From 1a78962012968dbf0813b3a2373255d431e192b3 Mon Sep 17 00:00:00 2001 From: Cedric Leporcq Date: Sat, 9 Oct 2021 16:19:33 +0200 Subject: [PATCH] Use Path class for sort_similar_images function and test it --- ordigi.py | 6 +- ordigi/collection.py | 245 ++++++++++++++------------------------- ordigi/images.py | 51 ++++---- tests/test_collection.py | 21 +++- tests/test_media.py | 2 - 5 files changed, 137 insertions(+), 188 deletions(-) diff --git a/ordigi.py b/ordigi.py index cdc4f58..327b505 100755 --- a/ordigi.py +++ b/ordigi.py @@ -300,14 +300,14 @@ def compare(**kwargs): mode='move', dry_run=dry_run, logger=logger) if kwargs['revert_compare']: - summary, has_errors = collection.revertcompare(path, dry_run) + summary, result = collection.revert_compare(path) else: - summary, has_errors = collection.sort_similar_images(path, kwargs['similarity']) + summary, result = collection.sort_similar_images(path, kwargs['similarity']) if verbose or debug: summary.print() - if has_errors: + if not result: sys.exit(1) diff --git a/ordigi/collection.py b/ordigi/collection.py index fd0e82b..b3a0ae4 100644 --- a/ordigi/collection.py +++ b/ordigi/collection.py @@ -518,65 +518,39 @@ class Collection(object): # return file_path and subdir yield file_path - def _create_directory(self, directory_path, path, media): + def _create_directory(self, directory_path, media): """Create a directory if it does not already exist. :param Path: A fully qualified path of the to create. :returns: bool """ - try: - parts = directory_path.relative_to(path).parts - except ValueError: - # directory_path is not the subpath of path - pass - else: - for i, part in enumerate(parts): - dir_path = self.root / Path(*parts[0:i+1]) - if dir_path.is_file(): - self.logger.warning(f'Target directory {dir_path} is a file') - # Rename the src_file - if self.interactive: - prompt = [ - inquirer.Text('file_path', message="New name for"\ - f"'{dir_path.name}' file"), - ] - answers = inquirer.prompt(prompt, theme=self.theme) - file_path = dir_path.parent / answers['file_path'] - else: - file_path = dir_path.parent / (dir_path.name + '_file') + parts = directory_path.relative_to(self.root).parts + for i, part in enumerate(parts): + dir_path = self.root / Path(*parts[0:i+1]) + if dir_path.is_file(): + self.logger.warning(f'Target directory {dir_path} is a file') + # Rename the src_file + if self.interactive: + prompt = [ + inquirer.Text('file_path', message="New name for"\ + f"'{dir_path.name}' file"), + ] + answers = inquirer.prompt(prompt, theme=self.theme) + file_path = dir_path.parent / answers['file_path'] + else: + file_path = dir_path.parent / (dir_path.name + '_file') - self.logger.warning(f'Renaming {dir_path} to {file_path}') - shutil.move(dir_path, file_path) - for media in medias: - if media.file_path == dir_path: - media.file_path = file_path - break + self.logger.warning(f'Renaming {dir_path} to {file_path}') + shutil.move(dir_path, file_path) + for media in medias: + if media.file_path == dir_path: + media.file_path = file_path + break if not self.dry_run: directory_path.mkdir(parents=True, exist_ok=True) self.logger.info(f'Create {directory_path}') - def create_directory(self, directory_path): - """Create a directory if it does not already exist. - - :param str directory_name: A fully qualified path of the - to create. - :returns: bool - """ - try: - if os.path.exists(directory_path): - return True - else: - if not self.dry_run: - os.makedirs(directory_path) - self.logger.info(f'Create {directory_path}') - return True - except OSError: - # OSError is thrown for cases like no permission - pass - - return False - def _check_path(self, path): """ :param: str path @@ -591,16 +565,6 @@ class Collection(object): return path - def check_path(self, path): - path = os.path.abspath(os.path.expanduser(path)) - - # some error checking - if not os.path.exists(path): - self.logger.error(f'Directory {path} does not exist') - sys.exit(1) - - return path - def set_utime_from_metadata(self, date_media, file_path): """ Set the modification time on the file based on the file name. """ @@ -648,7 +612,7 @@ class Collection(object): # Dedup path dest_path = self.root.joinpath(*dedup_path) - self._create_directory(dest_path.parent.name, path, media) + self._create_directory(dest_path.parent.name, media) result = self.sort_file(src_path, dest_path, remove_duplicates) @@ -720,20 +684,16 @@ class Collection(object): media = Media(src_path, path, self.album_from_folder, ignore_tags, self.interactive, self.logger, self.use_date_filename, self.use_file_dates) - if media: - metadata = media.get_metadata(self.root, loc, self.db, self.cache) - # Get the destination path according to metadata - relpath = Path(self.get_path(metadata)) - else: - # Keep same directory structure - relpath = src_path.relative_to(path) + metadata = media.get_metadata(self.root, loc, self.db, self.cache) + # Get the destination path according to metadata + relpath = Path(self.get_path(metadata)) files_data.append((copy(media), relpath)) # Create directories for media, relpath in files_data: dest_directory = self.root / relpath.parent - self._create_directory(dest_directory, path, media) + self._create_directory(dest_directory, media) # sort files and solve conflicts for media, relpath in files_data: @@ -764,40 +724,11 @@ class Collection(object): return self.summary, record - def set_hash(self, result, src_path, dest_path, src_checksum): - if result: - # Check if file remain the same - result = self._checkcomp(dest_path, src_checksum) - has_errors = False - if result: - if not self.dry_run: - self._add_db_data(dest_path, metadata, checksum) - - if dest_path: - self.logger.info(f'{src_path} -> {dest_path}') - - self.summary.append((src_path, dest_path)) - - else: - self.logger.error(f'Files {src_path} and {dest_path} are not identical') - # sys.exit(1) - self.summary.append((src_path, False)) - has_errors = True - else: - self.summary.append((src_path, False)) - has_errors = True - - return has_errors - - def move_file(self, img_path, dest_path, checksum): + def move_file(self, img_path, dest_path): if not self.dry_run: - try: - shutil.move(img_path, dest_path) - except OSError as error: - self.logger.error(error) + shutil.move(img_path, dest_path) self.logger.info(f'move: {img_path} -> {dest_path}') - return self.set_hash(True, img_path, dest_path, checksum) def _get_images(self, path): """ @@ -813,88 +744,86 @@ class Collection(object): image = Image(src_path) if image.is_image(): - yield src_path + yield image def sort_similar_images(self, path, similarity=80): - has_errors = False + result = True path = self._check_path(path) - img_paths = set([ x for x in self._get_images(path) ]) - i = Images(img_paths, logger=self.logger) - for image in img_paths: - if not os.path.isfile(image): + images = set([ x for x in self._get_images(path) ]) + i = Images(images, logger=self.logger) + for image in images: + if not image.img_path.is_file(): continue - checksum1 = utils.checksum(image) - # Process files - # media = Media(src_path, False, self.logger) - # TODO compare metadata - # if media: - # metadata = media.get_metadata() + media_ref = Media(image.img_path, path, self.logger) + # Todo: compare metadata? + metadata = media_ref.get_metadata(self.root, db=self.db, cache=self.cache) similar = False moved_imgs = set() for img_path in i.find_similar(image, similarity): similar = True - checksum2 = utils.checksum(img_path) + media = Media(img_path, path, self.logger) + metadata = media.get_metadata(self.root, db=self.db, cache=self.cache) # move image into directory - name = os.path.splitext(os.path.basename(image))[0] + name = img_path.stem directory_name = 'similar_to_' + name - dest_directory = os.path.join(os.path.dirname(img_path), - directory_name) - dest_path = os.path.join(dest_directory, os.path.basename(img_path)) + dest_directory = img_path.parent / directory_name + dest_path = dest_directory / img_path.name + dest_directory.mkdir(exist_ok=True) - result = self.create_directory(dest_directory) # Move the simlars file into the destination directory - if result: - result = self.move_file(img_path, dest_path, checksum2) - moved_imgs.add(img_path) - if not result: - has_errors = True + self.move_file(img_path, dest_path) + moved_imgs.add(img_path) + if self._record_file(img_path, dest_path, media): + self.summary.append((img_path, dest_path)) else: - has_errors = True - + self.summary.append((img_path, False)) + result = False if similar: - dest_path = os.path.join(dest_directory, - os.path.basename(image)) - result = self.move_file(image, dest_path, checksum1) - moved_imgs.add(image) - if not result: - has_errors = True + img_path = image.img_path + dest_path = dest_directory / img_path.name + self.move_file(img_path, dest_path) + moved_imgs.add(img_path) + if self._record_file(img_path, dest_path, media_ref): + self.summary.append((img_path, dest_path)) + else: + self.summary.append((img_path, False)) + result = False - # for moved_img in moved_imgs: - # os.remove(moved_img) - - return self.summary, has_errors + return self.summary, result def revert_compare(self, path): - has_errors = False - path = self.check_path(path) - for dirname, dirnames, filenames, level in self.walklevel(path, None): - if dirname == os.path.join(path, '.ordigi'): - continue + result = True + path = self._check_path(path) + dirnames = set() + moved_files = set() + for src_path in self._get_files_in_path(path, glob=self.glob, + extensions=self.filter_by_ext): + dirname = src_path.parent.name if dirname.find('similar_to') == 0: - continue + dirnames.add(src_path.parent) - for subdir in dirnames: - if subdir.find('similar_to') == 0: - file_names = os.listdir(os.path.abspath(os.path.join(dirname, subdir))) - for file_name in file_names: - # move file to initial folder - img_path = os.path.join(dirname, subdir, file_name) - if os.path.isdir(img_path): - continue - checksum = utils.checksum(img_path) - dest_path = os.path.join(dirname, os.path.basename(img_path)) - result = self.move_file(img_path, dest_path, checksum) - if not result: - has_errors = True - # remove directory - try: - os.rmdir(os.path.join (dirname, subdir)) - except OSError as error: - self.logger.error(error) + # move file to initial folder and update metadata + media = Media(src_path, path, self.logger) + metadata = media.get_metadata(self.root, db=self.db, cache=self.cache) + dest_path = Path(src_path.parent.parent, src_path.name) + self.move_file(src_path, dest_path) + moved_files.add(src_path) + if self._record_file(src_path, dest_path, media): + self.summary.append((src_path, dest_path)) + else: + self.summary.append((src_path, False)) + result = False - return self.summary, has_errors + for dirname in dirnames: + # remove 'similar_to*' directories + try: + dirname.rmdir() + except OSError as error: + self.logger.error(error) + + return self.summary, result diff --git a/ordigi/images.py b/ordigi/images.py index 1e06cb2..9e6988b 100644 --- a/ordigi/images.py +++ b/ordigi/images.py @@ -61,8 +61,11 @@ class Image(): return True def get_hash(self): - with img.open(self.img_path) as img_path: - return imagehash.average_hash(img_path, self.hash_size).hash + try: + with img.open(self.img_path) as image: + return imagehash.average_hash(image, self.hash_size).hash + except (OSError, UnidentifiedImageError): + return None class Images(): @@ -75,33 +78,31 @@ class Images(): #: Valid extensions for image files. extensions = ('arw', 'cr2', 'dng', 'gif', 'heic', 'jpeg', 'jpg', 'nef', 'png', 'rw2') - def __init__(self, img_paths=set(), hash_size=8, logger=logging.getLogger()): + def __init__(self, images=set(), hash_size=8, logger=logging.getLogger()): - self.img_paths = img_paths + self.images = images self.duplicates = [] self.hash_size = hash_size self.logger = logger def add_images(self, file_paths): - ''':returns: img_path generator - ''' for img_path in file_paths: image = Image(img_path) if image.is_image(): - self.img_paths.add(img_path) + self.images.add(image) def get_images_hashes(self): """Get image hashes""" hashes = {} # Searching for duplicates. - for img_path in self.img_paths: - with img.open(img_path) as img: + for image in self.images: + with img.open(image.img_path) as img: yield imagehash.average_hash(img, self.hash_size) def find_duplicates(self, img_path): """Find duplicates""" duplicates = [] - for temp_hash in get_images_hashes(self.img_paths): + for temp_hash in get_images_hashes(): if temp_hash in hashes: self.logger.info("Duplicate {} \nfound for image {}\n".format(img_path, hashes[temp_hash])) duplicates.append(img_path) @@ -136,28 +137,34 @@ class Images(): return similarity_img def find_similar(self, image, similarity=80): - ''' + """ Find similar images :returns: img_path generator - ''' - hash1 = '' - image = Image(image) - if image.is_image(): - hash1 = image.get_hash() + """ + hash1 = image.get_hash() - self.logger.info(f'Finding similar images to {image}') + if hash1 is None: + return None + + self.logger.info(f'Finding similar images to {image.img_path}') threshold = 1 - similarity/100 diff_limit = int(threshold*(self.hash_size**2)) - for img_path in self.img_paths: - if img_path == image: + for img in self.images: + if not img.img_path.is_file(): continue - hash2 = image.get_hash() + if img.img_path == image.img_path: + continue + hash2 = img.get_hash() + # Be sure that hash are not None + if hash2 is None: + continue + img_diff = self.diff(hash1, hash2) if img_diff <= diff_limit: similarity_img = self.similarity(img_diff) - self.logger.info(f'{img_path} image found {similarity_img}% similar to {image}') - yield img_path + self.logger.info(f'{img.img_path} image found {similarity_img}% similar to {image}') + yield img.img_path diff --git a/tests/test_collection.py b/tests/test_collection.py index 87f5ad9..6062990 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -11,10 +11,11 @@ from time import sleep from .conftest import randomize_files, randomize_db from ordigi import constants +from ordigi.collection import Collection from ordigi.database import Sqlite from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool -from ordigi.collection import Collection from ordigi.geolocation import GeoLocation +from ordigi import log from ordigi.media import Media from ordigi import utils @@ -179,7 +180,21 @@ class TestCollection: for path in paths: assert isinstance(path, Path) + def test_sort_similar_images(self, tmp_path): + path = tmp_path / 'collection' + shutil.copytree(self.src_path, path) + logger = log.get_logger(True, True) + collection = Collection(path, None, mode='move', logger=logger) + summary, result = collection.sort_similar_images(path, similarity=60) + + # Summary is created and there is no errors + assert summary, summary + assert result, result + + summary, result = collection.revert_compare(path) + + # Summary is created and there is no errors + assert summary, summary + assert result, result -# TODO Sort similar images into a directory -# collection.sort_similar diff --git a/tests/test_media.py b/tests/test_media.py index 1adb745..db8f13f 100644 --- a/tests/test_media.py +++ b/tests/test_media.py @@ -74,8 +74,6 @@ class TestMetadata: assert not media.has_exif_data() def test_get_date_media(self): - # collection = Collection(tmp_path, self.path_format, - # use_date_filename=True, use_file_dates=True) for file_path in self.file_paths: exif_data = ExifToolCaching(str(file_path)).asdict() media = Media(file_path, self.src_path, use_date_filename=True,