Ability to retrieve metadata from Sqlite database and fixes

This commit is contained in:
Cédric Leporcq 2021-09-26 17:44:13 +02:00
parent 86d88b72c8
commit 8e8afe9a89
7 changed files with 314 additions and 226 deletions

View File

@ -3,9 +3,10 @@ General file system methods.
"""
from builtins import object
from copy import copy
from datetime import datetime, timedelta
import filecmp
from fnmatch import fnmatch
import hashlib
import inquirer
import logging
import os
@ -13,7 +14,6 @@ from pathlib import Path, PurePath
import re
import sys
import shutil
from datetime import datetime, timedelta
from ordigi import media
from ordigi.database import Sqlite
@ -21,7 +21,7 @@ from ordigi.media import Media, get_all_subclasses
from ordigi.images import Image, Images
from ordigi import request
from ordigi.summary import Summary
from ordigi.utils import get_date_regex, camel2snake
from ordigi import utils
class Collection(object):
@ -35,7 +35,7 @@ class Collection(object):
# Attributes
self.root = Path(root).expanduser().absolute()
if not os.path.exists(self.root):
if not self.root.exists():
logger.error(f'Directory {self.root} does not exist')
sys.exit(1)
@ -61,6 +61,8 @@ class Collection(object):
self.logger = logger
self.max_deep = max_deep
self.mode = mode
# List to store media metadata
self.medias = []
self.summary = Summary()
self.use_date_filename = use_date_filename
self.use_file_dates = use_file_dates
@ -140,7 +142,7 @@ class Collection(object):
# select matched folders
return folders[begin:end]
def get_part(self, item, mask, metadata, subdirs):
def get_part(self, item, mask, metadata):
"""Parse a specific folder's name given a mask and metadata.
:param item: Name of the item as defined in the path (i.e. date from %date)
@ -160,7 +162,7 @@ class Collection(object):
elif item == 'name':
# Remove date prefix added to the name.
part = basename
for i, rx in get_date_regex(basename):
for i, rx in utils.get_date_regex(basename):
part = re.sub(rx, '', part)
elif item == 'date':
date = metadata['date_media']
@ -169,10 +171,10 @@ class Collection(object):
date = self._check_for_early_morning_photos(date)
part = date.strftime(mask)
elif item == 'folder':
part = os.path.basename(subdirs)
part = os.path.basename(metadata['subdirs'])
elif item == 'folders':
folders = subdirs.parts
folders = Path(metadata['subdirs']).parts
folders = self._get_folders(folders, mask)
part = os.path.join(*folders)
@ -189,14 +191,13 @@ class Collection(object):
return part
def get_path_part(self, this_part, metadata, subdirs):
def get_path_part(self, this_part, metadata):
"""Build path part
:returns: part (string)"""
for item, regex in self.items.items():
matched = re.search(regex, this_part)
if matched:
part = self.get_part(item, matched.group()[1:-1], metadata,
subdirs)
part = self.get_part(item, matched.group()[1:-1], metadata)
part = part.strip()
@ -215,9 +216,15 @@ class Collection(object):
else:
this_part = re.sub(regex, part, this_part)
# Delete separator char at the begining of the string if any:
if this_part:
regex = '[-_ .]'
if re.match(regex, this_part[0]):
this_part = this_part[1:]
return this_part
def get_path(self, metadata, subdirs, whitespace_sub='_'):
def get_path(self, metadata, whitespace_sub='_'):
"""path_format: {%Y-%d-%m}/%u{city}/{album}
Returns file path.
@ -230,7 +237,7 @@ class Collection(object):
for path_part in path_parts:
this_parts = path_part.split('|')
for this_part in this_parts:
this_part = self.get_path_part(this_part, metadata, subdirs)
this_part = self.get_path_part(this_part, metadata)
if this_part:
# Check if all masks are substituted
@ -244,7 +251,9 @@ class Collection(object):
break
# Else we continue for fallbacks
if len(path[-1]) == 0 or re.match(r'^\..*', path[-1]):
if path == []:
path = [ metadata['filename'] ]
elif len(path[-1]) == 0 or re.match(r'^\..*', path[-1]):
path[-1] = metadata['filename']
path_string = os.path.join(*path)
@ -257,80 +266,72 @@ class Collection(object):
return None
def checksum(self, file_path, blocksize=65536):
"""Create a hash value for the given file.
See http://stackoverflow.com/a/3431835/1318758.
:param str file_path: Path to the file to create a hash for.
:param int blocksize: Read blocks of this size from the file when
creating the hash.
:returns: str or None
"""
hasher = hashlib.sha256()
with open(file_path, 'rb') as f:
buf = f.read(blocksize)
while len(buf) > 0:
hasher.update(buf)
buf = f.read(blocksize)
return hasher.hexdigest()
return None
def checkcomp(self, dest_path, src_checksum):
def _checkcomp(self, dest_path, src_checksum):
"""Check file.
"""
# src_checksum = self.checksum(src_path)
if self.dry_run:
return src_checksum
return True
dest_checksum = self.checksum(dest_path)
dest_checksum = utils.checksum(dest_path)
if dest_checksum != src_checksum:
self.logger.info(f'Source checksum and destination checksum are not the same')
return False
return src_checksum
return True
def _get_row_data(self, table, metadata):
def _format_row_data(self, table, metadata):
row_data = {}
for title in self.db.tables[table]['header']:
key = camel2snake(title)
key = utils.camel2snake(title)
# Convert Path type to str
row_data[title] = metadata[key]
return row_data
def _add_db_data(self, dest_path, metadata):
loc_values = self._get_row_data('location', metadata)
loc_values = self._format_row_data('location', metadata)
metadata['location_id'] = self.db.add_row('location', loc_values)
row_data = self._get_row_data('metadata', metadata)
row_data = self._format_row_data('metadata', metadata)
self.db.add_row('metadata', row_data)
def _update_exif_data(self, dest_path, media):
updated = False
if self.album_from_folder:
media.file_path = dest_path
media.set_album_from_folder()
updated = True
if media.metadata['original_name'] in (False, ''):
media.set_value('original_name', self.filename)
updated = True
if self.album_from_folder:
album = media.metadata['album']
if album and album != '':
media.set_value('album', album)
updated = True
if updated:
return True
return False
def record_file(self, src_path, dest_path, src_checksum, media):
def record_file(self, src_path, dest_path, media):
"""Check file and record the file to db"""
# Check if file remain the same
checksum = self.checkcomp(dest_path, src_checksum)
has_errors = False
if checksum:
checksum = media.metadata['checksum']
if self._checkcomp(dest_path, checksum):
# change media file_path to dest_path
media.file_path = dest_path
if not self.dry_run:
updated = self._update_exif_data(dest_path, media)
if updated:
dest_checksum = self.checksum(dest_path)
checksum = utils.checksum(dest_path)
media.metadata['checksum'] = checksum
media.metadata['file_path'] = os.path.relpath(dest_path,
self.root)
media.metadata['checksum'] = checksum
self._add_db_data(dest_path, media.metadata)
self.summary.append((src_path, dest_path))
@ -349,7 +350,13 @@ class Collection(object):
self.logger.info(f'remove: {file_path}')
def sort_file(self, src_path, dest_path, remove_duplicates=False):
'''Copy or move file to dest_path.'''
'''
Copy or move file to dest_path.
Return True if success, None is no filesystem action, False if
conflicts.
:params: str, str, bool
:returns: bool or None
'''
mode = self.mode
dry_run = self.dry_run
@ -358,7 +365,10 @@ class Collection(object):
if(src_path == dest_path):
self.logger.info(f'File {dest_path} already sorted')
return None
elif os.path.isfile(dest_path):
elif dest_path.is_dir():
self.logger.warning(f'File {dest_path} is a existing directory')
return False
elif dest_path.is_file():
self.logger.warning(f'File {dest_path} already exist')
if remove_duplicates:
if filecmp.cmp(src_path, dest_path):
@ -383,40 +393,36 @@ class Collection(object):
self.logger.info(f'copy: {src_path} -> {dest_path}')
return True
def _solve_conflicts(self, conflict_file_list, media, remove_duplicates):
def _solve_conflicts(self, conflict_file_list, remove_duplicates):
has_errors = False
unresolved_conflicts = []
while conflict_file_list != []:
file_paths = conflict_file_list.pop()
src_path = file_paths['src_path']
src_checksum = file_paths['src_checksum']
dest_path = file_paths['dest_path']
src_path, dest_path, media = conflict_file_list.pop()
# Try to sort the file
result = self.sort_file(src_path, dest_path, remove_duplicates)
# remove to conflict file list if file as be successfully copied or ignored
n = 1
while result is False and n < 100:
# Add appendix to the name
pre, ext = os.path.splitext(dest_path)
suffix = dest_path.suffix
if n > 1:
regex = '_' + str(n-1) + ext
pre = re.split(regex, dest_path)[0]
dest_path = pre + '_' + str(n) + ext
# file_list[item]['dest_path'] = dest_path
file_paths['dest_path'] = dest_path
stem = dest_path.stem.rsplit('_' + str(n-1))[0]
else:
stem = dest_path.stem
dest_path = dest_path.parent / (stem + '_' + str(n) + suffix)
result = self.sort_file(src_path, dest_path, remove_duplicates)
n = n + 1
if result is False:
# n > 100:
unresolved_conflicts.append(file_paths)
unresolved_conflicts.append((src_path, dest_path, media))
self.logger.error(f'{self.mode}: too many append for {dest_path}...')
self.summary.append((src_path, False))
has_errors = True
if result:
self.summary, has_errors = self.record_file(src_path,
dest_path, src_checksum, media)
dest_path, media)
if has_errors:
return False
@ -468,13 +474,8 @@ class Collection(object):
:param: Path
:return: int
"""
# if isinstance(path, str):
# # To remove trailing '/' chars
# path = Path(path)
# path = str(path)
return len(path.parts) - 1
# TODO move to utils.. or CPath..
def _get_files_in_path(self, path, glob='**/*', maxlevel=None, extensions=set()):
"""Recursively get files which match a path and extension.
@ -493,7 +494,8 @@ class Collection(object):
else:
level = len(subdirs.parts)
if file_path.parts[0] == '.ordigi': continue
if subdirs.parts != ():
if subdirs.parts[0] == '.ordigi': continue
if maxlevel is not None:
if level > maxlevel: continue
@ -513,25 +515,43 @@ class Collection(object):
# return file_path and subdir
yield file_path
def _create_directory(self, directory_path):
def _create_directory(self, directory_path, path, media):
"""Create a directory if it does not already exist.
:param Path: A fully qualified path of the to create.
:returns: bool
"""
try:
if directory_path.exists():
return True
else:
if not self.dry_run:
directory_path.mkdir(parents=True, exist_ok=True)
self.logger.info(f'Create {directory_path}')
return True
except OSError:
# OSError is thrown for cases like no permission
parts = directory_path.relative_to(path).parts
except ValueError:
# directory_path is not the subpath of path
pass
else:
for i, part in enumerate(parts):
dir_path = self.root / Path(*parts[0:i+1])
if dir_path.is_file():
self.logger.warning(f'Target directory {dir_path} is a file')
# Rename the src_file
if self.interactive:
prompt = [
inquirer.Text('file_path', message="New name for"\
f"'{dir_path.name}' file"),
]
answers = inquirer.prompt(prompt, theme=self.theme)
file_path = dir_path.parent / answers['file_path']
else:
file_path = dir_path.parent / (dir_path.name + '_file')
return False
self.logger.warning(f'Renaming {dir_path} to {file_path}')
shutil.move(dir_path, file_path)
for media in medias:
if media.file_path == dir_path:
media.file_path = file_path
break
if not self.dry_run:
directory_path.mkdir(parents=True, exist_ok=True)
self.logger.info(f'Create {directory_path}')
def create_directory(self, directory_path):
"""Create a directory if it does not already exist.
@ -608,7 +628,8 @@ class Collection(object):
conflict_file_list = []
file_list = [x for x in self._get_files_in_path(path, glob=self.glob)]
for src_path in file_list:
src_checksum = self.checksum(src_path)
# TODO to test it
media = Media(src_path, path, logger=self.logger)
path_parts = src_path.relative_to(self.root).parts
dedup_path = []
for path_part in path_parts:
@ -624,22 +645,18 @@ class Collection(object):
# Dedup path
dest_path = self.root.joinpath(*dedup_path)
self._create_directory(dest_path.parent.name)
src_path = str(src_path)
dest_path = str(dest_path)
self._create_directory(dest_path.parent.name, path, media)
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
self.summary, has_errors = self.record_file(src_path,
dest_path, src_checksum, media)
dest_path, media)
elif result is False:
# There is conflict files
conflict_file_list.append({'src_path': src_path,
'src_checksum': src_checksum, 'dest_path': dest_path})
conflict_file_list.append(src_path, dest_path, copy(media))
if conflict_file_list != []:
result = self._solve_conflicts(conflict_file_list, media, remove_duplicates)
result = self._solve_conflicts(conflict_file_list, remove_duplicates)
if not result:
has_errors = True
@ -667,6 +684,8 @@ class Collection(object):
Sort files into appropriate folder
"""
has_errors = False
result = False
files_data = []
for path in paths:
path = self._check_path(path)
conflict_file_list = []
@ -675,43 +694,47 @@ class Collection(object):
if self.interactive:
file_list = self._modify_selection(file_list)
print('Processing...')
# Get medias and paths
for src_path in file_list:
subdirs = src_path.relative_to(path).parent
# Process files
src_checksum = self.checksum(src_path)
media = Media(src_path, path, self.album_from_folder,
ignore_tags, self.interactive, self.logger,
self.use_date_filename, self.use_file_dates)
if media:
metadata = media.get_metadata(loc, self.db, self.cache)
metadata = media.get_metadata(self.root, loc, self.db, self.cache)
# Get the destination path according to metadata
file_path = Path(self.get_path(metadata, subdirs))
relpath = Path(self.get_path(metadata))
else:
# Keep same directory structure
file_path = src_path.relative_to(path)
relpath = src_path.relative_to(path)
dest_directory = self.root / file_path.parent
self._create_directory(dest_directory)
files_data.append((copy(media), relpath))
# Create directories
for media, relpath in files_data:
dest_directory = self.root / relpath.parent
self._create_directory(dest_directory, path, media)
# sort files and solve conflicts
for media, relpath in files_data:
# Convert paths to string
src_path = str(src_path)
dest_path = str(self.root / file_path)
src_path = media.file_path
dest_path = self.root / relpath
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
self.summary, has_errors = self.record_file(src_path,
dest_path, src_checksum, media)
dest_path, media)
elif result is False:
# There is conflict files
conflict_file_list.append({'src_path': src_path,
'src_checksum': src_checksum, 'dest_path': dest_path})
conflict_file_list.append((src_path, dest_path, media))
if conflict_file_list != []:
result = self._solve_conflicts(conflict_file_list, media,
remove_duplicates)
result = self._solve_conflicts(conflict_file_list, remove_duplicates)
if not result:
if result is False:
has_errors = True
return self.summary, has_errors
@ -719,7 +742,7 @@ class Collection(object):
def set_hash(self, result, src_path, dest_path, src_checksum):
if result:
# Check if file remain the same
result = self.checkcomp(dest_path, src_checksum)
result = self._checkcomp(dest_path, src_checksum)
has_errors = False
if result:
if not self.dry_run:
@ -776,7 +799,7 @@ class Collection(object):
for image in img_paths:
if not os.path.isfile(image):
continue
checksum1 = self.checksum(image)
checksum1 = utils.checksum(image)
# Process files
# media = Media(src_path, False, self.logger)
# TODO compare metadata
@ -786,7 +809,7 @@ class Collection(object):
moved_imgs = set()
for img_path in i.find_similar(image, similarity):
similar = True
checksum2 = self.checksum(img_path)
checksum2 = utils.checksum(img_path)
# move image into directory
name = os.path.splitext(os.path.basename(image))[0]
directory_name = 'similar_to_' + name
@ -836,7 +859,7 @@ class Collection(object):
img_path = os.path.join(dirname, subdir, file_name)
if os.path.isdir(img_path):
continue
checksum = self.checksum(img_path)
checksum = utils.checksum(img_path)
dest_path = os.path.join(dirname, os.path.basename(img_path))
result = self.move_file(img_path, dest_path, checksum)
if not result:

View File

@ -45,6 +45,7 @@ class Sqlite:
'FilePath': 'text not null',
'Checksum': 'text',
'Album': 'text',
'Title': 'text',
'LocationId': 'integer',
'DateMedia': 'text',
'DateOriginal': 'text',
@ -52,6 +53,7 @@ class Sqlite:
'DateModified': 'text',
'CameraMake': 'text',
'CameraModel': 'text',
'OriginalName':'text',
'SrcPath': 'text',
'Subdirs': 'text',
'Filename': 'text'
@ -114,13 +116,13 @@ class Sqlite:
return False
def _run(self, query, n=0):
result = None
result = False
result = self.cur.execute(query).fetchone()
if result:
return result[n]
else:
return None
return False
def _run_many(self, query):
self.cur.executemany(query, table_list)
@ -223,7 +225,7 @@ class Sqlite:
return self._run(query)
def get_location_data(self, LocationId, data):
query = f"select {data} from location where ROWID='{LocationId}'"
query = f"select '{data}' from location where ROWID='{LocationId}'"
return self._run(query)
def get_location(self, Latitude, Longitude, column):
@ -277,3 +279,5 @@ class Sqlite:
sql = f'delete from {table}'
self.cur.execute(sql)
self.con.commit()

View File

@ -6,13 +6,14 @@ import inquirer
import logging
import mimetypes
import os
import re
import sys
# import pprint
# load modules
from dateutil.parser import parse
import re
from ordigi.exiftool import ExifTool, ExifToolCaching
from ordigi.utils import get_date_from_string
from ordigi import utils
from ordigi import request
@ -34,17 +35,14 @@ class Media():
extensions = PHOTO + AUDIO + VIDEO
def __init__(self, file_path, root, album_from_folder=False,
def __init__(self, file_path, src_path, album_from_folder=False,
ignore_tags=set(), interactive=False, logger=logging.getLogger(),
use_date_filename=False, use_file_dates=False):
"""
:params: Path, Path, bool, set, bool, Logger
"""
self.file_path = str(file_path)
self.root = str(root)
self.subdirs = str(file_path.relative_to(root).parent)
self.folder = str(file_path.parent.name)
self.filename = str(file_path.name)
self.file_path = file_path
self.src_path = src_path
self.album_from_folder = album_from_folder
self.exif_metadata = None
@ -222,7 +220,7 @@ class Media():
answers = inquirer.prompt(choices_list, theme=self.theme)
if not answers['date_list']:
answers = inquirer.prompt(prompt, theme=self.theme)
return get_date_from_string(answers['date_custom'])
return utils.get_date_from_string(answers['date_custom'])
else:
return answers['date_list']
@ -237,9 +235,9 @@ class Media():
basename = os.path.splitext(self.metadata['filename'])[0]
date_original = self.metadata['date_original']
if self.metadata['original_name']:
date_filename = get_date_from_string(self.metadata['original_name'])
date_filename = utils.get_date_from_string(self.metadata['original_name'])
else:
date_filename = get_date_from_string(basename)
date_filename = utils.get_date_from_string(basename)
date_original = self.metadata['date_original']
date_created = self.metadata['date_created']
@ -324,76 +322,99 @@ class Media():
else:
return answers['album']
def get_metadata(self, loc=None, db=None, cache=False):
def get_metadata(self, root, loc=None, db=None, cache=False):
"""Get a dictionary of metadata from exif.
All keys will be present and have a value of None if not obtained.
:returns: dict
"""
self.get_exif_metadata()
self.metadata = {}
# Retrieve selected metadata to dict
if not self.exif_metadata:
return self.metadata
self.metadata['checksum'] = utils.checksum(self.file_path)
for key in self.tags_keys:
db_checksum = False
location_id = None
if cache and db:
# Check if file_path is a subpath of root
if str(self.file_path).startswith(str(root)):
relpath = os.path.relpath(self.file_path, root)
db_checksum = db.get_checksum(relpath)
file_checksum = self.metadata['checksum']
# Check if checksum match
if db_checksum and db_checksum != file_checksum:
self.logger.error(f'{self.file_path} checksum has changed')
self.logger.error('(modified or corrupted file).')
self.logger.error(f'file_checksum={file_checksum},\ndb_checksum={db_checksum}')
self.logger.info('Use --reset-cache, check database integrity or try to restore the file')
# We d'ont want to silently ignore or correct this without
# resetting the cache as is could be due to file corruption
sys.exit(1)
if db_checksum:
# Get metadata from db
formated_data = None
for value in self._get_key_values(key):
for key in self.tags_keys:
if key in ('latitude', 'longitude', 'latitude_ref',
'longitude_ref', 'file_path'):
continue
label = utils.snake2camel(key)
value = db.get_metadata_data(relpath, label)
if 'date' in key:
formated_data = self.get_date_format(value)
elif key in ('latitude', 'longitude'):
formated_data = self.get_coordinates(key, value)
else:
if value is not None and value != '':
formated_data = value
formated_data = value
self.metadata[key] = formated_data
for key in 'src_path', 'subdirs', 'filename':
label = utils.snake2camel(key)
formated_data = db.get_metadata_data(relpath, label)
self.metadata[key] = formated_data
location_id = db.get_metadata_data(relpath, 'LocationId')
else:
self.metadata['src_path'] = str(self.src_path)
self.metadata['subdirs'] = str(self.file_path.relative_to(self.src_path).parent)
self.metadata['filename'] = self.file_path.name
# Get metadata from exif
self.get_exif_metadata()
# Retrieve selected metadata to dict
if not self.exif_metadata:
return self.metadata
for key in self.tags_keys:
formated_data = None
for value in self._get_key_values(key):
if 'date' in key:
formated_data = self.get_date_format(value)
elif key in ('latitude', 'longitude'):
formated_data = self.get_coordinates(key, value)
else:
formated_data = None
if formated_data:
# Use this data and break
break
if value is not None and value != '':
formated_data = value
else:
formated_data = None
if formated_data:
# Use this data and break
break
self.metadata[key] = formated_data
self.metadata['src_path'] = self.root
self.metadata['subdirs'] = self.subdirs
self.metadata['filename'] = self.filename
original_name = self.metadata['original_name']
if not original_name or original_name == '':
self.set_value('original_name', self.filename)
self.metadata[key] = formated_data
self.metadata['date_media'] = self.get_date_media()
self.metadata['location_id'] = location_id
if self.album_from_folder:
album = self.metadata['album']
folder = self.folder
if album and album != '':
if self.interactive:
answer = self._set_album(album, folder)
if answer == 'c':
self.metadata['album'] = input('album=')
self.set_value('album', folder)
if answer == 'a':
self.metadata['album'] = album
elif answer == 'f':
self.metadata['album'] = folder
if not album or album == '':
self.metadata['album'] = folder
self.set_value('album', folder)
loc_keys = ('latitude', 'longitude', 'city', 'state', 'country', 'default')
location_id = None
if cache and db:
location_id = db.get_metadata_data(self.file_path, 'LocationId')
loc_keys = ('latitude', 'longitude', 'latitude_ref', 'longitude_ref', 'city', 'state', 'country', 'default')
if location_id:
for key in loc_keys:
# use str to convert non string format data like latitude and
# longitude
self.metadata[key] = str(db.get_location(location_id, key.capitalize()))
self.metadata[key] = str(db.get_location_data(location_id,
utils.snake2camel(key)))
elif loc:
for key in 'latitude', 'longitude', 'latitude_ref', 'longitude_ref':
self.metadata[key] = None
place_name = loc.place_name(
self.metadata['latitude'],
self.metadata['longitude'],
@ -411,7 +432,22 @@ class Media():
for key in loc_keys:
self.metadata[key] = None
self.metadata['location_id'] = location_id
if self.album_from_folder:
album = self.metadata['album']
folder = self.file_path.parent.name
if album and album != '':
if self.interactive:
answer = self._set_album(album, folder)
if answer == 'c':
self.metadata['album'] = input('album=')
if answer == 'a':
self.metadata['album'] = album
elif answer == 'f':
self.metadata['album'] = folder
if not album or album == '':
self.metadata['album'] = folder
return self.metadata
@ -496,7 +532,7 @@ class Media():
:returns: bool
"""
return self.set_value('album', self.folder)
return self.set_value('album', self.file_path.parent.name)
def get_all_subclasses(cls=None):

