Add final basic checkup

This commit is contained in:
Cédric Leporcq 2021-09-29 07:36:47 +02:00
parent 8e8afe9a89
commit 4156e769d0
4 changed files with 82 additions and 55 deletions

View File

@ -144,16 +144,16 @@ def sort(**kwargs):
loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'],
opt['timeout'])
summary, has_errors = collection.sort_files(paths, loc,
summary, result = collection.sort_files(paths, loc,
kwargs['remove_duplicates'], kwargs['ignore_tags'])
if kwargs['clean']:
remove_empty_folders(destination, logger)
if verbose or debug:
summary.write()
summary.print()
if has_errors:
if not result:
sys.exit(1)
@ -225,15 +225,15 @@ def clean(**kwargs):
exclude=exclude, filter_by_ext=filter_by_ext, glob=kwargs['glob'],
logger=logger, max_deep=kwargs['max_deep'], mode='move')
dedup_regex = list(kwargs['dedup_regex'])
summary, has_errors = collection.dedup_regex(path, dedup_regex, logger, kwargs['remove_duplicates'])
summary, result = collection.dedup_regex(path, dedup_regex, kwargs['remove_duplicates'])
if clean_all or folders:
remove_empty_folders(path, logger)
if verbose or debug:
summary.write()
summary.print()
if has_errors:
if not result:
sys.exit(1)
@ -305,7 +305,7 @@ def compare(**kwargs):
summary, has_errors = collection.sort_similar_images(path, kwargs['similarity'])
if verbose or debug:
summary.write()
summary.print()
if has_errors:
sys.exit(1)

View File

