Refactoring collection options (1)

This commit is contained in:
Cédric Leporcq 2021-11-13 10:03:53 +01:00
parent 21be384563
commit 0fdf09ea42
6 changed files with 297 additions and 253 deletions

View File

@ -7,7 +7,7 @@ import sys
import click
from ordigi import log, LOG
from ordigi import constants, log, LOG
from ordigi.config import Config
from ordigi.collection import Collection
from ordigi.geolocation import GeoLocation
@ -65,6 +65,7 @@ _filter_options = [
click.option('--glob', '-g', default='**/*', help='Glob file selection'),
]
_sort_options = [
click.option(
'--album-from-folder',
@ -75,7 +76,7 @@ _sort_options = [
click.option(
'--path-format',
'-p',
default=None,
default=constants.DEFAULT_PATH_FORMAT,
help='Custom featured path format',
),
click.option(
@ -122,11 +123,6 @@ def _get_exclude(opt, exclude):
return set(exclude)
def get_collection_config(root):
return Config(root.joinpath('.ordigi', 'ordigi.conf'))
def _get_paths(paths, root):
root = Path(root).expanduser().absolute()
if not paths:
@ -139,146 +135,28 @@ def _get_paths(paths, root):
return paths, root
@click.command('import')
@click.command('check')
@add_options(_logger_options)
@add_options(_input_options)
@add_options(_dry_run_options)
@add_options(_filter_options)
@add_options(_sort_options)
@click.option(
'--copy',
'-c',
default=False,
is_flag=True,
help='True if you want files to be copied over from src_dir to\
dest_dir rather than moved',
)
@click.argument('src', required=False, nargs=-1, type=click.Path())
@click.argument('dest', required=True, nargs=1, type=click.Path())
def _import(**kwargs):
"""Sort files or directories by reading their EXIF and organizing them
according to ordigi.conf preferences.
@click.argument('path', required=True, nargs=1, type=click.Path())
def _check(**kwargs):
"""
Check media collection.
"""
root = Path(kwargs['path']).expanduser().absolute()
log_level = log.get_level(kwargs['verbose'])
log.console(LOG, level=log_level)
root = kwargs['dest']
src_paths = kwargs['src']
src_paths, root = _get_paths(src_paths, root)
if kwargs['copy']:
import_mode = 'copy'
collection = Collection(root)
result = collection.check_db()
if result:
summary = collection.check_files()
if log_level < 30:
summary.print()
if summary.errors:
sys.exit(1)
else:
import_mode = 'move'
config = get_collection_config(root)
opt = config.get_options()
path_format = opt['path_format']
if kwargs['path_format']:
path_format = kwargs['path_format']
exclude = _get_exclude(opt, kwargs['exclude'])
extensions = set(kwargs['ext'])
collection = Collection(
root,
kwargs['album_from_folder'],
False,
opt['day_begins'],
kwargs['dry_run'],
exclude,
extensions,
kwargs['glob'],
kwargs['interactive'],
kwargs['ignore_tags'],
opt['max_deep'],
kwargs['use_date_filename'],
kwargs['use_file_dates'],
)
loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'], opt['timeout'])
summary = collection.sort_files(
src_paths, path_format, loc, import_mode, kwargs['remove_duplicates']
)
if log_level < 30:
summary.print()
if summary.errors:
sys.exit(1)
@click.command('sort')
@add_options(_logger_options)
@add_options(_input_options)
@add_options(_dry_run_options)
@add_options(_filter_options)
@add_options(_sort_options)
@click.option('--clean', '-C', default=False, is_flag=True, help='Clean empty folders')
@click.option(
'--reset-cache',
'-r',
default=False,
is_flag=True,
help='Regenerate the hash.json and location.json database ',
)
@click.argument('subdirs', required=False, nargs=-1, type=click.Path())
@click.argument('dest', required=True, nargs=1, type=click.Path())
def _sort(**kwargs):
"""Sort files or directories by reading their EXIF and organizing them
according to ordigi.conf preferences.
"""
log_level = log.get_level(kwargs['verbose'])
log.console(LOG, level=log_level)
subdirs = kwargs['subdirs']
root = kwargs['dest']
paths, root = _get_paths(subdirs, root)
cache = True
if kwargs['reset_cache']:
cache = False
config = get_collection_config(root)
opt = config.get_options()
path_format = opt['path_format']
if kwargs['path_format']:
path_format = kwargs['path_format']
exclude = _get_exclude(opt, kwargs['exclude'])
extensions = set(kwargs['ext'])
collection = Collection(
root,
kwargs['album_from_folder'],
cache,
opt['day_begins'],
kwargs['dry_run'],
exclude,
extensions,
kwargs['glob'],
kwargs['interactive'],
kwargs['ignore_tags'],
opt['max_deep'],
kwargs['use_date_filename'],
kwargs['use_file_dates'],
)
loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'], opt['timeout'])
summary = collection.sort_files(
paths, path_format, loc, kwargs['remove_duplicates']
)
if kwargs['clean']:
collection.remove_empty_folders(root)
if log_level < 30:
summary.print()
if summary.errors:
LOG.logger.error('Db data is not accurate run `ordigi update`')
sys.exit(1)
@ -323,25 +201,19 @@ def _clean(**kwargs):
root = kwargs['collection']
paths, root = _get_paths(subdirs, root)
clean_all = False
if not folders:
clean_all = True
config = get_collection_config(root)
opt = config.get_options()
exclude = _get_exclude(opt, kwargs['exclude'])
extensions = set(kwargs['ext'])
collection = Collection(
root,
dry_run=dry_run,
exclude=exclude,
extensions=extensions,
exclude=kwargs['exclude'],
extensions=kwargs['ext'],
glob=kwargs['glob'],
max_deep=opt['max_deep'],
)
# TODO
# summary = collection.sort_files(
# paths, remove_duplicates=kwargs['remove_duplicates']
# )
if kwargs['path_string']:
dedup_regex = set(kwargs['dedup_regex'])
collection.dedup_path(
@ -349,7 +221,7 @@ def _clean(**kwargs):
)
for path in paths:
if clean_all or folders:
if folders:
collection.remove_empty_folders(path)
if kwargs['delete_excluded']:
@ -364,74 +236,6 @@ def _clean(**kwargs):
sys.exit(1)
@click.command('init')
@add_options(_logger_options)
@click.argument('path', required=True, nargs=1, type=click.Path())
def _init(**kwargs):
"""
Init media collection database.
"""
root = Path(kwargs['path']).expanduser().absolute()
config = get_collection_config(root)
opt = config.get_options()
log_level = log.get_level(kwargs['verbose'])
log.console(LOG, level=log_level)
loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'], opt['timeout'])
collection = Collection(root, exclude=opt['exclude'])
summary = collection.init(loc)
if log_level < 30:
summary.print()
@click.command('update')
@add_options(_logger_options)
@click.argument('path', required=True, nargs=1, type=click.Path())
def _update(**kwargs):
"""
Update media collection database.
"""
root = Path(kwargs['path']).expanduser().absolute()
config = get_collection_config(root)
opt = config.get_options()
log_level = log.get_level(kwargs['verbose'])
log.console(LOG, level=log_level)
loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'], opt['timeout'])
collection = Collection(root, exclude=opt['exclude'])
summary = collection.update(loc)
if log_level < 30:
summary.print()
@click.command('check')
@add_options(_logger_options)
@click.argument('path', required=True, nargs=1, type=click.Path())
def _check(**kwargs):
"""
Check media collection.
"""
root = Path(kwargs['path']).expanduser().absolute()
log_level = log.get_level(kwargs['verbose'])
log.console(LOG, level=log_level)
config = get_collection_config(root)
opt = config.get_options()
collection = Collection(root, exclude=opt['exclude'])
result = collection.check_db()
if result:
summary = collection.check_files()
if log_level < 30:
summary.print()
if summary.errors:
sys.exit(1)
else:
LOG.logger.error('Db data is not accurate run `ordigi update`')
sys.exit(1)
@click.command('compare')
@add_options(_logger_options)
@add_options(_dry_run_options)
@ -465,16 +269,10 @@ def _compare(**kwargs):
log.console(LOG, level=log_level)
paths, root = _get_paths(subdirs, root)
config = get_collection_config(root)
opt = config.get_options()
exclude = _get_exclude(opt, kwargs['exclude'])
extensions = set(kwargs['ext'])
collection = Collection(
root,
exclude=exclude,
extensions=extensions,
exclude=kwargs['exclude'],
extensions=kwargs['ext'],
glob=kwargs['glob'],
dry_run=dry_run,
)
@ -491,6 +289,175 @@ def _compare(**kwargs):
sys.exit(1)
@click.command('init')
@add_options(_logger_options)
@click.argument('path', required=True, nargs=1, type=click.Path())
def _init(**kwargs):
"""
Init media collection database.
"""
root = Path(kwargs['path']).expanduser().absolute()
log_level = log.get_level(kwargs['verbose'])
log.console(LOG, level=log_level)
collection = Collection(root)
# TODO retrieve collection.opt
geocoder='Nominatim'
prefer_english_names=False
timeout=1
loc = GeoLocation(geocoder, prefer_english_names, timeout)
summary = collection.init(loc)
if log_level < 30:
summary.print()
@click.command('import')
@add_options(_logger_options)
@add_options(_input_options)
@add_options(_dry_run_options)
@add_options(_filter_options)
@add_options(_sort_options)
@click.option(
'--copy',
'-c',
default=False,
is_flag=True,
help='True if you want files to be copied over from src_dir to\
dest_dir rather than moved',
)
@click.argument('src', required=False, nargs=-1, type=click.Path())
@click.argument('dest', required=True, nargs=1, type=click.Path())
def _import(**kwargs):
"""Sort files or directories by reading their EXIF and organizing them
according to ordigi.conf preferences.
"""
log_level = log.get_level(kwargs['verbose'])
log.console(LOG, level=log_level)
src_paths, root = _get_paths(kwargs['src'], kwargs['dest'])
if kwargs['copy']:
import_mode = 'copy'
else:
import_mode = 'move'
collection = Collection(
root,
kwargs['album_from_folder'],
False,
kwargs['dry_run'],
kwargs['exclude'],
kwargs['ext'],
kwargs['glob'],
kwargs['interactive'],
kwargs['ignore_tags'],
kwargs['use_date_filename'],
kwargs['use_file_dates'],
)
# TODO retrieve collection.opt
# Use loc function
geocoder='Nominatim'
prefer_english_names=False
timeout=1
loc = GeoLocation(geocoder, prefer_english_names, timeout)
summary = collection.sort_files(
src_paths, kwargs['path_format'], loc, import_mode, kwargs['remove_duplicates']
)
if log_level < 30:
summary.print()
if summary.errors:
sys.exit(1)
@click.command('sort')
@add_options(_logger_options)
@add_options(_input_options)
@add_options(_dry_run_options)
@add_options(_filter_options)
@add_options(_sort_options)
@click.option('--clean', '-C', default=False, is_flag=True, help='Clean empty folders')
@click.option(
'--reset-cache',
'-r',
default=False,
is_flag=True,
help='Regenerate the hash.json and location.json database ',
)
@click.argument('subdirs', required=False, nargs=-1, type=click.Path())
@click.argument('dest', required=True, nargs=1, type=click.Path())
def _sort(**kwargs):
"""Sort files or directories by reading their EXIF and organizing them
according to ordigi.conf preferences.
"""
log_level = log.get_level(kwargs['verbose'])
log.console(LOG, level=log_level)
paths, root = _get_paths(kwargs['subdirs'], kwargs['dest'])
cache = not kwargs['reset_cache']
collection = Collection(
root,
kwargs['album_from_folder'],
cache,
kwargs['dry_run'],
kwargs['exclude'],
kwargs['ext'],
kwargs['glob'],
kwargs['interactive'],
kwargs['ignore_tags'],
kwargs['use_date_filename'],
kwargs['use_file_dates'],
)
# TODO retrieve collection.opt
geocoder='Nominatim'
prefer_english_names=False
timeout=1
loc = GeoLocation(geocoder, prefer_english_names, timeout)
summary = collection.sort_files(
paths, kwargs['path_format'], loc, kwargs['remove_duplicates']
)
if kwargs['clean']:
collection.remove_empty_folders(root)
if log_level < 30:
summary.print()
if summary.errors:
sys.exit(1)
@click.command('update')
@add_options(_logger_options)
@click.argument('path', required=True, nargs=1, type=click.Path())
def _update(**kwargs):
"""
Update media collection database.
"""
root = Path(kwargs['path']).expanduser().absolute()
log_level = log.get_level(kwargs['verbose'])
log.console(LOG, level=log_level)
geocoder='Nominatim'
prefer_english_names=False
timeout=1
loc = GeoLocation(geocoder, prefer_english_names, timeout)
collection = Collection(root)
summary = collection.update(loc)
if log_level < 30:
summary.print()
@click.group()
def main(**kwargs):
pass

View File

@ -333,10 +333,10 @@ class Paths:
def check(self, path):
"""
:param: str path
Check if path exist
:param: Path path
:return: Path path
"""
# some error checking
if not path.exists():
self.log.error(f'Directory {path} does not exist')
sys.exit(1)
@ -696,19 +696,40 @@ class Collection(SortMedias):
root,
album_from_folder=False,
cache=False,
day_begins=0,
dry_run=False,
exclude=None,
extensions=None,
glob='**/*',
interactive=False,
ignore_tags=None,
max_deep=None,
use_date_filename=False,
use_file_dates=False,
):
# Modules
day_begins=0
max_deep=None
# Options
self.day_begins = day_begins
self.log = LOG.getChild(self.__class__.__name__)
self.glob = glob
# Check if collection path is valid
if not root.exists():
self.log.error(f'Collection path {root} does not exist')
sys.exit(1)
# def get_collection_config(root):
# return Config(root.joinpath('.ordigi', 'ordigi.conf'))
# TODO Collection options
# config = get_collection_config(root)
# opt = config.get_options()
# exclude = _get_exclude(opt, kwargs['exclude'])
# path_format = opt['path_format']
# if kwargs['path_format']:
# path_format = kwargs['path_format']
self.db = CollectionDb(root)
self.fileio = FileIO(dry_run)
self.paths = Paths(
@ -741,19 +762,8 @@ class Collection(SortMedias):
interactive,
)
# Arguments
if not self.root.exists():
self.log.error(f'Directory {self.root} does not exist')
sys.exit(1)
# Options
self.day_begins = day_begins
self.glob = glob
self.log = LOG.getChild(self.__class__.__name__)
self.summary = Summary(self.root)
# Attributes
self.summary = Summary(self.root)
self.theme = request.load_theme()
def get_collection_files(self, exclude=True):

View File

@ -23,6 +23,7 @@ APPLICATION_DIRECTORY = get_config_dir('ordigi')
DEFAULT_PATH = '{%Y-%m-%b}/{album}|{city}'
DEFAULT_NAME = '{%Y-%m-%d_%H-%M-%S}-{name}-{title}.%l{ext}'
DEFAULT_PATH_FORMAT = DEFAULT_PATH + '/' + DEFAULT_NAME
DEFAULT_GEOCODER = 'Nominatim'
CONFIG_FILE = APPLICATION_DIRECTORY / 'ordigi.conf'

View File

@ -3,12 +3,15 @@ from blessed import Terminal
term = Terminal()
# TODO allow exit from inquierer prompt
# TODO fix 'opening_prompt_color': term.yellow,
def load_theme():
"""
Customize inquirer
source:https://github.com/magmax/python-inquirer/blob/master/inquirer/themes.py
"""
custom_theme = {
'List': {
'opening_prompt_color': term.yellow,
},
'Question': {
'brackets_color': term.dodgerblue4,
'default_color': term.yellow,
@ -29,3 +32,38 @@ def load_theme():
}
return inquirer.themes.load_theme_from_dict(custom_theme)
# def edit_prompt(self, key: str, value: str) -> str:
# print(f"Date conflict for file: {self.file_path}")
# choices_list = [
# inquirer.List(
# 'edit',
# message=f"Edit '{key}' metadata",
# choices = [
# (f"{key}: '{value}'", value),
# ("custom", None),
# ],
# default=value,
# ),
# ]
# answers = inquirer.prompt(choices_list, theme=self.theme)
# if not answers['edit']:
# prompt = [
# inquirer.Text('edit', message="value"),
# ]
# answers = inquirer.prompt(prompt, theme=self.theme)
# return self.get_date_format(answers['edit'])
# else:
# return answers['date_list']
# choices = [
# (f"date original:'{date_original}'", date_original),
# (f"date filename:'{date_filename}'", date_filename),
# ("custom", None),
# ]
# default = f'{date_original}'
# return self._get_date_media_interactive(choices, default)

View File

@ -141,3 +141,16 @@ def camel2snake(name):
return name[0].lower() + re.sub(
r'(?!^)[A-Z]', lambda x: '_' + x.group(0).lower(), name[1:]
)
import os
import platform
import subprocess
def open_file(path):
if platform.system() == "Windows":
os.startfile(path)
elif platform.system() == "Darwin":
subprocess.Popen(["open", path])
else:
subprocess.Popen(["xdg-open", path])

View File

@ -40,9 +40,9 @@ class TestOrdigi:
'--use-file-dates',
)
def assert_cli(self, command, attributes):
def assert_cli(self, command, attributes, state=0):
result = self.runner.invoke(command, [*attributes])
assert result.exit_code == 0, attributes
assert result.exit_code == state, (command, attributes)
def assert_options(self, command, bool_options, arg_options, paths):
for bool_option in bool_options:
@ -57,6 +57,21 @@ class TestOrdigi:
*bool_options, *arg_options_list, *paths,
])
def test_commands(self):
# Check if fail if path not exist
commands = [
cli._check,
cli._clean,
cli._compare,
cli._import,
cli._init,
cli._sort,
cli._update,
]
for command in commands:
self.assert_cli(command, ['not_exist'], state=1)
def test_sort(self):
bool_options = (
# '--interactive',