View File

@ -1,7 +1,31 @@
from math import radians, cos, sqrt
from datetime import datetime
import hashlib
import re
def checksum(file_path, blocksize=65536):
"""Create a hash value for the given file.
See http://stackoverflow.com/a/3431835/1318758.
:param str file_path: Path to the file to create a hash for.
:param int blocksize: Read blocks of this size from the file when
creating the hash.
:returns: str or None
"""
hasher = hashlib.sha256()
with open(file_path, 'rb') as f:
buf = f.read(blocksize)
while len(buf) > 0:
hasher.update(buf)
buf = f.read(blocksize)
return hasher.hexdigest()
return None
def distance_between_two_points(lat1, lon1, lat2, lon2):
# As threshold is quite small use simple math
# From http://stackoverflow.com/questions/15736995/how-can-i-quickly-estimate-the-distance-between-two-latitude-longitude-points # noqa
@ -37,6 +61,7 @@ def get_date_regex(string, user_regex=None):
for i, rx in regex.items():
yield i, rx
def get_date_from_string(string, user_regex=None):
# If missing datetime from EXIF data check if filename is in datetime format.
# For this use a user provided regex if possible.
@ -75,17 +100,14 @@ def get_date_from_string(string, user_regex=None):
return date
# Conversion functions
# source:https://rodic.fr/blog/camelcase-and-snake_case-strings-conversion-with-python/
def snake2camel(name):
return re.sub(r'(?:^|_)([a-z])', lambda x: x.group(1).upper(), name)
def snake2camelback(name):
return re.sub(r'_([a-z])', lambda x: x.group(1).upper(), name)
def camel2snake(name):
return name[0].lower() + re.sub(r'(?!^)[A-Z]', lambda x: '_' + x.group(0).lower(), name[1:])
def camelback2snake(name):
return re.sub(r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)

