Refactoring collection

This commit is contained in:
Cédric Leporcq 2021-11-03 21:29:06 +01:00
parent e04ad3248a
commit dde40149c2
4 changed files with 380 additions and 369 deletions

View File

@ -61,6 +61,14 @@ _filter_options = [
common media file extension for filtering. Ignored files remain in
the same directory structure""",
),
click.option(
'--ignore-tags',
'-I',
default=set(),
multiple=True,
help='Specific tags or group that will be ignored when\
searching for file data. Example \'File:FileModifyDate\' or \'Filename\'',
),
click.option('--glob', '-g', default='**/*', help='Glob file selection'),
]
@ -71,14 +79,6 @@ _sort_options = [
is_flag=True,
help="Use images' folders as their album names.",
),
click.option(
'--ignore-tags',
'-I',
default=set(),
multiple=True,
help='Specific tags or group that will be ignored when\
searching for file data. Example \'File:FileModifyDate\' or \'Filename\'',
),
click.option(
'--path-format',
'-p',
@ -133,14 +133,15 @@ def get_collection_config(root):
return Config(root.joinpath('.ordigi', 'ordigi.conf'))
def _get_paths(paths, root):
root = Path(root).absolute()
root = Path(root).expanduser().absolute()
if not paths:
paths = {root}
else:
paths = set()
for path in paths:
paths.add(Path(path).absolute())
paths.add(Path(path).expanduser().absolute())
return paths, root
@ -198,6 +199,7 @@ def _import(**kwargs):
filter_by_ext,
kwargs['glob'],
kwargs['interactive'],
kwargs['ignore_tags'],
logger,
opt['max_deep'],
kwargs['use_date_filename'],
@ -207,7 +209,7 @@ def _import(**kwargs):
loc = GeoLocation(opt['geocoder'], logger, opt['prefer_english_names'], opt['timeout'])
summary = collection.sort_files(
src_paths, path_format, loc, import_mode, kwargs['remove_duplicates'], kwargs['ignore_tags']
src_paths, path_format, loc, import_mode, kwargs['remove_duplicates']
)
if log_level < 30:
@ -268,6 +270,7 @@ def _sort(**kwargs):
filter_by_ext,
kwargs['glob'],
kwargs['interactive'],
kwargs['ignore_tags'],
logger,
opt['max_deep'],
kwargs['use_date_filename'],
@ -277,7 +280,7 @@ def _sort(**kwargs):
loc = GeoLocation(opt['geocoder'], logger, opt['prefer_english_names'], opt['timeout'])
summary = collection.sort_files(
paths, path_format, loc, kwargs['remove_duplicates'], kwargs['ignore_tags']
paths, path_format, loc, kwargs['remove_duplicates']
)
if kwargs['clean']:
@ -380,7 +383,7 @@ def _init(**kwargs):
"""
Init media collection database.
"""
root = Path(kwargs['path']).absolute()
root = Path(kwargs['path']).expanduser().absolute()
config = get_collection_config(root)
opt = config.get_options()
log_level = log.level(kwargs['verbose'], kwargs['debug'])
@ -401,7 +404,7 @@ def _update(**kwargs):
"""
Update media collection database.
"""
root = Path(kwargs['path']).absolute()
root = Path(kwargs['path']).expanduser().absolute()
config = get_collection_config(root)
opt = config.get_options()
log_level = log.level(kwargs['verbose'], kwargs['debug'])
@ -424,7 +427,7 @@ def _check(**kwargs):
"""
log_level = log.level(kwargs['verbose'], kwargs['debug'])
logger = log.get_logger(level=log_level)
root = Path(kwargs['path']).absolute()
root = Path(kwargs['path']).expanduser().absolute()
config = get_collection_config(root)
opt = config.get_options()
collection = Collection(root, exclude=opt['exclude'], logger=logger)

View File

@ -250,59 +250,77 @@ class FPath:
return None
class Collection:
"""Class of the media collection."""
class CollectionDb:
def __init__(self, root):
self.sqlite = Sqlite(root)
def _format_row_data(self, table, metadata):
row_data = {}
for title in self.sqlite.tables[table]['header']:
key = utils.camel2snake(title)
row_data[title] = metadata[key]
return row_data
def add_file_data(self, metadata):
"""Save metadata informations to db"""
loc_values = self._format_row_data('location', metadata)
metadata['location_id'] = self.sqlite.add_row('location', loc_values)
row_data = self._format_row_data('metadata', metadata)
self.sqlite.add_row('metadata', row_data)
class FileIO:
"""File Input/Ouput operations for collection"""
def __init__(self, dry_run=False, logger=logging.getLogger()):
# Options
self.dry_run = dry_run
self.logger = logger.getChild(self.__class__.__name__)
def copy(self, src_path, dest_path):
if not self.dry_run:
shutil.copy2(src_path, dest_path)
self.logger.info(f'copy: {src_path} -> {dest_path}')
def move(self, src_path, dest_path):
if not self.dry_run:
# Move the file into the destination directory
shutil.move(src_path, dest_path)
self.logger.info(f'move: {src_path} -> {dest_path}')
def remove(self, path):
if not self.dry_run:
os.remove(path)
self.logger.info(f'remove: {path}')
class SortMedias:
"""Sort medias in collection"""
def __init__(
self,
root,
album_from_folder=False,
cache=False,
day_begins=0,
db=None,
dry_run=False,
exclude=set(),
filter_by_ext=set(),
glob='**/*',
interactive=False,
logger=logging.getLogger(),
max_deep=None,
use_date_filename=False,
use_file_dates=False,
):
# Attributes
self.root = root.expanduser().absolute()
if not self.root.exists():
logger.error(f'Directory {self.root} does not exist')
sys.exit(1)
self.db = Sqlite(self.root)
self.root = root
# Options
self.album_from_folder = album_from_folder
self.cache = cache
self.day_begins = day_begins
self.db = db
self.dry_run = dry_run
self.exclude = exclude
if '%media' in filter_by_ext:
filter_by_ext.remove('%media')
self.filter_by_ext = filter_by_ext.union(Media.extensions)
else:
self.filter_by_ext = filter_by_ext
self.glob = glob
self.interactive = interactive
self.logger = logger.getChild(self.__class__.__name__)
self.max_deep = max_deep
# List to store media metadata
self.medias = []
self.summary = Summary(self.root)
self.use_date_filename = use_date_filename
self.use_file_dates = use_file_dates
self.src_list = []
self.dest_list = []
self.summary = Summary(self.root)
# Constants
self.theme = request.load_theme()
@ -322,21 +340,6 @@ class Collection:
return True
def _format_row_data(self, table, metadata):
row_data = {}
for title in self.db.tables[table]['header']:
key = utils.camel2snake(title)
row_data[title] = metadata[key]
return row_data
def _add_db_data(self, metadata):
loc_values = self._format_row_data('location', metadata)
metadata['location_id'] = self.db.add_row('location', loc_values)
row_data = self._format_row_data('metadata', metadata)
self.db.add_row('metadata', row_data)
def _update_exif_data(self, media):
updated = False
if self.album_from_folder:
@ -356,30 +359,9 @@ class Collection:
return False
def remove(self, file_path):
if not self.dry_run:
os.remove(file_path)
self.logger.info(f'remove: {file_path}')
def remove_excluded_files(self):
result = True
for file_path in self.root.glob(self.glob):
if file_path.is_dir():
continue
else:
if self.root / '.ordigi' in file_path.parents:
continue
for exclude in self.exclude:
if fnmatch(file_path, exclude):
if not self.dry_run:
self.remove(file_path)
self.summary.append('remove', True, file_path)
break
return self.summary
def _check_file(self, src_path, dest_path, media):
def _record_file(self, src_path, dest_path, media, imp=False):
"""Check file and record the file to db"""
# Check if file remain the same
checksum = media.metadata['checksum']
if not self._checkcomp(dest_path, checksum):
self.logger.error(f'Files {src_path} and {dest_path} are not identical')
@ -396,39 +378,118 @@ class Collection:
media.metadata['file_path'] = os.path.relpath(dest_path, self.root)
if not self.dry_run:
self.db.add_file_data(media.metadata)
if imp != 'copy' and self.root in src_path.parents:
self.db.sqlite.delete_filepath(str(src_path.relative_to(self.root)))
return True
def _copy(self, src_path, dest_path):
if not self.dry_run:
shutil.copy2(src_path, dest_path)
self.logger.info(f'copy: {src_path} -> {dest_path}')
def _set_summary(self, result, src_path, dest_path, imp=None):
if result:
if imp:
self.summary.append('import', True, src_path, dest_path)
else:
self.summary.append('sort', True, src_path, dest_path)
else:
if imp:
self.summary.append('import', False, src_path, dest_path)
else:
self.summary.append('sort', False, src_path, dest_path)
def _move(self, src_path, dest_path):
if not self.dry_run:
# Move the file into the destination directory
shutil.move(src_path, dest_path)
self.logger.info(f'move: {src_path} -> {dest_path}')
def sort_file(self, src_path, dest_path, media, imp=False):
"""Sort file and register it to db"""
file = FileIO(self.dry_run, self.logger)
def _remove(self, path):
if not self.dry_run:
self.remove(path)
if imp == 'copy':
file.copy(src_path, dest_path)
else:
file.move(src_path, dest_path)
self.logger.info(f'remove: {path}')
if self.db:
result = self._record_file(
src_path, dest_path, media, imp=imp
)
else:
result = True
def _record_file(self, src_path, dest_path, media, import_mode=False):
"""Check file and record the file to db"""
# Check if file remain the same
if not self._check_file(src_path, dest_path, media):
self.summary.append('check', False, src_path, dest_path)
return False
self._set_summary(result, src_path, dest_path, imp)
if not self.dry_run:
self._add_db_data(media.metadata)
if import_mode != 'copy' and self.root in src_path.parents:
self.db.delete_filepath(str(src_path.relative_to(self.root)))
return self.summary
return True
def _create_directories(self, files_data):
"""Create a directory if it does not already exist.
:param Path: A fully qualified path of the to create.
:returns: bool
"""
for media, relpath in files_data:
directory_path = self.root / relpath.parent
parts = directory_path.relative_to(self.root).parts
for i, _ in enumerate(parts):
dir_path = self.root / Path(*parts[0 : i + 1])
if dir_path.is_file():
self.logger.warning(f'Target directory {dir_path} is a file')
# Rename the src_file
if self.interactive:
prompt = [
inquirer.Text(
'file_path',
message="New name for" f"'{dir_path.name}' file",
),
]
answers = inquirer.prompt(prompt, theme=self.theme)
file_path = dir_path.parent / answers['file_path']
else:
file_path = dir_path.parent / (dir_path.name + '_file')
self.logger.warning(f'Renaming {dir_path} to {file_path}')
if not self.dry_run:
shutil.move(dir_path, file_path)
for med, _ in files_data:
if med.file_path == dir_path:
med.file_path = file_path
break
if not self.dry_run:
directory_path.mkdir(parents=True, exist_ok=True)
self.logger.info(f'Create {directory_path}')
def check_conflicts(self, src_path, dest_path, remove_duplicates=False):
'''
Check if file can be copied or moved file to dest_path.
Return True if success, None is no filesystem action, False if
conflicts.
:params: str, str, bool
:returns: bool or None
'''
# check for collisions
if src_path == dest_path:
self.logger.info(f"File {dest_path} already sorted")
return 2
if dest_path.is_dir():
self.logger.info(f"File {dest_path} is a existing directory")
return 1
elif dest_path.is_file():
self.logger.info(f"File {dest_path} already exist")
if remove_duplicates:
if filecmp.cmp(src_path, dest_path):
self.logger.info(
f"File in source and destination are identical. Duplicate will be ignored."
)
return 3
else: # name is same, but file is different
self.logger.info(
f"File {src_path} and {dest_path} are different."
)
return 1
else:
return 1
else:
return 0
def _solve_conflicts(self, conflicts, remove_duplicates):
result = False
@ -456,9 +517,147 @@ class Collection:
yield (src_path, dest_path, media), conflict
def sort_medias(self, files_data, imp=None, remove_duplicates=False):
"""
sort files and solve conflicts
"""
file = FileIO(self.dry_run, self.logger)
# Create directories
self._create_directories(files_data)
conflicts = []
for media, relpath in files_data:
src_path = media.file_path
dest_path = self.root / relpath
conflict = self.check_conflicts(src_path, dest_path, remove_duplicates)
if not conflict:
self.sort_file(
src_path, dest_path, media, imp=imp
)
elif conflict == 1:
# There is conflict and file are different
conflicts.append((src_path, dest_path, media))
elif conflict == 3:
# Same file checksum
if imp == 'move':
file.remove(src_path)
elif conflict == 2:
# File already sorted
pass
if conflicts != []:
for files_data, conflict in self._solve_conflicts(conflicts,
remove_duplicates):
src_path, dest_path, media = files_data
if not conflict:
self.sort_file(
src_path, dest_path, media, imp=imp
)
elif conflict == 1:
# There is unresolved conflict
self._set_summary(False, src_path, dest_path, imp)
elif conflict == 3:
# Same file checksum
if imp == 'move':
file.remove(src_path)
elif conflict == 2:
# File already sorted
pass
return self.summary
class Collection(SortMedias):
"""Class of the media collection."""
def __init__(
self,
root,
album_from_folder=False,
cache=False,
day_begins=0,
dry_run=False,
exclude=set(),
filter_by_ext=set(),
glob='**/*',
interactive=False,
ignore_tags=None,
logger=logging.getLogger(),
max_deep=None,
use_date_filename=False,
use_file_dates=False,
):
self.db = CollectionDb(root)
super().__init__(
root,
album_from_folder,
self.db,
dry_run,
interactive,
logger,
)
# Attributes
if not self.root.exists():
logger.error(f'Directory {self.root} does not exist')
sys.exit(1)
# Options
self.cache = cache
self.day_begins = day_begins
self.exclude = exclude
if '%media' in filter_by_ext:
filter_by_ext.remove('%media')
self.filter_by_ext = filter_by_ext.union(Media.extensions)
else:
self.filter_by_ext = filter_by_ext
self.glob = glob
self.ignore_tags = ignore_tags
self.logger = logger.getChild(self.__class__.__name__)
self.max_deep = max_deep
# List to store media metadata
self.medias = []
self.summary = Summary(self.root)
self.use_date_filename = use_date_filename
self.use_file_dates = use_file_dates
self.src_list = []
# Constants
self.theme = request.load_theme()
def remove_excluded_files(self):
file = FileIO(self.dry_run, self.logger)
result = True
for file_path in self.root.glob(self.glob):
if file_path.is_dir():
continue
else:
if self.root / '.ordigi' in file_path.parents:
continue
for exclude in self.exclude:
if fnmatch(file_path, exclude):
file.remove(file_path)
self.summary.append('remove', True, file_path)
break
return self.summary
def _split_part(self, dedup_regex, path_part, items):
"""Split part from regex
:returns: parts"""
"""
Split part from regex
:returns: parts
"""
regex = dedup_regex.pop(0)
parts = re.split(regex, path_part)
# Loop thought part, search matched regex part and proceed with
@ -544,48 +743,11 @@ class Collection:
# return file_path and subdir
yield file_path
def _create_directory(self, directory_path, media):
"""Create a directory if it does not already exist.
:param Path: A fully qualified path of the to create.
:returns: bool
"""
parts = directory_path.relative_to(self.root).parts
for i, part in enumerate(parts):
dir_path = self.root / Path(*parts[0 : i + 1])
if dir_path.is_file():
self.logger.warning(f'Target directory {dir_path} is a file')
# Rename the src_file
if self.interactive:
prompt = [
inquirer.Text(
'file_path',
message="New name for" f"'{dir_path.name}' file",
),
]
answers = inquirer.prompt(prompt, theme=self.theme)
file_path = dir_path.parent / answers['file_path']
else:
file_path = dir_path.parent / (dir_path.name + '_file')
self.logger.warning(f'Renaming {dir_path} to {file_path}')
shutil.move(dir_path, file_path)
for media in medias:
if media.file_path == dir_path:
media.file_path = file_path
break
if not self.dry_run:
directory_path.mkdir(parents=True, exist_ok=True)
self.logger.info(f'Create {directory_path}')
def _check_path(self, path):
"""
:param: str path
:return: Path path
"""
path = Path(path).expanduser().absolute()
# some error checking
if not path.exists():
self.logger.error(f'Directory {path} does not exist')
@ -601,96 +763,6 @@ class Collection:
file_path, (int(datetime.now().timestamp()), int(date_media.timestamp()))
)
def check_conflicts(self, src_path, dest_path, remove_duplicates=False):
'''
Check if file can be copied or moved file to dest_path.
Return True if success, None is no filesystem action, False if
conflicts.
:params: str, str, bool
:returns: bool or None
'''
# check for collisions
if src_path == dest_path:
self.logger.info(f"File {dest_path} already sorted")
return 2
if dest_path.is_dir():
self.logger.info(f"File {dest_path} is a existing directory")
return 1
elif dest_path.is_file():
self.logger.info(f"File {dest_path} already exist")
if remove_duplicates:
if filecmp.cmp(src_path, dest_path):
self.logger.info(
f"File in source and destination are identical. Duplicate will be ignored."
)
return 3
else: # name is same, but file is different
self.logger.info(
f"File {src_path} and {dest_path} are different."
)
return 1
else:
return 1
else:
return 0
def _sort_medias(self, files_data, import_mode=None, remove_duplicates=False):
"""
sort files and solve conflicts
"""
# Create directories
for media, relpath in files_data:
dest_directory = self.root / relpath.parent
self._create_directory(dest_directory, media)
conflicts = []
for media, relpath in files_data:
src_path = media.file_path
dest_path = self.root / relpath
conflict = self.check_conflicts(src_path, dest_path, remove_duplicates)
if not conflict:
self.sort_file(
src_path, dest_path, media, import_mode=import_mode
)
elif conflict == 1:
# There is conflict and file are different
conflicts.append((src_path, dest_path, media))
elif conflict == 3:
# Same file checksum
if import_mode == 'move':
self._remove(src_path)
self.dest_list.append(dest_path)
elif conflict == 2:
# File already sorted
self.dest_list.append(dest_path)
if conflicts != []:
for files_data, conflict in self._solve_conflicts(conflicts,
remove_duplicates):
src_path, dest_path, media = files_data
if not conflict:
self.sort_file(
src_path, dest_path, media, import_mode=import_mode
)
elif conflict == 1:
# There is unresolved conflict
if import_mode:
self.summary.append('import', False, src_path, dest_path)
else:
self.summary.append('sort', False, src_path, dest_path)
elif conflict == 3:
# Same file checksum
if import_mode == 'move':
self._remove(src_path)
self.dest_list.append(dest_path)
elif conflict == 2:
# File already sorted
self.dest_list.append(dest_path)
def _modify_selection(self):
"""
:params: list
@ -716,7 +788,7 @@ class Collection:
:returns: bool
"""
file_paths = [x for x in self._get_all_files()]
db_rows = [row['FilePath'] for row in self.db.get_rows('metadata')]
db_rows = [row['FilePath'] for row in self.db.sqlite.get_rows('metadata')]
for file_path in file_paths:
relpath = os.path.relpath(file_path, self.root)
# If file not in database
@ -733,18 +805,22 @@ class Collection:
return True
def _check_processed(self):
# Finally check if are files are successfully processed
n_fail = len(self.src_list) - len(self.dest_list)
if n_fail != 0:
self.logger.error(f"{n_fail} files have not be processed")
return False
def _get_media(self, file_path, src_dir, loc=None):
media = Media(
file_path,
src_dir,
self.album_from_folder,
self.ignore_tags,
self.interactive,
self.logger,
self.use_date_filename,
self.use_file_dates,
)
media.get_metadata(self.root, loc, self.db.sqlite, self.cache)
return self.check_db()
return media
def _get_medias_data(
self, src_dirs, import_mode=None, ignore_tags=set(), loc=None
):
def _get_medias(self, src_dirs, imp=None, loc=None):
"""Get medias data"""
for src_dir in src_dirs:
self.dest_list = []
@ -754,29 +830,25 @@ class Collection:
# Get medias and src_dirs
for src_path in self.src_list:
if self.root not in src_path.parents:
if not import_mode:
if not imp:
self.logger.error(f"""{src_path} not in {self.root}
collection, use `ordigi import`""")
sys.exit(1)
# Get file metadata
media = Media(
src_path,
src_dir,
self.album_from_folder,
ignore_tags,
self.interactive,
self.logger,
self.use_date_filename,
self.use_file_dates,
)
media.get_metadata(self.root, loc, self.db, self.cache)
media = self._get_media(src_path, src_dir, loc)
yield media
def _init_check_db(self, loc=None, ignore_tags=set()):
if self.db.is_empty('metadata'):
self.init(loc, ignore_tags)
def get_col_medias(self, loc):
for file_path in self._get_all_files():
media = self._get_media(file_path, self.root, loc)
media.metadata['file_path'] = os.path.relpath(file_path, self.root)
yield media, file_path
def _init_check_db(self, loc=None):
if self.db.sqlite.is_empty('metadata'):
self.init(loc)
elif not self.check_db():
self.logger.error('Db data is not accurate run `ordigi update`')
sys.exit(1)
@ -795,30 +867,16 @@ class Collection:
return src_list
def get_medias(self, loc, ignore_tags=set()):
for file_path in self._get_all_files():
media = Media(
file_path,
self.root,
ignore_tags=ignore_tags,
logger=self.logger,
use_date_filename=self.use_date_filename,
use_file_dates=self.use_file_dates,
)
metadata = media.get_metadata(self.root, loc, self.db, self.cache)
media.metadata['file_path'] = os.path.relpath(file_path, self.root)
yield media, file_path
def init(self, loc, ignore_tags=set()):
for media, file_path in self.get_medias(loc):
self._add_db_data(media.metadata)
def init(self, loc):
for media, file_path in self.get_col_medias(loc):
self.db.add_file_data(media.metadata)
self.summary.append('update', file_path)
return self.summary
def update(self, loc, ignore_tags=set()):
def update(self, loc):
file_paths = [x for x in self._get_all_files()]
db_rows = [row for row in self.db.get_rows('metadata')]
db_rows = [row for row in self.db.sqlite.get_rows('metadata')]
invalid_db_rows = set()
db_paths = set()
for db_row in db_rows:
@ -832,15 +890,7 @@ class Collection:
relpath = os.path.relpath(file_path, self.root)
# If file not in database
if relpath not in db_paths:
media = Media(
file_path,
self.root,
ignore_tags=ignore_tags,
logger=self.logger,
use_date_filename=self.use_date_filename,
use_file_dates=self.use_file_dates,
)
metadata = media.get_metadata(self.root, loc, self.db, self.cache)
media = self._get_media(file_path, self.root, loc)
media.metadata['file_path'] = relpath
# Check if file checksum is in invalid rows
row = []
@ -855,12 +905,12 @@ class Collection:
media.metadata['Filename'] = row['Filename']
break
# set row attribute to the file
self._add_db_data(media.metadata)
self.db.add_file_data(media.metadata)
self.summary.append('update', file_path)
# Finally delete invalid rows
for row in invalid_db_rows:
self.db.delete_filepath(row['FilePath'])
self.db.sqlite.delete_filepath(row['FilePath'])
return self.summary
@ -868,7 +918,7 @@ class Collection:
for file_path in self._get_all_files():
checksum = utils.checksum(file_path)
relpath = file_path.relative_to(self.root)
if checksum == self.db.get_checksum(relpath):
if checksum == self.db.sqlite.get_checksum(relpath):
self.summary.append('check',True, file_path)
else:
self.logger.error('{file_path} is corrupted')
@ -922,47 +972,20 @@ class Collection:
return self.summary
def sort_file(self, src_path, dest_path, media, import_mode=False):
if import_mode == 'copy':
self._copy(src_path, dest_path)
else:
self._move(src_path, dest_path)
result = self._record_file(
src_path, dest_path, media, import_mode=import_mode
)
if result:
self.dest_list.append(dest_path)
if import_mode:
self.summary.append('import', True, src_path, dest_path)
else:
self.summary.append('sort', True, src_path, dest_path)
else:
if import_mode:
self.summary.append('import', False, src_path, dest_path)
else:
self.summary.append('sort', False, src_path, dest_path)
return self.summary
def sort_files(
self, src_dirs, path_format, loc,
import_mode=False, remove_duplicates=False, ignore_tags=set()
imp=False, remove_duplicates=False
):
"""
Sort files into appropriate folder
"""
# Check db
self._init_check_db(loc, ignore_tags)
self._init_check_db(loc)
# Get medias data
files_data = []
subdirs = set()
for media in self._get_medias_data(
src_dirs,
import_mode=import_mode, ignore_tags=ignore_tags, loc=loc,
):
for media in self._get_medias(src_dirs, imp=imp, loc=loc):
# Get the destination path according to metadata
fpath = FPath(path_format, self.day_begins, self.logger)
relpath = Path(fpath.get_path(media.metadata))
@ -971,12 +994,12 @@ class Collection:
files_data.append((copy(media), relpath))
# Sort files and solve conflicts
self._sort_medias(files_data, import_mode, remove_duplicates)
self.summary = self.sort_medias(files_data, imp, remove_duplicates)
if import_mode != 'copy':
if imp != 'copy':
self._remove_empty_subdirs(subdirs, src_dirs)
if not self._check_processed():
if not self.check_db():
self.summary.append('check', False)
return self.summary
@ -1004,7 +1027,7 @@ class Collection:
# Get medias data
files_data = []
for media in self._get_medias_data(paths):
for media in self._get_medias(paths):
# Deduplicate the path
src_path = media.file_path
path_parts = src_path.relative_to(self.root).parts
@ -1024,9 +1047,9 @@ class Collection:
files_data.append((copy(media), relpath))
# Sort files and solve conflicts
self._sort_medias(files_data, remove_duplicates=remove_duplicates)
self.sort_medias(files_data, remove_duplicates=remove_duplicates)
if not self._check_processed():
if not self.check_db():
self.summary.append('check', False)
return self.summary
@ -1049,12 +1072,6 @@ class Collection:
if image.is_image():
yield image
def _get_media_data(self, img_path, path):
media = Media(img_path, path, self.logger)
media.get_metadata(self.root, db=self.db, cache=self.cache)
return media
def _find_similar_images(self, image, images, path, dest_dir, similarity=80):
files_data = []
if not image.img_path.is_file():
@ -1066,7 +1083,7 @@ class Collection:
for img_path in images.find_similar(image, similarity):
self.src_list.append(img_path)
media = self._get_media_data(img_path, path)
media = self._get_media(img_path, path)
relpath = directory_name / img_path.name
files_data.append((copy(media), relpath))
@ -1074,7 +1091,7 @@ class Collection:
if files_data:
# Found similar images to image
self.src_list.append(image.img_path)
media = self._get_media_data(image.img_path, path)
media = self._get_media(image.img_path, path)
relpath = directory_name / image.img_path.name
files_data.insert(0, (copy(media), relpath))
@ -1090,25 +1107,25 @@ class Collection:
images_paths = set(x for x in self._get_images(path))
images = Images(images_paths, logger=self.logger)
nb_row_ini = self.db.len('metadata')
nb_row_ini = self.db.sqlite.len('metadata')
for image in images_paths:
files_data = self._find_similar_images(
image, images, path, dest_dir, similarity
)
if files_data:
# Move the simlars file into the destination directory
self._sort_medias(files_data, remove_duplicates=remove_duplicates)
self.sort_medias(files_data, remove_duplicates=remove_duplicates)
nb_row_end = self.db.len('metadata')
nb_row_end = self.db.sqlite.len('metadata')
if nb_row_ini and nb_row_ini != nb_row_end:
self.logger.error('Nb of row have changed unexpectedly')
if not self._check_processed():
if not self.check_db():
self.summary.append('check', False)
return self.summary
def fill_data(self, path, key, loc=None, edit=False):
def fill_metadata(self, path, key, loc=None, overwrite=False):
"""Fill metadata and exif data for given key"""
self._init_check_db()
@ -1121,7 +1138,7 @@ class Collection:
print(f"Set {key} alone is not allowed")
return None
if edit:
if overwrite:
print(f"Edit {key} values:")
else:
print(f"Fill empty {key} values:")
@ -1129,24 +1146,14 @@ class Collection:
self.src_list = self._get_path_list(path)
for file_path in self.src_list:
media = Media(
file_path,
self.root,
self.album_from_folder,
ignore_tags,
self.interactive,
self.logger,
self.use_date_filename,
self.use_file_dates,
)
metadata = media.get_metadata(self.root, loc, self.db, self.cache)
media = self._get_media(file_path, self.root, loc)
print()
value = media.metadata[key]
if edit or not value:
if overwrite or not value:
print(f"FILE: '{file_path}'")
if edit:
if overwrite:
print(f"{key}: '{value}'")
if edit or not value:
if overwrite or not value:
# Prompt value for given key for file_path
# utils.open_file()
prompt = [
@ -1166,7 +1173,7 @@ class Collection:
media.metadata[key] = value
# Update database
self._add_db_data(media.metadata)
self.db.add_file_data(media.metadata)
# Update exif data
media.set_key_values(key, value)

View File

@ -35,7 +35,7 @@ class Media:
file_path,
src_dir,
album_from_folder=False,
ignore_tags=set(),
ignore_tags=None,
interactive=False,
logger=logging.getLogger(),
use_date_filename=False,
@ -49,6 +49,9 @@ class Media:
self.album_from_folder = album_from_folder
self.exif_metadata = None
if ignore_tags is None:
ignore_tags = set()
self.ignore_tags = ignore_tags
self.interactive = interactive
self.logger = logger.getChild(self.__class__.__name__)

View File

@ -31,12 +31,12 @@ class TestOrdigi:
)
cls.filter_options = (
('--exclude', '.DS_Store'),
('--ignore-tags', 'CreateDate'),
('--filter-by-ext', 'jpg'),
('--glob', '*'),
)
cls.sort_options = (
'--album-from-folder',
'--ignore-tags',
'--path-format',
'--remove-duplicates',
'--use-date-filename',
@ -74,7 +74,6 @@ class TestOrdigi:
arg_options = (
*self.filter_options,
('--ignore-tags', 'CreateDate'),
('--path-format', '{%Y}/{folder}/{name}.{ext}'),
)
@ -155,7 +154,6 @@ class TestOrdigi:
arg_options = (
*self.filter_options,
('--ignore-tags', 'CreateDate'),
('--path-format', '{%Y}/{folder}/{stem}.{ext}'),
)