Use Path class for sort_similar_images function and test it

This commit is contained in:
Cédric Leporcq 2021-10-09 16:19:33 +02:00
parent 4156e769d0
commit 1a78962012
5 changed files with 137 additions and 188 deletions

View File

@ -300,14 +300,14 @@ def compare(**kwargs):
mode='move', dry_run=dry_run, logger=logger)
if kwargs['revert_compare']:
summary, has_errors = collection.revertcompare(path, dry_run)
summary, result = collection.revert_compare(path)
else:
summary, has_errors = collection.sort_similar_images(path, kwargs['similarity'])
summary, result = collection.sort_similar_images(path, kwargs['similarity'])
if verbose or debug:
summary.print()
if has_errors:
if not result:
sys.exit(1)

View File

@ -518,18 +518,13 @@ class Collection(object):
# return file_path and subdir
yield file_path
def _create_directory(self, directory_path, path, media):
def _create_directory(self, directory_path, media):
"""Create a directory if it does not already exist.
:param Path: A fully qualified path of the to create.
:returns: bool
"""
try:
parts = directory_path.relative_to(path).parts
except ValueError:
# directory_path is not the subpath of path
pass
else:
parts = directory_path.relative_to(self.root).parts
for i, part in enumerate(parts):
dir_path = self.root / Path(*parts[0:i+1])
if dir_path.is_file():
@ -556,27 +551,6 @@ class Collection(object):
directory_path.mkdir(parents=True, exist_ok=True)
self.logger.info(f'Create {directory_path}')
def create_directory(self, directory_path):
"""Create a directory if it does not already exist.
:param str directory_name: A fully qualified path of the
to create.
:returns: bool
"""
try:
if os.path.exists(directory_path):
return True
else:
if not self.dry_run:
os.makedirs(directory_path)
self.logger.info(f'Create {directory_path}')
return True
except OSError:
# OSError is thrown for cases like no permission
pass
return False
def _check_path(self, path):
"""
:param: str path
@ -591,16 +565,6 @@ class Collection(object):
return path
def check_path(self, path):
path = os.path.abspath(os.path.expanduser(path))
# some error checking
if not os.path.exists(path):
self.logger.error(f'Directory {path} does not exist')
sys.exit(1)
return path
def set_utime_from_metadata(self, date_media, file_path):
""" Set the modification time on the file based on the file name.
"""
@ -648,7 +612,7 @@ class Collection(object):
# Dedup path
dest_path = self.root.joinpath(*dedup_path)
self._create_directory(dest_path.parent.name, path, media)
self._create_directory(dest_path.parent.name, media)
result = self.sort_file(src_path, dest_path, remove_duplicates)
@ -720,20 +684,16 @@ class Collection(object):
media = Media(src_path, path, self.album_from_folder,
ignore_tags, self.interactive, self.logger,
self.use_date_filename, self.use_file_dates)
if media:
metadata = media.get_metadata(self.root, loc, self.db, self.cache)
# Get the destination path according to metadata
relpath = Path(self.get_path(metadata))
else:
# Keep same directory structure
relpath = src_path.relative_to(path)
files_data.append((copy(media), relpath))
# Create directories
for media, relpath in files_data:
dest_directory = self.root / relpath.parent
self._create_directory(dest_directory, path, media)
self._create_directory(dest_directory, media)
# sort files and solve conflicts
for media, relpath in files_data:
@ -764,40 +724,11 @@ class Collection(object):
return self.summary, record
def set_hash(self, result, src_path, dest_path, src_checksum):
if result:
# Check if file remain the same
result = self._checkcomp(dest_path, src_checksum)
has_errors = False
if result:
def move_file(self, img_path, dest_path):
if not self.dry_run:
self._add_db_data(dest_path, metadata, checksum)
if dest_path:
self.logger.info(f'{src_path} -> {dest_path}')
self.summary.append((src_path, dest_path))
else:
self.logger.error(f'Files {src_path} and {dest_path} are not identical')
# sys.exit(1)
self.summary.append((src_path, False))
has_errors = True
else:
self.summary.append((src_path, False))
has_errors = True
return has_errors
def move_file(self, img_path, dest_path, checksum):
if not self.dry_run:
try:
shutil.move(img_path, dest_path)
except OSError as error:
self.logger.error(error)
self.logger.info(f'move: {img_path} -> {dest_path}')
return self.set_hash(True, img_path, dest_path, checksum)
def _get_images(self, path):
"""
@ -813,88 +744,86 @@ class Collection(object):
image = Image(src_path)
if image.is_image():
yield src_path
yield image
def sort_similar_images(self, path, similarity=80):
has_errors = False
result = True
path = self._check_path(path)
img_paths = set([ x for x in self._get_images(path) ])
i = Images(img_paths, logger=self.logger)
for image in img_paths:
if not os.path.isfile(image):
images = set([ x for x in self._get_images(path) ])
i = Images(images, logger=self.logger)
for image in images:
if not image.img_path.is_file():
continue
checksum1 = utils.checksum(image)
# Process files
# media = Media(src_path, False, self.logger)
# TODO compare metadata
# if media:
# metadata = media.get_metadata()
media_ref = Media(image.img_path, path, self.logger)
# Todo: compare metadata?
metadata = media_ref.get_metadata(self.root, db=self.db, cache=self.cache)
similar = False
moved_imgs = set()
for img_path in i.find_similar(image, similarity):
similar = True
checksum2 = utils.checksum(img_path)
media = Media(img_path, path, self.logger)
metadata = media.get_metadata(self.root, db=self.db, cache=self.cache)
# move image into directory
name = os.path.splitext(os.path.basename(image))[0]
name = img_path.stem
directory_name = 'similar_to_' + name
dest_directory = os.path.join(os.path.dirname(img_path),
directory_name)
dest_path = os.path.join(dest_directory, os.path.basename(img_path))
dest_directory = img_path.parent / directory_name
dest_path = dest_directory / img_path.name
dest_directory.mkdir(exist_ok=True)
result = self.create_directory(dest_directory)
# Move the simlars file into the destination directory
if result:
result = self.move_file(img_path, dest_path, checksum2)
self.move_file(img_path, dest_path)
moved_imgs.add(img_path)
if not result:
has_errors = True
if self._record_file(img_path, dest_path, media):
self.summary.append((img_path, dest_path))
else:
has_errors = True
self.summary.append((img_path, False))
result = False
if similar:
dest_path = os.path.join(dest_directory,
os.path.basename(image))
result = self.move_file(image, dest_path, checksum1)
moved_imgs.add(image)
if not result:
has_errors = True
img_path = image.img_path
dest_path = dest_directory / img_path.name
self.move_file(img_path, dest_path)
moved_imgs.add(img_path)
if self._record_file(img_path, dest_path, media_ref):
self.summary.append((img_path, dest_path))
else:
self.summary.append((img_path, False))
result = False
# for moved_img in moved_imgs:
# os.remove(moved_img)
return self.summary, has_errors
return self.summary, result
def revert_compare(self, path):
has_errors = False
path = self.check_path(path)
for dirname, dirnames, filenames, level in self.walklevel(path, None):
if dirname == os.path.join(path, '.ordigi'):
continue
result = True
path = self._check_path(path)
dirnames = set()
moved_files = set()
for src_path in self._get_files_in_path(path, glob=self.glob,
extensions=self.filter_by_ext):
dirname = src_path.parent.name
if dirname.find('similar_to') == 0:
continue
dirnames.add(src_path.parent)
for subdir in dirnames:
if subdir.find('similar_to') == 0:
file_names = os.listdir(os.path.abspath(os.path.join(dirname, subdir)))
for file_name in file_names:
# move file to initial folder
img_path = os.path.join(dirname, subdir, file_name)
if os.path.isdir(img_path):
continue
checksum = utils.checksum(img_path)
dest_path = os.path.join(dirname, os.path.basename(img_path))
result = self.move_file(img_path, dest_path, checksum)
if not result:
has_errors = True
# remove directory
# move file to initial folder and update metadata
media = Media(src_path, path, self.logger)
metadata = media.get_metadata(self.root, db=self.db, cache=self.cache)
dest_path = Path(src_path.parent.parent, src_path.name)
self.move_file(src_path, dest_path)
moved_files.add(src_path)
if self._record_file(src_path, dest_path, media):
self.summary.append((src_path, dest_path))
else:
self.summary.append((src_path, False))
result = False
for dirname in dirnames:
# remove 'similar_to*' directories
try:
os.rmdir(os.path.join (dirname, subdir))
dirname.rmdir()
except OSError as error:
self.logger.error(error)
return self.summary, has_errors
return self.summary, result