View File

@ -16,7 +16,7 @@ from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exif
from ordigi.collection import Collection
from ordigi.geolocation import GeoLocation
from ordigi.media import Media
from ordigi.utils import get_date_regex
from ordigi import utils
class TestCollection:
@ -75,8 +75,7 @@ class TestCollection:
for mask in masks:
matched = re.search(regex, mask)
if matched:
part = collection.get_part(item, mask[1:-1],
metadata, subdirs)
part = collection.get_part(item, mask[1:-1], metadata)
# check if part is correct
assert isinstance(part, str), file_path
if item == 'basename':
@ -93,7 +92,7 @@ class TestCollection:
assert part == file_path.suffix[1:], file_path
elif item == 'name':
expected_part = file_path.stem
for i, rx in get_date_regex(expected_part):
for i, rx in utils.get_date_regex(expected_part):
part = re.sub(rx, '', expected_part)
assert part == expected_part, file_path
elif item == 'custom':
@ -151,11 +150,11 @@ class TestCollection:
src_path = Path(self.src_path, 'test_exif', 'photo.png')
name = 'photo_' + mode + '.png'
dest_path = Path(tmp_path, name)
src_checksum = collection.checksum(src_path)
src_checksum = utils.checksum(src_path)
result_copy = collection.sort_file(src_path, dest_path)
assert result_copy
# Ensure files remain the same
assert collection.checkcomp(dest_path, src_checksum)
assert collection._checkcomp(dest_path, src_checksum)
if mode == 'copy':
assert src_path.exists()

