Add Glob option for filtering path

This commit is contained in:
Cédric Leporcq 2021-09-18 22:06:34 +02:00
parent 6af9d5d879
commit 63b154b8f3
9 changed files with 381 additions and 237 deletions

View File

@ -9,8 +9,8 @@ dirs_path={%Y}/{%m-%b}-{city}-{folder}
name={%Y%m%d-%H%M%S}-%u{original_name}.%l{ext}
[Exclusions]
name1=.directory
name2=.DS_Store
path1=**/.directory
path2=**/.DS_Store
[Geolocation]
geocoder=Nominatim

181
ordigi.py
View File

@ -3,7 +3,6 @@
import os
import re
import sys
from datetime import datetime
import click
@ -16,17 +15,56 @@ from ordigi.media import Media, get_all_subclasses
from ordigi.summary import Summary
_logger_options = [
click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.'),
click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
]
_dry_run_options = [
click.option('--dry-run', default=False, is_flag=True,
help='Dry run only, no change made to the filesystem.')
]
_filter_option = [
click.option('--exclude', '-e', default=set(), multiple=True,
help='Directories or files to exclude.'),
click.option('--filter-by-ext', '-f', default=set(), multiple=True,
help="""Use filename
extension to filter files for sorting. If value is '*', use
common media file extension for filtering. Ignored files remain in
the same directory structure""" ),
click.option('--glob', '-g', default='**/*',
help='Glob file selection')
]
def print_help(command):
click.echo(command.get_help(click.Context(sort)))
def add_options(options):
def _add_options(func):
for option in reversed(options):
func = option(func)
return func
return _add_options
def _get_exclude(opt, exclude):
# if no exclude list was passed in we check if there's a config
if len(exclude) == 0:
exclude = opt['exclude']
return set(exclude)
@click.command('sort')
@add_options(_logger_options)
@add_options(_dry_run_options)
@add_options(_filter_option)
@click.option('--album-from-folder', default=False, is_flag=True,
help="Use images' folders as their album names.")
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@click.option('--dry-run', default=False, is_flag=True,
help='Dry run only, no change made to the filesystem.')
@click.option('--destination', '-d', type=click.Path(file_okay=False),
default=None, help='Sort files into this directory.')
@click.option('--clean', '-C', default=False, is_flag=True,
@ -34,16 +72,10 @@ def print_help(command):
@click.option('--copy', '-c', default=False, is_flag=True,
help='True if you want files to be copied over from src_dir to\
dest_dir rather than moved')
@click.option('--exclude-regex', '-e', default=set(), multiple=True,
help='Regular expression for directories or files to exclude.')
@click.option('--filter-by-ext', '-f', default=set(), multiple=True, help='''Use filename
extension to filter files for sorting. If value is '*', use
common media file extension for filtering. Ignored files remain in
the same directory structure''' )
@click.option('--ignore-tags', '-i', default=set(), multiple=True,
@click.option('--ignore-tags', '-I', default=set(), multiple=True,
help='Specific tags or group that will be ignored when\
searching for file data. Example \'File:FileModifyDate\' or \'Filename\'' )
@click.option('--interactive', default=False, is_flag=True,
@click.option('--interactive', '-i', default=False, is_flag=True,
help="Interactive mode")
@click.option('--max-deep', '-m', default=None,
help='Maximum level to proceed. Number from 0 to desired level.')
@ -52,28 +84,31 @@ def print_help(command):
and a file hash')
@click.option('--reset-cache', '-r', default=False, is_flag=True,
help='Regenerate the hash.json and location.json database ')
@click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
@click.argument('paths', required=True, nargs=-1, type=click.Path())
def _sort(album_from_folder, debug, dry_run, destination, clean, copy,
exclude_regex, interactive, filter_by_ext, ignore_tags,
max_deep, remove_duplicates, reset_cache, verbose, paths):
def sort(**kwargs):
"""Sort files or directories by reading their EXIF and organizing them
according to ordigi.conf preferences.
"""
if copy:
debug = kwargs['debug']
destination = kwargs['destination']
verbose = kwargs['verbose']
paths = kwargs['paths']
if kwargs['copy']:
mode = 'copy'
else:
mode = 'move'
logger = log.get_logger(verbose, debug)
max_deep = kwargs['max_deep']
if max_deep is not None:
max_deep = int(max_deep)
cache = True
if reset_cache:
if kwargs['reset_cache']:
cache = False
if len(paths) > 1:
@ -89,28 +124,25 @@ def _sort(album_from_folder, debug, dry_run, destination, clean, copy,
sys.exit(1)
paths = set(paths)
filter_by_ext = set(filter_by_ext)
config = Config(constants.CONFIG_FILE)
opt = config.get_options()
# if no exclude list was passed in we check if there's a config
if len(exclude_regex) == 0:
exclude_regex = opt['exclude_regex']
exclude_regex_list = set(exclude_regex)
exclude = _get_exclude(opt, kwargs['exclude'])
filter_by_ext = set(kwargs['filter_by_ext'])
collection = Collection(destination, opt['path_format'],
album_from_folder, cache, opt['day_begins'], dry_run,
exclude_regex_list, filter_by_ext, interactive,
kwargs['album_from_folder'], cache, opt['day_begins'], kwargs['dry_run'],
exclude, filter_by_ext, kwargs['glob'], kwargs['interactive'],
logger, max_deep, mode)
loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'],
opt['timeout'])
summary, has_errors = collection.sort_files(paths, loc,
remove_duplicates, ignore_tags)
kwargs['remove_duplicates'], kwargs['ignore_tags'])
if clean:
if kwargs['clean']:
remove_empty_folders(destination, logger)
if verbose or debug:
@ -141,12 +173,11 @@ def remove_empty_folders(path, logger, remove_root=True):
@click.command('clean')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@add_options(_logger_options)
@add_options(_dry_run_options)
@add_options(_filter_option)
@click.option('--dedup-regex', '-d', default=set(), multiple=True,
help='Regex to match duplicate strings parts')
@click.option('--dry-run', default=False, is_flag=True,
help='Dry run only, no change made to the filesystem.')
@click.option('--folders', '-f', default=False, is_flag=True,
help='Remove empty folders')
@click.option('--max-deep', '-m', default=None,
@ -158,15 +189,20 @@ def remove_empty_folders(path, logger, remove_root=True):
and a file hash')
@click.option('--root', '-r', type=click.Path(file_okay=False),
default=None, help='Root dir of media collection. If not set, use path')
@click.option('--verbose', '-v', default=False,
help='True if you want to see details of file processing')
@click.argument('path', required=True, nargs=1, type=click.Path())
def _clean(debug, dedup_regex, dry_run, folders, max_deep, path_string, remove_duplicates, root, verbose, path):
def clean(**kwargs):
"""Remove empty folders
Usage: clean [--verbose|--debug] directory [removeRoot]"""
logger = log.get_logger(verbose, debug)
debug = kwargs['debug']
dry_run = kwargs['dry_run']
folders = kwargs['folders']
root = kwargs['root']
verbose = kwargs['verbose']
path = kwargs['path']
logger = log.get_logger(verbose, debug)
clean_all = False
if not folders:
clean_all = True
@ -176,10 +212,15 @@ def _clean(debug, dedup_regex, dry_run, folders, max_deep, path_string, remove_d
config = Config(constants.CONFIG_FILE)
opt = config.get_options()
if path_string:
collection = Collection(root, opt['path_format'], dry_run=dry_run, logger=logger, max_deep=max_deep, mode='move')
dedup_regex = list(dedup_regex)
summary, has_errors = collection.dedup_regex(path, dedup_regex, logger, remove_duplicates)
exclude = _get_exclude(opt, kwargs['exclude'])
filter_by_ext = set(kwargs['filter_by_ext'])
if kwargs['path_string']:
collection = Collection(root, opt['path_format'], dry_run=dry_run,
exclude=exclude, filter_by_ext=filter_by_ext, glob=kwargs['glob'],
logger=logger, max_deep=kwargs['max_deep'], mode='move')
dedup_regex = list(kwargs['dedup_regex'])
summary, has_errors = collection.dedup_regex(path, dedup_regex, logger, kwargs['remove_duplicates'])
if clean_all or folders:
remove_empty_folders(path, logger)
@ -192,11 +233,10 @@ def _clean(debug, dedup_regex, dry_run, folders, max_deep, path_string, remove_d
@click.command('generate-db')
@add_options(_logger_options)
@click.option('--path', type=click.Path(file_okay=False),
required=True, help='Path of your photo library.')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
def _generate_db(path, debug):
def generate_db(**kwargs):
"""Regenerate the hash.json database which contains all of the sha256 signatures of media files.
"""
# TODO
@ -204,21 +244,19 @@ def _generate_db(path, debug):
@click.command('verify')
@add_options(_logger_options)
@click.option('--path', type=click.Path(file_okay=False),
required=True, help='Path of your photo library.')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
def _verify(path, debug):
def verify(**kwargs):
"""Verify hashes"""
# TODO
pass
@click.command('compare')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@click.option('--dry-run', default=False, is_flag=True,
help='Dry run only, no change made to the filesystem.')
@add_options(_logger_options)
@add_options(_dry_run_options)
@add_options(_filter_option)
@click.option('--find-duplicates', '-f', default=False, is_flag=True)
@click.option('--output-dir', '-o', default=False, is_flag=True, help='output\
dir')
@ -231,27 +269,35 @@ def _verify(path, debug):
image')
@click.option('--similarity', '-S', default=80, help='Similarity level for\
images')
@click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
@click.argument('path', nargs=1, required=True)
def _compare(debug, dry_run, find_duplicates, output_dir, remove_duplicates,
revert_compare, root, similar_to, similarity, verbose, path):
def compare(**kwargs):
'''Compare files in directories'''
logger = log.get_logger(verbose, debug)
debug = kwargs['debug']
dry_run = kwargs['dry_run']
root = kwargs['root']
verbose = kwargs['verbose']
path = kwargs['path']
logger = log.get_logger(verbose, debug)
if not root:
root = path
root = kwargs['path']
config = Config(constants.CONFIG_FILE)
opt = config.get_options()
collection = Collection(root, None, mode='move', dry_run=dry_run, logger=logger)
exclude = _get_exclude(opt, kwargs['exclude'])
filter_by_ext = set(kwargs['filter_by_ext'])
if revert_compare:
summary, has_errors = collection.revert_compare(path, dry_run)
collection = Collection(root, None, exclude=exclude,
filter_by_ext=filter_by_ext, glob=kwargs['glob'],
mode='move', dry_run=dry_run, logger=logger)
if kwargs['revert_compare']:
summary, has_errors = collection.revertcompare(path, dry_run)
else:
summary, has_errors = collection.sort_similar_images(path, similarity)
summary, has_errors = collection.sort_similar_images(path, kwargs['similarity'])
if verbose or debug:
summary.write()
@ -261,16 +307,17 @@ def _compare(debug, dry_run, find_duplicates, output_dir, remove_duplicates,
@click.group()
def main():
def main(**kwargs):
pass
main.add_command(_clean)
main.add_command(_compare)
main.add_command(_sort)
main.add_command(_generate_db)
main.add_command(_verify)
main.add_command(clean)
main.add_command(compare)
main.add_command(sort)
main.add_command(generate_db)
main.add_command(verify)
if __name__ == '__main__':
main()

View File

@ -4,10 +4,11 @@ General file system methods.
from builtins import object
import filecmp
from fnmatch import fnmatch
import hashlib
import logging
import os
from pathlib import Path
from pathlib import Path, PurePath
import re
import sys
import shutil
@ -16,7 +17,7 @@ from datetime import datetime, timedelta
from ordigi import media
from ordigi.database import Sqlite
from ordigi.media import Media, get_all_subclasses
from ordigi.images import Images
from ordigi.images import Image, Images
from ordigi.summary import Summary
from ordigi.utils import get_date_regex, camel2snake
@ -25,9 +26,9 @@ class Collection(object):
"""Class of the media collection."""
def __init__(self, root, path_format, album_from_folder=False,
cache=False, day_begins=0, dry_run=False, exclude_regex_list=set(),
filter_by_ext=set(), interactive=False, logger=logging.getLogger(),
max_deep=None, mode='copy'):
cache=False, day_begins=0, dry_run=False, exclude=set(),
filter_by_ext=set(), glob='**/*', interactive=False,
logger=logging.getLogger(), max_deep=None, mode='copy'):
# Attributes
self.root = Path(root).expanduser().absolute()
@ -43,7 +44,7 @@ class Collection(object):
self.cache = cache
self.day_begins = day_begins
self.dry_run = dry_run
self.exclude_regex_list = exclude_regex_list
self.exclude = exclude
if '%media' in filter_by_ext:
filter_by_ext.remove('%media')
@ -51,6 +52,7 @@ class Collection(object):
else:
self.filter_by_ext = filter_by_ext
self.glob = glob
self.items = self.get_items()
self.interactive = interactive
self.logger = logger
@ -91,6 +93,47 @@ class Collection(object):
return date
def _get_folders(self, folders, mask):
"""
Get folders part
:params: Part, list
:returns: list
"""
n = len(folders) - 1
if not re.search(r':', mask):
a = re.compile(r'[0-9]')
match = re.search(a, mask)
if match:
# single folder example: folders[1]
i = int(match[0])
if i > n:
# i is out of range, use ''
return ['']
else:
return folders[i]
else:
# all folders example: folders
return folders
else:
# multiple folder selection: example folders[1:3]
a = re.compile(r'[0-9]:')
b = re.compile(r':[0-9]')
begin = int(re.search(a, mask)[0][0])
end = int(re.search(b, mask)[0][1])
if begin > n:
# no matched folders
return ['']
if end > n:
end = n
if begin >= end:
return ['']
else:
# select matched folders
return folders[begin:end]
def get_part(self, item, mask, metadata, subdirs):
"""Parse a specific folder's name given a mask and metadata.
@ -123,9 +166,8 @@ class Collection(object):
part = os.path.basename(subdirs)
elif item == 'folders':
folders = Path(subdirs).parts
folders = eval(mask)
folders = subdirs.parts
folders = self._get_folders(folders, mask)
part = os.path.join(*folders)
elif item in ('album','camera_make', 'camera_model', 'city', 'country',
@ -169,7 +211,7 @@ class Collection(object):
return this_part
def get_path(self, metadata, subdirs='', whitespace_sub='_'):
def get_path(self, metadata, subdirs, whitespace_sub='_'):
"""path_format: {%Y-%d-%m}/%u{city}/{album}
Returns file path.
@ -295,28 +337,6 @@ class Collection(object):
return self.summary, has_errors
def should_exclude(self, path, regex_list=set()):
if(len(regex_list) == 0):
return False
return any(regex.search(path) for regex in regex_list)
def walklevel(self, src_path, maxlevel=None):
"""
Walk into input directory recursively until desired maxlevel
source: https://stackoverflow.com/questions/229186/os-walk-without-digging-into-directories-below
"""
src_path = src_path.rstrip(os.path.sep)
if not os.path.isdir(src_path):
return None
num_sep = src_path.count(os.path.sep)
for root, dirs, files in os.walk(src_path):
level = root.count(os.path.sep) - num_sep
yield root, dirs, files, level
if maxlevel is not None and level >= maxlevel:
del dirs[:]
def remove(self, file_path):
if not self.dry_run:
os.remove(file_path)
@ -421,43 +441,90 @@ class Collection(object):
return items
def get_files_in_path(self, path, extensions=set()):
def walklevel(self, src_path, maxlevel=None):
"""
Walk into input directory recursively until desired maxlevel
source: https://stackoverflow.com/questions/229186/os-walk-without-digging-into-directories-below
"""
src_path = str(src_path)
if not os.path.isdir(src_path):
return None
num_sep = src_path.count(os.path.sep)
for root, dirs, files in os.walk(src_path):
level = root.count(os.path.sep) - num_sep
yield root, dirs, files, level
if maxlevel is not None and level >= maxlevel:
del dirs[:]
def level(self, path):
"""
:param: Path
:return: int
"""
# if isinstance(path, str):
# # To remove trailing '/' chars
# path = Path(path)
# path = str(path)
return len(path.parts) - 1
# TODO move to utils.. or CPath..
def _get_files_in_path(self, path, glob='**/*', maxlevel=None, extensions=set()):
"""Recursively get files which match a path and extension.
:param str path string: Path to start recursive file listing
:param tuple(str) extensions: File extensions to include (whitelist)
:returns: file_path, subdirs
:returns: Path file_path, Path subdirs
"""
file_list = set()
if os.path.isfile(path):
file_list.add((path, ''))
# Create a list of compiled regular expressions to match against the file path
compiled_regex_list = [re.compile(regex) for regex in self.exclude_regex_list]
subdirs = ''
for dirname, dirnames, filenames, level in self.walklevel(path,
self.max_deep):
should_exclude_dir = self.should_exclude(dirname, compiled_regex_list)
if dirname == os.path.join(path, '.ordigi') or should_exclude_dir:
for path0 in path.glob(glob):
if path0.is_dir():
continue
else:
file_path = path0
parts = file_path.parts
subdirs = file_path.relative_to(path).parent
if glob == '*':
level = 0
else:
level = len(subdirs.parts)
if level > 0:
subdirs = os.path.join(subdirs, os.path.basename(dirname))
if file_path.parts[0] == '.ordigi':
continue
if maxlevel is not None:
if level > maxlevel:
continue
for exclude in self.exclude:
if fnmatch(file_path, exclude):
continue
for filename in filenames:
# If file extension is in `extensions`
# And if file path is not in exclude regexes
# Then append to the list
filename_path = os.path.join(dirname, filename)
if (
extensions == set()
or os.path.splitext(filename)[1][1:].lower() in extensions
and not self.should_exclude(filename, compiled_regex_list)
or PurePath(file_path).suffix.lower() in extensions
):
file_list.add((filename, subdirs))
# return file_path and subdir
yield file_path
return file_list
def _create_directory(self, directory_path):
"""Create a directory if it does not already exist.
:param Path: A fully qualified path of the to create.
:returns: bool
"""
try:
if directory_path.exists():
return True
else:
if not self.dry_run:
directory_path.mkdir(parents=True, exist_ok=True)
self.logger.info(f'Create {directory_path}')
return True
except OSError:
# OSError is thrown for cases like no permission
pass
return False
def create_directory(self, directory_path):
"""Create a directory if it does not already exist.
@ -480,6 +547,20 @@ class Collection(object):
return False
def _check_path(self, path):
"""
:param: str path
:return: Path path
"""
path = Path(path).expanduser().absolute()
# some error checking
if not path.exists():
self.logger.error(f'Directory {path} does not exist')
sys.exit(1)
return path
def check_path(self, path):
path = os.path.abspath(os.path.expanduser(path))
@ -500,7 +581,7 @@ class Collection(object):
def dedup_regex(self, path, dedup_regex, logger, remove_duplicates=False):
# cycle throught files
has_errors = False
path = self.check_path(path)
path = self._check_path(path)
# Delimiter regex
delim = r'[-_ .]'
# Numeric date item regex
@ -518,11 +599,9 @@ class Collection(object):
]
conflict_file_list = []
for filename, subdirs in self.get_files_in_path(path):
file_path = os.path.join(path, subdirs, filename)
for src_path in self._get_files_in_path(path, glob=self.glob):
src_checksum = self.checksum(src_path)
file_path = Path(src_path).relative_to(self.root)
path_parts = file_path.parts
path_parts = src_path.relative_to(self.root).parts
dedup_path = []
for path_part in path_parts:
items = []
@ -536,8 +615,11 @@ class Collection(object):
dedup_path.append(''.join(filtered_items))
# Dedup path
dest_path = os.path.join(self.root, *dedup_path)
self.create_directory(os.path.dirname(dest_path))
dest_path = self.root.joinpath(*dedup_path)
self._create_directory(dest_path.parent.name)
src_path = str(src_path)
dest_path = str(dest_path)
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
@ -563,28 +645,29 @@ class Collection(object):
"""
has_errors = False
for path in paths:
path = self.check_path(path)
path = self._check_path(path)
conflict_file_list = []
for filename, subdirs in self.get_files_in_path(path,
for src_path in self._get_files_in_path(path, glob=self.glob,
extensions=self.filter_by_ext):
src_path = os.path.join(path, subdirs, filename)
subdirs = src_path.relative_to(path).parent
# Process files
src_checksum = self.checksum(src_path)
media = Media(path, subdirs, filename, self.album_from_folder, ignore_tags,
media = Media(src_path, path, self.album_from_folder, ignore_tags,
self.interactive, self.logger)
if media:
metadata = media.get_metadata(loc, self.db, self.cache)
# Get the destination path according to metadata
file_path = self.get_path(metadata, subdirs=subdirs)
file_path = Path(self.get_path(metadata, subdirs))
else:
# Keep same directory structure
file_path = os.path.relpath(src_path, path)
file_path = src_path.relative_to(path)
dest_directory = os.path.join(self.root,
os.path.dirname(file_path))
dest_path = os.path.join(self.root, file_path)
dest_directory = self.root / file_path.parent
self._create_directory(dest_directory)
self.create_directory(dest_directory)
# Convert paths to string
src_path = str(src_path)
dest_path = str(self.root / file_path)
result = self.sort_file(src_path, dest_path, remove_duplicates)
@ -640,65 +723,70 @@ class Collection(object):
self.logger.info(f'move: {img_path} -> {dest_path}')
return self.set_hash(True, img_path, dest_path, checksum)
def sort_similar_images(self, path, similarity=80):
def _get_images(self, path):
"""
:returns: iter
"""
for src_path in self._get_files_in_path(path, glob=self.glob,
extensions=self.filter_by_ext):
dirname = src_path.parent.name
has_errors = False
path = self.check_path(path)
for dirname, dirnames, filenames, level in self.walklevel(path, None):
if dirname == os.path.join(path, '.ordigi'):
continue
if dirname.find('similar_to') == 0:
continue
file_paths = set()
for filename in filenames:
file_paths.add(os.path.join(dirname, filename))
image = Image(src_path)
i = Images(file_paths, logger=self.logger)
if image.is_image():
yield src_path
images = set([ i for i in i.get_images() ])
for image in images:
if not os.path.isfile(image):
continue
checksum1 = self.checksum(image)
# Process files
# media = Media(src_path, False, self.logger)
# TODO compare metadata
# if media:
# metadata = media.get_metadata()
similar = False
moved_imgs = set()
for img_path in i.find_similar(image, similarity):
similar = True
checksum2 = self.checksum(img_path)
# move image into directory
name = os.path.splitext(os.path.basename(image))[0]
directory_name = 'similar_to_' + name
dest_directory = os.path.join(os.path.dirname(img_path),
directory_name)
dest_path = os.path.join(dest_directory, os.path.basename(img_path))
def sort_similar_images(self, path, similarity=80):
result = self.create_directory(dest_directory)
# Move the simlars file into the destination directory
if result:
result = self.move_file(img_path, dest_path, checksum2)
moved_imgs.add(img_path)
if not result:
has_errors = True
else:
has_errors = True
has_errors = False
path = self._check_path(path)
img_paths = set([ x for x in self._get_images(path) ])
i = Images(img_paths, logger=self.logger)
for image in img_paths:
if not os.path.isfile(image):
continue
checksum1 = self.checksum(image)
# Process files
# media = Media(src_path, False, self.logger)
# TODO compare metadata
# if media:
# metadata = media.get_metadata()
similar = False
moved_imgs = set()
for img_path in i.find_similar(image, similarity):
similar = True
checksum2 = self.checksum(img_path)
# move image into directory
name = os.path.splitext(os.path.basename(image))[0]
directory_name = 'similar_to_' + name
dest_directory = os.path.join(os.path.dirname(img_path),
directory_name)
dest_path = os.path.join(dest_directory, os.path.basename(img_path))
if similar:
dest_path = os.path.join(dest_directory,
os.path.basename(image))
result = self.move_file(image, dest_path, checksum1)
moved_imgs.add(image)
result = self.create_directory(dest_directory)
# Move the simlars file into the destination directory
if result:
result = self.move_file(img_path, dest_path, checksum2)
moved_imgs.add(img_path)
if not result:
has_errors = True
else:
has_errors = True
# for moved_img in moved_imgs:
# os.remove(moved_img)
if similar:
dest_path = os.path.join(dest_directory,
os.path.basename(image))
result = self.move_file(image, dest_path, checksum1)
moved_imgs.add(image)
if not result:
has_errors = True
# for moved_img in moved_imgs:
# os.remove(moved_img)
return self.summary, has_errors

View File

@ -86,7 +86,7 @@ class Config:
options['day_begins'] = 0
if 'Exclusions' in self.conf:
options['exclude_regex'] = [value for key, value in self.conf.items('Exclusions')]
options['exclude'] = [value for key, value in self.conf.items('Exclusions')]
return options

View File

@ -75,33 +75,33 @@ class Images():
#: Valid extensions for image files.
extensions = ('arw', 'cr2', 'dng', 'gif', 'heic', 'jpeg', 'jpg', 'nef', 'png', 'rw2')
def __init__(self, file_paths=None, hash_size=8, logger=logging.getLogger()):
def __init__(self, img_paths=set(), hash_size=8, logger=logging.getLogger()):
self.file_paths = file_paths
self.hash_size = hash_size
self.img_paths = img_paths
self.duplicates = []
self.hash_size = hash_size
self.logger = logger
def get_images(self):
def add_images(self, file_paths):
''':returns: img_path generator
'''
for img_path in self.file_paths:
for img_path in file_paths:
image = Image(img_path)
if image.is_image():
yield img_path
self.img_paths.add(img_path)
def get_images_hashes(self):
"""Get image hashes"""
hashes = {}
# Searching for duplicates.
for img_path in self.get_images():
for img_path in self.img_paths:
with img.open(img_path) as img:
yield imagehash.average_hash(img, self.hash_size)
def find_duplicates(self, img_path):
"""Find duplicates"""
duplicates = []
for temp_hash in get_images_hashes(self.file_paths):
for temp_hash in get_images_hashes(self.img_paths):
if temp_hash in hashes:
self.logger.info("Duplicate {} \nfound for image {}\n".format(img_path, hashes[temp_hash]))
duplicates.append(img_path)
@ -150,7 +150,7 @@ class Images():
threshold = 1 - similarity/100
diff_limit = int(threshold*(self.hash_size**2))
for img_path in self.get_images():
for img_path in self.img_paths:
if img_path == image:
continue
hash2 = image.get_hash()

View File

@ -30,12 +30,16 @@ class Media():
extensions = PHOTO + AUDIO + VIDEO
def __init__(self, path, subdirs, filename, album_from_folder=False, ignore_tags=set(),
def __init__(self, file_path, root, album_from_folder=False, ignore_tags=set(),
interactive=False, logger=logging.getLogger()):
self.path = path
self.subdirs = subdirs
self.filename = filename
self.file_path = os.path.join(path, subdirs, filename)
"""
:params: Path, Path, bool, set, bool, Logger
"""
self.file_path = str(file_path)
self.root = str(root)
self.subdirs = str(file_path.relative_to(root).parent)
self.folder = str(file_path.parent.name)
self.filename = str(file_path.name)
self.album_from_folder = album_from_folder
self.ignore_tags = ignore_tags
@ -262,14 +266,14 @@ class Media():
self.metadata[key] = formated_data
self.metadata['src_path'] = self.path
self.metadata['src_path'] = self.root
self.metadata['subdirs'] = self.subdirs
self.metadata['filename'] = self.filename
self.metadata['date_taken'] = self.get_date_taken()
if self.album_from_folder:
album = self.metadata['album']
folder = os.path.basename(self.subdirs)
folder = self.folder
if album and album != '':
if self.interactive:
print(f"Conflict for file: {self.file_path}")
@ -351,7 +355,7 @@ class Media():
:returns: value (str)
"""
return ExifTool(self.file_path, self.logger).setvalue(tag, value)
return ExifTool(self.file_path, logger=self.logger).setvalue(tag, value)
def set_date_taken(self, date_key, time):
"""Set the date/time a photo was taken.
@ -400,9 +404,7 @@ class Media():
:returns: bool
"""
folder = os.path.basename(os.path.dirname(self.file_path))
return self.set_value('album', folder)
return self.set_value('album', self.folder)
def get_all_subclasses(cls=None):

View File

@ -22,12 +22,11 @@ def reset_singletons():
@pytest.fixture(scope="session")
def sample_files_paths(tmpdir_factory):
tmp_path = tmpdir_factory.mktemp("ordigi-src-")
paths = Path(ORDIGI_PATH, 'samples/test_exif').glob('*')
tmp_path = Path(tmpdir_factory.mktemp("ordigi-src-"))
path = Path(ORDIGI_PATH, 'samples/test_exif')
shutil.copytree(path, tmp_path / path.name)
paths = Path(tmp_path).glob('**/*')
file_paths = [x for x in paths if x.is_file()]
for file_path in file_paths:
source_path = tmp_path.join(file_path.name)
shutil.copyfile(file_path, source_path)
return tmp_path, file_paths

View File

@ -22,7 +22,7 @@ class TestCollection:
@pytest.fixture(autouse=True)
def setup_class(cls, sample_files_paths):
cls.src_paths, cls.file_paths = sample_files_paths
cls.src_path, cls.file_paths = sample_files_paths
cls.path_format = constants.default_path + '/' + constants.default_name
def teardown_class(self):
@ -57,9 +57,9 @@ class TestCollection:
'{%Y-%m-%b}'
]
subdirs = Path('a', 'b', 'c', 'd')
for file_path in self.file_paths:
media = Media(os.path.dirname(file_path), '', os.path.basename(file_path))
media = Media(file_path, self.src_path)
subdirs = file_path.relative_to(self.src_path).parent
exif_tags = {}
for key in ('album', 'camera_make', 'camera_model', 'latitude',
'longitude', 'original_name', 'title'):
@ -83,10 +83,7 @@ class TestCollection:
elif item == 'folder':
assert part == subdirs.name, file_path
elif item == 'folders':
if platform == "win32":
assert '\\' in part, file_path
else:
assert '/' in part, file_path
assert part in str(subdirs)
elif item == 'ext':
assert part == file_path.suffix[1:], file_path
elif item == 'name':
@ -115,7 +112,7 @@ class TestCollection:
collection = Collection(tmp_path, self.path_format)
for file_path in self.file_paths:
exif_data = ExifToolCaching(str(file_path)).asdict()
media = Media(os.path.dirname(file_path), '', os.path.basename(file_path))
media = Media(file_path, self.src_path)
metadata = media.get_metadata()
date_taken = media.get_date_taken()
@ -139,22 +136,22 @@ class TestCollection:
def test_sort_files(self, tmp_path):
collection = Collection(tmp_path, self.path_format, album_from_folder=True)
loc = GeoLocation()
summary, has_errors = collection.sort_files([self.src_paths], loc)
summary, has_errors = collection.sort_files([self.src_path], loc)
# Summary is created and there is no errors
assert summary, summary
assert not has_errors, has_errors
for file_path in tmp_path.glob('*/**/*.*'):
for file_path in tmp_path.glob('**/*'):
if '.db' not in str(file_path):
media = Media(os.path.dirname(file_path), '', os.path.basename(file_path), album_from_folder=True)
media = Media(file_path, tmp_path, album_from_folder=True)
media.get_exif_metadata()
for value in media._get_key_values('album'):
assert value != '' or None
# test with populated dest dir
randomize_files(tmp_path)
summary, has_errors = collection.sort_files([self.src_paths], loc)
summary, has_errors = collection.sort_files([self.src_path], loc)
assert summary, summary
assert not has_errors, has_errors
@ -165,14 +162,14 @@ class TestCollection:
loc = GeoLocation()
randomize_db(tmp_path)
with pytest.raises(sqlite3.DatabaseError) as e:
summary, has_errors = collection.sort_files([self.src_paths], loc)
summary, has_errors = collection.sort_files([self.src_path], loc)
def test_sort_file(self, tmp_path):
for mode in 'copy', 'move':
collection = Collection(tmp_path, self.path_format, mode=mode)
# copy mode
src_path = Path(self.src_paths, 'photo.png')
src_path = Path(self.src_path, 'test_exif', 'photo.png')
name = 'photo_' + mode + '.png'
dest_path = Path(tmp_path, name)
src_checksum = collection.checksum(src_path)
@ -191,6 +188,15 @@ class TestCollection:
# TODO check date
#- Sort similar images into a directory
def test__get_files_in_path(self, tmp_path):
collection = Collection(tmp_path, self.path_format, exclude='**/*.dng')
paths = [x for x in collection._get_files_in_path(self.src_path,
maxlevel=1, glob='**/photo*')]
assert len(paths) == 6
for path in paths:
assert isinstance(path, Path)
# TODO Sort similar images into a directory
# collection.sort_similar

View File

@ -18,14 +18,14 @@ class TestMetadata:
@pytest.fixture(autouse=True)
def setup_class(cls, sample_files_paths):
cls.src_paths, cls.file_paths = sample_files_paths
cls.src_path, cls.file_paths = sample_files_paths
cls.ignore_tags = ('EXIF:CreateDate', 'File:FileModifyDate',
'File:FileAccessDate', 'EXIF:Make', 'Composite:LightValue')
def get_media(self):
for file_path in self.file_paths:
self.exif_data = ExifTool(str(file_path)).asdict()
yield file_path, Media(os.path.dirname(file_path), '', os.path.basename(file_path), album_from_folder=True, ignore_tags=self.ignore_tags)
self.exif_data = ExifTool(file_path).asdict()
yield file_path, Media(file_path, self.src_path, album_from_folder=True, ignore_tags=self.ignore_tags)
def test_get_metadata(self):
for file_path, media in self.get_media():
@ -51,8 +51,10 @@ class TestMetadata:
assert value is None
if key == 'album':
if 'with-album' in str(file_path):
assert value == "Test Album"
for album in media._get_key_values('album'):
if album is not None and album != '':
assert value == album
break
else:
assert value == file_path.parent.name