diff --git a/ordigi/collection.py b/ordigi/collection.py index d1357a5..5670629 100644 --- a/ordigi/collection.py +++ b/ordigi/collection.py @@ -3,9 +3,10 @@ General file system methods. """ from builtins import object +from copy import copy +from datetime import datetime, timedelta import filecmp from fnmatch import fnmatch -import hashlib import inquirer import logging import os @@ -13,7 +14,6 @@ from pathlib import Path, PurePath import re import sys import shutil -from datetime import datetime, timedelta from ordigi import media from ordigi.database import Sqlite @@ -21,7 +21,7 @@ from ordigi.media import Media, get_all_subclasses from ordigi.images import Image, Images from ordigi import request from ordigi.summary import Summary -from ordigi.utils import get_date_regex, camel2snake +from ordigi import utils class Collection(object): @@ -35,7 +35,7 @@ class Collection(object): # Attributes self.root = Path(root).expanduser().absolute() - if not os.path.exists(self.root): + if not self.root.exists(): logger.error(f'Directory {self.root} does not exist') sys.exit(1) @@ -61,6 +61,8 @@ class Collection(object): self.logger = logger self.max_deep = max_deep self.mode = mode + # List to store media metadata + self.medias = [] self.summary = Summary() self.use_date_filename = use_date_filename self.use_file_dates = use_file_dates @@ -140,7 +142,7 @@ class Collection(object): # select matched folders return folders[begin:end] - def get_part(self, item, mask, metadata, subdirs): + def get_part(self, item, mask, metadata): """Parse a specific folder's name given a mask and metadata. :param item: Name of the item as defined in the path (i.e. date from %date) @@ -160,7 +162,7 @@ class Collection(object): elif item == 'name': # Remove date prefix added to the name. part = basename - for i, rx in get_date_regex(basename): + for i, rx in utils.get_date_regex(basename): part = re.sub(rx, '', part) elif item == 'date': date = metadata['date_media'] @@ -169,10 +171,10 @@ class Collection(object): date = self._check_for_early_morning_photos(date) part = date.strftime(mask) elif item == 'folder': - part = os.path.basename(subdirs) + part = os.path.basename(metadata['subdirs']) elif item == 'folders': - folders = subdirs.parts + folders = Path(metadata['subdirs']).parts folders = self._get_folders(folders, mask) part = os.path.join(*folders) @@ -189,14 +191,13 @@ class Collection(object): return part - def get_path_part(self, this_part, metadata, subdirs): + def get_path_part(self, this_part, metadata): """Build path part :returns: part (string)""" for item, regex in self.items.items(): matched = re.search(regex, this_part) if matched: - part = self.get_part(item, matched.group()[1:-1], metadata, - subdirs) + part = self.get_part(item, matched.group()[1:-1], metadata) part = part.strip() @@ -215,9 +216,15 @@ class Collection(object): else: this_part = re.sub(regex, part, this_part) + # Delete separator char at the begining of the string if any: + if this_part: + regex = '[-_ .]' + if re.match(regex, this_part[0]): + this_part = this_part[1:] + return this_part - def get_path(self, metadata, subdirs, whitespace_sub='_'): + def get_path(self, metadata, whitespace_sub='_'): """path_format: {%Y-%d-%m}/%u{city}/{album} Returns file path. @@ -230,7 +237,7 @@ class Collection(object): for path_part in path_parts: this_parts = path_part.split('|') for this_part in this_parts: - this_part = self.get_path_part(this_part, metadata, subdirs) + this_part = self.get_path_part(this_part, metadata) if this_part: # Check if all masks are substituted @@ -244,7 +251,9 @@ class Collection(object): break # Else we continue for fallbacks - if len(path[-1]) == 0 or re.match(r'^\..*', path[-1]): + if path == []: + path = [ metadata['filename'] ] + elif len(path[-1]) == 0 or re.match(r'^\..*', path[-1]): path[-1] = metadata['filename'] path_string = os.path.join(*path) @@ -257,80 +266,72 @@ class Collection(object): return None - def checksum(self, file_path, blocksize=65536): - """Create a hash value for the given file. - - See http://stackoverflow.com/a/3431835/1318758. - - :param str file_path: Path to the file to create a hash for. - :param int blocksize: Read blocks of this size from the file when - creating the hash. - :returns: str or None - """ - hasher = hashlib.sha256() - with open(file_path, 'rb') as f: - buf = f.read(blocksize) - - while len(buf) > 0: - hasher.update(buf) - buf = f.read(blocksize) - return hasher.hexdigest() - return None - - def checkcomp(self, dest_path, src_checksum): + def _checkcomp(self, dest_path, src_checksum): """Check file. """ - # src_checksum = self.checksum(src_path) - if self.dry_run: - return src_checksum + return True - dest_checksum = self.checksum(dest_path) + dest_checksum = utils.checksum(dest_path) if dest_checksum != src_checksum: self.logger.info(f'Source checksum and destination checksum are not the same') return False - return src_checksum + return True - def _get_row_data(self, table, metadata): + def _format_row_data(self, table, metadata): row_data = {} for title in self.db.tables[table]['header']: - key = camel2snake(title) + key = utils.camel2snake(title) + # Convert Path type to str row_data[title] = metadata[key] return row_data def _add_db_data(self, dest_path, metadata): - loc_values = self._get_row_data('location', metadata) + loc_values = self._format_row_data('location', metadata) metadata['location_id'] = self.db.add_row('location', loc_values) - row_data = self._get_row_data('metadata', metadata) + row_data = self._format_row_data('metadata', metadata) self.db.add_row('metadata', row_data) def _update_exif_data(self, dest_path, media): + updated = False if self.album_from_folder: - media.file_path = dest_path media.set_album_from_folder() + updated = True + if media.metadata['original_name'] in (False, ''): + media.set_value('original_name', self.filename) + updated = True + if self.album_from_folder: + album = media.metadata['album'] + if album and album != '': + media.set_value('album', album) + updated = True + + if updated: return True return False - def record_file(self, src_path, dest_path, src_checksum, media): + def record_file(self, src_path, dest_path, media): """Check file and record the file to db""" # Check if file remain the same - checksum = self.checkcomp(dest_path, src_checksum) has_errors = False - if checksum: + checksum = media.metadata['checksum'] + if self._checkcomp(dest_path, checksum): + # change media file_path to dest_path + media.file_path = dest_path if not self.dry_run: updated = self._update_exif_data(dest_path, media) if updated: - dest_checksum = self.checksum(dest_path) + checksum = utils.checksum(dest_path) + media.metadata['checksum'] = checksum media.metadata['file_path'] = os.path.relpath(dest_path, self.root) - media.metadata['checksum'] = checksum self._add_db_data(dest_path, media.metadata) self.summary.append((src_path, dest_path)) @@ -349,7 +350,13 @@ class Collection(object): self.logger.info(f'remove: {file_path}') def sort_file(self, src_path, dest_path, remove_duplicates=False): - '''Copy or move file to dest_path.''' + ''' + Copy or move file to dest_path. + Return True if success, None is no filesystem action, False if + conflicts. + :params: str, str, bool + :returns: bool or None + ''' mode = self.mode dry_run = self.dry_run @@ -358,7 +365,10 @@ class Collection(object): if(src_path == dest_path): self.logger.info(f'File {dest_path} already sorted') return None - elif os.path.isfile(dest_path): + elif dest_path.is_dir(): + self.logger.warning(f'File {dest_path} is a existing directory') + return False + elif dest_path.is_file(): self.logger.warning(f'File {dest_path} already exist') if remove_duplicates: if filecmp.cmp(src_path, dest_path): @@ -383,40 +393,36 @@ class Collection(object): self.logger.info(f'copy: {src_path} -> {dest_path}') return True - def _solve_conflicts(self, conflict_file_list, media, remove_duplicates): + def _solve_conflicts(self, conflict_file_list, remove_duplicates): has_errors = False unresolved_conflicts = [] while conflict_file_list != []: - file_paths = conflict_file_list.pop() - src_path = file_paths['src_path'] - src_checksum = file_paths['src_checksum'] - dest_path = file_paths['dest_path'] + src_path, dest_path, media = conflict_file_list.pop() # Try to sort the file result = self.sort_file(src_path, dest_path, remove_duplicates) # remove to conflict file list if file as be successfully copied or ignored n = 1 while result is False and n < 100: # Add appendix to the name - pre, ext = os.path.splitext(dest_path) + suffix = dest_path.suffix if n > 1: - regex = '_' + str(n-1) + ext - pre = re.split(regex, dest_path)[0] - dest_path = pre + '_' + str(n) + ext - # file_list[item]['dest_path'] = dest_path - file_paths['dest_path'] = dest_path + stem = dest_path.stem.rsplit('_' + str(n-1))[0] + else: + stem = dest_path.stem + dest_path = dest_path.parent / (stem + '_' + str(n) + suffix) result = self.sort_file(src_path, dest_path, remove_duplicates) n = n + 1 if result is False: # n > 100: - unresolved_conflicts.append(file_paths) + unresolved_conflicts.append((src_path, dest_path, media)) self.logger.error(f'{self.mode}: too many append for {dest_path}...') self.summary.append((src_path, False)) has_errors = True if result: self.summary, has_errors = self.record_file(src_path, - dest_path, src_checksum, media) + dest_path, media) if has_errors: return False @@ -468,13 +474,8 @@ class Collection(object): :param: Path :return: int """ - # if isinstance(path, str): - # # To remove trailing '/' chars - # path = Path(path) - # path = str(path) return len(path.parts) - 1 - # TODO move to utils.. or CPath.. def _get_files_in_path(self, path, glob='**/*', maxlevel=None, extensions=set()): """Recursively get files which match a path and extension. @@ -493,7 +494,8 @@ class Collection(object): else: level = len(subdirs.parts) - if file_path.parts[0] == '.ordigi': continue + if subdirs.parts != (): + if subdirs.parts[0] == '.ordigi': continue if maxlevel is not None: if level > maxlevel: continue @@ -513,25 +515,43 @@ class Collection(object): # return file_path and subdir yield file_path - def _create_directory(self, directory_path): + def _create_directory(self, directory_path, path, media): """Create a directory if it does not already exist. :param Path: A fully qualified path of the to create. :returns: bool """ try: - if directory_path.exists(): - return True - else: - if not self.dry_run: - directory_path.mkdir(parents=True, exist_ok=True) - self.logger.info(f'Create {directory_path}') - return True - except OSError: - # OSError is thrown for cases like no permission + parts = directory_path.relative_to(path).parts + except ValueError: + # directory_path is not the subpath of path pass + else: + for i, part in enumerate(parts): + dir_path = self.root / Path(*parts[0:i+1]) + if dir_path.is_file(): + self.logger.warning(f'Target directory {dir_path} is a file') + # Rename the src_file + if self.interactive: + prompt = [ + inquirer.Text('file_path', message="New name for"\ + f"'{dir_path.name}' file"), + ] + answers = inquirer.prompt(prompt, theme=self.theme) + file_path = dir_path.parent / answers['file_path'] + else: + file_path = dir_path.parent / (dir_path.name + '_file') - return False + self.logger.warning(f'Renaming {dir_path} to {file_path}') + shutil.move(dir_path, file_path) + for media in medias: + if media.file_path == dir_path: + media.file_path = file_path + break + + if not self.dry_run: + directory_path.mkdir(parents=True, exist_ok=True) + self.logger.info(f'Create {directory_path}') def create_directory(self, directory_path): """Create a directory if it does not already exist. @@ -608,7 +628,8 @@ class Collection(object): conflict_file_list = [] file_list = [x for x in self._get_files_in_path(path, glob=self.glob)] for src_path in file_list: - src_checksum = self.checksum(src_path) + # TODO to test it + media = Media(src_path, path, logger=self.logger) path_parts = src_path.relative_to(self.root).parts dedup_path = [] for path_part in path_parts: @@ -624,22 +645,18 @@ class Collection(object): # Dedup path dest_path = self.root.joinpath(*dedup_path) - self._create_directory(dest_path.parent.name) - - src_path = str(src_path) - dest_path = str(dest_path) + self._create_directory(dest_path.parent.name, path, media) result = self.sort_file(src_path, dest_path, remove_duplicates) if result: self.summary, has_errors = self.record_file(src_path, - dest_path, src_checksum, media) + dest_path, media) elif result is False: # There is conflict files - conflict_file_list.append({'src_path': src_path, - 'src_checksum': src_checksum, 'dest_path': dest_path}) + conflict_file_list.append(src_path, dest_path, copy(media)) if conflict_file_list != []: - result = self._solve_conflicts(conflict_file_list, media, remove_duplicates) + result = self._solve_conflicts(conflict_file_list, remove_duplicates) if not result: has_errors = True @@ -667,6 +684,8 @@ class Collection(object): Sort files into appropriate folder """ has_errors = False + result = False + files_data = [] for path in paths: path = self._check_path(path) conflict_file_list = [] @@ -675,43 +694,47 @@ class Collection(object): if self.interactive: file_list = self._modify_selection(file_list) print('Processing...') + + # Get medias and paths for src_path in file_list: - subdirs = src_path.relative_to(path).parent # Process files - src_checksum = self.checksum(src_path) media = Media(src_path, path, self.album_from_folder, ignore_tags, self.interactive, self.logger, self.use_date_filename, self.use_file_dates) if media: - metadata = media.get_metadata(loc, self.db, self.cache) + metadata = media.get_metadata(self.root, loc, self.db, self.cache) # Get the destination path according to metadata - file_path = Path(self.get_path(metadata, subdirs)) + relpath = Path(self.get_path(metadata)) else: # Keep same directory structure - file_path = src_path.relative_to(path) + relpath = src_path.relative_to(path) - dest_directory = self.root / file_path.parent - self._create_directory(dest_directory) + files_data.append((copy(media), relpath)) + # Create directories + for media, relpath in files_data: + dest_directory = self.root / relpath.parent + self._create_directory(dest_directory, path, media) + + # sort files and solve conflicts + for media, relpath in files_data: # Convert paths to string - src_path = str(src_path) - dest_path = str(self.root / file_path) + src_path = media.file_path + dest_path = self.root / relpath result = self.sort_file(src_path, dest_path, remove_duplicates) if result: self.summary, has_errors = self.record_file(src_path, - dest_path, src_checksum, media) + dest_path, media) elif result is False: # There is conflict files - conflict_file_list.append({'src_path': src_path, - 'src_checksum': src_checksum, 'dest_path': dest_path}) + conflict_file_list.append((src_path, dest_path, media)) if conflict_file_list != []: - result = self._solve_conflicts(conflict_file_list, media, - remove_duplicates) + result = self._solve_conflicts(conflict_file_list, remove_duplicates) - if not result: + if result is False: has_errors = True return self.summary, has_errors @@ -719,7 +742,7 @@ class Collection(object): def set_hash(self, result, src_path, dest_path, src_checksum): if result: # Check if file remain the same - result = self.checkcomp(dest_path, src_checksum) + result = self._checkcomp(dest_path, src_checksum) has_errors = False if result: if not self.dry_run: @@ -776,7 +799,7 @@ class Collection(object): for image in img_paths: if not os.path.isfile(image): continue - checksum1 = self.checksum(image) + checksum1 = utils.checksum(image) # Process files # media = Media(src_path, False, self.logger) # TODO compare metadata @@ -786,7 +809,7 @@ class Collection(object): moved_imgs = set() for img_path in i.find_similar(image, similarity): similar = True - checksum2 = self.checksum(img_path) + checksum2 = utils.checksum(img_path) # move image into directory name = os.path.splitext(os.path.basename(image))[0] directory_name = 'similar_to_' + name @@ -836,7 +859,7 @@ class Collection(object): img_path = os.path.join(dirname, subdir, file_name) if os.path.isdir(img_path): continue - checksum = self.checksum(img_path) + checksum = utils.checksum(img_path) dest_path = os.path.join(dirname, os.path.basename(img_path)) result = self.move_file(img_path, dest_path, checksum) if not result: diff --git a/ordigi/database.py b/ordigi/database.py index dee0a68..b057e08 100644 --- a/ordigi/database.py +++ b/ordigi/database.py @@ -45,6 +45,7 @@ class Sqlite: 'FilePath': 'text not null', 'Checksum': 'text', 'Album': 'text', + 'Title': 'text', 'LocationId': 'integer', 'DateMedia': 'text', 'DateOriginal': 'text', @@ -52,6 +53,7 @@ class Sqlite: 'DateModified': 'text', 'CameraMake': 'text', 'CameraModel': 'text', + 'OriginalName':'text', 'SrcPath': 'text', 'Subdirs': 'text', 'Filename': 'text' @@ -114,13 +116,13 @@ class Sqlite: return False def _run(self, query, n=0): - result = None + result = False result = self.cur.execute(query).fetchone() if result: return result[n] else: - return None + return False def _run_many(self, query): self.cur.executemany(query, table_list) @@ -223,7 +225,7 @@ class Sqlite: return self._run(query) def get_location_data(self, LocationId, data): - query = f"select {data} from location where ROWID='{LocationId}'" + query = f"select '{data}' from location where ROWID='{LocationId}'" return self._run(query) def get_location(self, Latitude, Longitude, column): @@ -277,3 +279,5 @@ class Sqlite: sql = f'delete from {table}' self.cur.execute(sql) self.con.commit() + + diff --git a/ordigi/media.py b/ordigi/media.py index d8a2999..09ec35a 100644 --- a/ordigi/media.py +++ b/ordigi/media.py @@ -6,13 +6,14 @@ import inquirer import logging import mimetypes import os +import re +import sys # import pprint # load modules from dateutil.parser import parse -import re from ordigi.exiftool import ExifTool, ExifToolCaching -from ordigi.utils import get_date_from_string +from ordigi import utils from ordigi import request @@ -34,17 +35,14 @@ class Media(): extensions = PHOTO + AUDIO + VIDEO - def __init__(self, file_path, root, album_from_folder=False, + def __init__(self, file_path, src_path, album_from_folder=False, ignore_tags=set(), interactive=False, logger=logging.getLogger(), use_date_filename=False, use_file_dates=False): """ :params: Path, Path, bool, set, bool, Logger """ - self.file_path = str(file_path) - self.root = str(root) - self.subdirs = str(file_path.relative_to(root).parent) - self.folder = str(file_path.parent.name) - self.filename = str(file_path.name) + self.file_path = file_path + self.src_path = src_path self.album_from_folder = album_from_folder self.exif_metadata = None @@ -222,7 +220,7 @@ class Media(): answers = inquirer.prompt(choices_list, theme=self.theme) if not answers['date_list']: answers = inquirer.prompt(prompt, theme=self.theme) - return get_date_from_string(answers['date_custom']) + return utils.get_date_from_string(answers['date_custom']) else: return answers['date_list'] @@ -237,9 +235,9 @@ class Media(): basename = os.path.splitext(self.metadata['filename'])[0] date_original = self.metadata['date_original'] if self.metadata['original_name']: - date_filename = get_date_from_string(self.metadata['original_name']) + date_filename = utils.get_date_from_string(self.metadata['original_name']) else: - date_filename = get_date_from_string(basename) + date_filename = utils.get_date_from_string(basename) date_original = self.metadata['date_original'] date_created = self.metadata['date_created'] @@ -324,76 +322,99 @@ class Media(): else: return answers['album'] - def get_metadata(self, loc=None, db=None, cache=False): + def get_metadata(self, root, loc=None, db=None, cache=False): """Get a dictionary of metadata from exif. All keys will be present and have a value of None if not obtained. :returns: dict """ - self.get_exif_metadata() - self.metadata = {} - # Retrieve selected metadata to dict - if not self.exif_metadata: - return self.metadata + self.metadata['checksum'] = utils.checksum(self.file_path) - for key in self.tags_keys: + db_checksum = False + location_id = None + + if cache and db: + # Check if file_path is a subpath of root + if str(self.file_path).startswith(str(root)): + relpath = os.path.relpath(self.file_path, root) + db_checksum = db.get_checksum(relpath) + file_checksum = self.metadata['checksum'] + # Check if checksum match + if db_checksum and db_checksum != file_checksum: + self.logger.error(f'{self.file_path} checksum has changed') + self.logger.error('(modified or corrupted file).') + self.logger.error(f'file_checksum={file_checksum},\ndb_checksum={db_checksum}') + self.logger.info('Use --reset-cache, check database integrity or try to restore the file') + # We d'ont want to silently ignore or correct this without + # resetting the cache as is could be due to file corruption + sys.exit(1) + + if db_checksum: + # Get metadata from db formated_data = None - for value in self._get_key_values(key): + for key in self.tags_keys: + if key in ('latitude', 'longitude', 'latitude_ref', + 'longitude_ref', 'file_path'): + continue + label = utils.snake2camel(key) + value = db.get_metadata_data(relpath, label) if 'date' in key: formated_data = self.get_date_format(value) - elif key in ('latitude', 'longitude'): - formated_data = self.get_coordinates(key, value) else: - if value is not None and value != '': - formated_data = value + formated_data = value + self.metadata[key] = formated_data + for key in 'src_path', 'subdirs', 'filename': + label = utils.snake2camel(key) + formated_data = db.get_metadata_data(relpath, label) + self.metadata[key] = formated_data + + location_id = db.get_metadata_data(relpath, 'LocationId') + else: + self.metadata['src_path'] = str(self.src_path) + self.metadata['subdirs'] = str(self.file_path.relative_to(self.src_path).parent) + self.metadata['filename'] = self.file_path.name + # Get metadata from exif + + self.get_exif_metadata() + + # Retrieve selected metadata to dict + if not self.exif_metadata: + return self.metadata + + for key in self.tags_keys: + formated_data = None + for value in self._get_key_values(key): + if 'date' in key: + formated_data = self.get_date_format(value) + elif key in ('latitude', 'longitude'): + formated_data = self.get_coordinates(key, value) else: - formated_data = None - if formated_data: - # Use this data and break - break + if value is not None and value != '': + formated_data = value + else: + formated_data = None + if formated_data: + # Use this data and break + break - self.metadata[key] = formated_data - - self.metadata['src_path'] = self.root - self.metadata['subdirs'] = self.subdirs - self.metadata['filename'] = self.filename - - original_name = self.metadata['original_name'] - if not original_name or original_name == '': - self.set_value('original_name', self.filename) + self.metadata[key] = formated_data self.metadata['date_media'] = self.get_date_media() + self.metadata['location_id'] = location_id - if self.album_from_folder: - album = self.metadata['album'] - folder = self.folder - if album and album != '': - if self.interactive: - answer = self._set_album(album, folder) - if answer == 'c': - self.metadata['album'] = input('album=') - self.set_value('album', folder) - if answer == 'a': - self.metadata['album'] = album - elif answer == 'f': - self.metadata['album'] = folder - - if not album or album == '': - self.metadata['album'] = folder - self.set_value('album', folder) - - loc_keys = ('latitude', 'longitude', 'city', 'state', 'country', 'default') - location_id = None - if cache and db: - location_id = db.get_metadata_data(self.file_path, 'LocationId') + loc_keys = ('latitude', 'longitude', 'latitude_ref', 'longitude_ref', 'city', 'state', 'country', 'default') if location_id: for key in loc_keys: # use str to convert non string format data like latitude and # longitude - self.metadata[key] = str(db.get_location(location_id, key.capitalize())) + self.metadata[key] = str(db.get_location_data(location_id, + utils.snake2camel(key))) elif loc: + for key in 'latitude', 'longitude', 'latitude_ref', 'longitude_ref': + self.metadata[key] = None + place_name = loc.place_name( self.metadata['latitude'], self.metadata['longitude'], @@ -411,7 +432,22 @@ class Media(): for key in loc_keys: self.metadata[key] = None - self.metadata['location_id'] = location_id + + if self.album_from_folder: + album = self.metadata['album'] + folder = self.file_path.parent.name + if album and album != '': + if self.interactive: + answer = self._set_album(album, folder) + if answer == 'c': + self.metadata['album'] = input('album=') + if answer == 'a': + self.metadata['album'] = album + elif answer == 'f': + self.metadata['album'] = folder + + if not album or album == '': + self.metadata['album'] = folder return self.metadata @@ -496,7 +532,7 @@ class Media(): :returns: bool """ - return self.set_value('album', self.folder) + return self.set_value('album', self.file_path.parent.name) def get_all_subclasses(cls=None): diff --git a/ordigi/utils.py b/ordigi/utils.py index fe674cd..c861706 100644 --- a/ordigi/utils.py +++ b/ordigi/utils.py @@ -1,7 +1,31 @@ from math import radians, cos, sqrt +from datetime import datetime +import hashlib import re + +def checksum(file_path, blocksize=65536): + """Create a hash value for the given file. + + See http://stackoverflow.com/a/3431835/1318758. + + :param str file_path: Path to the file to create a hash for. + :param int blocksize: Read blocks of this size from the file when + creating the hash. + :returns: str or None + """ + hasher = hashlib.sha256() + with open(file_path, 'rb') as f: + buf = f.read(blocksize) + + while len(buf) > 0: + hasher.update(buf) + buf = f.read(blocksize) + return hasher.hexdigest() + return None + + def distance_between_two_points(lat1, lon1, lat2, lon2): # As threshold is quite small use simple math # From http://stackoverflow.com/questions/15736995/how-can-i-quickly-estimate-the-distance-between-two-latitude-longitude-points # noqa @@ -37,6 +61,7 @@ def get_date_regex(string, user_regex=None): for i, rx in regex.items(): yield i, rx + def get_date_from_string(string, user_regex=None): # If missing datetime from EXIF data check if filename is in datetime format. # For this use a user provided regex if possible. @@ -75,17 +100,14 @@ def get_date_from_string(string, user_regex=None): return date + # Conversion functions # source:https://rodic.fr/blog/camelcase-and-snake_case-strings-conversion-with-python/ def snake2camel(name): return re.sub(r'(?:^|_)([a-z])', lambda x: x.group(1).upper(), name) -def snake2camelback(name): - return re.sub(r'_([a-z])', lambda x: x.group(1).upper(), name) - def camel2snake(name): return name[0].lower() + re.sub(r'(?!^)[A-Z]', lambda x: '_' + x.group(0).lower(), name[1:]) -def camelback2snake(name): - return re.sub(r'[A-Z]', lambda x: '_' + x.group(0).lower(), name) + diff --git a/tests/test_collection.py b/tests/test_collection.py index 266ac24..a84b7ea 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -16,7 +16,7 @@ from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exif from ordigi.collection import Collection from ordigi.geolocation import GeoLocation from ordigi.media import Media -from ordigi.utils import get_date_regex +from ordigi import utils class TestCollection: @@ -75,8 +75,7 @@ class TestCollection: for mask in masks: matched = re.search(regex, mask) if matched: - part = collection.get_part(item, mask[1:-1], - metadata, subdirs) + part = collection.get_part(item, mask[1:-1], metadata) # check if part is correct assert isinstance(part, str), file_path if item == 'basename': @@ -93,7 +92,7 @@ class TestCollection: assert part == file_path.suffix[1:], file_path elif item == 'name': expected_part = file_path.stem - for i, rx in get_date_regex(expected_part): + for i, rx in utils.get_date_regex(expected_part): part = re.sub(rx, '', expected_part) assert part == expected_part, file_path elif item == 'custom': @@ -151,11 +150,11 @@ class TestCollection: src_path = Path(self.src_path, 'test_exif', 'photo.png') name = 'photo_' + mode + '.png' dest_path = Path(tmp_path, name) - src_checksum = collection.checksum(src_path) + src_checksum = utils.checksum(src_path) result_copy = collection.sort_file(src_path, dest_path) assert result_copy # Ensure files remain the same - assert collection.checkcomp(dest_path, src_checksum) + assert collection._checkcomp(dest_path, src_checksum) if mode == 'copy': assert src_path.exists() diff --git a/tests/test_database.py b/tests/test_database.py index 27d48c9..4763588 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -18,13 +18,15 @@ class TestSqlite: 'FilePath': 'file_path', 'Checksum': 'checksum', 'Album': 'album', + 'Title': 'title', 'LocationId': 2, - 'DateTaken': datetime(2012, 3, 27), + 'DateMedia': datetime(2012, 3, 27), 'DateOriginal': datetime(2013, 3, 27), 'DateCreated': 'date_created', 'DateModified': 'date_modified', 'CameraMake': 'camera_make', 'CameraModel': 'camera_model', + 'OriginalName':'original_name', 'SrcPath': 'src_path', 'Subdirs': 'subdirs', 'Filename': 'filename' @@ -62,7 +64,7 @@ class TestSqlite: def test_add_metadata_data(self): result = tuple(self.sqlite.cur.execute("""select * from metadata where rowid=1""").fetchone()) - assert result == ('file_path', 'checksum', 'album', 2, '2012-03-27 00:00:00', '2013-03-27 00:00:00', 'date_created', 'date_modified', 'camera_make', 'camera_model', 'src_path', 'subdirs', 'filename') + assert result == ('file_path', 'checksum', 'album', 'title', 2, '2012-03-27 00:00:00', '2013-03-27 00:00:00', 'date_created', 'date_modified', 'camera_make', 'camera_model', 'original_name', 'src_path', 'subdirs', 'filename') def test_get_checksum(self): assert not self.sqlite.get_checksum('invalid') diff --git a/tests/test_media.py b/tests/test_media.py index a0d9c4f..1adb745 100644 --- a/tests/test_media.py +++ b/tests/test_media.py @@ -28,48 +28,50 @@ class TestMetadata: self.exif_data = ExifTool(file_path).asdict() yield file_path, Media(file_path, self.src_path, album_from_folder=True, ignore_tags=self.ignore_tags) - def test_get_metadata(self): + def test_get_metadata(self, tmp_path): for file_path, media in self.get_media(): - result = media.get_metadata() - assert result - assert isinstance(media.metadata, dict), media.metadata - #check if all tags key are present - for tags_key, tags in media.tags_keys.items(): - assert tags_key in media.metadata - for tag in tags: - for tag_regex in self.ignore_tags: - assert not re.match(tag_regex, tag) - # Check for valid type - for key, value in media.metadata.items(): - if value or value == '': - if 'date' in key: - assert isinstance(value, datetime) - elif key in ('latitude', 'longitude'): - assert isinstance(value, float) + # test get metadata from cache or exif + for root in self.src_path, tmp_path: + result = media.get_metadata(root) + assert result + assert isinstance(media.metadata, dict), media.metadata + #check if all tags key are present + for tags_key, tags in media.tags_keys.items(): + assert tags_key in media.metadata + for tag in tags: + for tag_regex in self.ignore_tags: + assert not re.match(tag_regex, tag) + # Check for valid type + for key, value in media.metadata.items(): + if value or value == '': + if 'date' in key: + assert isinstance(value, datetime) + elif key in ('latitude', 'longitude'): + assert isinstance(value, float) + else: + assert isinstance(value, str) else: - assert isinstance(value, str) - else: - assert value is None + assert value is None - if key == 'album': - for album in media._get_key_values('album'): - if album is not None and album != '': - assert value == album + if key == 'album': + for album in media._get_key_values('album'): + if album is not None and album != '': + assert value == album + break + else: + assert value == file_path.parent.name + + # Check if has_exif_data() is True if 'date_original' key is + # present, else check if it's false + has_exif_data = False + for tag in media.tags_keys['date_original']: + if tag in media.exif_metadata: + if media.get_date_format(media.exif_metadata[tag]): + has_exif_data = True + assert media.has_exif_data() break - else: - assert value == file_path.parent.name - - # Check if has_exif_data() is True if 'date_original' key is - # present, else check if it's false - has_exif_data = False - for tag in media.tags_keys['date_original']: - if tag in media.exif_metadata: - if media.get_date_format(media.exif_metadata[tag]): - has_exif_data = True - assert media.has_exif_data() - break - if has_exif_data == False: - assert not media.has_exif_data() + if has_exif_data == False: + assert not media.has_exif_data() def test_get_date_media(self): # collection = Collection(tmp_path, self.path_format, @@ -78,7 +80,7 @@ class TestMetadata: exif_data = ExifToolCaching(str(file_path)).asdict() media = Media(file_path, self.src_path, use_date_filename=True, use_file_dates=True) - metadata = media.get_metadata() + metadata = media.get_metadata(self.src_path) date_media = media.get_date_media() date_filename = None