From 9b055c88bd805b43b51be90c2510d576e9d226e1 Mon Sep 17 00:00:00 2001 From: Cedric Leporcq Date: Fri, 27 Aug 2021 12:45:25 +0200 Subject: [PATCH] Refactoring filesystem.py --- ordigi.py | 150 ++--- ordigi/{filesystem.py => collection.py} | 571 ++++++++++-------- ordigi/constants.py | 2 +- tests/conftest.py | 34 +- ...{test_filesystem.py => test_collection.py} | 61 +- tests/test_media.py | 6 +- tests/{test_dozo.py => test_ordigi.py} | 0 7 files changed, 452 insertions(+), 372 deletions(-) rename ordigi/{filesystem.py => collection.py} (76%) rename tests/{test_filesystem.py => test_collection.py} (77%) rename tests/{test_dozo.py => test_ordigi.py} (100%) diff --git a/ordigi.py b/ordigi.py index 0aa511d..059669a 100755 --- a/ordigi.py +++ b/ordigi.py @@ -11,29 +11,16 @@ from ordigi.config import Config from ordigi import constants from ordigi import log from ordigi.database import Db -from ordigi.filesystem import FileSystem +from ordigi.collection import Collection from ordigi.geolocation import GeoLocation from ordigi.media import Media, get_all_subclasses from ordigi.summary import Summary -FILESYSTEM = FileSystem() - def print_help(command): click.echo(command.get_help(click.Context(sort))) -@click.command('batch') -@click.option('--debug', default=False, is_flag=True, - help='Override the value in constants.py with True.') -def _batch(debug): - """Run batch() for all plugins. - """ - constants.debug = debug - plugins = Plugins() - plugins.run_batch() - - @click.command('sort') @click.option('--debug', default=False, is_flag=True, help='Override the value in constants.py with True.') @@ -57,10 +44,10 @@ def _batch(debug): searching for file data. Example \'File:FileModifyDate\' or \'Filename\'' ) @click.option('--max-deep', '-m', default=None, help='Maximum level to proceed. Number from 0 to desired level.') -@click.option('--remove-duplicates', '-r', default=False, is_flag=True, +@click.option('--remove-duplicates', '-R', default=False, is_flag=True, help='True to remove files that are exactly the same in name\ and a file hash') -@click.option('--reset-cache', '-R', default=False, is_flag=True, +@click.option('--reset-cache', '-r', default=False, is_flag=True, help='Regenerate the hash.json and location.json database ') @click.option('--verbose', '-v', default=False, is_flag=True, help='True if you want to see details of file processing') @@ -85,10 +72,16 @@ def _sort(debug, dry_run, destination, clean, copy, exclude_regex, filter_by_ext if reset_cache: cache = False - if not destination and paths: - destination = paths[-1] - paths = paths[0:-1] + if len(paths) > 1: + if not destination: + # Use last path argument as destination + destination = paths[-1] + paths = paths[0:-1] + elif paths: + # Source and destination are the same + destination = paths[0] else: + logger.error(f'`ordigi sort` need at least one path argument') sys.exit(1) paths = set(paths) @@ -110,13 +103,15 @@ def _sort(debug, dry_run, destination, clean, copy, exclude_regex, filter_by_ext # Initialize Db db = Db(destination) - filesystem = FileSystem(cache, opt['day_begins'], dry_run, exclude_regex_list, - filter_by_ext, logger, max_deep, mode, opt['path_format']) + collection = Collection(opt['path_format'], destination, cache, + opt['day_begins'], dry_run, exclude_regex_list, filter_by_ext, + logger, max_deep, mode) - loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'], opt['timeout']) + loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'], + opt['timeout']) - summary, has_errors = filesystem.sort_files(paths, destination, db, - loc, remove_duplicates, ignore_tags) + summary, has_errors = collection.sort_files(paths, db, loc, + remove_duplicates, ignore_tags) if clean: remove_empty_folders(destination, logger) @@ -151,17 +146,54 @@ def remove_empty_folders(path, logger, remove_root=True): @click.command('clean') @click.option('--debug', default=False, is_flag=True, help='Override the value in constants.py with True.') -@click.option('--verbose', '-v', default=False, is_flag=True, +@click.option('--dedup-regex', '-d', default=set(), multiple=True, + help='Regex to match duplicate strings parts') +@click.option('--dry-run', default=False, is_flag=True, + help='Dry run only, no change made to the filesystem.') +@click.option('--folders', '-f', default=False, is_flag=True, + help='Remove empty folders') +@click.option('--max-deep', '-m', default=None, + help='Maximum level to proceed. Number from 0 to desired level.') +@click.option('--path-string', '-p', default=False, is_flag=True, + help='Deduplicate path string') +@click.option('--remove-duplicates', '-R', default=False, is_flag=True, + help='True to remove files that are exactly the same in name\ + and a file hash') +@click.option('--root', '-r', type=click.Path(file_okay=False), + default=None, help='Root dir of media collection. If not set, use path') +@click.option('--verbose', '-v', default=False, help='True if you want to see details of file processing') @click.argument('path', required=True, nargs=1, type=click.Path()) -def _clean(debug, verbose, path): +def _clean(debug, dedup_regex, dry_run, folders, max_deep, path_string, remove_duplicates, root, verbose, path): """Remove empty folders Usage: clean [--verbose|--debug] directory [removeRoot]""" logger = log.get_logger(verbose, debug) - remove_empty_folders(path, logger) + clean_all = False + if not folders: + clean_all = True + if not root: + root = path + if clean_all or folders: + remove_empty_folders(path, logger) + + config = Config(constants.CONFIG_FILE) + opt = config.get_options() + + if path_string: + # Initialize Db + db = Db(root) + collection = Collection(opt['path_format'], root, dry_run=dry_run, logger=logger, max_deep=max_deep, mode='move') + dedup_regex = list(dedup_regex) + summary, has_errors = collection.dedup_regex(path, dedup_regex, db, logger, remove_duplicates) + + if verbose or debug: + summary.write() + + if has_errors: + sys.exit(1) @click.command('generate-db') @@ -172,26 +204,8 @@ def _clean(debug, verbose, path): def _generate_db(path, debug): """Regenerate the hash.json database which contains all of the sha256 signatures of media files. """ - constants.debug = debug - result = Result() - path = os.path.abspath(os.path.expanduser(path)) - - if not os.path.isdir(path): - log.error('path is not a valid directory %s' % path) - sys.exit(1) - - db = Db(path) - db.backup_hash_db() - db.reset_hash_db() - - for current_file in FILESYSTEM.get_all_files(path): - result.append((current_file, True)) - db.add_hash(db.checksum(current_file), current_file) - log.progress() - - db.update_hash_db() - log.progress('', True) - result.write() + # TODO + pass @click.command('verify') @@ -200,25 +214,9 @@ def _generate_db(path, debug): @click.option('--debug', default=False, is_flag=True, help='Override the value in constants.py with True.') def _verify(path, debug): - constants.debug = debug - result = Result() - db = Db(path) - for checksum, file_path in db.all(): - if not os.path.isfile(file_path): - result.append((file_path, False)) - log.progress('x') - continue - - actual_checksum = db.checksum(file_path) - if checksum == actual_checksum: - result.append((file_path, True)) - log.progress() - else: - result.append((file_path, False)) - log.progress('x') - - log.progress('', True) - result.write() + """Verify hashes""" + # TODO + pass @click.command('compare') @@ -232,6 +230,8 @@ def _verify(path, debug): @click.option('--remove-duplicates', '-r', default=False, is_flag=True) @click.option('--revert-compare', '-R', default=False, is_flag=True, help='Revert\ compare') +@click.option('--root', '-r', type=click.Path(file_okay=False), + default=None, help='Root dir of media collection. If not set, use path') @click.option('--similar-to', '-s', default=False, help='Similar to given\ image') @click.option('--similarity', '-S', default=80, help='Similarity level for\ @@ -240,21 +240,26 @@ def _verify(path, debug): help='True if you want to see details of file processing') @click.argument('path', nargs=1, required=True) def _compare(debug, dry_run, find_duplicates, output_dir, remove_duplicates, - revert_compare, similar_to, similarity, verbose, path): + revert_compare, root, similar_to, similarity, verbose, path): '''Compare files in directories''' logger = log.get_logger(verbose, debug) + if not root: + root = path + + config = Config(constants.CONFIG_FILE) + opt = config.get_options() + # Initialize Db - db = Db(path) + db = Db(root) - - filesystem = FileSystem(mode='move', dry_run=dry_run, logger=logger) + collection = Collection(path_format, root, mode='move', dry_run=dry_run, logger=logger) if revert_compare: - summary, has_errors = filesystem.revert_compare(path, db, dry_run) + summary, has_errors = collection.revert_compare(path, db, dry_run) else: - summary, has_errors = filesystem.sort_similar_images(path, db, + summary, has_errors = collection.sort_similar_images(path, db, similarity) if verbose or debug: @@ -274,7 +279,6 @@ main.add_command(_compare) main.add_command(_sort) main.add_command(_generate_db) main.add_command(_verify) -main.add_command(_batch) if __name__ == '__main__': diff --git a/ordigi/filesystem.py b/ordigi/collection.py similarity index 76% rename from ordigi/filesystem.py rename to ordigi/collection.py index 52f7417..467e656 100644 --- a/ordigi/filesystem.py +++ b/ordigi/collection.py @@ -7,27 +7,27 @@ import filecmp import hashlib import logging import os -import pathlib +from pathlib import Path import re import sys import shutil -import time from datetime import datetime, timedelta -from ordigi import constants - from ordigi import media from ordigi.media import Media, get_all_subclasses from ordigi.images import Images from ordigi.summary import Summary -class FileSystem(object): - """A class for interacting with the file system.""" +class Collection(object): + """Class of the media collection.""" - def __init__(self, cache=False, day_begins=0, dry_run=False, exclude_regex_list=set(), + def __init__(self, path_format, root, cache=False, day_begins=0, dry_run=False, exclude_regex_list=set(), filter_by_ext=set(), logger=logging.getLogger(), max_deep=None, - mode='copy', path_format=None): + mode='copy'): + + self.root = root + self.cache = cache self.day_begins = day_begins self.dry_run = dry_run @@ -43,38 +43,11 @@ class FileSystem(object): self.logger = logger self.max_deep = max_deep self.mode = mode - # TODO have to be removed - if path_format: - self.path_format = path_format - else: - self.path_format = os.path.join(constants.default_path, - constants.default_name) + self.path_format = path_format self.summary = Summary() self.whitespace_regex = '[ \t\n\r\f\v]+' - - def create_directory(self, directory_path): - """Create a directory if it does not already exist. - - :param str directory_name: A fully qualified path of the - to create. - :returns: bool - """ - try: - if os.path.exists(directory_path): - return True - else: - if not self.dry_run: - os.makedirs(directory_path) - self.logger.info(f'Create {directory_path}') - return True - except OSError: - # OSError is thrown for cases like no permission - pass - - return False - def get_items(self): return { 'album': '{album}', @@ -96,65 +69,26 @@ class FileSystem(object): 'date': '{(%[a-zA-Z][^a-zA-Z]*){1,8}}' # search for date format string } - def walklevel(self, src_path, maxlevel=None): - """ - Walk into input directory recursively until desired maxlevel - source: https://stackoverflow.com/questions/229186/os-walk-without-digging-into-directories-below - """ - src_path = src_path.rstrip(os.path.sep) - if not os.path.isdir(src_path): - return None + def get_date_regex(self, string, user_regex=None): + if user_regex is not None: + matches = re.findall(user_regex, string) + else: + regex = { + # regex to match date format type %Y%m%d, %y%m%d, %d%m%Y, + # etc... + 'a': re.compile( + r'.*[_-]?(?P\d{4})[_-]?(?P\d{2})[_-]?(?P\d{2})[_-]?(?P\d{2})[_-]?(?P\d{2})[_-]?(?P\d{2})'), + 'b': re.compile ( + r'[-_./](?P\d{4})[-_.]?(?P\d{2})[-_.]?(?P\d{2})[-_./]'), + # not very accurate + 'c': re.compile ( + r'[-_./](?P\d{2})[-_.]?(?P\d{2})[-_.]?(?P\d{2})[-_./]'), + 'd': re.compile ( + r'[-_./](?P\d{2})[-_.](?P\d{2})[-_.](?P\d{4})[-_./]') + } - num_sep = src_path.count(os.path.sep) - for root, dirs, files in os.walk(src_path): - level = root.count(os.path.sep) - num_sep - yield root, dirs, files, level - if maxlevel is not None and level >= maxlevel: - del dirs[:] - - def get_all_files(self, path, extensions=False, exclude_regex_list=set()): - """Recursively get all files which match a path and extension. - - :param str path string: Path to start recursive file listing - :param tuple(str) extensions: File extensions to include (whitelist) - :returns: generator - """ - if self.filter_by_ext != () and not extensions: - # Filtering files by extensions. - if '%media' in self.filter_by_ext: - extensions = set() - subclasses = get_all_subclasses() - for cls in subclasses: - extensions.update(cls.extensions) - else: - extensions = self.filter_by_ext - - # Create a list of compiled regular expressions to match against the file path - compiled_regex_list = [re.compile(regex) for regex in exclude_regex_list] - for dirname, dirnames, filenames in os.walk(path): - if dirname == os.path.join(path, '.ordigi'): - continue - for filename in filenames: - # If file extension is in `extensions` - # And if file path is not in exclude regexes - # Then append to the list - filename_path = os.path.join(dirname, filename) - if ( - extensions == False - or os.path.splitext(filename)[1][1:].lower() in extensions - and not self.should_exclude(filename_path, compiled_regex_list, False) - ): - yield filename_path - - def check_for_early_morning_photos(self, date): - """check for early hour photos to be grouped with previous day""" - - if date.hour < self.day_begins: - self.logger.info('moving this photo to the previous day for\ - classification purposes (day_begins=' + str(self.day_begins) + ')') - date = date - timedelta(hours=date.hour+1) # push it to the day before for classificiation purposes - - return date + for i, rx in regex.items(): + yield i, rx def get_location_part(self, mask, part, place_name): """Takes a mask for a location and interpolates the actual place names. @@ -188,6 +122,16 @@ class FileSystem(object): return folder_name + def check_for_early_morning_photos(self, date): + """check for early hour photos to be grouped with previous day""" + + if date.hour < self.day_begins: + self.logger.info('moving this photo to the previous day for\ + classification purposes (day_begins=' + str(self.day_begins) + ')') + date = date - timedelta(hours=date.hour+1) # push it to the day before for classificiation purposes + + return date + def get_part(self, item, mask, metadata, db, subdirs, loc): """Parse a specific folder's name given a mask and metadata. @@ -198,14 +142,14 @@ class FileSystem(object): """ # Each item has its own custom logic and we evaluate a single item and return - # the evaluated string. + # the evaluated string. part = '' if item == 'basename': part = os.path.basename(metadata['base_name']) elif item == 'name': # Remove date prefix added to the name. part = metadata['base_name'] - for i, rx in self.match_date_from_string(metadata['base_name']): + for i, rx in self.get_date_regex(metadata['base_name']): part = re.sub(rx, '', part) elif item == 'date': date = self.get_date_taken(metadata) @@ -229,27 +173,49 @@ class FileSystem(object): part = os.path.basename(subdirs) elif item == 'folders': - folders = pathlib.Path(subdirs).parts + folders = Path(subdirs).parts folders = eval(mask) part = os.path.join(*folders) elif item in ('album','camera_make', 'camera_model', 'ext', - 'title'): + 'original_name', 'title'): if metadata[item]: part = metadata[item] - elif item == 'original_name': - # First we check if we have metadata['original_name']. - # We have to do this for backwards compatibility because - # we original did not store this back into EXIF. - if metadata[item]: - part = metadata['original_name'] elif item in 'custom': # Fallback string part = mask[1:-1] return part + def get_path_part(self, this_part, metadata, db, subdirs, loc): + """Build path part + :returns: part (string)""" + for item, regex in self.items.items(): + matched = re.search(regex, this_part) + if matched: + part = self.get_part(item, matched.group()[1:-1], metadata, db, + subdirs, loc) + + part = part.strip() + + if part == '': + # delete separator if any + regex = '[-_ .]?(%[ul])?' + regex + this_part = re.sub(regex, part, this_part) + else: + # Capitalization + u_regex = '%u' + regex + l_regex = '%l' + regex + if re.search(u_regex, this_part): + this_part = re.sub(u_regex, part.upper(), this_part) + elif re.search(l_regex, this_part): + this_part = re.sub(l_regex, part.lower(), this_part) + else: + this_part = re.sub(regex, part, this_part) + + return this_part + def get_path(self, metadata, db, loc, subdirs='', whitespace_sub='_'): """path_format: {%Y-%d-%m}/%u{city}/{album} @@ -262,34 +228,8 @@ class FileSystem(object): path_parts = path_format.split('/') for path_part in path_parts: this_parts = path_part.split('|') - # p = [] for this_part in this_parts: - # parts = '' - for item, regex in self.items.items(): - matched = re.search(regex, this_part) - if matched: - # parts = re.split(mask, this_part) - # parts = this_part.split('%')[1:] - part = self.get_part(item, matched.group()[1:-1], metadata, db, - subdirs, loc) - - part = part.strip() - - if part == '': - # delete separator if any - regex = '[-_ .]?(%[ul])?' + regex - this_part = re.sub(regex, part, this_part) - else: - # Capitalization - u_regex = '%u' + regex - l_regex = '%l' + regex - if re.search(u_regex, this_part): - this_part = re.sub(u_regex, part.upper(), this_part) - elif re.search(l_regex, this_part): - this_part = re.sub(l_regex, part.lower(), this_part) - else: - this_part = re.sub(regex, part, this_part) - + this_part = self.get_path_part(this_part, metadata, db, subdirs, loc) if this_part: # Check if all masks are substituted @@ -314,34 +254,13 @@ class FileSystem(object): return path_string - def match_date_from_string(self, string, user_regex=None): - if user_regex is not None: - matches = re.findall(user_regex, string) - else: - regex = { - # regex to match date format type %Y%m%d, %y%m%d, %d%m%Y, - # etc... - 'a': re.compile( - r'.*[_-]?(?P\d{4})[_-]?(?P\d{2})[_-]?(?P\d{2})[_-]?(?P\d{2})[_-]?(?P\d{2})[_-]?(?P\d{2})'), - 'b': re.compile ( - r'[-_./](?P\d{4})[-_.]?(?P\d{2})[-_.]?(?P\d{2})[-_./]'), - # not very accurate - 'c': re.compile ( - r'[-_./](?P\d{2})[-_.]?(?P\d{2})[-_.]?(?P\d{2})[-_./]'), - 'd': re.compile ( - r'[-_./](?P\d{2})[-_.](?P\d{2})[-_.](?P\d{4})[-_./]') - } - - for i, rx in regex.items(): - yield i, rx - def get_date_from_string(self, string, user_regex=None): # If missing datetime from EXIF data check if filename is in datetime format. # For this use a user provided regex if possible. # Otherwise assume a filename such as IMG_20160915_123456.jpg as default. matches = [] - for i, rx in self.match_date_from_string(string, user_regex): + for i, rx in self.get_date_regex(string, user_regex): match = re.findall(rx, string) if match != []: if i == 'c': @@ -445,43 +364,6 @@ class FileSystem(object): return src_checksum - def sort_file(self, src_path, dest_path, remove_duplicates=True): - '''Copy or move file to dest_path.''' - - mode = self.mode - dry_run = self.dry_run - - # check for collisions - if(src_path == dest_path): - self.logger.info(f'File {dest_path} already sorted') - return None - elif os.path.isfile(dest_path): - self.logger.info(f'File {dest_path} already exist') - if remove_duplicates: - if filecmp.cmp(src_path, dest_path): - self.logger.info(f'File in source and destination are identical. Duplicate will be ignored.') - if(mode == 'move'): - if not dry_run: - os.remove(src_path) - self.logger.info(f'remove: {src_path}') - return None - else: # name is same, but file is different - self.logger.info(f'File in source and destination are different.') - return False - else: - return False - else: - if(mode == 'move'): - if not dry_run: - # Move the processed file into the destination directory - shutil.move(src_path, dest_path) - self.logger.info(f'move: {src_path} -> {dest_path}') - elif mode == 'copy': - if not dry_run: - shutil.copy2(src_path, dest_path) - self.logger.info(f'copy: {src_path} -> {dest_path}') - return True - def check_file(self, src_path, dest_path, src_checksum, db): # Check if file remain the same @@ -502,6 +384,138 @@ class FileSystem(object): return self.summary, has_errors + def should_exclude(self, path, regex_list=set(), needs_compiled=False): + if(len(regex_list) == 0): + return False + + if(needs_compiled): + compiled_list = [] + for regex in regex_list: + compiled_list.append(re.compile(regex)) + regex_list = compiled_list + + return any(regex.search(path) for regex in regex_list) + + def walklevel(self, src_path, maxlevel=None): + """ + Walk into input directory recursively until desired maxlevel + source: https://stackoverflow.com/questions/229186/os-walk-without-digging-into-directories-below + """ + src_path = src_path.rstrip(os.path.sep) + if not os.path.isdir(src_path): + return None + + num_sep = src_path.count(os.path.sep) + for root, dirs, files in os.walk(src_path): + level = root.count(os.path.sep) - num_sep + yield root, dirs, files, level + if maxlevel is not None and level >= maxlevel: + del dirs[:] + + def remove(self, file_path): + if not self.dry_run: + os.remove(file_path) + self.logger.info(f'remove: {file_path}') + + def sort_file(self, src_path, dest_path, remove_duplicates=False): + '''Copy or move file to dest_path.''' + + mode = self.mode + dry_run = self.dry_run + + # check for collisions + if(src_path == dest_path): + self.logger.info(f'File {dest_path} already sorted') + return None + elif os.path.isfile(dest_path): + self.logger.warning(f'File {dest_path} already exist') + if remove_duplicates: + if filecmp.cmp(src_path, dest_path): + self.logger.info(f'File in source and destination are identical. Duplicate will be ignored.') + if(mode == 'move'): + self.remove(src_path) + return None + else: # name is same, but file is different + self.logger.warning(f'File in source and destination are different.') + return False + else: + return False + else: + if(mode == 'move'): + if not dry_run: + # Move the processed file into the destination directory + shutil.move(src_path, dest_path) + self.logger.info(f'move: {src_path} -> {dest_path}') + elif mode == 'copy': + if not dry_run: + shutil.copy2(src_path, dest_path) + self.logger.info(f'copy: {src_path} -> {dest_path}') + return True + + def solve_conflicts(self, conflict_file_list, db, remove_duplicates): + has_errors = False + unresolved_conflicts = [] + while conflict_file_list != []: + file_paths = conflict_file_list.pop() + src_path = file_paths['src_path'] + src_checksum = file_paths['src_checksum'] + dest_path = file_paths['dest_path'] + # Try to sort the file + result = self.sort_file(src_path, dest_path, remove_duplicates) + # remove to conflict file list if file as be successfully copied or ignored + n = 1 + while result is False and n < 100: + # Add appendix to the name + pre, ext = os.path.splitext(dest_path) + if n > 1: + regex = '_' + str(n-1) + ext + pre = re.split(regex, dest_path)[0] + dest_path = pre + '_' + str(n) + ext + # file_list[item]['dest_path'] = dest_path + file_paths['dest_path'] = dest_path + result = self.sort_file(src_path, dest_path, remove_duplicates) + n = n + 1 + + if result is False: + # n > 100: + unresolved_conflicts.append(file_paths) + self.logger.error(f'{self.mode}: too many append for {dest_path}...') + self.summary.append((src_path, False)) + has_errors = True + + if result: + self.summary, has_errors = self.check_file(src_path, + dest_path, src_checksum, db) + + if has_errors: + return False + else: + return True + + def _split_part(self, dedup_regex, path_part, items): + """Split part from regex + :returns: parts""" + regex = dedup_regex.pop(0) + parts = re.split(regex, path_part) + # Loop thought part, search matched regex part and proceed with + # next regex for others parts + for n, part in enumerate(parts): + if re.match(regex, part): + if part[0] in '-_ .': + if n > 0: + # move the separator to previous item + parts[n-1] = parts[n-1] + part[0] + items.append(part[1:]) + else: + items.append(part) + elif dedup_regex != []: + # Others parts + self._split_part(dedup_regex, part, items) + else: + items.append(part) + + return items + def get_files_in_path(self, path, extensions=set()): """Recursively get files which match a path and extension. @@ -539,49 +553,110 @@ class FileSystem(object): return file_list - def _conflict_solved(self, conflict_file_list, item, dest_path): - self.logger.warning(f'Same name already exists...renaming to: {dest_path}') - del(conflict_file_list[item]) + def create_directory(self, directory_path): + """Create a directory if it does not already exist. - def solve_conflicts(self, conflict_file_list, remove_duplicates): - file_list = conflict_file_list.copy() - for item, file_paths in enumerate(file_list): - src_path = file_paths['src_path'] - dest_path = file_paths['dest_path'] - # Try to sort the file - result = self.sort_file(src_path, dest_path, remove_duplicates) - # remove to conflict file list if file as be successfully copied or ignored - if result is True or None: - self._conflict_solved(conflict_file_list, item, dest_path) + :param str directory_name: A fully qualified path of the + to create. + :returns: bool + """ + try: + if os.path.exists(directory_path): + return True else: - n = 1 - while result is False: - if n > 100: - self.logger.warning(f'{self.mode}: to many append for {dest_path}...') - break - # Add appendix to the name - pre, ext = os.path.splitext(dest_path) - dest_path = pre + '_' + str(n) + ext - conflict_file_list[item]['dest_path'] = dest_path - result = self.sort_file(src_path, dest_path, remove_duplicates) - else: - self._conflict_solved(conflict_file_list, item, dest_path) + if not self.dry_run: + os.makedirs(directory_path) + self.logger.info(f'Create {directory_path}') + return True + except OSError: + # OSError is thrown for cases like no permission + pass - return result + return False - def sort_files(self, paths, destination, db, loc, remove_duplicates=False, + def check_path(self, path): + path = os.path.abspath(os.path.expanduser(path)) + + # some error checking + if not os.path.exists(path): + self.logger.error(f'Directory {path} does not exist') + sys.exit(1) + + return path + + def set_utime_from_metadata(self, date_taken, file_path): + """ Set the modification time on the file based on the file name. + """ + + # Initialize date taken to what's returned from the metadata function. + os.utime(file_path, (int(datetime.now().timestamp()), int(date_taken.timestamp()))) + + def dedup_regex(self, path, dedup_regex, db, logger, remove_duplicates=False): + # cycle throught files + has_errors = False + path = self.check_path(path) + # Delimiter regex + delim = r'[-_ .]' + # Numeric date item regex + d = r'\d{2}' + # Numeric date regex + + if len(dedup_regex) == 0: + date_num2 = re.compile(fr'([^0-9]{d}{delim}{d}{delim}|{delim}{d}{delim}{d}[^0-9])') + date_num3 = re.compile(fr'([^0-9]{d}{delim}{d}{delim}{d}{delim}|{delim}{d}{delim}{d}{delim}{d}[^0-9])') + default = re.compile(r'([^-_ .]+[-_ .])') + dedup_regex = [ + date_num3, + date_num2, + default + ] + + conflict_file_list = [] + for src_path, _ in self.get_files_in_path(path): + src_checksum = self.checksum(src_path) + file_path = Path(src_path).relative_to(self.root) + path_parts = file_path.parts + dedup_path = [] + for path_part in path_parts: + items = [] + items = self._split_part(dedup_regex.copy(), path_part, items) + + filtered_items = [] + for item in items: + if item not in filtered_items: + filtered_items.append(item) + + dedup_path.append(''.join(filtered_items)) + + # Dedup path + dest_path = os.path.join(self.root, *dedup_path) + self.create_directory(os.path.dirname(dest_path)) + + result = self.sort_file(src_path, dest_path, remove_duplicates) + if result: + self.summary, has_errors = self.check_file(src_path, + dest_path, src_checksum, db) + elif result is False: + # There is conflict files + conflict_file_list.append({'src_path': src_path, + 'src_checksum': src_checksum, 'dest_path': dest_path}) + + if conflict_file_list != []: + result = self.solve_conflicts(conflict_file_list, db, remove_duplicates) + + if not result: + has_errors = True + + return self.summary, has_errors + + def sort_files(self, paths, db, loc, remove_duplicates=False, ignore_tags=set()): """ Sort files into appropriate folder """ has_errors = False for path in paths: - # some error checking - if not os.path.exists(path): - self.logger.error(f'Directory {path} does not exist') - - path = os.path.expanduser(path) - + path = self.check_path(path) conflict_file_list = [] for src_path, subdirs in self.get_files_in_path(path, extensions=self.filter_by_ext): @@ -596,39 +671,27 @@ class FileSystem(object): # Keep same directory structure file_path = os.path.relpath(src_path, path) - dest_directory = os.path.join(destination, + dest_directory = os.path.join(self.root, os.path.dirname(file_path)) - dest_path = os.path.join(destination, file_path) + dest_path = os.path.join(self.root, file_path) self.create_directory(dest_directory) + result = self.sort_file(src_path, dest_path, remove_duplicates) if result is False: # There is conflict files - conflict_file_list.append({'src_path': src_path, 'dest_path': dest_path}) - result = self.solve_conflicts(conflict_file_list, remove_duplicates) + conflict_file_list.append({'src_path': src_path, + 'src_checksum': src_checksum, 'dest_path': dest_path}) - if result is True: - self.summary, has_errors = self.check_file(src_path, - dest_path, src_checksum, db) - elif result is None: - has_errors = False - else: - self.summary.append((src_path, False)) - has_errors = True + if conflict_file_list != []: + result = self.solve_conflicts(conflict_file_list, db, remove_duplicates) + + if not result: + has_errors = True return self.summary, has_errors - def check_path(self, path): - path = os.path.abspath(os.path.expanduser(path)) - - # some error checking - if not os.path.exists(path): - self.logger.error(f'Directory {path} does not exist') - sys.exit(1) - - return path - def set_hash(self, result, src_path, dest_path, src_checksum, db): if result: # Check if file remain the same @@ -758,21 +821,3 @@ class FileSystem(object): return self.summary, has_errors - def set_utime_from_metadata(self, date_taken, file_path): - """ Set the modification time on the file based on the file name. - """ - - # Initialize date taken to what's returned from the metadata function. - os.utime(file_path, (int(datetime.now().timestamp()), int(date_taken.timestamp()))) - - def should_exclude(self, path, regex_list=set(), needs_compiled=False): - if(len(regex_list) == 0): - return False - - if(needs_compiled): - compiled_list = [] - for regex in regex_list: - compiled_list.append(re.compile(regex)) - regex_list = compiled_list - - return any(regex.search(path) for regex in regex_list) diff --git a/ordigi/constants.py b/ordigi/constants.py index 135a76f..5749307 100644 --- a/ordigi/constants.py +++ b/ordigi/constants.py @@ -36,7 +36,7 @@ script_directory = path.dirname(path.dirname(path.abspath(__file__))) #: Accepted language in responses from MapQuest accepted_language = 'en' -# check python version, required in filesystem.py to trigger appropriate method +# check python version, required in collection.py to trigger appropriate method python_version = version_info.major CONFIG_FILE = f'{application_directory}/ordigi.conf' diff --git a/tests/conftest.py b/tests/conftest.py index f6a7e07..4abe3f9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,12 @@ """ pytest test configuration """ from configparser import RawConfigParser +import os import pytest -from pathlib import Path +from pathlib import Path, PurePath +import random import shutil +import string import tempfile from ordigi.config import Config @@ -17,20 +20,35 @@ def reset_singletons(): _ExifToolProc.instance = None -def copy_sample_files(): - src_path = tempfile.mkdtemp(prefix='ordigi-src') +@pytest.fixture(scope="session") +def sample_files_paths(tmpdir_factory): + tmp_path = tmpdir_factory.mktemp("ordigi-src-") paths = Path(ORDIGI_PATH, 'samples/test_exif').glob('*') file_paths = [x for x in paths if x.is_file()] for file_path in file_paths: - source_path = Path(src_path, file_path.name) + source_path = tmp_path.join(file_path.name) shutil.copyfile(file_path, source_path) - return src_path, file_paths + return tmp_path, file_paths + + +def randomize_files(dest_dir): + # Get files randomly + paths = Path(dest_dir).glob('*') + for path, subdirs, files in os.walk(dest_dir): + for name in files: + file_path = PurePath(path, name) + if bool(random.getrandbits(1)): + with open(file_path, 'wb') as fout: + fout.write(os.urandom(random.randrange(128, 2048))) + if bool(random.getrandbits(1)): + dest_path = PurePath(path, file_path.stem + '_1'+ file_path.suffix) + shutil.copyfile(file_path, dest_path) @pytest.fixture(scope="module") def conf_path(): - tmp_path = tempfile.mkdtemp(prefix='ordigi-') + conf_dir = tempfile.mkdtemp(prefix='ordigi-') conf = RawConfigParser() conf['Path'] = { 'day_begins': '4', @@ -40,11 +58,11 @@ def conf_path(): conf['Geolocation'] = { 'geocoder': 'Nominatium' } - conf_path = Path(tmp_path, "ordigi.conf") + conf_path = Path(conf_dir, "ordigi.conf") config = Config(conf_path) config.write(conf) yield conf_path - shutil.rmtree(tmp_path) + shutil.rmtree(conf_dir) diff --git a/tests/test_filesystem.py b/tests/test_collection.py similarity index 77% rename from tests/test_filesystem.py rename to tests/test_collection.py index f55e1d4..a20494b 100644 --- a/tests/test_filesystem.py +++ b/tests/test_collection.py @@ -7,11 +7,11 @@ import re from sys import platform from time import sleep -from .conftest import copy_sample_files +from .conftest import randomize_files from ordigi import constants from ordigi.database import Db from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool -from ordigi.filesystem import FileSystem +from ordigi.collection import Collection from ordigi.geolocation import GeoLocation from ordigi.media import Media @@ -20,9 +20,11 @@ from ordigi.media import Media class TestDb: pass -class TestFilesystem: - def setup_class(cls): - cls.src_paths, cls.file_paths = copy_sample_files() +class TestCollection: + + @pytest.fixture(autouse=True) + def setup_class(cls, sample_files_paths): + cls.src_paths, cls.file_paths = sample_files_paths cls.path_format = constants.default_path + '/' + constants.default_name def teardown_class(self): @@ -34,8 +36,8 @@ class TestFilesystem: Test all parts """ # Item to search for: - filesystem = FileSystem() - items = filesystem.get_items() + collection = Collection(self.path_format, tmp_path) + items = collection.get_items() masks = [ '{album}', '{basename}', @@ -73,7 +75,7 @@ class TestFilesystem: for mask in masks: matched = re.search(regex, mask) if matched: - part = filesystem.get_part(item, mask[1:-1], + part = collection.get_part(item, mask[1:-1], metadata, Db(tmp_path), subdirs, loc) # check if part is correct assert isinstance(part, str), file_path @@ -92,7 +94,7 @@ class TestFilesystem: assert part == file_path.suffix[1:], file_path elif item == 'name': expected_part = file_path.stem - for i, rx in filesystem.match_date_from_string(expected_part): + for i, rx in collection.get_date_regex(expected_part): part = re.sub(rx, '', expected_part) assert part == expected_part, file_path elif item == 'custom': @@ -112,21 +114,21 @@ class TestFilesystem: assert part == '', file_path - def test_get_date_taken(self): - filesystem = FileSystem() + def test_get_date_taken(self, tmp_path): + collection = Collection(self.path_format, tmp_path) for file_path in self.file_paths: exif_data = ExifToolCaching(str(file_path)).asdict() media = Media(str(file_path)) metadata = media.get_metadata() - date_taken = filesystem.get_date_taken(metadata) + date_taken = collection.get_date_taken(metadata) date_filename = None for tag in media.tags_keys['original_name']: if tag in exif_data: - date_filename = filesystem.get_date_from_string(exif_data[tag]) + date_filename = collection.get_date_from_string(exif_data[tag]) break if not date_filename: - date_filename = filesystem.get_date_from_string(file_path.name) + date_filename = collection.get_date_from_string(file_path.name) if media.metadata['date_original']: assert date_taken == media.metadata['date_original'] @@ -139,31 +141,40 @@ class TestFilesystem: def test_sort_files(self, tmp_path): db = Db(tmp_path) - filesystem = FileSystem(path_format=self.path_format) + collection = Collection(self.path_format, tmp_path) loc = GeoLocation() - summary, has_errors = filesystem.sort_files([self.src_paths], - tmp_path, db, loc) + summary, has_errors = collection.sort_files([self.src_paths], + db, loc) # Summary is created and there is no errors assert summary, summary assert not has_errors, has_errors + randomize_files(tmp_path) + collection = Collection(self.path_format, tmp_path) + loc = GeoLocation() + summary, has_errors = collection.sort_files([self.src_paths], + db, loc) + + # Summary is created and there is no errors + assert summary, summary + assert not has_errors, has_errors # TODO check if path follow path_format - # TODO make another class? + def test_sort_file(self, tmp_path): for mode in 'copy', 'move': - filesystem = FileSystem(path_format=self.path_format, mode=mode) + collection = Collection(self.path_format, tmp_path, mode=mode) # copy mode src_path = Path(self.src_paths, 'photo.png') name = 'photo_' + mode + '.png' dest_path = Path(tmp_path, name) - src_checksum = filesystem.checksum(src_path) - result_copy = filesystem.sort_file(src_path, dest_path) + src_checksum = collection.checksum(src_path) + result_copy = collection.sort_file(src_path, dest_path) assert result_copy # Ensure files remain the same - assert filesystem.checkcomp(dest_path, src_checksum) + assert collection.checkcomp(dest_path, src_checksum) if mode == 'copy': assert src_path.exists() @@ -175,7 +186,9 @@ class TestFilesystem: # TODO check date -# filesystem.sort_files + def test_filter_part(): + _filter_part(dedup_regex, path_part, items) + assert #- Sort similar images into a directory -# filesystem.sort_similar +# collection.sort_similar diff --git a/tests/test_media.py b/tests/test_media.py index bcb935d..1290737 100644 --- a/tests/test_media.py +++ b/tests/test_media.py @@ -5,7 +5,6 @@ import re import shutil import tempfile -from .conftest import copy_sample_files from ordigi import constants from ordigi.media import Media from ordigi.images import Images @@ -16,8 +15,9 @@ CACHING = True class TestMetadata: - def setup_class(cls): - cls.src_paths, cls.file_paths = copy_sample_files() + @pytest.fixture(autouse=True) + def setup_class(cls, sample_files_paths): + cls.src_paths, cls.file_paths = sample_files_paths cls.ignore_tags = ('EXIF:CreateDate', 'File:FileModifyDate', 'File:FileAccessDate', 'EXIF:Make', 'Composite:LightValue') diff --git a/tests/test_dozo.py b/tests/test_ordigi.py similarity index 100% rename from tests/test_dozo.py rename to tests/test_ordigi.py