From 38d7cb684107fc7a5491f822b0c04f91d6ae277c Mon Sep 17 00:00:00 2001 From: Cedric Leporcq Date: Sat, 23 Oct 2021 07:51:53 +0200 Subject: [PATCH] Refactoring and fix class --- ordigi.py | 11 +-- ordigi/collection.py | 142 +++++++++++++++++++++++++++++--------- ordigi/database.py | 29 +++++--- ordigi/media.py | 37 ++++++---- requirements.txt | 3 - tests/test_collection.py | 26 +++---- tests/test_geolocation.py | 8 ++- 7 files changed, 174 insertions(+), 82 deletions(-) diff --git a/ordigi.py b/ordigi.py index 5085649..5037a93 100755 --- a/ordigi.py +++ b/ordigi.py @@ -202,7 +202,6 @@ def sort(**kwargs): collection = Collection( root, - path_format, kwargs['album_from_folder'], cache, opt['day_begins'], @@ -296,14 +295,12 @@ def clean(**kwargs): collection = Collection( root, - opt['path_format'], dry_run=dry_run, exclude=exclude, filter_by_ext=filter_by_ext, glob=kwargs['glob'], logger=logger, max_deep=opt['max_deep'], - mode='move', ) if kwargs['path_string']: @@ -337,7 +334,7 @@ def init(**kwargs): logger = log.get_logger(level=log_level) loc = GeoLocation(opt['geocoder'], logger, opt['prefer_english_names'], opt['timeout']) - collection = Collection(root, None, exclude=opt['exclude'], mode='move', logger=logger) + collection = Collection(root, exclude=opt['exclude'], logger=logger) summary = collection.init(loc) if log_level < 30: @@ -356,7 +353,7 @@ def update(**kwargs): logger = log.get_logger(level=log_level) loc = GeoLocation(opt['geocoder'], logger, opt['prefer_english_names'], opt['timeout']) - collection = Collection(root, None, exclude=opt['exclude'], mode='move', logger=logger) + collection = Collection(root, exclude=opt['exclude'], logger=logger) summary = collection.update(loc) if log_level < 30: @@ -373,7 +370,7 @@ def check(**kwargs): root = kwargs['path'] config = get_collection_config(root) opt = config.get_options() - collection = Collection(root, None, exclude=opt['exclude'], mode='move', logger=logger) + collection = Collection(root, exclude=opt['exclude'], logger=logger) result = collection.check_db() if result: summary, result = collection.check_files() @@ -447,11 +444,9 @@ def compare(**kwargs): collection = Collection( root, - None, exclude=exclude, filter_by_ext=filter_by_ext, glob=kwargs['glob'], - mode='move', dry_run=dry_run, logger=logger, ) diff --git a/ordigi/collection.py b/ordigi/collection.py index 0e74abf..4bdd1d3 100644 --- a/ordigi/collection.py +++ b/ordigi/collection.py @@ -253,7 +253,6 @@ class Collection: def __init__( self, root, - path_format, album_from_folder=False, cache=False, day_begins=0, @@ -264,7 +263,7 @@ class Collection: interactive=False, logger=logging.getLogger(), max_deep=None, - mode='copy', + mode='move', use_date_filename=False, use_file_dates=False, ): @@ -275,7 +274,6 @@ class Collection: logger.error(f'Directory {self.root} does not exist') sys.exit(1) - self.path_format = path_format self.db = Sqlite(self.root) # Options @@ -522,17 +520,17 @@ class Collection: return items - def walklevel(self, src_path, maxlevel=None): + def walklevel(self, src_dir, maxlevel=None): """ Walk into input directory recursively until desired maxlevel source: https://stackoverflow.com/questions/229186/os-walk-without-digging-into-directories-below """ - src_path = str(src_path) - if not os.path.isdir(src_path): + src_dir = str(src_dir) + if not os.path.isdir(src_dir): return None - num_sep = src_path.count(os.path.sep) - for root, dirs, files in os.walk(src_path): + num_sep = src_dir.count(os.path.sep) + for root, dirs, files in os.walk(src_dir): level = root.count(os.path.sep) - num_sep yield root, dirs, files, level if maxlevel is not None and level >= maxlevel: @@ -765,8 +763,7 @@ class Collection: return self.check_db() - def init(self, loc, ignore_tags=set()): - record = True + def get_medias(self, loc, ignore_tags=set()): for file_path in self._get_all_files(): media = Media( file_path, @@ -778,11 +775,22 @@ class Collection: ) metadata = media.get_metadata(self.root, loc, self.db, self.cache) media.metadata['file_path'] = os.path.relpath(file_path, self.root) + yield media, file_path + + def init(self, loc, ignore_tags=set()): + for media, file_path in self.get_medias(loc): self._add_db_data(media.metadata) self.summary.append((file_path, 'record')) return self.summary + def _init_check_db(self, loc=None, ignore_tags=set()): + if self.db.is_empty('metadata'): + self.init(loc, ignore_tags) + elif not self.check_db(): + self.logger.error('Db data is not accurate run `ordigi update`') + sys.exit(1) + def check_files(self): result = True for file_path in self._get_all_files(): @@ -857,44 +865,45 @@ class Collection: if parents != set(): self.remove_empty_subdirs(parents) - def sort_files(self, paths, loc, remove_duplicates=False, ignore_tags=set()): + def _get_path_list(self, path): + src_list = [ + x + for x in self._get_files_in_path( + path, glob=self.glob, + extensions=self.filter_by_ext, + ) + ] + if self.interactive: + src_list = self._modify_selection() + print('Processing...') + + return src_list + + def sort_files(self, src_dirs, path_format, loc, remove_duplicates=False, ignore_tags=set()): """ Sort files into appropriate folder """ # Check db - if [x for x in self.db.get_rows('metadata')] == []: - self.init(loc, ignore_tags) - elif not self.check_db(): - self.logger.error('Db data is not accurate run `ordigi update`') - sys.exit(1) + self._init_check_db(loc, ignore_tags) result = False files_data = [] src_dirs_in_collection = set() - for path in paths: + for src_dir in src_dirs: self.dest_list = [] - path = self._check_path(path) + src_dir = self._check_path(src_dir) conflict_file_list = [] - self.src_list = [ - x - for x in self._get_files_in_path( - path, glob=self.glob, - extensions=self.filter_by_ext, - ) - ] - if self.interactive: - self.src_list = self._modify_selection() - print('Processing...') + self.src_list = self._get_path_list(src_dir) - # Get medias and paths + # Get medias and src_dirs for src_path in self.src_list: - # List all src_dirs in collection + # List all src dirs in collection if self.root in src_path.parents: src_dirs_in_collection.add(src_path.parent) - # Process files + # Get file metadata media = Media( src_path, - path, + src_dir, self.album_from_folder, ignore_tags, self.interactive, @@ -904,7 +913,7 @@ class Collection: ) metadata = media.get_metadata(self.root, loc, self.db, self.cache) # Get the destination path according to metadata - fpath = FPath(self.path_format, self.day_begins, self.logger) + fpath = FPath(path_format, self.day_begins, self.logger) relpath = Path(fpath.get_path(metadata)) files_data.append((copy(media), relpath)) @@ -916,7 +925,6 @@ class Collection: # sort files and solve conflicts for media, relpath in files_data: - # Convert paths to string src_path = media.file_path dest_path = self.root / relpath @@ -1102,3 +1110,69 @@ class Collection: return self.summary, result + def fill_data(self, path, key, loc=None, edit=False): + """Fill metadata and exif data for given key""" + self._init_check_db() + + if key in ( + 'latitude', + 'longitude', + 'latitude_ref', + 'longitude_ref', + ): + print(f"Set {key} alone is not allowed") + return None + + if edit: + print(f"Edit {key} values:") + else: + print(f"Fill empty {key} values:") + + self.src_list = self._get_path_list(path) + + for file_path in self.src_list: + media = Media( + file_path, + self.root, + self.album_from_folder, + ignore_tags, + self.interactive, + self.logger, + self.use_date_filename, + self.use_file_dates, + ) + metadata = media.get_metadata(self.root, loc, self.db, self.cache) + print() + value = media.metadata[key] + if edit or not value: + print(f"FILE: '{file_path}'") + if edit: + print(f"{key}: '{value}'") + if edit or not value: + # Prompt value for given key for file_path + # utils.open_file() + prompt = [ + inquirer.Text('value', message=key), + ] + answer = inquirer.prompt(prompt, theme=self.theme) + # Validate value + if key in ('date_original', 'date_created', 'date_modified'): + # Check date format + value = str(media.get_date_format(answer['value'])) + else: + if not answer[key].isalnum(): + print("Invalid entry, use alphanumeric chars") + value = inquirer.prompt(prompt, theme=self.theme) + + # print(f"{key}='{value}'") + + media.metadata[key] = value + # Update database + self._add_db_data(media.metadata) + # Update exif data + media.set_key_values(key, value) + + self.summary.append((file_path, 'record')) + + return self.summary + diff --git a/ordigi/database.py b/ordigi/database.py index ed2f0fe..704c560 100644 --- a/ordigi/database.py +++ b/ordigi/database.py @@ -49,7 +49,7 @@ class Sqlite: 'CameraMake': 'text', 'CameraModel': 'text', 'OriginalName': 'text', - 'SrcPath': 'text', + 'SrcDir': 'text', 'Subdirs': 'text', 'Filename': 'text', } @@ -109,6 +109,21 @@ class Sqlite: return False + def get_rows(self, table): + """Cycle through rows in table + :params: str + :return: iter + """ + self.cur.execute(f'select * from {table}') + for row in self.cur: + yield row + + def is_empty(self, table): + if [x for x in self.get_rows(table)] == []: + return True + + return False + def _run(self, query, n=0): result = False result = self.cur.execute(query).fetchone() @@ -234,8 +249,8 @@ class Sqlite: self.cur.execute(f'SELECT * FROM {table}').fetchall() def get_location_nearby(self, latitude, longitude, Column, threshold_m=3000): - """Find a name for a location in the database. - + """ + Find a name for a location in the database. :param float latitude: Latitude of the location. :param float longitude: Longitude of the location. :param int threshold_m: Location in the database must be this close to @@ -282,11 +297,3 @@ class Sqlite: sql = f'select count() from {table}' return self._run(sql) - def get_rows(self, table): - """Cycle through rows in table - :params: str - :return: iter - """ - self.cur.execute(f'select * from {table}') - for row in self.cur: - yield row diff --git a/ordigi/media.py b/ordigi/media.py index 659f9b1..654a5e2 100644 --- a/ordigi/media.py +++ b/ordigi/media.py @@ -33,7 +33,7 @@ class Media: def __init__( self, file_path, - src_path, + src_dir, album_from_folder=False, ignore_tags=set(), interactive=False, @@ -45,7 +45,7 @@ class Media: :params: Path, Path, bool, set, bool, Logger """ self.file_path = file_path - self.src_path = src_path + self.src_dir = src_dir self.album_from_folder = album_from_folder self.exif_metadata = None @@ -214,14 +214,14 @@ class Media: default=default, ), ] - prompt = [ - inquirer.Text('date_custom', message="date"), - ] - answers = inquirer.prompt(choices_list, theme=self.theme) + if not answers['date_list']: + prompt = [ + inquirer.Text('date_custom', message="date"), + ] answers = inquirer.prompt(prompt, theme=self.theme) - return utils.get_date_from_string(answers['date_custom']) + return self.get_date_format(answers['date_custom']) else: return answers['date_list'] @@ -236,9 +236,9 @@ class Media: basename = os.path.splitext(self.metadata['filename'])[0] date_original = self.metadata['date_original'] if self.metadata['original_name']: - date_filename = utils.get_date_from_string(self.metadata['original_name']) + date_filename = self.get_date_format(self.metadata['original_name']) else: - date_filename = utils.get_date_from_string(basename) + date_filename = self.get_date_format(basename) date_original = self.metadata['date_original'] date_created = self.metadata['date_created'] @@ -386,16 +386,16 @@ class Media: else: formated_data = value self.metadata[key] = formated_data - for key in 'src_path', 'subdirs', 'filename': + for key in 'src_dir', 'subdirs', 'filename': label = utils.snake2camel(key) formated_data = db.get_metadata_data(relpath, label) self.metadata[key] = formated_data location_id = db.get_metadata_data(relpath, 'LocationId') else: - self.metadata['src_path'] = str(self.src_path) + self.metadata['src_dir'] = str(self.src_dir) self.metadata['subdirs'] = str( - self.file_path.relative_to(self.src_path).parent + self.file_path.relative_to(self.src_dir).parent ) self.metadata['filename'] = self.file_path.name # Get metadata from exif @@ -500,6 +500,19 @@ class Media: """ return ExifTool(self.file_path, logger=self.logger).setvalue(tag, value) + def set_key_values(self, key, value): + """Set tags values for given key""" + status = True + if self.exif_metadata is None: + return False + + for tag in self.tags_keys[key]: + if tag in self.exif_metadata: + if not self.set_value(tag, value): + status = False + + return status + def set_date_media(self, time): """Set the date/time a photo was taken. diff --git a/requirements.txt b/requirements.txt index e5103aa..6358f83 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,7 @@ click==6.6 imagehash==4.2.1 inquirer -requests==2.20.0 -Send2Trash==1.3.0 configparser==3.5.0 tabulate==0.7.7 Pillow==8.0 pyheif_pillow_opener=0.1 -six==1.9 diff --git a/tests/test_collection.py b/tests/test_collection.py index 02e88ea..054a994 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -1,5 +1,6 @@ # TODO to be removed later from datetime import datetime +import inquirer import os import pytest import shutil @@ -131,10 +132,11 @@ class TestCollection: assert not exiftool_is_running() def test_sort_files(self, tmp_path): - collection = Collection(tmp_path, self.path_format, - album_from_folder=True, logger=self.logger) + collection = Collection(tmp_path, album_from_folder=True, + logger=self.logger, mode='copy') loc = GeoLocation() - summary, result = collection.sort_files([self.src_path], loc) + summary, result = collection.sort_files([self.src_path], + self.path_format, loc) # Summary is created and there is no errors assert summary, summary @@ -157,13 +159,13 @@ class TestCollection: assert summary, summary assert not result, result - collection = Collection(tmp_path, None, mode='move', logger=self.logger) + collection = Collection(tmp_path, logger=self.logger) summary = collection.update(loc) assert summary, summary - collection = Collection(tmp_path, self.path_format, album_from_folder=True) + collection = Collection(tmp_path, mode='copy', album_from_folder=True) loc = GeoLocation() - summary, result = collection.sort_files([self.src_path], loc) + summary, result = collection.sort_files([self.src_path], self.path_format, loc) assert summary, summary assert result, result @@ -171,16 +173,17 @@ class TestCollection: # TODO check if path follow path_format def test_sort_files_invalid_db(self, tmp_path): - collection = Collection(tmp_path, self.path_format) + collection = Collection(tmp_path, mode='copy') loc = GeoLocation() randomize_db(tmp_path) with pytest.raises(sqlite3.DatabaseError) as e: - summary, result = collection.sort_files([self.src_path], loc) + summary, result = collection.sort_files([self.src_path], + self.path_format, loc) def test_sort_file(self, tmp_path): for mode in 'copy', 'move': - collection = Collection(tmp_path, self.path_format, mode=mode) + collection = Collection(tmp_path, mode=mode) # copy mode src_path = Path(self.src_path, 'test_exif', 'photo.png') name = 'photo_' + mode + '.png' @@ -200,8 +203,7 @@ class TestCollection: # TODO check for conflicts def test__get_files_in_path(self, tmp_path): - collection = Collection(tmp_path, self.path_format, - exclude={'**/*.dng',}, max_deep=1, + collection = Collection(tmp_path, exclude={'**/*.dng',}, max_deep=1, use_date_filename=True, use_file_dates=True) paths = [x for x in collection._get_files_in_path(self.src_path, glob='**/photo*')] @@ -212,7 +214,7 @@ class TestCollection: def test_sort_similar_images(self, tmp_path): path = tmp_path / 'collection' shutil.copytree(self.src_path, path) - collection = Collection(path, None, mode='move', logger=self.logger) + collection = Collection(path, logger=self.logger) loc = GeoLocation() summary = collection.init(loc) summary, result = collection.sort_similar_images(path, similarity=60) diff --git a/tests/test_geolocation.py b/tests/test_geolocation.py index ad88e17..a6ae203 100644 --- a/tests/test_geolocation.py +++ b/tests/test_geolocation.py @@ -1,3 +1,4 @@ +from ordigi.utils import distance_between_two_points from ordigi.geolocation import GeoLocation import pytest @@ -8,8 +9,11 @@ class TestGeoLocation: def test_coordinates_by_name(self): coordinates = self.loc.coordinates_by_name('Sunnyvale, CA') - assert coordinates['latitude'] == 37.3688301 - assert coordinates['longitude'] == -122.036349 + latitude = coordinates['latitude'] + longitude = coordinates['longitude'] + distance = distance_between_two_points(latitude, longitude, 37.3745086, -122.0581602) + + assert distance <= 3000 def test_place_name(self): place_name = self.loc.place_name(lat=37.368, lon=-122.03)