Compare commits

...

17 Commits

Author SHA1 Message Date
Cédric Leporcq d54ccd64f6 Update LICENCE file and .gitignore 2022-08-28 17:11:03 +02:00
Cédric Leporcq 836792429f Change command line options for log levels 2022-08-28 14:22:30 +02:00
Cédric Leporcq b7435c4eac Modify input text fonction and add Input Class 2022-08-28 14:19:58 +02:00
Cédric Leporcq 723f549f73 Modify db checking for best performances 2022-08-28 14:19:58 +02:00
Cédric Leporcq 47b9aa57ae Add checksum dict 2022-08-28 14:19:58 +02:00
Cédric Leporcq 9e32052ce3 Verify checksum in check_db 2022-08-28 14:19:58 +02:00
Cédric Leporcq ed58383ea0 Add --ckecksum option to update command 2022-08-28 14:19:58 +02:00
Cédric Leporcq f6816c6c01 Allow import or sort single files 2022-08-28 07:50:43 +02:00
Cédric Leporcq b7f0cafe98 Update .gitignore 2022-08-28 07:50:43 +02:00
Cédric Leporcq 573a63998e Revamp and fix options 2022-08-28 07:50:43 +02:00
Cédric Leporcq 01b47c8c40 Fix date detection in filenames 2022-08-28 07:50:43 +02:00
Cédric Leporcq 52768f64db Add 60s tolerance when compare date_original, date_filename and
date_created
2022-08-28 07:50:43 +02:00
Cédric Leporcq cdfa408206 Fix get_date_from_string function 2022-08-28 07:50:43 +02:00
Cédric Leporcq eee3c71f6a Remove unused import 2022-08-28 07:50:43 +02:00
Cédric Leporcq 1eb2a2c6e0 Fix set original name to EXIF metadata 2022-08-28 07:50:43 +02:00
Cédric Leporcq 58e282fd87 Fix database 2022-08-28 07:50:43 +02:00
Cédric Leporcq a1ba0663b6 Fix edit metadata 2022-08-28 07:50:43 +02:00
15 changed files with 889 additions and 651 deletions

144
.gitignore vendored
View File

@ -1,19 +1,139 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# Compiled python modules.
*.pyc
/build/
/.coverage
docs/_build
# Other
/diagnostics.lua
docs/Ordigi_data_scheme.odg
# Setuptools distribution folder.
/dist/
# Python egg metadata, regenerated from source files by setuptools.
/*.egg-info
/env/
/htmlcov
/ressources
/Session.vim
/tags

713
LICENSE

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,17 @@
[Exif]
#album_from_folder=False
fill_date_original=True
#cache=True
#ignore_tags=None
use_date_filename=True
#use_file_dates=False
[Filters]
exclude= ["**/.directory", "**/.DS_Store"]
exclude=["**/.directory", "**/.DS_Store"]
#extensions=None
#glob=**/*
#max_deep=None
remove_duplicates=True
[Geolocation]
geocoder=Nominatim
@ -15,5 +27,9 @@ day_begins=4
# Path format
dirs_path=<%Y>/<%m-%b>_<location>_<folder>
name=<%Y%m%d-%H%M%S>_<<original_name>|<name>>.%l<ext>
name=<%Y%m%d-%H%M%S>_<<name>.%l<ext>|<original_name>>
# name=<%Y%m%d-%H%M%S>-%u<original_name>.%l<ext>
[Terminal]
dry_run=False
interactive=False

View File