View File

@ -61,8 +61,11 @@ class Image():
return True
def get_hash(self):
with img.open(self.img_path) as img_path:
return imagehash.average_hash(img_path, self.hash_size).hash
try:
with img.open(self.img_path) as image:
return imagehash.average_hash(image, self.hash_size).hash
except (OSError, UnidentifiedImageError):
return None
class Images():
@ -75,33 +78,31 @@ class Images():
#: Valid extensions for image files.
extensions = ('arw', 'cr2', 'dng', 'gif', 'heic', 'jpeg', 'jpg', 'nef', 'png', 'rw2')
def __init__(self, img_paths=set(), hash_size=8, logger=logging.getLogger()):
def __init__(self, images=set(), hash_size=8, logger=logging.getLogger()):
self.img_paths = img_paths
self.images = images
self.duplicates = []
self.hash_size = hash_size
self.logger = logger
def add_images(self, file_paths):
''':returns: img_path generator
'''
for img_path in file_paths:
image = Image(img_path)
if image.is_image():
self.img_paths.add(img_path)
self.images.add(image)
def get_images_hashes(self):
"""Get image hashes"""
hashes = {}
# Searching for duplicates.
for img_path in self.img_paths:
with img.open(img_path) as img:
for image in self.images:
with img.open(image.img_path) as img:
yield imagehash.average_hash(img, self.hash_size)
def find_duplicates(self, img_path):
"""Find duplicates"""
duplicates = []
for temp_hash in get_images_hashes(self.img_paths):
for temp_hash in get_images_hashes():
if temp_hash in hashes:
self.logger.info("Duplicate {} \nfound for image {}\n".format(img_path, hashes[temp_hash]))
duplicates.append(img_path)
@ -136,28 +137,34 @@ class Images():
return similarity_img
def find_similar(self, image, similarity=80):
'''
"""
Find similar images
:returns: img_path generator
'''
hash1 = ''
image = Image(image)
if image.is_image():
"""
hash1 = image.get_hash()
self.logger.info(f'Finding similar images to {image}')
if hash1 is None:
return None
self.logger.info(f'Finding similar images to {image.img_path}')
threshold = 1 - similarity/100
diff_limit = int(threshold*(self.hash_size**2))
for img_path in self.img_paths:
if img_path == image:
for img in self.images:
if not img.img_path.is_file():
continue
hash2 = image.get_hash()
if img.img_path == image.img_path:
continue
hash2 = img.get_hash()
# Be sure that hash are not None
if hash2 is None:
continue
img_diff = self.diff(hash1, hash2)
if img_diff <= diff_limit:
similarity_img = self.similarity(img_diff)
self.logger.info(f'{img_path} image found {similarity_img}% similar to {image}')
yield img_path
self.logger.info(f'{img.img_path} image found {similarity_img}% similar to {image}')
yield img.img_path

View File

@ -11,10 +11,11 @@ from time import sleep
from .conftest import randomize_files, randomize_db
from ordigi import constants
from ordigi.collection import Collection
from ordigi.database import Sqlite
from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool
from ordigi.collection import Collection
from ordigi.geolocation import GeoLocation
from ordigi import log
from ordigi.media import Media
from ordigi import utils
@ -179,7 +180,21 @@ class TestCollection:
for path in paths:
assert isinstance(path, Path)
def test_sort_similar_images(self, tmp_path):
path = tmp_path / 'collection'
shutil.copytree(self.src_path, path)
logger = log.get_logger(True, True)
collection = Collection(path, None, mode='move', logger=logger)
summary, result = collection.sort_similar_images(path, similarity=60)
# Summary is created and there is no errors
assert summary, summary
assert result, result
summary, result = collection.revert_compare(path)
# Summary is created and there is no errors
assert summary, summary
assert result, result
# TODO Sort similar images into a directory
# collection.sort_similar

View File

@ -74,8 +74,6 @@ class TestMetadata:
assert not media.has_exif_data()
def test_get_date_media(self):
# collection = Collection(tmp_path, self.path_format,
# use_date_filename=True, use_file_dates=True)
for file_path in self.file_paths:
exif_data = ExifToolCaching(str(file_path)).asdict()
media = Media(file_path, self.src_path, use_date_filename=True,