@ -68,6 +68,9 @@ class Collection(object):
self.use_file_dates = use_file_dates
self.whitespace_regex = '[ \t\n\r\f\v]+'
self.src_list = []
self.dest_list = []
# Constants
self.theme = request.load_theme()
@ -315,11 +318,11 @@ class Collection(object):
return False
def record_file(self, src_path, dest_path, media):
def _record_file(self, src_path, dest_path, media):
"""Check file and record the file to db"""
# Check if file remain the same
has_errors = False
record = False
checksum = media.metadata['checksum']
if self._checkcomp(dest_path, checksum):
# change media file_path to dest_path
@ -335,14 +338,13 @@ class Collection(object):
self._add_db_data(dest_path, media.metadata)
self.summary.append((src_path, dest_path))
record = True
else:
self.logger.error(f'Files {src_path} and {dest_path} are not identical')
# sys.exit(1)
self.summary.append((src_path, False))
has_errors = True
return self.summary, has_errors
return record
def remove(self, file_path):
if not self.dry_run:
@ -394,7 +396,7 @@ class Collection(object):
return True
def _solve_conflicts(self, conflict_file_list, remove_duplicates):
has_errors = False
result = False
unresolved_conflicts = []
while conflict_file_list != []:
src_path, dest_path, media = conflict_file_list.pop()
@ -413,21 +415,22 @@ class Collection(object):
result = self.sort_file(src_path, dest_path, remove_duplicates)
n = n + 1
if result is False:
record = False
if result is True:
record = self._record_file(src_path, dest_path, media)
elif result is None:
record = True
else:
# n > 100:
unresolved_conflicts.append((src_path, dest_path, media))
self.logger.error(f'{self.mode}: too many append for {dest_path}...')
self.summary.append((src_path, False))
has_errors = True
if result:
self.summary, has_errors = self.record_file(src_path,
dest_path, media)
if record:
# result is true or None
self.dest_list.append(dest_path)
if has_errors:
return False
else:
return True
return record
def _split_part(self, dedup_regex, path_part, items):
"""Split part from regex
@ -605,9 +608,9 @@ class Collection(object):
# Initialize date taken to what's returned from the metadata function.
os.utime(file_path, (int(datetime.now().timestamp()), int(date_media.timestamp())))
def dedup_regex(self, path, dedup_regex, logger, remove_duplicates=False):
def dedup_regex(self, path, dedup_regex, remove_duplicates=False):
# cycle throught files
has_errors = False
result = False
path = self._check_path(path)
# Delimiter regex
delim = r'[-_ .]'
@ -626,8 +629,8 @@ class Collection(object):
]
conflict_file_list = []
file_list = [x for x in self._get_files_in_path(path, glob=self.glob)]
for src_path in file_list:
self.src_list = [x for x in self._get_files_in_path(path, glob=self.glob)]
for src_path in self.src_list:
# TODO to test it
media = Media(src_path, path, logger=self.logger)
path_parts = src_path.relative_to(self.root).parts
@ -648,22 +651,29 @@ class Collection(object):
self._create_directory(dest_path.parent.name, path, media)
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
self.summary, has_errors = self.record_file(src_path,
dest_path, media)
elif result is False:
record = False
if result is True:
record = self._record_file(src_path, dest_path, media)
elif result is None:
record = True
else:
# There is conflict files
conflict_file_list.append(src_path, dest_path, copy(media))
if record:
# result is true or None
self.dest_list.append(dest_path)
if conflict_file_list != []:
result = self._solve_conflicts(conflict_file_list, remove_duplicates)
record = self._solve_conflicts(conflict_file_list, remove_duplicates)
if not result:
has_errors = True
if not self._check_processed():
return False
return self.summary, has_errors
return self.summary, record
def _modify_selection(self, file_list):
def _modify_selection(self):
"""
:params: list
:return: list
@ -672,31 +682,40 @@ class Collection(object):
questions = [
inquirer.Checkbox('selection',
message=message,
choices=file_list,
default=file_list,
choices=self.src_list,
default=self.src_list,
),
]
return inquirer.prompt(questions, theme=self.theme)['selection']
def _check_processed(self):
# Finally check if are files are successfully processed
n_fail = len(self.src_list) - len(self.dest_list)
if n_fail != 0:
self.logger.error("{n_fail} files have not be processed")
return False
return True
def sort_files(self, paths, loc, remove_duplicates=False,
ignore_tags=set()):
"""
Sort files into appropriate folder
"""
has_errors = False
result = False
files_data = []
for path in paths:
self.dest_list = []
path = self._check_path(path)
conflict_file_list = []
file_list = [x for x in self._get_files_in_path(path,
self.src_list = [x for x in self._get_files_in_path(path,
glob=self.glob, extensions=self.filter_by_ext)]
if self.interactive:
file_list = self._modify_selection(file_list)
self.src_list = self._modify_selection()
print('Processing...')
# Get medias and paths
for src_path in file_list:
for src_path in self.src_list:
# Process files
media = Media(src_path, path, self.album_from_folder,
ignore_tags, self.interactive, self.logger,
@ -724,20 +743,26 @@ class Collection(object):
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
self.summary, has_errors = self.record_file(src_path,
dest_path, media)
elif result is False:
record = False
if result is True:
record = self._record_file(src_path, dest_path, media)
elif result is None:
record = True
else:
# There is conflict files
conflict_file_list.append((src_path, dest_path, media))
if record:
# result is true or None
self.dest_list.append(dest_path)
if conflict_file_list != []:
result = self._solve_conflicts(conflict_file_list, remove_duplicates)
record = self._solve_conflicts(conflict_file_list, remove_duplicates)
if result is False:
has_errors = True
if not self._check_processed():
record = False
return self.summary, has_errors
return self.summary, record
def set_hash(self, result, src_path, dest_path, src_checksum):
if result:

View File

@ -18,7 +18,7 @@ class Summary(object):
self.error += 1
self.error_items.append(id)
def write(self):
def print(self):
if self.error > 0:
error_headers = ["File"]
error_result = []

View File

@ -114,11 +114,11 @@ class TestCollection:
def test_sort_files(self, tmp_path):
collection = Collection(tmp_path, self.path_format, album_from_folder=True)
loc = GeoLocation()
summary, has_errors = collection.sort_files([self.src_path], loc)
summary, result = collection.sort_files([self.src_path], loc)
# Summary is created and there is no errors
assert summary, summary
assert not has_errors, has_errors
assert result, result
for file_path in tmp_path.glob('**/*'):
if '.db' not in str(file_path):
@ -129,10 +129,12 @@ class TestCollection:
# test with populated dest dir
randomize_files(tmp_path)
summary, has_errors = collection.sort_files([self.src_path], loc)
collection = Collection(tmp_path, self.path_format, album_from_folder=True)
loc = GeoLocation()
summary, result = collection.sort_files([self.src_path], loc)
assert summary, summary
assert not has_errors, has_errors
assert result, result
# TODO check if path follow path_format
def test_sort_files_invalid_db(self, tmp_path):
@ -140,7 +142,7 @@ class TestCollection:
loc = GeoLocation()
randomize_db(tmp_path)
with pytest.raises(sqlite3.DatabaseError) as e:
summary, has_errors = collection.sort_files([self.src_path], loc)
summary, result = collection.sort_files([self.src_path], loc)
def test_sort_file(self, tmp_path):