@ -5,17 +5,33 @@ import sys
import click
from ordigi import constants, log, LOG
from ordigi import log, LOG
from ordigi.collection import Collection
from ordigi import constants
from ordigi.geolocation import GeoLocation
from ordigi import utils
_logger_options = [
click.option(
'--quiet',
'-q',
default=False,
is_flag=True,
help='Log level set to ERROR',
),
click.option(
'--verbose',
'-v',
default='WARNING',
help='Log level [WARNING,INFO,DEBUG,NOTSET]',
default=False,
is_flag=True,
help='Log level set to INFO',
),
click.option(
'--debug',
'-d',
default=False,
is_flag=True,
help='Log level set to DEBUG',
),
]
@ -85,7 +101,7 @@ _sort_options = [
click.option(
'--path-format',
'-p',
default=None,
default=constants.DEFAULT_PATH_FORMAT,
help='Custom featured path format',
),
click.option(
@ -147,15 +163,10 @@ def _cli_get_location(collection):
)
def _cli_sort(collection, src_paths, import_mode, remove_duplicates):
def _cli_sort(collection, src_paths, import_mode):
loc = _cli_get_location(collection)
path_format = collection.opt['Path']['path_format']
LOG.debug(f'path_format: {path_format}')
return collection.sort_files(
src_paths, path_format, loc, import_mode, remove_duplicates
)
return collection.sort_files(src_paths, loc, import_mode)
@click.group()
@ -172,7 +183,7 @@ def _check(**kwargs):
"""
root = Path(kwargs['path']).expanduser().absolute()
log_level = log.get_level(kwargs['verbose'])
log_level = log.get_level(kwargs['quiet'], kwargs['verbose'], kwargs['debug'])
log.console(LOG, level=log_level)
collection = Collection(root)
@ -182,6 +193,7 @@ def _check(**kwargs):
if log_level < 30:
summary.print()
if summary.errors:
LOG.error('Db data is not accurate run `ordigi update --checksum`')
sys.exit(1)
else:
LOG.error('Db data is not accurate run `ordigi update`')
@ -194,7 +206,7 @@ def _check(**kwargs):
@add_options(_filter_options)
@click.option(
'--dedup-regex',
'-d',
'-D',
default=None,
multiple=True,
help='Regex to match duplicate strings parts',
@ -218,10 +230,10 @@ def _check(**kwargs):
@click.argument('subdirs', required=False, nargs=-1, type=click.Path())
@click.argument('collection', required=True, nargs=1, type=click.Path())
def _clean(**kwargs):
"""Remove empty folders"""
"""Clean media collection"""
folders = kwargs['folders']
log_level = log.get_level(kwargs['verbose'])
log_level = log.get_level(kwargs['quiet'], kwargs['verbose'], kwargs['debug'])
log.console(LOG, level=log_level)
subdirs = kwargs['subdirs']
@ -231,24 +243,21 @@ def _clean(**kwargs):
collection = Collection(
root,
{
"dry_run": kwargs['dry_run'],
"extensions": kwargs['ext'],
"glob": kwargs['glob'],
'dry_run': kwargs['dry_run'],
'extensions': kwargs['ext'],
'glob': kwargs['glob'],
'remove_duplicates': kwargs['remove_duplicates'],
},
)
# os.path.join(
# TODO make function to remove duplicates
# path_format = collection.opt['Path']['path_format']
# summary = collection.sort_files(
# paths, path_format, None, remove_duplicates=kwargs['remove_duplicates']
# )
# summary = collection.sort_files(paths, None)
if kwargs['path_string']:
dedup_regex = set(kwargs['dedup_regex'])
collection.dedup_path(
paths, dedup_regex, kwargs['remove_duplicates']
)
collection.dedup_path(paths, dedup_regex)
for path in paths:
if folders:
@ -274,7 +283,7 @@ def _clean(**kwargs):
def _clone(**kwargs):
"""Clone media collection to another location"""
log_level = log.get_level(kwargs['verbose'])
log_level = log.get_level(kwargs['quiet'], kwargs['verbose'], kwargs['debug'])
log.console(LOG, level=log_level)
src_path = Path(kwargs['src']).expanduser().absolute()
@ -327,16 +336,17 @@ def _compare(**kwargs):
subdirs = kwargs['subdirs']
root = kwargs['collection']
log_level = log.get_level(kwargs['verbose'])
log_level = log.get_level(kwargs['quiet'], kwargs['verbose'], kwargs['debug'])
log.console(LOG, level=log_level)
paths, root = _get_paths(subdirs, root)
collection = Collection(
root,
{
"extensions": kwargs['ext'],
"glob": kwargs['glob'],
"dry_run": kwargs['dry_run'],
'extensions': kwargs['ext'],
'glob': kwargs['glob'],
'dry_run': kwargs['dry_run'],
'remove_duplicates': kwargs['remove_duplicates'],
},
)
@ -363,16 +373,25 @@ def _compare(**kwargs):
multiple=True,
help="Select exif tags groups to edit",
)
@click.option(
'--overwrite',
'-O',
default=False,
is_flag=True,
help="Overwrite db and exif value by key value",
)
@click.argument('subdirs', required=False, nargs=-1, type=click.Path())
@click.argument('path', required=True, nargs=1, type=click.Path())
def _edit(**kwargs):
"""Edit EXIF metadata in files or directories"""
log_level = log.get_level(kwargs['verbose'])
log_level = log.get_level(kwargs['quiet'], kwargs['verbose'], kwargs['debug'])
log.console(LOG, level=log_level)
paths, root = _get_paths(kwargs['subdirs'], kwargs['path'])
overwrite = kwargs['overwrite']
collection = Collection(
root,
{
@ -389,13 +408,11 @@ def _edit(**kwargs):
'camera_make',
'camera_model',
'city',
'coordinates',
'country',
# 'date_created',
'date_media',
# 'date_modified',
'date_original',
'default',
'latitude',
'location',
'longitude',
@ -410,6 +427,9 @@ def _edit(**kwargs):
keys = set(editable_keys)
else:
keys = set(kwargs['key'])
if 'coordinates' in keys:
keys.remove('coordinates')
keys.update(['latitude', 'longitude'])
location = False
for key in keys:
@ -417,10 +437,6 @@ def _edit(**kwargs):
LOG.error(f"key '{key}' is not valid")
sys.exit(1)
if key == 'coordinates':
keys.remove('coordinates')
keys.update(['latitude', 'longitude'])
if key in (
'city',
'latitude',
@ -436,7 +452,7 @@ def _edit(**kwargs):
else:
loc = None
summary = collection.edit_metadata(paths, keys, loc, overwrite=True)
summary = collection.edit_metadata(paths, keys, loc, overwrite)
if log_level < 30:
summary.print()
@ -453,7 +469,7 @@ def _init(**kwargs):
Init media collection database.
"""
root = Path(kwargs['path']).expanduser().absolute()
log_level = log.get_level(kwargs['verbose'])
log_level = log.get_level(kwargs['quiet'], kwargs['verbose'], kwargs['debug'])
log.console(LOG, level=log_level)
collection = Collection(root)
@ -490,7 +506,7 @@ def _import(**kwargs):
"""Sort files or directories by reading their EXIF and organizing them
according to ordigi.conf preferences.
"""
log_level = log.get_level(kwargs['verbose'])
log_level = log.get_level(kwargs['quiet'], kwargs['verbose'], kwargs['debug'])
log.console(LOG, level=log_level)
src_paths, root = _get_paths(kwargs['src'], kwargs['dest'])
@ -509,7 +525,7 @@ def _import(**kwargs):
'dry_run': kwargs['dry_run'],
'interactive': kwargs['interactive'],
'path_format': kwargs['path_format'],
'remove_duplicates': kwargs['remove_duplicates'],
}
)
@ -517,7 +533,7 @@ def _import(**kwargs):
import_mode = 'copy'
else:
import_mode = 'move'
summary = _cli_sort(collection, src_paths, import_mode, kwargs['remove_duplicates'])
summary = _cli_sort(collection, src_paths, import_mode)
if log_level < 30:
summary.print()
@ -546,7 +562,7 @@ def _sort(**kwargs):
"""Sort files or directories by reading their EXIF and organizing them
according to ordigi.conf preferences.
"""
log_level = log.get_level(kwargs['verbose'])
log_level = log.get_level(kwargs['quiet'], kwargs['verbose'], kwargs['debug'])
log.console(LOG, level=log_level)
paths, root = _get_paths(kwargs['subdirs'], kwargs['dest'])
@ -566,10 +582,11 @@ def _sort(**kwargs):
'glob': kwargs['glob'],
'dry_run': kwargs['dry_run'],
'interactive': kwargs['interactive'],
'remove_duplicates': kwargs['remove_duplicates'],
}
)
summary = _cli_sort(collection, paths, False, kwargs['remove_duplicates'])
summary = _cli_sort(collection, paths, False)
if kwargs['clean']:
collection.remove_empty_folders(root)
@ -583,18 +600,25 @@ def _sort(**kwargs):
@cli.command('update')
@add_options(_logger_options)
@click.option(
'--checksum',
'-c',
default=False,
is_flag=True,
help='Update checksum, assuming file are changed by the user',
)
@click.argument('path', required=True, nargs=1, type=click.Path())
def _update(**kwargs):
"""
Update media collection database.
"""
root = Path(kwargs['path']).expanduser().absolute()
log_level = log.get_level(kwargs['verbose'])
log_level = log.get_level(kwargs['quiet'], kwargs['verbose'], kwargs['debug'])
log.console(LOG, level=log_level)
collection = Collection(root)
loc = _cli_get_location(collection)
summary = collection.update(loc)
summary = collection.update(loc, kwargs['checksum'])
if log_level < 30:
summary.print()

View File

