Refactoring sort_file

This commit is contained in:
Cédric Leporcq 2021-10-27 00:06:38 +02:00
parent 4184d753ac
commit 7c936fc32c
5 changed files with 557 additions and 404 deletions

View File

@ -6,7 +6,7 @@
day_begins=4
dirs_path={%Y}/{%m-%b}-{city}-{folder}
name={%Y%m%d-%H%M%S}-%u{original_name}.%l{ext}
name={%Y%m%d-%H%M%S}-%u{original_name}|%u{basename}.%l{ext}
[Exclusions]
path1=**/.directory

View File

@ -29,6 +29,12 @@ _logger_options = [
),
]
_input_options = [
click.option(
'--interactive', '-i', default=False, is_flag=True, help="Interactive mode"
),
]
_dry_run_options = [
click.option(
'--dry-run',
@ -38,7 +44,7 @@ _dry_run_options = [
)
]
_filter_option = [
_filter_options = [
click.option(
'--exclude',
'-e',
@ -59,6 +65,50 @@ _filter_option = [
click.option('--glob', '-g', default='**/*', help='Glob file selection'),
]
_sort_options = [
click.option(
'--album-from-folder',
default=False,
is_flag=True,
help="Use images' folders as their album names.",
),
click.option(
'--ignore-tags',
'-I',
default=set(),
multiple=True,
help='Specific tags or group that will be ignored when\
searching for file data. Example \'File:FileModifyDate\' or \'Filename\'',
),
click.option(
'--path-format',
'-p',
default=None,
help='Custom featured path format',
),
click.option(
'--remove-duplicates',
'-R',
default=False,
is_flag=True,
help='True to remove files that are exactly the same in name\
and a file hash',
),
click.option(
'--use-date-filename',
'-f',
default=False,
is_flag=True,
help="Use filename date for media original date.",
),
click.option(
'--use-file-dates',
'-F',
default=False,
is_flag=True,
help="Use file date created or modified for media original date.",
),
]
def print_help(command):
click.echo(command.get_help(click.Context(sort)))
@ -79,27 +129,25 @@ def _get_exclude(opt, exclude):
exclude = opt['exclude']
return set(exclude)
def get_collection_config(root):
return Config(os.path.join(root, '.ordigi', 'ordigi.conf'))
@click.command('sort')
def _get_paths(paths, root):
if not paths:
paths = root
paths = set(paths)
return paths, root
@click.command('import')
@add_options(_logger_options)
@add_options(_input_options)
@add_options(_dry_run_options)
@add_options(_filter_option)
@click.option(
'--album-from-folder',
default=False,
is_flag=True,
help="Use images' folders as their album names.",
)
@click.option(
'--destination',
'-d',
type=click.Path(file_okay=False),
default=None,
help='Sort files into this directory.',
)
@click.option('--clean', '-C', default=False, is_flag=True, help='Clean empty folders')
@add_options(_filter_options)
@add_options(_sort_options)
@click.option(
'--copy',
'-c',
@ -108,31 +156,70 @@ def get_collection_config(root):
help='True if you want files to be copied over from src_dir to\
dest_dir rather than moved',
)
@click.option(
'--ignore-tags',
'-I',
default=set(),
multiple=True,
help='Specific tags or group that will be ignored when\
searching for file data. Example \'File:FileModifyDate\' or \'Filename\'',
)
@click.option(
'--interactive', '-i', default=False, is_flag=True, help="Interactive mode"
)
@click.option(
'--path-format',
'-p',
default=None,
help='set custom featured path format',
)
@click.option(
'--remove-duplicates',
'-R',
default=False,
is_flag=True,
help='True to remove files that are exactly the same in name\
and a file hash',
)
@click.argument('src', required=False, nargs=-1, type=click.Path())
@click.argument('dest', required=True, nargs=1, type=click.Path())
def _import(**kwargs):
"""Sort files or directories by reading their EXIF and organizing them
according to ordigi.conf preferences.
"""
log_level = log.level(kwargs['verbose'], kwargs['debug'])
logger = log.get_logger(level=log_level)
src_paths = kwargs['src']
root = kwargs['dest']
src_paths, root = _get_paths(src_paths, root)
if kwargs['copy']:
import_mode = 'copy'
else:
import_mode = 'move'
config = get_collection_config(root)
opt = config.get_options()
path_format = opt['path_format']
if kwargs['path_format']:
path_format = kwargs['path_format']
exclude = _get_exclude(opt, kwargs['exclude'])
filter_by_ext = set(kwargs['filter_by_ext'])
collection = Collection(
root,
kwargs['album_from_folder'],
False,
opt['day_begins'],
kwargs['dry_run'],
exclude,
filter_by_ext,
kwargs['glob'],
kwargs['interactive'],
logger,
opt['max_deep'],
kwargs['use_date_filename'],
kwargs['use_file_dates'],
)
loc = GeoLocation(opt['geocoder'], logger, opt['prefer_english_names'], opt['timeout'])
summary = collection.sort_files(
src_paths, path_format, loc, import_mode, kwargs['remove_duplicates'], kwargs['ignore_tags']
)
if log_level < 30:
summary.print()
if summary.errors:
sys.exit(1)
@click.command('sort')
@add_options(_logger_options)
@add_options(_input_options)
@add_options(_dry_run_options)
@add_options(_filter_options)
@add_options(_sort_options)
@click.option('--clean', '-C', default=False, is_flag=True, help='Clean empty folders')
@click.option(
'--reset-cache',
'-r',
@ -140,56 +227,25 @@ def get_collection_config(root):
is_flag=True,
help='Regenerate the hash.json and location.json database ',
)
@click.option(
'--use-date-filename',
'-f',
default=False,
is_flag=True,
help="Use filename date for media original date.",
)
@click.option(
'--use-file-dates',
'-F',
default=False,
is_flag=True,
help="Use file date created or modified for media original date.",
)
@click.argument('paths', required=True, nargs=-1, type=click.Path())
def sort(**kwargs):
@click.argument('subdirs', required=False, nargs=-1, type=click.Path())
@click.argument('dest', required=True, nargs=1, type=click.Path())
def _sort(**kwargs):
"""Sort files or directories by reading their EXIF and organizing them
according to ordigi.conf preferences.
"""
root = kwargs['destination']
log_level = log.level(kwargs['verbose'], kwargs['debug'])
paths = kwargs['paths']
if kwargs['copy']:
mode = 'copy'
else:
mode = 'move'
logger = log.get_logger(level=log_level)
subdirs = kwargs['subdirs']
root = kwargs['dest']
paths, root = _get_paths(subdirs, root)
paths = os.path.join(root, subdirs)
cache = True
if kwargs['reset_cache']:
cache = False
if len(paths) > 1:
if not root:
# Use last path argument as destination
root = paths[-1]
paths = paths[0:-1]
elif paths:
# Source and destination are the same
root = paths[0]
else:
logger.error(f'`ordigi sort` need at least one path argument')
sys.exit(1)
paths = set(paths)
config = get_collection_config(root)
opt = config.get_options()
@ -212,15 +268,14 @@ def sort(**kwargs):
kwargs['interactive'],
logger,
opt['max_deep'],
mode,
kwargs['use_date_filename'],
kwargs['use_file_dates'],
)
loc = GeoLocation(opt['geocoder'], logger, opt['prefer_english_names'], opt['timeout'])
summary, result = collection.sort_files(
paths, loc, kwargs['remove_duplicates'], kwargs['ignore_tags']
summary = collection.sort_files(
paths, path_format, loc, kwargs['remove_duplicates'], kwargs['ignore_tags']
)
if kwargs['clean']:
@ -229,14 +284,14 @@ def sort(**kwargs):
if log_level < 30:
summary.print()
if not result:
if summary.errors:
sys.exit(1)
@click.command('clean')
@add_options(_logger_options)
@add_options(_dry_run_options)
@add_options(_filter_option)
@add_options(_filter_options)
@click.option(
'--dedup-regex',
'-d',
@ -260,32 +315,25 @@ def sort(**kwargs):
is_flag=True,
help='True to remove files that are exactly the same in name and a file hash',
)
@click.option(
'--root',
'-r',
type=click.Path(file_okay=False),
default=None,
help='Root dir of media collection. If not set, use path',
)
@click.argument('path', required=True, nargs=1, type=click.Path())
def clean(**kwargs):
@click.argument('subdirs', required=False, nargs=-1, type=click.Path())
@click.argument('dest', required=True, nargs=1, type=click.Path())
def _clean(**kwargs):
"""Remove empty folders
Usage: clean [--verbose|--debug] directory [removeRoot]"""
result = True
dry_run = kwargs['dry_run']
folders = kwargs['folders']
log_level = log.level(kwargs['verbose'], kwargs['debug'])
root = kwargs['root']
path = kwargs['path']
logger = log.get_logger(level=log_level)
subdirs = kwargs['subdirs']
root = kwargs['dest']
paths, root = _get_paths(subdirs, root)
paths = os.path.join(root, subdirs)
clean_all = False
if not folders:
clean_all = True
if not root:
root = path
config = get_collection_config(root)
opt = config.get_options()
@ -303,30 +351,35 @@ def clean(**kwargs):
max_deep=opt['max_deep'],
)
if kwargs['path_string']:
dedup_regex = list(kwargs['dedup_regex'])
summary, result = collection.dedup_regex(
path, dedup_regex, kwargs['remove_duplicates']
)
for path in paths:
if kwargs['path_string']:
dedup_regex = list(kwargs['dedup_regex'])
collection.dedup_regex(
path, dedup_regex, kwargs['remove_duplicates']
)
if clean_all or folders:
summary = collection.remove_empty_folders(path)
if clean_all or folders:
collection.remove_empty_folders(path)
if kwargs['delete_excluded']:
summary = collection.remove_excluded_files()
if kwargs['delete_excluded']:
collection.remove_excluded_files()
summary = collection.summary
if log_level < 30:
summary.print()
if not result:
if summary.errors:
sys.exit(1)
@click.command('init')
@add_options(_logger_options)
@click.argument('path', required=True, nargs=1, type=click.Path())
def init(**kwargs):
"""Regenerate the hash.json database which contains all of the sha256 signatures of media files."""
def _init(**kwargs):
"""
Init media collection database.
"""
root = kwargs['path']
config = get_collection_config(root)
opt = config.get_options()
@ -344,8 +397,10 @@ def init(**kwargs):
@click.command('update')
@add_options(_logger_options)
@click.argument('path', required=True, nargs=1, type=click.Path())
def update(**kwargs):
"""Regenerate the hash.json database which contains all of the sha256 signatures of media files."""
def _update(**kwargs):
"""
Update media collection database.
"""
root = kwargs['path']
config = get_collection_config(root)
opt = config.get_options()
@ -363,8 +418,10 @@ def update(**kwargs):
@click.command('check')
@add_options(_logger_options)
@click.argument('path', required=True, nargs=1, type=click.Path())
def check(**kwargs):
"""check db and verify hashes"""
def _check(**kwargs):
"""
Check media collection.
"""
log_level = log.level(kwargs['verbose'], kwargs['debug'])
logger = log.get_logger(level=log_level)
root = kwargs['path']
@ -373,10 +430,10 @@ def check(**kwargs):
collection = Collection(root, exclude=opt['exclude'], logger=logger)
result = collection.check_db()
if result:
summary, result = collection.check_files()
summary = collection.check_files()
if log_level < 30:
summary.print()
if not result:
if summary.errors:
sys.exit(1)
else:
logger.error('Db data is not accurate run `ordigi update`')
@ -386,7 +443,7 @@ def check(**kwargs):
@click.command('compare')
@add_options(_logger_options)
@add_options(_dry_run_options)
@add_options(_filter_option)
@add_options(_filter_options)
@click.option('--find-duplicates', '-f', default=False, is_flag=True)
@click.option(
'--output-dir',
@ -403,13 +460,6 @@ def check(**kwargs):
is_flag=True,
help='Revert compare',
)
@click.option(
'--root',
'-r',
type=click.Path(file_okay=False),
default=None,
help='Root dir of media collection. If not set, use path',
)
@click.option(
'--similar-to',
'-s',
@ -422,15 +472,23 @@ def check(**kwargs):
default=80,
help='Similarity level for images',
)
@click.argument('path', nargs=1, required=True)
def compare(**kwargs):
'''Compare files in directories'''
@click.argument('subdirs', required=False, nargs=-1, type=click.Path())
@click.argument('dest', required=True, nargs=1, type=click.Path())
def _compare(**kwargs):
"""
Sort similar images in directories
"""
dry_run = kwargs['dry_run']
log_level = log.level(kwargs['verbose'], kwargs['debug'])
root = kwargs['root']
subdirs = kwargs['subdirs']
root = kwargs['dest']
paths, root = _get_paths(subdirs, root)
paths = os.path.join(root, subdirs)
path = kwargs['path']
root = kwargs['root']
logger = log.get_logger(level=log_level)
if not root:
@ -451,15 +509,18 @@ def compare(**kwargs):
logger=logger,
)
if kwargs['revert_compare']:
summary, result = collection.revert_compare(path)
else:
summary, result = collection.sort_similar_images(path, kwargs['similarity'])
for path in paths:
if kwargs['revert_compare']:
collection.revert_compare(path)
else:
collection.sort_similar_images(path, kwargs['similarity'])
summary = collection.summary
if log_level < 30:
summary.print()
if not result:
if summary.errors:
sys.exit(1)
@ -468,12 +529,13 @@ def main(**kwargs):
pass
main.add_command(clean)
main.add_command(check)
main.add_command(compare)
main.add_command(init)
main.add_command(sort)
main.add_command(update)
main.add_command(_clean)
main.add_command(_check)
main.add_command(_compare)
main.add_command(_init)
main.add_command(_import)
main.add_command(_sort)
main.add_command(_update)
if __name__ == '__main__':

View File

@ -1,21 +1,19 @@
"""
General file system methods.
Collection methods.
"""
from builtins import object
from copy import copy
from datetime import datetime, timedelta
import filecmp
from fnmatch import fnmatch
import inquirer
import logging
import os
from pathlib import Path, PurePath
import re
import sys
import shutil
import sys
import logging
from pathlib import Path, PurePath
import inquirer
from ordigi import media
from ordigi.database import Sqlite
from ordigi.media import Media
from ordigi.images import Image, Images
@ -170,6 +168,17 @@ class FPath:
return part
def _set_case(self, regex, part, this_part):
# Capitalization
u_regex = '%u' + regex
l_regex = '%l' + regex
if re.search(u_regex, this_part):
this_part = re.sub(u_regex, part.upper(), this_part)
elif re.search(l_regex, this_part):
this_part = re.sub(l_regex, part.lower(), this_part)
else:
this_part = re.sub(regex, part, this_part)
def get_path_part(self, this_part, metadata):
"""Build path part
:returns: part (string)"""
@ -185,15 +194,7 @@ class FPath:
regex = '[-_ .]?(%[ul])?' + regex
this_part = re.sub(regex, part, this_part)
else:
# Capitalization
u_regex = '%u' + regex
l_regex = '%l' + regex
if re.search(u_regex, this_part):
this_part = re.sub(u_regex, part.upper(), this_part)
elif re.search(l_regex, this_part):
this_part = re.sub(l_regex, part.lower(), this_part)
else:
this_part = re.sub(regex, part, this_part)
self._set_case(regex, part, this_part)
# Delete separator char at the begining of the string if any:
if this_part:
@ -263,7 +264,6 @@ class Collection:
interactive=False,
logger=logging.getLogger(),
max_deep=None,
mode='move',
use_date_filename=False,
use_file_dates=False,
):
@ -285,7 +285,7 @@ class Collection:
if '%media' in filter_by_ext:
filter_by_ext.remove('%media')
self.filter_by_ext = filter_by_ext.union(media.extensions)
self.filter_by_ext = filter_by_ext.union(Media.extensions)
else:
self.filter_by_ext = filter_by_ext
@ -293,10 +293,9 @@ class Collection:
self.interactive = interactive
self.logger = logger.getChild(self.__class__.__name__)
self.max_deep = max_deep
self.mode = mode
# List to store media metadata
self.medias = []
self.summary = Summary()
self.summary = Summary(self.root)
self.use_date_filename = use_date_filename
self.use_file_dates = use_file_dates
@ -355,37 +354,6 @@ class Collection:
return False
def _record_file(self, src_path, dest_path, media):
"""Check file and record the file to db"""
# Check if file remain the same
record = False
checksum = media.metadata['checksum']
if self._checkcomp(dest_path, checksum):
# change media file_path to dest_path
media.file_path = dest_path
if not self.dry_run:
updated = self._update_exif_data(dest_path, media)
if updated:
checksum = utils.checksum(dest_path)
media.metadata['checksum'] = checksum
media.metadata['file_path'] = os.path.relpath(dest_path, self.root)
self._add_db_data(media.metadata)
if self.mode == 'move':
# Delete file path entry in db when file is moved inside collection
if self.root in src_path.parents:
self.db.delete_filepath(str(src_path.relative_to(self.root)))
self.summary.append((src_path, self.mode))
record = True
else:
self.logger.error(f'Files {src_path} and {dest_path} are not identical')
self.summary.append((src_path, False))
return record
def remove(self, file_path):
if not self.dry_run:
os.remove(file_path)
@ -404,71 +372,71 @@ class Collection:
if fnmatch(file_path, exclude):
if not self.dry_run:
self.remove(file_path)
self.summary.append((file_path, 'delete'))
self.summary.append((file_path, 'remove_excluded'))
break
return self.summary
def sort_file(self, src_path, dest_path, remove_duplicates=False):
'''
Copy or move file to dest_path.
Return True if success, None is no filesystem action, False if
conflicts.
:params: str, str, bool
:returns: bool or None
'''
mode = self.mode
dry_run = self.dry_run
# check for collisions
if src_path == dest_path:
self.logger.info(f'File {dest_path} already sorted')
return None
elif dest_path.is_dir():
self.logger.info(f'File {dest_path} is a existing directory')
def _check_file(self, src_path, dest_path, media):
checksum = media.metadata['checksum']
if not self._checkcomp(dest_path, checksum):
self.logger.error(f'Files {src_path} and {dest_path} are not identical')
self.summary.append((src_path, False))
return False
elif dest_path.is_file():
self.logger.info(f'File {dest_path} already exist')
if remove_duplicates:
if filecmp.cmp(src_path, dest_path):
self.logger.info(
f'File in source and destination are identical. Duplicate will be ignored.'
)
if mode == 'move':
if not dry_run:
self.remove(src_path)
self.summary.append((src_path, 'delete'))
return None
else: # name is same, but file is different
self.logger.warning(
f'File in source and destination are different.'
)
return False
else:
return False
else:
if mode == 'move':
if not dry_run:
# Move the processed file into the destination directory
shutil.move(src_path, dest_path)
self.logger.info(f'move: {src_path} -> {dest_path}')
elif mode == 'copy':
if not dry_run:
shutil.copy2(src_path, dest_path)
self.logger.info(f'copy: {src_path} -> {dest_path}')
return True
def _solve_conflicts(self, conflict_file_list, remove_duplicates):
# change media file_path to dest_path
media.file_path = dest_path
if not self.dry_run:
updated = self._update_exif_data(dest_path, media)
if updated:
checksum = utils.checksum(dest_path)
media.metadata['checksum'] = checksum
media.metadata['file_path'] = os.path.relpath(dest_path, self.root)
return True
def _copy(self, src_path, dest_path):
if not self.dry_run:
shutil.copy2(src_path, dest_path)
self.logger.info(f'copy: {src_path} -> {dest_path}')
def _move(self, src_path, dest_path):
if not self.dry_run:
# Move the file into the destination directory
shutil.move(src_path, dest_path)
self.logger.info(f'move: {src_path} -> {dest_path}')
def _remove(self, path):
if not self.dry_run:
self.remove(path)
self.logger.info(f'remove: {path}')
def _record_file(self, src_path, dest_path, media, import_mode=False):
"""Check file and record the file to db"""
# Check if file remain the same
if not self._check_file(src_path, dest_path, media):
self.summary.append((src_path, False))
return False
if not self.dry_run:
self._add_db_data(media.metadata)
if import_mode != 'copy' and self.root in src_path.parents:
self.db.delete_filepath(str(src_path.relative_to(self.root)))
return True
def _solve_conflicts(self, conflicts, remove_duplicates):
result = False
unresolved_conflicts = []
while conflict_file_list != []:
src_path, dest_path, media = conflict_file_list.pop()
# Try to sort the file
result = self.sort_file(src_path, dest_path, remove_duplicates)
# remove to conflict file list if file as be successfully copied or ignored
while conflicts != []:
src_path, dest_path, media = conflicts.pop()
# Check for conflict status again in case is has changed
conflict = self.check_conflicts(src_path, dest_path, remove_duplicates)
n = 1
while result is False and n < 100:
while conflict == 1 and n < 100:
# Add appendix to the name
suffix = dest_path.suffix
if n > 1:
@ -476,25 +444,15 @@ class Collection:
else:
stem = dest_path.stem
dest_path = dest_path.parent / (stem + '_' + str(n) + suffix)
result = self.sort_file(src_path, dest_path, remove_duplicates)
conflict = self.check_conflicts(src_path, dest_path, remove_duplicates)
n = n + 1
record = False
if result is True:
record = self._record_file(src_path, dest_path, media)
elif result is None:
record = True
else:
if conflict == 1:
# n > 100:
unresolved_conflicts.append((src_path, dest_path, media))
self.logger.error(f'{self.mode}: too many append for {dest_path}...')
self.summary.append((src_path, False))
self.logger.error(f"Too many appends for {dest_path}")
if record:
# result is true or None
self.dest_list.append(dest_path)
return record
yield (src_path, dest_path, media), conflict
def _split_part(self, dedup_regex, path_part, items):
"""Split part from regex
@ -641,6 +599,40 @@ class Collection:
file_path, (int(datetime.now().timestamp()), int(date_media.timestamp()))
)
def check_conflicts(self, src_path, dest_path, remove_duplicates=False):
'''
Check if file can be copied or moved file to dest_path.
Return True if success, None is no filesystem action, False if
conflicts.
:params: str, str, bool
:returns: bool or None
'''
# check for collisions
if src_path == dest_path:
self.logger.info(f"File {dest_path} already sorted")
return 2
if dest_path.is_dir():
self.logger.info(f"File {dest_path} is a existing directory")
return 1
elif dest_path.is_file():
self.logger.info(f"File {dest_path} already exist")
if remove_duplicates:
if filecmp.cmp(src_path, dest_path):
self.logger.info(
f"File in source and destination are identical. Duplicate will be ignored."
)
return 3
else: # name is same, but file is different
self.logger.info(
f"File {src_path} and {dest_path} are different."
)
return 1
else:
return 1
else:
return 0
def dedup_regex(self, path, dedup_regex, remove_duplicates=False):
# cycle throught files
result = False
@ -661,7 +653,7 @@ class Collection:
default = re.compile(r'([^-_ .]+[-_ .])')
dedup_regex = [date_num3, date_num2, default]
conflict_file_list = []
conflicts = []
self.src_list = [
x
for x in self._get_files_in_path(
@ -689,28 +681,42 @@ class Collection:
dest_path = self.root.joinpath(*dedup_path)
self._create_directory(dest_path.parent.name, media)
result = self.sort_file(src_path, dest_path, remove_duplicates)
conflicts = self.check_conflicts(src_path, dest_path, remove_duplicates)
record = False
if result is True:
result = False
if not conflict:
record = self._record_file(src_path, dest_path, media)
elif result is None:
record = True
else:
# There is conflict files
conflict_file_list.append(src_path, dest_path, copy(media))
elif conflict == 1:
# There is conflict and file are different
conflicts.append((src_path, dest_path, media))
elif conflict in (2, 3):
result = True
if record:
if result:
# result is true or None
self.dest_list.append(dest_path)
if conflict_file_list != []:
record = self._solve_conflicts(conflict_file_list, remove_duplicates)
if conflicts != []:
files_data, conflict = self._solve_conflicts(conflicts, remove_duplicates)
src_path, dest_path, media = file_data
result = False
if not conflict:
self._record_file(src_path, dest_path, media)
elif conflict == 1:
# There is unresolved conflict
self.summary.append((src_path, False))
elif conflict in (2, 3):
result = True
if result:
# result is true or None
self.dest_list.append(dest_path)
if not self._check_processed():
return False
self.summary.append((None, False))
return self.summary, record
return self.summary
def _modify_selection(self):
"""
@ -780,7 +786,7 @@ class Collection:
def init(self, loc, ignore_tags=set()):
for media, file_path in self.get_medias(loc):
self._add_db_data(media.metadata)
self.summary.append((file_path, 'record'))
self.summary.append((file_path, 'update'))
return self.summary
@ -792,32 +798,33 @@ class Collection:
sys.exit(1)
def check_files(self):
result = True
for file_path in self._get_all_files():
checksum = utils.checksum(file_path)
relpath = file_path.relative_to(self.root)
if checksum == self.db.get_checksum(relpath):
self.summary.append((file_path, 'record'))
self.summary.append((file_path, 'check'))
else:
self.logger.error('{file_path} is corrupted')
self.summary.append((file_path, False))
result = False
return self.summary, result
return self.summary
def update(self, loc, ignore_tags=set()):
file_paths = [x for x in self._get_all_files()]
db_rows = [row for row in self.db.get_rows('metadata')]
invalid_db_rows = set()
db_paths = set()
for db_row in db_rows:
abspath = self.root / db_row['FilePath']
if abspath not in file_paths:
invalid_db_rows.add(db_row)
db_paths.add(db_row['FilePath'])
for file_path in file_paths:
relpath = os.path.relpath(file_path, self.root)
# If file not in database
if relpath not in db_rows:
if relpath not in db_paths:
media = Media(
file_path,
self.root,
@ -842,7 +849,7 @@ class Collection:
break
# set row attribute to the file
self._add_db_data(media.metadata)
self.summary.append((file_path, 'record'))
self.summary.append((file_path, 'update'))
# Finally delete invalid rows
for row in invalid_db_rows:
@ -879,27 +886,54 @@ class Collection:
return src_list
def sort_files(self, src_dirs, path_format, loc, remove_duplicates=False, ignore_tags=set()):
def sort_file(self, src_path, dest_path, media, import_mode=False):
if import_mode == 'copy':
self._copy(src_path, dest_path)
else:
self._move(src_path, dest_path)
if import_mode:
update = False
result = self._record_file(
src_path, dest_path, media, import_mode=import_mode
)
if result:
self.dest_list.append(dest_path)
if import_mode:
self.summary.append((src_path, 'import'))
else:
self.summary.append((src_path, 'sort'))
else:
self.summary.append((src_path, False))
return self.summary
def sort_files(self, src_dirs, path_format, loc, import_mode=False, remove_duplicates=False, ignore_tags=set()):
"""
Sort files into appropriate folder
"""
# Check db
self._init_check_db(loc, ignore_tags)
result = False
files_data = []
src_dirs_in_collection = set()
for src_dir in src_dirs:
self.dest_list = []
src_dir = self._check_path(src_dir)
conflict_file_list = []
conflicts = []
self.src_list = self._get_path_list(src_dir)
# Get medias and src_dirs
for src_path in self.src_list:
# List all src dirs in collection
if self.root in src_path.parents:
src_dirs_in_collection.add(src_path.parent)
else:
if not import_mode:
self.logger.error(f"""{src_path} not in {self.root}
collection, use `ordigi import`""")
sys.exit(1)
# Get file metadata
media = Media(
src_path,
@ -928,29 +962,50 @@ class Collection:
src_path = media.file_path
dest_path = self.root / relpath
result = self.sort_file(src_path, dest_path, remove_duplicates)
conflict = self.check_conflicts(src_path, dest_path, remove_duplicates)
record = False
if result is True:
record = self._record_file(src_path, dest_path, media)
elif result is None:
record = True
else:
# There is conflict files
conflict_file_list.append((src_path, dest_path, media))
if record:
# result is true or None
if not conflict:
self.sort_file(
src_path, dest_path, media, import_mode=import_mode
)
elif conflict == 1:
# There is conflict and file are different
conflicts.append((src_path, dest_path, media))
elif conflict == 3:
# Same file checksum
if import_mode == 'move':
self._remove(src_path)
self.dest_list.append(dest_path)
elif conflict == 2:
# File already sorted
self.dest_list.append(dest_path)
if conflict_file_list != []:
record = self._solve_conflicts(conflict_file_list, remove_duplicates)
if conflicts != []:
files_data, conflict = self._solve_conflicts(conflicts, remove_duplicates)
src_path, dest_path, media = file_data
if not conflict:
self.sort_file(
src_path, dest_path, media, import_mode=import_mode
)
elif conflict == 1:
# There is unresolved conflict
self.summary.append((src_path, False))
elif conflict == 3:
# Same file checksum
if import_mode == 'move':
self._remove(src_path)
self.dest_list.append(dest_path)
elif conflict == 2:
# File already sorted
self.dest_list.append(dest_path)
self.remove_empty_subdirs(src_dirs_in_collection)
if not self._check_processed():
record = False
self.summary.append((None, False))
return self.summary, record
return self.summary
def remove_empty_folders(self, directory, remove_root=True):
'Function to remove empty folders'
@ -972,16 +1027,10 @@ class Collection:
self.logger.info(f"Removing empty folder: {directory}")
if not self.dry_run:
os.rmdir(directory)
self.summary.append((directory, 'delete'))
self.summary.append((directory, 'remove_empty_folders'))
return self.summary
def move_file(self, img_path, dest_path):
if not self.dry_run:
shutil.move(img_path, dest_path)
self.logger.info(f'move: {img_path} -> {dest_path}')
def _get_images(self, path):
"""
:returns: iter
@ -1007,9 +1056,8 @@ class Collection:
self.logger.error('Db data is not accurate run `ordigi init`')
sys.exit(1)
result = True
path = self._check_path(path)
images = set([x for x in self._get_images(path)])
images = set(x for x in self._get_images(path))
i = Images(images, logger=self.logger)
nb_row_ini = self.db.len('metadata')
for image in images:
@ -1032,34 +1080,32 @@ class Collection:
dest_directory.mkdir(exist_ok=True)
# Move the simlars file into the destination directory
self.move_file(img_path, dest_path)
self._move(img_path, dest_path)
moved_imgs.add(img_path)
if self._record_file(img_path, dest_path, media):
self.summary.append((img_path, self.mode))
self.summary.append((img_path, 'sort'))
else:
self.summary.append((img_path, False))
result = False
if similar:
img_path = image.img_path
dest_path = dest_directory / img_path.name
self.move_file(img_path, dest_path)
self._move(img_path, dest_path)
moved_imgs.add(img_path)
if self._record_file(img_path, dest_path, media_ref):
self.summary.append((img_path, self.mode))
self.summary.append((img_path, 'sort'))
else:
self.summary.append((img_path, False))
result = False
nb_row_end = self.db.len('metadata')
if nb_row_ini and nb_row_ini != nb_row_end:
self.logger.error('Nb of row have changed unexpectedly')
result = False
if result:
result = self.check_db()
self.summary.append((None, False))
return self.summary, result
return self.summary
def revert_compare(self, path):
@ -1067,7 +1113,6 @@ class Collection:
self.logger.error('Db data is not accurate run `ordigi init`')
sys.exit(1)
result = True
path = self._check_path(path)
dirnames = set()
moved_files = set()
@ -1084,13 +1129,12 @@ class Collection:
media = Media(src_path, path, self.logger)
metadata = media.get_metadata(self.root, db=self.db, cache=self.cache)
dest_path = Path(src_path.parent.parent, src_path.name)
self.move_file(src_path, dest_path)
self._move(src_path, dest_path)
moved_files.add(src_path)
if self._record_file(src_path, dest_path, media):
self.summary.append((src_path, self.mode))
self.summary.append((src_path, 'sort'))
else:
self.summary.append((src_path, False))
result = False
for dirname in dirnames:
# remove 'similar_to*' directories
@ -1102,13 +1146,12 @@ class Collection:
nb_row_end = self.db.len('metadata')
if nb_row_ini and nb_row_ini != nb_row_end:
self.logger.error('Nb of row have changed unexpectedly')
result = False
if result:
result = self.check_db()
self.summary.append((None, False))
return self.summary, result
return self.summary
def fill_data(self, path, key, loc=None, edit=False):
"""Fill metadata and exif data for given key"""
@ -1172,7 +1215,7 @@ class Collection:
# Update exif data
media.set_key_values(key, value)
self.summary.append((file_path, 'record'))
self.summary.append((file_path, 'update'))
return self.summary

View File

@ -2,43 +2,56 @@ from tabulate import tabulate
class Summary:
def __init__(self):
self.modes = ('record', 'copy', 'move', 'delete')
def __init__(self, path):
self.actions = (
'check',
'import',
'remove_empty_folders',
'remove_excluded',
'sort',
'update',
)
self.path = path
self.result = {}
for mode in self.modes:
self.result[mode] = 0
for action in self.actions:
self.result[action] = 0
self.errors = 0
self.errors_items = []
def append(self, row):
file_path, mode = row
file_path, action = row
if mode:
for m in self.modes:
if mode == m:
self.result[mode] += 1
if action:
for m in self.actions:
if action == m:
self.result[action] += 1
else:
self.errors += 1
self.errors_items.append(file_path)
if file_path:
self.errors_items.append(file_path)
def print(self):
print()
for mode in self.result:
nb = self.result[mode]
if self.result[mode] != 0:
if mode == 'record':
print(f"SUMMARY: {nb} files recorded.")
elif mode == 'copy':
print(f"SUMMARY: {nb} files copied.")
elif mode == 'move':
print(f"SUMMARY: {nb} files moved.")
else:
print(f"SUMMARY: {nb} files deleted.")
for action in self.result:
nb = self.result[action]
if self.result[action] != 0:
if action == 'check':
print(f"SUMMARY: {nb} files checked in {self.path}.")
elif action == 'import':
print(f"SUMMARY: {nb} files imported into {self.path}.")
elif action == 'sort':
print(f"SUMMARY: {nb} files sorted inside {self.path}.")
elif action == 'remove_excluded':
print(f"SUMMARY: {nb} files deleted in {self.path}.")
elif action == 'remove_empty_folders':
print(f"SUMMARY: {nb} empty folders removed in {self.path}.")
elif action == 'update':
print(f"SUMMARY: {nb} files updated in {self.path} database.")
if sum(self.result.values()) == 0 and not self.errors:
print(f"OK !!")
print(f"SUMMARY: no file imported, sorted or deleted from {self.path}.")
if self.errors > 0:
print()
@ -49,4 +62,3 @@ class Summary:
print(tabulate(errors_result, headers=errors_headers))
print()

View File

@ -1,24 +1,21 @@
# TODO to be removed later
from datetime import datetime
import inquirer
import os
import pytest
import shutil
import sqlite3
from pathlib import Path
import re
from sys import platform
from time import sleep
import pytest
import inquirer
from .conftest import randomize_files, randomize_db
from ordigi import constants
from ordigi.collection import Collection, FPath
from ordigi.database import Sqlite
from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool
from ordigi.geolocation import GeoLocation
from ordigi import log
from ordigi.media import Media
from ordigi import utils
from .conftest import randomize_files, randomize_db
from ordigi.summary import Summary
class TestFPath:
@ -68,7 +65,7 @@ class TestFPath:
exif_data = ExifToolCaching(str(file_path)).asdict()
loc = GeoLocation()
metadata = media.get_metadata(loc)
metadata = media.get_metadata(self.src_path, loc)
for item, regex in items.items():
for mask in masks:
matched = re.search(regex, mask)
@ -131,21 +128,30 @@ class TestCollection:
terminate_exiftool()
assert not exiftool_is_running()
def assert_import(self, summary, nb):
# Summary is created and there is no errors
assert summary.errors == 0
assert summary.result['import'] == nb
def assert_sort(self, summary, nb):
# Summary is created and there is no errors
assert summary.errors == 0
assert summary.result['sort'] == nb
def test_sort_files(self, tmp_path):
collection = Collection(tmp_path, album_from_folder=True,
logger=self.logger, mode='copy')
logger=self.logger)
loc = GeoLocation()
summary, result = collection.sort_files([self.src_path],
self.path_format, loc)
summary = collection.sort_files([self.src_path],
self.path_format, loc, import_mode='copy')
# Summary is created and there is no errors
assert summary, summary
assert result, result
self.assert_import(summary, 30)
summary, result = collection.check_files()
assert summary, summary
assert result, result
summary = collection.check_files()
assert summary.result['check'] == 30
assert not summary.errors
# check if album value are set
for file_path in tmp_path.glob('**/*'):
if '.db' not in str(file_path):
media = Media(file_path, tmp_path, album_from_folder=True)
@ -153,55 +159,66 @@ class TestCollection:
for value in media._get_key_values('album'):
assert value != '' or None
collection = Collection(tmp_path, album_from_folder=True)
# Try to change path format and sort files again
path = '{city}/{%Y}-{name}.%l{ext}'
summary = collection.sort_files([tmp_path],
self.path_format, loc)
self.assert_sort(summary, 24)
shutil.copytree(tmp_path / 'test_exif', tmp_path / 'test_exif_copy')
collection.summary = Summary(tmp_path)
assert sum(collection.summary.result.values()) == 0
summary = collection.update(loc)
assert summary.result['update'] == 2
assert not summary.errors
collection.summary = Summary(tmp_path)
summary = collection.update(loc)
assert not summary.result['update']
assert not summary.errors
# test with populated dest dir
randomize_files(tmp_path)
summary, result = collection.check_files()
assert summary, summary
assert not result, result
summary = collection.check_files()
assert summary.errors
collection = Collection(tmp_path, logger=self.logger)
# test summary update
collection.summary = Summary(tmp_path)
summary = collection.update(loc)
assert summary, summary
collection = Collection(tmp_path, mode='copy', album_from_folder=True)
loc = GeoLocation()
summary, result = collection.sort_files([self.src_path], self.path_format, loc)
assert summary, summary
assert result, result
# TODO check if path follow path_format
assert summary.result['update']
assert not summary.errors
def test_sort_files_invalid_db(self, tmp_path):
collection = Collection(tmp_path, mode='copy')
collection = Collection(tmp_path)
loc = GeoLocation()
randomize_db(tmp_path)
with pytest.raises(sqlite3.DatabaseError) as e:
summary, result = collection.sort_files([self.src_path],
self.path_format, loc)
summary = collection.sort_files([self.src_path],
self.path_format, loc, import_mode='copy')
def test_sort_file(self, tmp_path):
for mode in 'copy', 'move':
collection = Collection(tmp_path, mode=mode)
for import_mode in 'copy', 'move', False:
collection = Collection(tmp_path)
# copy mode
src_path = Path(self.src_path, 'test_exif', 'photo.png')
name = 'photo_' + mode + '.png'
media = Media(src_path, self.src_path)
metadata = media.get_metadata(tmp_path)
name = 'photo_' + str(import_mode) + '.png'
dest_path = Path(tmp_path, name)
src_checksum = utils.checksum(src_path)
result_copy = collection.sort_file(src_path, dest_path)
assert result_copy
summary = collection.sort_file(src_path, dest_path, media,
import_mode=import_mode)
assert not summary.errors
# Ensure files remain the same
assert collection._checkcomp(dest_path, src_checksum)
if mode == 'copy':
if import_mode == 'copy':
assert src_path.exists()
else:
assert not src_path.exists()
shutil.copyfile(dest_path, src_path)
# TODO check for conflicts
def test__get_files_in_path(self, tmp_path):
collection = Collection(tmp_path, exclude={'**/*.dng',}, max_deep=1,
use_date_filename=True, use_file_dates=True)
@ -217,16 +234,35 @@ class TestCollection:
collection = Collection(path, logger=self.logger)
loc = GeoLocation()
summary = collection.init(loc)
summary, result = collection.sort_similar_images(path, similarity=60)
summary = collection.sort_similar_images(path, similarity=60)
# Summary is created and there is no errors
assert summary, summary
assert result, result
assert not summary.errors
summary, result = collection.revert_compare(path)
summary = collection.revert_compare(path)
# Summary is created and there is no errors
assert summary, summary
assert result, result
assert not summary.errors
@pytest.mark.skip()
def test_fill_data(self, tmp_path, monkeypatch):
path = tmp_path / 'collection'
shutil.copytree(self.src_path, path)
collection = Collection(path, logger=self.logger)
# loc = GeoLocation()
import ipdb; ipdb.set_trace()
# def mockreturn(prompt, theme):
# return {'value': '03-12-2021 08:12:35'}
# monkeypatch.setattr(inquirer, 'prompt', mockreturn)
# collection.fill_data(path, 'date_original')
# # check if db value is set
# import ipdb; ipdb.set_trace()
# date = collection.db.get_metadata_data('test_exif/invalid.invalid',
# 'DateOriginal')
# assert date == '2021-03-12 08:12:35'
# Check if exif value is set
collection.fill_data(path, 'date_original', edit=True)