View File

@ -18,13 +18,15 @@ class TestSqlite:
'FilePath': 'file_path',
'Checksum': 'checksum',
'Album': 'album',
'Title': 'title',
'LocationId': 2,
'DateTaken': datetime(2012, 3, 27),
'DateMedia': datetime(2012, 3, 27),
'DateOriginal': datetime(2013, 3, 27),
'DateCreated': 'date_created',
'DateModified': 'date_modified',
'CameraMake': 'camera_make',
'CameraModel': 'camera_model',
'OriginalName':'original_name',
'SrcPath': 'src_path',
'Subdirs': 'subdirs',
'Filename': 'filename'
@ -62,7 +64,7 @@ class TestSqlite:
def test_add_metadata_data(self):
result = tuple(self.sqlite.cur.execute("""select * from metadata where
rowid=1""").fetchone())
assert result == ('file_path', 'checksum', 'album', 2, '2012-03-27 00:00:00', '2013-03-27 00:00:00', 'date_created', 'date_modified', 'camera_make', 'camera_model', 'src_path', 'subdirs', 'filename')
assert result == ('file_path', 'checksum', 'album', 'title', 2, '2012-03-27 00:00:00', '2013-03-27 00:00:00', 'date_created', 'date_modified', 'camera_make', 'camera_model', 'original_name', 'src_path', 'subdirs', 'filename')
def test_get_checksum(self):
assert not self.sqlite.get_checksum('invalid')