@ -138,8 +138,9 @@ class FPath:
elif item == 'name':
# Remove date prefix added to the name.
part = stem
for regex in utils.get_date_regex().values():
part = re.sub(regex, '', part)
date_filename, regex, sep = utils.get_date_from_string(stem)
if date_filename:
part = re.sub(regex, sep, part)
# Delete separator
if re.search('^[-_ .]', part):
part = part[1:]
@ -272,7 +273,7 @@ class CollectionDb:
def __init__(self, root):
self.sqlite = Sqlite(root)
def _set_row_data(self, table, metadata):
def _get_row_data(self, table, metadata):
row_data = {}
for title in self.sqlite.tables[table]['header']:
key = utils.camel2snake(title)
@ -283,11 +284,11 @@ class CollectionDb:
def add_file_data(self, metadata):
"""Save metadata informations to db"""
if metadata['latitude'] and metadata['longitude']:
loc_values = self._set_row_data('location', metadata)
loc_values = self._get_row_data('location', metadata)
metadata['location_id'] = self.sqlite.upsert_location(loc_values)
if metadata['file_path']:
row_data = self._set_row_data('metadata', metadata)
row_data = self._get_row_data('metadata', metadata)
self.sqlite.upsert_metadata(row_data)
@ -359,7 +360,7 @@ class Paths:
:return: Path path
"""
if not path.exists():
self.log.error(f'Directory {path} does not exist')
self.log.error(f'Path {path} does not exist')
sys.exit(1)
return path
@ -476,6 +477,7 @@ class SortMedias:
db=None,
dry_run=False,
interactive=False,
remove_duplicates=False,
):
# Arguments
@ -488,9 +490,11 @@ class SortMedias:
self.dry_run = dry_run
self.interactive = interactive
self.log = LOG.getChild(self.__class__.__name__)
self.remove_duplicates = remove_duplicates
self.summary = Summary(self.root)
# Attributes
self.input = request.Input()
self.theme = request.load_theme()
def _checkcomp(self, dest_path, src_checksum):
@ -519,7 +523,7 @@ class SortMedias:
# change media file_path to dest_path
if not self.dry_run:
updated = self.medias.update_exif_data(metadata)
updated = self.medias.update_exif_data(metadata, imp)
if updated:
checksum = utils.checksum(dest_path)
metadata['checksum'] = checksum
@ -577,14 +581,10 @@ class SortMedias:
self.log.warning(f'Target directory {dir_path} is a file')
# Rename the src_file
if self.interactive:
prompt = [
inquirer.Text(
'file_path',
message="New name for" f"'{dir_path.name}' file",
),
]
answers = inquirer.prompt(prompt, theme=self.theme)
file_path = dir_path.parent / answers['file_path']
answer = self.input.text(
"New name for" f"'{dir_path.name}' file"
)
file_path = dir_path.parent / answer
else:
file_path = dir_path.parent / (dir_path.name + '_file')
@ -599,7 +599,7 @@ class SortMedias:
directory_path.mkdir(parents=True, exist_ok=True)
self.log.info(f'Create {directory_path}')
def check_conflicts(self, src_path, dest_path, remove_duplicates=False):
def check_conflicts(self, src_path, dest_path):
"""
Check if file can be copied or moved file to dest_path.
"""
@ -615,7 +615,7 @@ class SortMedias:
if dest_path.is_file():
self.log.info(f"File {dest_path} already exist")
if remove_duplicates:
if self.remove_duplicates:
if filecmp.cmp(src_path, dest_path):
self.log.info(
"File in source and destination are identical. Duplicate will be ignored."
@ -632,15 +632,15 @@ class SortMedias:
return 0
def _solve_conflicts(self, conflicts, remove_duplicates):
def _solve_conflicts(self, conflicts):
unresolved_conflicts = []
while conflicts != []:
src_path, dest_path, metadata = conflicts.pop()
# Check for conflict status again in case is has changed
conflict = self.check_conflicts(src_path, dest_path, remove_duplicates)
conflict = self.check_conflicts(src_path, dest_path)
for i in range(1, 100):
for i in range(1, 1000):
if conflict != 1:
break
@ -651,7 +651,7 @@ class SortMedias:
else:
stem = dest_path.stem
dest_path = dest_path.parent / (stem + '_' + str(i) + suffix)
conflict = self.check_conflicts(src_path, dest_path, remove_duplicates)
conflict = self.check_conflicts(src_path, dest_path)
if conflict == 1:
# i = 100:
@ -662,7 +662,7 @@ class SortMedias:
yield (src_path, dest_path, metadata), conflict
def sort_medias(self, imp=False, remove_duplicates=False):
def sort_medias(self, imp=False):
"""
sort files and solve conflicts
"""
@ -673,7 +673,7 @@ class SortMedias:
for src_path, metadata in self.medias.datas.items():
dest_path = self.root / metadata['file_path']
conflict = self.check_conflicts(src_path, dest_path, remove_duplicates)
conflict = self.check_conflicts(src_path, dest_path)
if not conflict:
self.sort_file(
@ -691,9 +691,7 @@ class SortMedias:
pass
if conflicts != []:
for files_data, conflict in self._solve_conflicts(
conflicts, remove_duplicates
):
for files_data, conflict in self._solve_conflicts(conflicts):
src_path, dest_path, metadata = files_data
if not conflict:
@ -726,13 +724,13 @@ class Collection(SortMedias):
self.log = LOG.getChild(self.__class__.__name__)
# Get config options
self.opt = self.get_config_options()
self.opt, default_options = self.get_config_options()
# Set client options
for option, value in cli_options.items():
if value not in (None, set()):
for section in self.opt:
if option in self.opt[section]:
for section in self.opt:
if option in self.opt[section]:
if value != default_options[section][option]:
if option == 'exclude':
self.opt[section][option].union(set(value))
elif option in ('ignore_tags', 'extensions'):
@ -759,6 +757,7 @@ class Collection(SortMedias):
self.paths,
root,
self.opt['Exif'],
{},
self.db,
self.opt['Terminal']['interactive'],
)
@ -771,6 +770,7 @@ class Collection(SortMedias):
self.db,
self.opt['Terminal']['dry_run'],
self.opt['Terminal']['interactive'],
self.opt['Filters']['remove_duplicates'],
)
# Attributes
@ -791,7 +791,7 @@ class Collection(SortMedias):
"""Get collection config"""
config = Config(self.root.joinpath('.ordigi', 'ordigi.conf'))
return config.get_config_options()
return config.get_config_options(), config.get_default_options()
def _set_option(self, section, option, cli_option):
"""if client option is set overwrite collection option value"""
@ -817,7 +817,7 @@ class Collection(SortMedias):
def init(self, loc):
"""Init collection db"""
for file_path in self.get_collection_files():
metadata = self.medias.get_metadata(file_path, self.root, loc)
metadata = self.medias.get_metadata(file_path, self.root, loc=loc)
metadata['file_path'] = os.path.relpath(file_path, self.root)
self.db.add_file_data(metadata)
@ -825,7 +825,60 @@ class Collection(SortMedias):
return self.summary
def check_db(self):
def check_files(self):
"""Check file integrity."""
for file_path in self.paths.get_files(self.root):
checksum = utils.checksum(file_path)
relpath = file_path.relative_to(self.root)
if checksum == self.db.sqlite.get_checksum(relpath):
self.summary.append('check', True, file_path)
else:
self.log.error(f'{file_path} is corrupted')
self.summary.append('check', False, file_path)
return self.summary
def file_in_db(self, file_path, db_rows):
# Assuming file_path are inside collection root dir
relpath = os.path.relpath(file_path, self.root)
# If file not in database
if relpath not in db_rows:
return False
return True
def _check_file(self, file_path, file_checksum):
"""Check if file checksum as changed"""
relpath = os.path.relpath(file_path, self.root)
db_checksum = self.db.sqlite.get_checksum(relpath)
# Check if checksum match
if not db_checksum:
return None
if db_checksum != file_checksum:
self.log.warning(f'{file_path} checksum as changed')
self.log.info(
f'file_checksum={file_checksum},\ndb_checksum={db_checksum}'
)
return False
return True
def check_file(self, file_path):
self.medias.checksums[file_path] = utils.checksum(file_path)
if self._check_file(file_path, self.medias.checksums[file_path]):
return True
# We d'ont want to silently ignore or correct this without
# resetting the cache as is could be due to file corruption
self.log.error(f'modified or corrupted file.')
self.log.info(
'Use ordigi update --checksum or --reset-cache, check database integrity or try to restore the file'
)
return False
def check_db(self, checksums=True):
"""
Check if db FilePath match to collection filesystem
:returns: bool
@ -833,12 +886,13 @@ class Collection(SortMedias):
file_paths = list(self.get_collection_files())
db_rows = [row['FilePath'] for row in self.db.sqlite.get_rows('metadata')]
for file_path in file_paths:
relpath = os.path.relpath(file_path, self.root)
# If file not in database
if relpath not in db_rows:
result = self.file_in_db(file_path, db_rows)
if not result:
self.log.error('Db data is not accurate')
self.log.info(f'{file_path} not in db')
return False
elif checksums and not self.check_file(file_path):
return False
nb_files = len(file_paths)
nb_row = len(db_rows)
@ -856,10 +910,10 @@ class Collection(SortMedias):
self.log.error('Db data is not accurate run `ordigi update`')
sys.exit(1)
def _init_check_db(self, loc=None):
def _init_check_db(self, checksums=True, loc=None):
if self.db.sqlite.is_empty('metadata'):
self.init(loc)
elif not self.check_db():
elif not self.check_db(checksums):
self.log.error('Db data is not accurate run `ordigi update`')
sys.exit(1)
@ -882,12 +936,13 @@ class Collection(SortMedias):
return self.summary
def update(self, loc):
def update(self, loc, update_checksum=False):
"""Update collection db"""
file_paths = list(self.get_collection_files())
db_rows = list(self.db.sqlite.get_rows('metadata'))
invalid_db_rows = set()
db_paths = set()
self.log.info(f"Update database:")
for db_row in db_rows:
abspath = self.root / db_row['FilePath']
if abspath not in file_paths:
@ -897,9 +952,24 @@ class Collection(SortMedias):
for file_path in file_paths:
relpath = os.path.relpath(file_path, self.root)
metadata = {}
self.medias.checksums[file_path] = utils.checksum(file_path)
if (
not self._check_file(file_path, self.medias.checksums[file_path])
and update_checksum
):
# metatata will fill checksum from file
metadata = self.medias.get_metadata(file_path, self.root, loc=loc)
metadata['file_path'] = relpath
# set row attribute to the file
self.db.add_file_data(metadata)
self.log.info(f"Update '{file_path}' checksum to db")
self.summary.append('update', file_path)
# If file not in database
if relpath not in db_paths:
metadata = self.medias.get_metadata(file_path, self.root, loc)
metadata = self.medias.get_metadata(file_path, self.root, loc=loc)
metadata['file_path'] = relpath
# Check if file checksum is in invalid rows
row = []
@ -915,24 +985,13 @@ class Collection(SortMedias):
break
# set row attribute to the file
self.db.add_file_data(metadata)
self.log.info(f"Add '{file_path}' to db")
self.summary.append('update', file_path)
# Finally delete invalid rows
for row in invalid_db_rows:
self.db.sqlite.delete_filepath(row['FilePath'])
return self.summary
def check_files(self):
"""Check file integrity."""
for file_path in self.paths.get_files(self.root):
checksum = utils.checksum(file_path)
relpath = file_path.relative_to(self.root)
if checksum == self.db.sqlite.get_checksum(relpath):
self.summary.append('check', True, file_path)
else:
self.log.error(f'{file_path} is corrupted')
self.summary.append('check', False, file_path)
self.log.info(f"Delete invalid row : '{row['FilePath']}' from db")
return self.summary
@ -1002,17 +1061,15 @@ class Collection(SortMedias):
return self.summary
def sort_files(
self, src_dirs, path_format, loc, imp=False, remove_duplicates=False
):
def sort_files(self, src_dirs, loc, imp=False):
"""
Sort files into appropriate folder
"""
# Check db
self._init_check_db(loc)
self._init_check_db(loc=loc)
# if path format client option is set overwrite it
self._set_option('Path', 'path_format', path_format)
path_format = self.opt['Path']['path_format']
self.log.debug(f'path_format: {path_format}')
# Get medias data
subdirs = set()
@ -1026,7 +1083,7 @@ class Collection(SortMedias):
self.medias.datas[src_path] = copy(metadata)
# Sort files and solve conflicts
self.summary = self.sort_medias(imp, remove_duplicates)
self.summary = self.sort_medias(imp)
if imp != 'copy':
self.remove_empty_subdirs(subdirs, src_dirs)
@ -1036,7 +1093,7 @@ class Collection(SortMedias):
return self.summary
def dedup_path(self, paths, dedup_regex=None, remove_duplicates=False):
def dedup_path(self, paths, dedup_regex=None):
"""Deduplicate file path parts"""
# Check db
@ -1077,7 +1134,7 @@ class Collection(SortMedias):
self.medias.datas[src_path] = copy(metadata)
# Sort files and solve conflicts
self.sort_medias(remove_duplicates=remove_duplicates)
self.sort_medias()
if not self.check_db():
self.summary.append('check', False)
@ -1109,7 +1166,7 @@ class Collection(SortMedias):
return True
def sort_similar_images(self, path, similarity=80, remove_duplicates=False):
def sort_similar_images(self, path, similarity=80):
"""Sort similar images using imagehash library"""
# Check db
self._init_check_db()
@ -1128,7 +1185,7 @@ class Collection(SortMedias):
)
if similar_images:
# Move the simlars file into the destination directory
self.sort_medias(remove_duplicates=remove_duplicates)
self.sort_medias()
nb_row_end = self.db.sqlite.len('metadata')
if nb_row_ini and nb_row_ini != nb_row_end:
@ -1141,35 +1198,43 @@ class Collection(SortMedias):
def edit_metadata(self, paths, keys, loc=None, overwrite=False):
"""Edit metadata and exif data for given key"""
self._init_check_db()
if self.db.sqlite.is_empty('metadata'):
self.init(loc)
for file_path, media in self.medias.get_medias_datas(paths, loc=loc):
result = False
media.metadata['file_path'] = os.path.relpath(file_path, self.root)
if not self.check_file(file_path):
self.log.error('Db data is not accurate run `ordigi update`')
sys.exit(1)
exif = WriteExif(
file_path,
media.metadata,
ignore_tags=self.opt['Exif']['ignore_tags'],
)
for key in keys:
print()
value = media.metadata[key]
if overwrite or not value:
print(f"FILE: '{file_path}'")
if overwrite:
if overwrite and value:
print(f"{key}: '{value}'")
if overwrite or not value:
# Prompt value for given key for file_path
prompt = [
inquirer.Text('value', message=key),
]
answer = inquirer.prompt(prompt, theme=self.theme)
# answer = {'value': '03-12-2021 08:12:35'}
# Validate value
answer = self.input.text(key)
# Check value
if key in ('date_original', 'date_created', 'date_modified'):
# Check date format
value = media.get_date_format(answer['value'])
value = media.get_date_format(answer)
else:
value = answer['value']
if not value.isalnum():
value = answer
while not value.isalnum():
if not value: break
print("Invalid entry, use alphanumeric chars")
value = inquirer.prompt(prompt, theme=self.theme)
result = False
if value:
media.metadata[key] = value
if key == 'location':
@ -1179,36 +1244,26 @@ class Collection(SortMedias):
media.metadata['longitude'] = coordinates['longitude']
media.set_location_from_coordinates(loc)
# Update database
self.db.add_file_data(media.metadata)
# Update exif data
if key in (
'date_original',
'album',
'title',
'latitude',
'location',
'longitude',
'latitude_ref',
'longitude_ref',
):
exif = WriteExif(
file_path,
media.metadata,
ignore_tags=self.opt['Exif']['ignore_tags'],
if key == 'location':
result = exif.set_key_values(
'latitude', media.metadata['latitude']
)
if key == 'location':
result = exif.set_key_values(
'latitude', media.metadata['latitude']
)
result = exif.set_key_values(
'longitude', media.metadata['longitude']
)
else:
result = exif.set_key_values(key, value)
if result:
self.summary.append('update', True, file_path)
else:
self.summary.append('update', False, file_path)
result = exif.set_key_values(
'longitude', media.metadata['longitude']
)
elif key in exif.get_tags().keys():
result = exif.set_key_values(key, value)
# Update checksum
media.metadata['checksum'] = utils.checksum(file_path)
# Update database
self.db.add_file_data(media.metadata)
if result:
self.summary.append('update', True, file_path)
else:
self.summary.append('update', False, file_path)
return self.summary

View File

@ -53,9 +53,9 @@ class Config:
else:
self.conf = conf
self.options = self.set_default_options()
self.options = self.get_default_options()
def set_default_options(self) -> dict:
def get_default_options(self) -> dict:
# Initialize with default options
return {
'Exif': {
@ -71,6 +71,7 @@ class Config:
'extensions': None,
'glob': '**/*',
'max_deep': None,
'remove_duplicates': False,
},
'Geolocation': {
'geocoder': constants.DEFAULT_GEOCODER,
@ -137,6 +138,7 @@ class Config:
'dry_run',
'interactive',
'prefer_english_names',
'remove_duplicates',
'use_date_filename',
'use_file_dates',
}

View File

@ -310,7 +310,9 @@ class Sqlite:
value = None
self.cur.execute('SELECT * FROM location')
for row in self.cur:
distance = distance_between_two_points(latitude, longitude, row[0], row[1])
distance = distance_between_two_points(
latitude, longitude, row['Latitude'], row['Longitude']
)
# Use if closer then threshold_km reuse lookup
if distance < shorter_distance and distance <= threshold_m:
shorter_distance = distance

View File

@ -46,9 +46,16 @@ def file_logger(logger, file, level=30):
logger.addHandler(handler)
def get_level(verbose):
"""Return int logging level from string"""
if verbose.isnumeric():
def get_level(quiet=False, verbose=False, debug=False, num=None):
"""Return int logging level from command line args"""
if num and num.isnumeric():
return int(verbose)
return int(logging.getLevelName(verbose))
if debug:
return int(logging.getLevelName('DEBUG'))
if verbose:
return int(logging.getLevelName('INFO'))
if quiet:
return int(logging.getLevelName('ERROR'))
return int(logging.getLevelName('WARNING'))

View File

@ -62,7 +62,7 @@ class ExifMetadata:
]
tags_keys['latitude_ref'] = ['EXIF:GPSLatitudeRef']
tags_keys['longitude_ref'] = ['EXIF:GPSLongitudeRef']
tags_keys['original_name'] = ['XMP:OriginalFileName']
tags_keys['original_name'] = ['EXIF:OriginalFileName', 'XMP:OriginalFileName']
# Remove ignored tag from list
for tag_regex in self.ignore_tags:
@ -279,6 +279,7 @@ class Media(ReadExif):
ignore_tags=None,
interactive=False,
cache=True,
checksum=None,
use_date_filename=False,
use_file_dates=False,
):
@ -292,6 +293,11 @@ class Media(ReadExif):
self.album_from_folder = album_from_folder
self.cache = cache
if checksum:
self.checksum = checksum
else:
self.checksum = utils.checksum(file_path)
self.interactive = interactive
self.log = LOG.getChild(self.__class__.__name__)
self.metadata = None
@ -339,11 +345,8 @@ class Media(ReadExif):
sys.exit()
if not answers['date_list']:
prompt = [
inquirer.Text('date_custom', message="date"),
]
answers = inquirer.prompt(prompt, theme=self.theme)
return self.get_date_format(answers['date_custom'])
answer = self.prompt.text("date")
return self.get_date_format(answer)
return answers['date_list']
@ -359,9 +362,9 @@ class Media(ReadExif):
stem = os.path.splitext(filename)[0]
date_original = self.metadata['date_original']
if self.metadata['original_name']:
date_filename = utils.get_date_from_string(self.metadata['original_name'])
date_filename, _, _ = utils.get_date_from_string(self.metadata['original_name'])
else:
date_filename = utils.get_date_from_string(stem)
date_filename, _, _ = utils.get_date_from_string(stem)
self.log.debug(f'date_filename: {date_filename}')
date_original = self.metadata['date_original']
@ -370,31 +373,35 @@ class Media(ReadExif):
file_modify_date = self.metadata['file_modify_date']
if self.metadata['date_original']:
if date_filename and date_filename != date_original:
self.log.warning(
f"{filename} time mark is different from {date_original}"
)
if self.interactive:
# Ask for keep date taken, filename time, or neither
choices = [
(f"date original:'{date_original}'", date_original),
(f"date filename:'{date_filename}'", date_filename),
("custom", None),
]
default = f'{date_original}'
return self._get_date_media_interactive(choices, default)
timedelta = abs(date_original - date_filename)
if timedelta.total_seconds() > 60:
self.log.warning(
f"{filename} time mark is different from {date_original}"
)
if self.interactive:
# Ask for keep date taken, filename time, or neither
choices = [
(f"date original:'{date_original}'", date_original),
(f"date filename:'{date_filename}'", date_filename),
("custom", None),
]
default = f'{date_original}'
return self._get_date_media_interactive(choices, default)
return self.metadata['date_original']
self.log.warning(f"could not find original date for {self.file_path}")
self.log.warning(f"could not find date original for {self.file_path}")
if self.use_date_filename and date_filename:
self.log.info(
f"use date from filename:{date_filename} for {self.file_path}"
)
if date_created and date_filename > date_created:
self.log.warning(
f"{filename} time mark is more recent than {date_created}"
)
timedelta = abs(date_created - date_filename)
if timedelta.total_seconds() > 60:
self.log.warning(
f"{filename} time mark is more recent than {date_created}"
)
return date_created
if self.interactive:
@ -457,17 +464,12 @@ class Media(ReadExif):
default=f'{album}',
),
]
prompt = [
inquirer.Text('custom', message="album"),
]
answers = inquirer.prompt(choices_list, theme=self.theme)
if not answers:
sys.exit()
if not answers['album']:
answers = inquirer.prompt(prompt, theme=self.theme)
return answers['custom']
return self.input.text("album")
return answers['album']
@ -523,31 +525,6 @@ class Media(ReadExif):
return db.get_metadata(relpath, 'LocationId')
def _check_file(self, db, root):
"""Check if file_path is a subpath of root"""
if str(self.file_path).startswith(str(root)):
relpath = os.path.relpath(self.file_path, root)
db_checksum = db.get_checksum(relpath)
file_checksum = self.metadata['checksum']
# Check if checksum match
if db_checksum and db_checksum != file_checksum:
self.log.error(f'{self.file_path} checksum has changed')
self.log.error('(modified or corrupted file).')
self.log.error(
f'file_checksum={file_checksum},\ndb_checksum={db_checksum}'
)
self.log.info(
'Use --reset-cache, check database integrity or try to restore the file'
)
# We d'ont want to silently ignore or correct this without
# resetting the cache as is could be due to file corruption
sys.exit(1)
return relpath, db_checksum
return None, None
def set_location_from_db(self, location_id, db):
self.metadata['location_id'] = location_id
@ -607,17 +584,17 @@ class Media(ReadExif):
All keys will be present and have a value of None if not obtained.
"""
self.metadata = {}
self.metadata['checksum'] = utils.checksum(self.file_path)
self.metadata['checksum'] = self.checksum
db_checksum = False
location_id = None
if cache and db:
relpath, db_checksum = self._check_file(db, root)
if cache and db and str(self.file_path).startswith(str(root)):
relpath = os.path.relpath(self.file_path, root)
db_checksum = db.get_checksum(relpath)
if db_checksum:
location_id = self._set_metadata_from_db(db, relpath)
self.set_location_from_db(location_id, db)
else:
# file not in db
self.metadata['src_dir'] = str(self.src_dir)
self.metadata['subdirs'] = str(
self.file_path.relative_to(self.src_dir).parent
@ -661,6 +638,7 @@ class Medias:
paths,
root,
exif_options,
checksums=None,
db=None,
interactive=False,
):
@ -673,6 +651,11 @@ class Medias:
self.root = root
# Options
if checksums:
self.checksums = checksums
else:
self.checksums = {}
self.exif_opt = exif_options
self.ignore_tags = self.exif_opt['ignore_tags']
@ -684,7 +667,7 @@ class Medias:
self.datas = {}
self.theme = request.load_theme()
def get_media(self, file_path, src_dir):
def get_media(self, file_path, src_dir, checksum=None):
media = Media(
file_path,
src_dir,
@ -692,6 +675,7 @@ class Medias:
self.exif_opt['ignore_tags'],
self.interactive,
self.exif_opt['cache'],
checksum,
self.exif_opt['use_date_filename'],
self.exif_opt['use_file_dates'],
)
@ -699,7 +683,14 @@ class Medias:
return media
def get_media_data(self, file_path, src_dir, loc=None):
media = self.get_media(file_path, src_dir)
"""Get media class instance with metadata"""
if self.checksums and file_path in self.checksums.keys():
checksum = self.checksums[file_path]
else:
checksum = None
media = self.get_media(file_path, src_dir, checksum)
media.get_metadata(
self.root, loc, self.db.sqlite, self.exif_opt['cache']
)
@ -714,6 +705,11 @@ class Medias:
"""Get paths"""
for src_dir in src_dirs:
src_dir = self.paths.check(src_dir)
if src_dir.is_file():
yield src_dir.parent, src_dir
continue
paths = self.paths.get_paths_list(src_dir)
# Get medias and src_dirs
@ -730,7 +726,7 @@ class Medias:
"""Get medias datas"""
for src_dir, src_path in self.get_paths(src_dirs, imp=imp):
# Get file metadata
media = self.get_media_data(src_path, src_dir, loc)
media = self.get_media_data(src_path, src_dir, loc=loc)
yield src_path, media
@ -738,11 +734,11 @@ class Medias:
"""Get medias data"""
for src_dir, src_path in self.get_paths(src_dirs, imp=imp):
# Get file metadata
metadata = self.get_metadata(src_path, src_dir, loc)
metadata = self.get_metadata(src_path, src_dir, loc=loc)
yield src_path, metadata
def update_exif_data(self, metadata):
def update_exif_data(self, metadata, imp=False):
file_path = self.root / metadata['file_path']
exif = WriteExif(
@ -752,8 +748,8 @@ class Medias:
)
updated = False
if metadata['original_name'] in (None, ''):
exif.set_value('original_name', metadata['filename'])
if imp and metadata['original_name'] in (None, ''):
exif.set_key_values('original_name', metadata['filename'])
updated = True
if self.exif_opt['album_from_folder']:
exif.set_album_from_folder()

View File

@ -1,5 +1,6 @@
import inquirer
from blessed import Terminal
from colorama import init,Fore,Style,Back
term = Terminal()
@ -34,6 +35,15 @@ def load_theme():
return inquirer.themes.load_theme_from_dict(custom_theme)
class Input():
def __init__(self):
init()
def text(self, message):
return input(f'{Fore.BLUE}[{Fore.YELLOW}?{Fore.BLUE}]{Fore.WHITE} {message}: ')
# def edit_prompt(self, key: str, value: str) -> str:
# print(f"Date conflict for file: {self.file_path}")

View File

@ -69,17 +69,17 @@ def get_date_regex(user_regex=None):
# regex to match date format type %Y%m%d, %y%m%d, %d%m%Y,
# etc...
'a': re.compile(
r'.*[_-]?(?P<year>\d{4})[_-]?(?P<month>\d{2})[_-]?(?P<day>\d{2})[_-]?(?P<hour>\d{2})[_-]?(?P<minute>\d{2})[_-]?(?P<second>\d{2})'
r'[-_./ ](?P<year>\d{4})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})[-_.]?(?P<hour>\d{2})[-_.]?(?P<minute>\d{2})[-_.]?(?P<second>\d{2})([-_./ ])'
),
'b': re.compile(
r'[-_./](?P<year>\d{4})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})[-_./]'
r'[-_./ ](?P<year>\d{4})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})([-_./ ])'
),
# not very accurate
'c': re.compile(
r'[-_./](?P<year>\d{2})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})[-_./]'
r'[-_./ ](?P<year>\d{2})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})([-_./ ])'
),
'd': re.compile(
r'[-_./](?P<day>\d{2})[-_.](?P<month>\d{2})[-_.](?P<year>\d{4})[-_./]'
r'[-_./ ](?P<day>\d{2})[-_.](?P<month>\d{2})[-_.](?P<year>\d{4})([-_./ ])'
),
}
@ -96,15 +96,18 @@ def get_date_from_string(string):
# Otherwise assume a filename such as IMG_20160915_123456.jpg as default.
matches = []
sep = ''
for i, regex in DATE_REGEX.items():
match = re.findall(regex, string)
if match != []:
sep = match[0][3]
if i == 'c':
match = [('20' + match[0][0], match[0][1], match[0][2])]
elif i == 'd':
# reorder items
match = [(match[0][2], match[0][1], match[0][0])]
# matches = match + matches
else:
match = [(match[0][0], match[0][1], match[0][2])]
if len(match) != 1:
# The time string is not uniq
continue
@ -119,9 +122,11 @@ def get_date_from_string(string):
date_object = tuple(map(int, matches[0][0]))
date = datetime(*date_object)
except (KeyError, ValueError):
return None
return None, matches[0][1], sep
return date
return date, matches[0][1], sep
return None, None, sep
def match_date_regex(regex, value):

View File

@ -5,6 +5,7 @@ import pytest
import inquirer
from ordigi import cli
from ordigi.request import Input
CONTENT = "content"
@ -26,7 +27,7 @@ class TestOrdigi:
def setup_class(cls, sample_files_paths):
cls.runner = CliRunner()
cls.src_path, cls.file_paths = sample_files_paths
cls.logger_options = (('--verbose', 'DEBUG'),)
cls.logger_options = ('--debug',)
cls.filter_options = (
('--ignore-tags', 'CreateDate'),
('--ext', 'jpg'),
@ -81,21 +82,23 @@ class TestOrdigi:
def test_edit(self, monkeypatch):
bool_options = ()
bool_options = (
*self.logger_options,
)
arg_options = (
*self.logger_options,
*self.filter_options,
)
def mockreturn(prompt, theme):
return {'value': '03-12-2021 08:12:35'}
def mockreturn(self, message):
return '03-12-2021 08:12:35'
monkeypatch.setattr(inquirer, 'prompt', mockreturn)
monkeypatch.setattr(Input, 'text', mockreturn)
args = (
'--key',
'date_original',
'--overwrite',
str(self.src_path.joinpath('test_exif/photo.png')),
str(self.src_path),
)
@ -107,6 +110,7 @@ class TestOrdigi:
def test_sort(self):
bool_options = (
*self.logger_options,
# '--interactive',
'--dry-run',
'--album-from-folder',
@ -117,7 +121,6 @@ class TestOrdigi:
)
arg_options = (
*self.logger_options,
*self.filter_options,
('--path-format', '{%Y}/{folder}/{name}.{ext}'),
@ -132,35 +135,29 @@ class TestOrdigi:
def test_clone(self, tmp_path):
arg_options = (
*self.logger_options,
)
paths = (str(self.src_path), str(tmp_path))
self.assert_cli(cli._init, [str(self.src_path)])
self.assert_cli(cli._clone, ['--dry-run', '--verbose', 'DEBUG', *paths])
self.assert_cli(cli._clone, ['--dry-run', *self.logger_options, *paths])
self.assert_cli(cli._clone, paths)
def assert_init(self):
for opt, arg in self.logger_options:
self.assert_cli(cli._init, [opt, arg, str(self.src_path)])
self.assert_cli(cli._init, [*self.logger_options, str(self.src_path)])
def assert_update(self):
file_path = Path(ORDIGI_PATH, 'samples/test_exif/photo.cr2')
dest_path = self.src_path / 'photo_moved.cr2'
shutil.copyfile(file_path, dest_path)
for opt, arg in self.logger_options:
self.assert_cli(cli._update, [opt, arg, str(self.src_path)])
self.assert_cli(cli._update, [*self.logger_options, str(self.src_path)])
self.assert_cli(cli._update, ['--checksum', str(self.src_path)])
def assert_check(self):
for opt, arg in self.logger_options:
self.assert_cli(cli._check, [opt, arg, str(self.src_path)])
self.assert_cli(cli._check, [*self.logger_options, str(self.src_path)])
def assert_clean(self):
bool_options = (
*self.logger_options,
# '--interactive',
'--dry-run',
'--delete-excluded',
@ -170,7 +167,6 @@ class TestOrdigi:
)
arg_options = (
*self.logger_options,
*self.filter_options,
('--dedup-regex', r'\d{4}-\d{2}'),
)
@ -187,11 +183,11 @@ class TestOrdigi:
def test_init_update_check_clean(self):
self.assert_init()
self.assert_update()
self.assert_check()
self.assert_clean()
def test_import(self, tmp_path):
bool_options = (
*self.logger_options,
# '--interactive',
'--dry-run',
'--album-from-folder',
@ -202,11 +198,9 @@ class TestOrdigi:
)
arg_options = (
*self.logger_options,
('--exclude', '.DS_Store'),
*self.filter_options,
('--path-format', '{%Y}/{folder}/{stem}.{ext}'),
)
paths = (str(self.src_path), str(tmp_path))
@ -219,6 +213,7 @@ class TestOrdigi:
def test_compare(self):
bool_options = (
*self.logger_options,
# '--interactive',
'--dry-run',
'--find-duplicates',
@ -226,7 +221,6 @@ class TestOrdigi:
)
arg_options = (
*self.logger_options,
*self.filter_options,
# ('--similar-to', ''),
('--similarity', '65'),
@ -240,6 +234,9 @@ class TestOrdigi:
self.assert_cli(cli._compare, paths)
self.assert_options(cli._compare, bool_options, arg_options, paths)
def test_check(self):
self.assert_check()
def test_needsfiles(tmpdir):
assert tmpdir

View File

@ -8,13 +8,14 @@ import inquirer
from ordigi import LOG
from ordigi import constants
from ordigi import utils
from ordigi.summary import Summary
from ordigi.collection import Collection, FPath, Paths
from ordigi.exiftool import ExifTool, ExifToolCaching, exiftool_is_running, terminate_exiftool
from ordigi.geolocation import GeoLocation
from ordigi.media import Media, ReadExif
from ordigi import utils
from ordigi.request import Input
from .conftest import randomize_files, randomize_db
from ordigi.summary import Summary
LOG.setLevel(10)
@ -137,11 +138,12 @@ class TestCollection:
assert summary.success_table.sum('sort') == nb
def test_sort_files(self, tmp_path):
cli_options = {'album_from_folder': True, 'cache': False}
cli_options = {
'album_from_folder': True, 'cache': False, 'path_format': self.path_format
}
collection = Collection(tmp_path, cli_options=cli_options)
loc = GeoLocation()
summary = collection.sort_files([self.src_path],
self.path_format, loc, imp='copy')
summary = collection.sort_files([self.src_path], loc, imp='copy')
self.assert_import(summary, 29)
@ -166,16 +168,16 @@ class TestCollection:
collection = Collection(tmp_path, cli_options=cli_options)
# Try to change path format and sort files again
path_format = 'test_exif/<city>/<%Y>-<name>.%l<ext>'
summary = collection.sort_files([tmp_path], path_format, loc)
summary = collection.sort_files([tmp_path], loc)
self.assert_sort(summary, 27)
self.assert_sort(summary, 23)
shutil.copytree(tmp_path / 'test_exif', tmp_path / 'test_exif_copy')
collection.summary = Summary(tmp_path)
assert collection.summary.success_table.sum() == 0
summary = collection.update(loc)
assert summary.success_table.sum('update') == 29
assert summary.success_table.sum() == 29
assert summary.success_table.sum('update') == 2
assert summary.success_table.sum() == 2
assert not summary.errors
collection.summary = Summary(tmp_path)
summary = collection.update(loc)
@ -195,12 +197,11 @@ class TestCollection:
assert not summary.errors
def test_sort_files_invalid_db(self, tmp_path):
collection = Collection(tmp_path)
collection = Collection(tmp_path, {'path_format': self.path_format})
loc = GeoLocation()
randomize_db(tmp_path)
with pytest.raises(sqlite3.DatabaseError) as e:
summary = collection.sort_files([self.src_path],
self.path_format, loc, imp='copy')
summary = collection.sort_files([self.src_path], loc, imp='copy')
def test_sort_file(self, tmp_path):
for imp in ('copy', 'move', False):
@ -218,7 +219,8 @@ class TestCollection:
)
assert not summary.errors
# Ensure files remain the same
assert collection._checkcomp(dest_path, src_checksum)
if not imp:
assert collection._checkcomp(dest_path, src_checksum)
if imp == 'copy':
assert src_path.exists()
@ -256,10 +258,10 @@ class TestCollection:
shutil.copytree(self.src_path, path)
collection = Collection(path, {'cache': False})
def mockreturn(prompt, theme):
return {'value': '03-12-2021 08:12:35'}
def mockreturn(self, message):
return '03-12-2021 08:12:35'
monkeypatch.setattr(inquirer, 'prompt', mockreturn)
monkeypatch.setattr(Input, 'text', mockreturn)
collection.edit_metadata({path}, {'date_original'}, overwrite=True)
# check if db value is set
@ -277,10 +279,10 @@ class TestCollection:
collection = Collection(path, {'cache': False})
loc = GeoLocation()
def mockreturn(prompt, theme):
return {'value': 'lyon'}
def mockreturn(self, message):
return 'lyon'
monkeypatch.setattr(inquirer, 'prompt', mockreturn)
monkeypatch.setattr(Input, 'text', mockreturn)
collection.edit_metadata({path}, {'location'}, loc, True)
# check if db value is set

View File

@ -28,7 +28,7 @@ class TestSqlite:
'CameraMake': 'camera_make',
'CameraModel': 'camera_model',
'OriginalName':'original_name',
'SrcPath': 'src_path',
'SrcDir': 'src_dir',
'Subdirs': 'subdirs',
'Filename': 'filename'
}
@ -44,8 +44,8 @@ class TestSqlite:
'Location': 'location'
}
cls.sqlite.add_row('metadata', row_data)
cls.sqlite.add_row('location', location_data)
cls.sqlite.upsert_metadata(row_data)
cls.sqlite.upsert_location(location_data)
# cls.sqlite.add_metadata_data('filename', 'ksinslsdosic', 'original_name', 'date_original', 'album', 1)
# cls.sqlite.add_location(24.2, 7.3, 'city', 'state', 'country', 'location')
@ -66,6 +66,7 @@ class TestSqlite:
result = tuple(self.sqlite.cur.execute("""select * from metadata where
rowid=1""").fetchone())
assert result == (
1,
'file_path',
'checksum',
'album',
@ -79,7 +80,7 @@ class TestSqlite:
'camera_make',
'camera_model',
'original_name',
'src_path',
'src_dir',
'subdirs',
'filename'
)
@ -96,7 +97,9 @@ class TestSqlite:
result = tuple(self.sqlite.cur.execute("""select * from location where
rowid=1""").fetchone())
assert result == (
24.2, 7.3,
1,
24.2,
7.3,
'latitude_ref',
'longitude_ref',
'city',

View File

@ -90,10 +90,10 @@ class TestMedia:
date_filename = None
for tag in media.tags_keys['original_name']:
if tag in exif_data:
date_filename = get_date_from_string(exif_data[tag])
date_filename, _, _ = get_date_from_string(exif_data[tag])
break
if not date_filename:
date_filename = get_date_from_string(file_path.name)
date_filename, _, _ = get_date_from_string(file_path.name)
if media.metadata['date_original']:
assert date_media == media.metadata['date_original']