View File

@ -28,48 +28,50 @@ class TestMetadata:
self.exif_data = ExifTool(file_path).asdict()
yield file_path, Media(file_path, self.src_path, album_from_folder=True, ignore_tags=self.ignore_tags)
def test_get_metadata(self):
def test_get_metadata(self, tmp_path):
for file_path, media in self.get_media():
result = media.get_metadata()
assert result
assert isinstance(media.metadata, dict), media.metadata
#check if all tags key are present
for tags_key, tags in media.tags_keys.items():
assert tags_key in media.metadata
for tag in tags:
for tag_regex in self.ignore_tags:
assert not re.match(tag_regex, tag)
# Check for valid type
for key, value in media.metadata.items():
if value or value == '':
if 'date' in key:
assert isinstance(value, datetime)
elif key in ('latitude', 'longitude'):
assert isinstance(value, float)
# test get metadata from cache or exif
for root in self.src_path, tmp_path:
result = media.get_metadata(root)
assert result
assert isinstance(media.metadata, dict), media.metadata
#check if all tags key are present
for tags_key, tags in media.tags_keys.items():
assert tags_key in media.metadata
for tag in tags:
for tag_regex in self.ignore_tags:
assert not re.match(tag_regex, tag)
# Check for valid type
for key, value in media.metadata.items():
if value or value == '':
if 'date' in key:
assert isinstance(value, datetime)
elif key in ('latitude', 'longitude'):
assert isinstance(value, float)
else:
assert isinstance(value, str)
else:
assert isinstance(value, str)
else:
assert value is None
assert value is None
if key == 'album':
for album in media._get_key_values('album'):
if album is not None and album != '':
assert value == album
if key == 'album':
for album in media._get_key_values('album'):
if album is not None and album != '':
assert value == album
break
else:
assert value == file_path.parent.name
# Check if has_exif_data() is True if 'date_original' key is
# present, else check if it's false
has_exif_data = False
for tag in media.tags_keys['date_original']:
if tag in media.exif_metadata:
if media.get_date_format(media.exif_metadata[tag]):
has_exif_data = True
assert media.has_exif_data()
break
else:
assert value == file_path.parent.name
# Check if has_exif_data() is True if 'date_original' key is
# present, else check if it's false
has_exif_data = False
for tag in media.tags_keys['date_original']:
if tag in media.exif_metadata:
if media.get_date_format(media.exif_metadata[tag]):
has_exif_data = True
assert media.has_exif_data()
break
if has_exif_data == False:
assert not media.has_exif_data()
if has_exif_data == False:
assert not media.has_exif_data()
def test_get_date_media(self):
# collection = Collection(tmp_path, self.path_format,
@ -78,7 +80,7 @@ class TestMetadata:
exif_data = ExifToolCaching(str(file_path)).asdict()
media = Media(file_path, self.src_path, use_date_filename=True,
use_file_dates=True)
metadata = media.get_metadata()
metadata = media.get_metadata(self.src_path)
date_media = media.get_date_media()
date_filename = None