Refactoring Collection class and fixes

This commit is contained in:
Cédric Leporcq 2021-11-06 16:35:35 +01:00
parent dde40149c2
commit 34c9490580
5 changed files with 401 additions and 327 deletions

View File

@ -46,14 +46,14 @@ _dry_run_options = [
_filter_options = [
click.option(
'--exclude',
'-e',
'-E',
default=set(),
multiple=True,
help='Directories or files to exclude.',
),
click.option(
'--filter-by-ext',
'-f',
'--ext',
'-e',
default=set(),
multiple=True,
help="""Use filename
@ -187,7 +187,7 @@ def _import(**kwargs):
path_format = kwargs['path_format']
exclude = _get_exclude(opt, kwargs['exclude'])
filter_by_ext = set(kwargs['filter_by_ext'])
extensions = set(kwargs['ext'])
collection = Collection(
root,
@ -196,7 +196,7 @@ def _import(**kwargs):
opt['day_begins'],
kwargs['dry_run'],
exclude,
filter_by_ext,
extensions,
kwargs['glob'],
kwargs['interactive'],
kwargs['ignore_tags'],
@ -258,7 +258,7 @@ def _sort(**kwargs):
path_format = kwargs['path_format']
exclude = _get_exclude(opt, kwargs['exclude'])
filter_by_ext = set(kwargs['filter_by_ext'])
extensions = set(kwargs['ext'])
collection = Collection(
root,
@ -267,7 +267,7 @@ def _sort(**kwargs):
opt['day_begins'],
kwargs['dry_run'],
exclude,
filter_by_ext,
extensions,
kwargs['glob'],
kwargs['interactive'],
kwargs['ignore_tags'],
@ -342,13 +342,13 @@ def _clean(**kwargs):
opt = config.get_options()
exclude = _get_exclude(opt, kwargs['exclude'])
filter_by_ext = set(kwargs['filter_by_ext'])
extensions = set(kwargs['ext'])
collection = Collection(
root,
dry_run=dry_run,
exclude=exclude,
filter_by_ext=filter_by_ext,
extensions=extensions,
glob=kwargs['glob'],
logger=logger,
max_deep=opt['max_deep'],
@ -480,12 +480,12 @@ def _compare(**kwargs):
opt = config.get_options()
exclude = _get_exclude(opt, kwargs['exclude'])
filter_by_ext = set(kwargs['filter_by_ext'])
extensions = set(kwargs['ext'])
collection = Collection(
root,
exclude=exclude,
filter_by_ext=filter_by_ext,
extensions=extensions,
glob=kwargs['glob'],
dry_run=dry_run,
logger=logger,

View File

@ -297,50 +297,226 @@ class FileIO:
self.logger.info(f'remove: {path}')
def rmdir(self, directory):
if not self.dry_run:
directory.rmdir()
class SortMedias:
"""Sort medias in collection"""
self.logger.info(f'remove dir: {directory}')
class Paths:
"""Get filtered files paths"""
def __init__(
self,
root,
album_from_folder=False,
db=None,
dry_run=False,
exclude=set(),
extensions=set(),
glob='**/*',
interactive=False,
logger=logging.getLogger(),
max_deep=None,
):
# Options
self.exclude = exclude
if '%media' in extensions:
extensions.remove('%media')
self.extensions = extensions.union(Media.extensions)
else:
self.extensions = extensions
self.glob = glob
self.interactive = interactive
self.logger = logger.getChild(self.__class__.__name__)
self.max_deep = max_deep
self.paths_list = []
# Arguments
self.theme = request.load_theme()
def check(self, path):
"""
:param: str path
:return: Path path
"""
# some error checking
if not path.exists():
self.logger.error(f'Directory {path} does not exist')
sys.exit(1)
return path
def get_images(self, path):
"""
:returns: iter
"""
for src_path in self.get_files(path):
dirname = src_path.parent.name
if dirname.find('similar_to') == 0:
continue
image = Image(src_path)
if image.is_image():
yield image
def get_files(self, path):
"""Recursively get files which match a path and extension.
:param str path string: Path to start recursive file listing
:returns: Path file_path, Path subdirs
"""
for path0 in path.glob(self.glob):
if path0.is_dir():
continue
file_path = path0
subdirs = file_path.relative_to(path).parent
if self.glob == '*':
level = 0
else:
level = len(subdirs.parts)
if path / '.ordigi' in file_path.parents:
continue
if self.max_deep is not None:
if level > self.max_deep:
continue
if self.exclude:
matched = False
for exclude in self.exclude:
if fnmatch(file_path, exclude):
matched = True
break
if matched:
continue
if (
not self.extensions
or PurePath(file_path).suffix.lower()[1:] in self.extensions
):
# return file_path and subdir
yield file_path
def walklevel(self, src_dir, maxlevel=None):
"""
Walk into input directory recursively until desired maxlevel
source: https://stackoverflow.com/questions/229186/os-walk-without-digging-into-directories-below
"""
src_dir = str(src_dir)
if not os.path.isdir(src_dir):
return None
num_sep = src_dir.count(os.path.sep)
for root, dirs, files in os.walk(src_dir):
level = root.count(os.path.sep) - num_sep
yield root, dirs, files, level
if maxlevel is not None and level >= maxlevel:
del dirs[:]
def _modify_selection(self):
"""
:params: list
:return: list
"""
message = "Bellow the file selection list, modify selection if needed"
questions = [
inquirer.Checkbox(
'selection',
message=message,
choices=self.paths_list,
default=self.paths_list,
),
]
return inquirer.prompt(questions, theme=self.theme)['selection']
def get_paths_list(self, path):
self.paths_list = list(self.get_files(path))
if self.interactive:
self.paths_list = self._modify_selection()
print('Processing...')
return self.paths_list
class Medias:
"""Get media data in collection or source path"""
def __init__(
self,
paths,
root,
album_from_folder=False,
cache=False,
db=None,
interactive=False,
ignore_tags=None,
logger=logging.getLogger(),
use_date_filename=False,
use_file_dates=False,
):
# Modules
self.db = db
self.paths = paths
# Attributes
self.root = root
# Options
self.cache = cache
self.album_from_folder = album_from_folder
self.db = db
self.dry_run = dry_run
self.ignore_tags = ignore_tags
self.interactive = interactive
self.logger = logger.getChild(self.__class__.__name__)
self.use_date_filename = use_date_filename
self.use_file_dates = use_file_dates
self.summary = Summary(self.root)
# List to store media metadata
self.medias = []
# Constants
# Arguments
self.theme = request.load_theme()
def _checkcomp(self, dest_path, src_checksum):
"""Check file."""
if self.dry_run:
return True
def get_media(self, file_path, src_dir, loc=None):
media = Media(
file_path,
src_dir,
self.album_from_folder,
self.ignore_tags,
self.interactive,
self.logger,
self.use_date_filename,
self.use_file_dates,
)
media.get_metadata(self.root, loc, self.db.sqlite, self.cache)
dest_checksum = utils.checksum(dest_path)
return media
if dest_checksum != src_checksum:
self.logger.info(
f'Source checksum and destination checksum are not the same'
)
return False
def get_medias(self, src_dirs, imp=False, loc=None):
"""Get medias data"""
for src_dir in src_dirs:
src_dir = self.paths.check(src_dir)
paths = self.paths.get_paths_list(src_dir)
return True
# Get medias and src_dirs
for src_path in paths:
if self.root not in src_path.parents:
if not imp:
self.logger.error(f"""{src_path} not in {self.root}
collection, use `ordigi import`""")
sys.exit(1)
def _update_exif_data(self, media):
# Get file metadata
media = self.get_media(src_path, src_dir, loc)
yield media
def update_exif_data(self, media):
updated = False
if self.album_from_folder:
media.set_album_from_folder()
@ -359,6 +535,51 @@ class SortMedias:
return False
class SortMedias:
"""Sort medias in collection"""
def __init__(
self,
fileio,
medias,
root,
db=None,
dry_run=False,
interactive=False,
logger=logging.getLogger(),
):
# Attributes
self.fileio = fileio
self.medias = medias
self.root = root
# Options
self.db = db
self.dry_run = dry_run
self.interactive = interactive
self.logger = logger.getChild(self.__class__.__name__)
self.summary = Summary(self.root)
# Arguments
self.theme = request.load_theme()
def _checkcomp(self, dest_path, src_checksum):
"""Check file."""
if self.dry_run:
return True
dest_checksum = utils.checksum(dest_path)
if dest_checksum != src_checksum:
self.logger.info(
f'Source checksum and destination checksum are not the same'
)
return False
return True
def _record_file(self, src_path, dest_path, media, imp=False):
"""Check file and record the file to db"""
# Check if file remain the same
@ -368,10 +589,11 @@ class SortMedias:
self.summary.append('check', False, src_path, dest_path)
return False
# TODO put this to Medias class???
# change media file_path to dest_path
media.file_path = dest_path
if not self.dry_run:
updated = self._update_exif_data(media)
updated = self.medias.update_exif_data(media)
if updated:
checksum = utils.checksum(dest_path)
media.metadata['checksum'] = checksum
@ -385,7 +607,7 @@ class SortMedias:
return True
def _set_summary(self, result, src_path, dest_path, imp=None):
def _set_summary(self, result, src_path, dest_path, imp=False):
if result:
if imp:
self.summary.append('import', True, src_path, dest_path)
@ -400,12 +622,10 @@ class SortMedias:
def sort_file(self, src_path, dest_path, media, imp=False):
"""Sort file and register it to db"""
file = FileIO(self.dry_run, self.logger)
if imp == 'copy':
file.copy(src_path, dest_path)
self.fileio.copy(src_path, dest_path)
else:
file.move(src_path, dest_path)
self.fileio.move(src_path, dest_path)
if self.db:
result = self._record_file(
@ -517,11 +737,10 @@ class SortMedias:
yield (src_path, dest_path, media), conflict
def sort_medias(self, files_data, imp=None, remove_duplicates=False):
def sort_medias(self, files_data, imp=False, remove_duplicates=False):
"""
sort files and solve conflicts
"""
file = FileIO(self.dry_run, self.logger)
# Create directories
self._create_directories(files_data)
@ -542,7 +761,7 @@ class SortMedias:
elif conflict == 3:
# Same file checksum
if imp == 'move':
file.remove(src_path)
self.fileio.remove(src_path)
elif conflict == 2:
# File already sorted
pass
@ -562,7 +781,7 @@ class SortMedias:
elif conflict == 3:
# Same file checksum
if imp == 'move':
file.remove(src_path)
self.fileio.remove(src_path)
elif conflict == 2:
# File already sorted
pass
@ -570,9 +789,12 @@ class SortMedias:
return self.summary
# TODO clean varaible
# media
class Collection(SortMedias):
"""Class of the media collection."""
# TODO clean variables
def __init__(
self,
root,
@ -581,7 +803,7 @@ class Collection(SortMedias):
day_begins=0,
dry_run=False,
exclude=set(),
filter_by_ext=set(),
extensions=set(),
glob='**/*',
interactive=False,
ignore_tags=None,
@ -591,203 +813,85 @@ class Collection(SortMedias):
use_file_dates=False,
):
# Modules
self.db = CollectionDb(root)
self.fileio = FileIO(dry_run, logger)
self.paths = Paths(
exclude,
extensions,
glob,
interactive,
logger,
max_deep,
)
super().__init__(
self.medias = Medias(
self.paths,
root,
album_from_folder,
cache,
self.db,
interactive,
ignore_tags,
logger,
use_date_filename,
use_file_dates,
)
# Features
super().__init__(
self.fileio,
self.medias,
root,
self.db,
dry_run,
interactive,
logger,
)
# Attributes
if not self.root.exists():
logger.error(f'Directory {self.root} does not exist')
sys.exit(1)
# Options
self.cache = cache
self.day_begins = day_begins
self.exclude = exclude
if '%media' in filter_by_ext:
filter_by_ext.remove('%media')
self.filter_by_ext = filter_by_ext.union(Media.extensions)
else:
self.filter_by_ext = filter_by_ext
self.glob = glob
self.ignore_tags = ignore_tags
self.logger = logger.getChild(self.__class__.__name__)
self.max_deep = max_deep
# List to store media metadata
self.medias = []
self.summary = Summary(self.root)
self.use_date_filename = use_date_filename
self.use_file_dates = use_file_dates
self.src_list = []
# Constants
# Arguments
self.theme = request.load_theme()
def remove_excluded_files(self):
def get_collection_files(self, exclude=True):
if exclude:
exclude = self.paths.exclude
file = FileIO(self.dry_run, self.logger)
paths = Paths(
exclude,
interactive=self.interactive,
logger=self.logger,
)
for file_path in paths.get_files(self.root):
yield file_path
result = True
for file_path in self.root.glob(self.glob):
if file_path.is_dir():
continue
else:
if self.root / '.ordigi' in file_path.parents:
continue
def init(self, loc):
for file_path in self.get_collection_files():
media = self.medias.get_media(file_path, self.root, loc)
media.metadata['file_path'] = os.path.relpath(file_path, self.root)
for exclude in self.exclude:
if fnmatch(file_path, exclude):
file.remove(file_path)
self.summary.append('remove', True, file_path)
break
self.db.add_file_data(media.metadata)
self.summary.append('update', file_path)
return self.summary
def _split_part(self, dedup_regex, path_part, items):
"""
Split part from regex
:returns: parts
"""
regex = dedup_regex.pop(0)
parts = re.split(regex, path_part)
# Loop thought part, search matched regex part and proceed with
# next regex for others parts
for n, part in enumerate(parts):
if re.match(regex, part):
if part[0] in '-_ .':
if n > 0:
# move the separator to previous item
parts[n - 1] = parts[n - 1] + part[0]
items.append(part[1:])
else:
items.append(part)
elif dedup_regex != []:
# Others parts
self._split_part(dedup_regex, part, items)
else:
items.append(part)
return items
def walklevel(self, src_dir, maxlevel=None):
"""
Walk into input directory recursively until desired maxlevel
source: https://stackoverflow.com/questions/229186/os-walk-without-digging-into-directories-below
"""
src_dir = str(src_dir)
if not os.path.isdir(src_dir):
return None
num_sep = src_dir.count(os.path.sep)
for root, dirs, files in os.walk(src_dir):
level = root.count(os.path.sep) - num_sep
yield root, dirs, files, level
if maxlevel is not None and level >= maxlevel:
del dirs[:]
def level(self, path):
"""
:param: Path
:return: int
"""
return len(path.parts) - 1
def _get_files_in_path(self, path, glob='**/*', extensions=set()):
"""Recursively get files which match a path and extension.
:param str path string: Path to start recursive file listing
:param tuple(str) extensions: File extensions to include (whitelist)
:returns: Path file_path, Path subdirs
"""
for path0 in path.glob(glob):
if path0.is_dir():
continue
else:
file_path = path0
parts = file_path.parts
subdirs = file_path.relative_to(path).parent
if glob == '*':
level = 0
else:
level = len(subdirs.parts)
if self.root / '.ordigi' in file_path.parents:
continue
if self.max_deep is not None:
if level > self.max_deep:
continue
matched = False
for exclude in self.exclude:
if fnmatch(file_path, exclude):
matched = True
break
if matched:
continue
if (
extensions == set()
or PurePath(file_path).suffix.lower() in extensions
):
# return file_path and subdir
yield file_path
def _check_path(self, path):
"""
:param: str path
:return: Path path
"""
# some error checking
if not path.exists():
self.logger.error(f'Directory {path} does not exist')
sys.exit(1)
return path
def set_utime_from_metadata(self, date_media, file_path):
"""Set the modification time on the file based on the file name."""
# Initialize date taken to what's returned from the metadata function.
os.utime(
file_path, (int(datetime.now().timestamp()), int(date_media.timestamp()))
)
def _modify_selection(self):
"""
:params: list
:return: list
"""
message = "Bellow the file selection list, modify selection if needed"
questions = [
inquirer.Checkbox(
'selection',
message=message,
choices=self.src_list,
default=self.src_list,
),
]
return inquirer.prompt(questions, theme=self.theme)['selection']
def _get_all_files(self):
return [x for x in self._get_files_in_path(self.root)]
def check_db(self):
"""
Check if db FilePath match to collection filesystem
:returns: bool
"""
file_paths = [x for x in self._get_all_files()]
file_paths = list(self.get_collection_files())
db_rows = [row['FilePath'] for row in self.db.sqlite.get_rows('metadata')]
for file_path in file_paths:
relpath = os.path.relpath(file_path, self.root)
@ -805,47 +909,6 @@ class Collection(SortMedias):
return True
def _get_media(self, file_path, src_dir, loc=None):
media = Media(
file_path,
src_dir,
self.album_from_folder,
self.ignore_tags,
self.interactive,
self.logger,
self.use_date_filename,
self.use_file_dates,
)
media.get_metadata(self.root, loc, self.db.sqlite, self.cache)
return media
def _get_medias(self, src_dirs, imp=None, loc=None):
"""Get medias data"""
for src_dir in src_dirs:
self.dest_list = []
src_dir = self._check_path(src_dir)
self.src_list = self._get_path_list(src_dir)
# Get medias and src_dirs
for src_path in self.src_list:
if self.root not in src_path.parents:
if not imp:
self.logger.error(f"""{src_path} not in {self.root}
collection, use `ordigi import`""")
sys.exit(1)
# Get file metadata
media = self._get_media(src_path, src_dir, loc)
yield media
def get_col_medias(self, loc):
for file_path in self._get_all_files():
media = self._get_media(file_path, self.root, loc)
media.metadata['file_path'] = os.path.relpath(file_path, self.root)
yield media, file_path
def _init_check_db(self, loc=None):
if self.db.sqlite.is_empty('metadata'):
self.init(loc)
@ -853,30 +916,9 @@ class Collection(SortMedias):
self.logger.error('Db data is not accurate run `ordigi update`')
sys.exit(1)
def _get_path_list(self, path):
src_list = [
x
for x in self._get_files_in_path(
path, glob=self.glob,
extensions=self.filter_by_ext,
)
]
if self.interactive:
src_list = self._modify_selection()
print('Processing...')
return src_list
def init(self, loc):
for media, file_path in self.get_col_medias(loc):
self.db.add_file_data(media.metadata)
self.summary.append('update', file_path)
return self.summary
def update(self, loc):
file_paths = [x for x in self._get_all_files()]
db_rows = [row for row in self.db.sqlite.get_rows('metadata')]
file_paths = list(self.get_collection_files())
db_rows = list(self.db.sqlite.get_rows('metadata'))
invalid_db_rows = set()
db_paths = set()
for db_row in db_rows:
@ -890,7 +932,7 @@ class Collection(SortMedias):
relpath = os.path.relpath(file_path, self.root)
# If file not in database
if relpath not in db_paths:
media = self._get_media(file_path, self.root, loc)
media = self.medias.get_media(file_path, self.root, loc)
media.metadata['file_path'] = relpath
# Check if file checksum is in invalid rows
row = []
@ -915,7 +957,7 @@ class Collection(SortMedias):
return self.summary
def check_files(self):
for file_path in self._get_all_files():
for file_path in self.paths.get_files(self.root):
checksum = utils.checksum(file_path)
relpath = file_path.relative_to(self.root)
if checksum == self.db.sqlite.get_checksum(relpath):
@ -926,10 +968,32 @@ class Collection(SortMedias):
return self.summary
def _remove_empty_subdirs(self, directories, src_dirs):
def set_utime_from_metadata(self, date_media, file_path):
"""Set the modification time on the file based on the file name."""
# Initialize date taken to what's returned from the metadata function.
os.utime(
file_path, (int(datetime.now().timestamp()), int(date_media.timestamp()))
)
def remove_excluded_files(self):
"""Remove excluded files in collection"""
result = True
# get all files
for file_path in self.get_collection_files(exclude=False):
for exclude in self.paths.exclude:
if fnmatch(file_path, exclude):
self.fileio.remove(file_path)
self.summary.append('remove', True, file_path)
break
return self.summary
def remove_empty_subdirs(self, directories, src_dirs):
"""Remove empty subdir after moving files"""
parents = set()
for directory in directories:
self.logger.info(f'remove empty subdirs')
if not directory.is_dir():
continue
@ -939,14 +1003,13 @@ class Collection(SortMedias):
# if folder empty, delete it
files = os.listdir(directory)
if len(files) == 0:
if not self.dry_run:
directory.rmdir()
self.fileio.rmdir(directory)
if self.root in directory.parent.parents:
parents.add(directory.parent)
if parents != set():
self._remove_empty_subdirs(parents, src_dirs)
self.remove_empty_subdirs(parents, src_dirs)
def remove_empty_folders(self, directory, remove_root=True):
"""Remove empty sub-folders in collection"""
@ -973,8 +1036,8 @@ class Collection(SortMedias):
return self.summary
def sort_files(
self, src_dirs, path_format, loc,
imp=False, remove_duplicates=False
self, src_dirs, path_format, loc,
imp=False, remove_duplicates=False
):
"""
Sort files into appropriate folder
@ -985,7 +1048,7 @@ class Collection(SortMedias):
# Get medias data
files_data = []
subdirs = set()
for media in self._get_medias(src_dirs, imp=imp, loc=loc):
for media in self.medias.get_medias(src_dirs, imp=imp, loc=loc):
# Get the destination path according to metadata
fpath = FPath(path_format, self.day_begins, self.logger)
relpath = Path(fpath.get_path(media.metadata))
@ -997,7 +1060,7 @@ class Collection(SortMedias):
self.summary = self.sort_medias(files_data, imp, remove_duplicates)
if imp != 'copy':
self._remove_empty_subdirs(subdirs, src_dirs)
self.remove_empty_subdirs(subdirs, src_dirs)
if not self.check_db():
self.summary.append('check', False)
@ -1006,6 +1069,7 @@ class Collection(SortMedias):
def dedup_regex(self, paths, dedup_regex, remove_duplicates=False):
"""Deduplicate file path parts"""
# Check db
self._init_check_db()
@ -1027,14 +1091,13 @@ class Collection(SortMedias):
# Get medias data
files_data = []
for media in self._get_medias(paths):
for media in self.medias.get_medias(paths):
# Deduplicate the path
src_path = media.file_path
path_parts = src_path.relative_to(self.root).parts
dedup_path = []
for path_part in path_parts:
items = []
items = self._split_part(dedup_regex.copy(), path_part, items)
items = utils.split_part(dedup_regex.copy(), path_part)
filtered_items = []
for item in items:
@ -1054,24 +1117,6 @@ class Collection(SortMedias):
return self.summary
def _get_images(self, path):
"""
:returns: iter
"""
for src_path in self._get_files_in_path(
path, glob=self.glob,
extensions=self.filter_by_ext,
):
dirname = src_path.parent.name
if dirname.find('similar_to') == 0:
continue
image = Image(src_path)
if image.is_image():
yield image
def _find_similar_images(self, image, images, path, dest_dir, similarity=80):
files_data = []
if not image.img_path.is_file():
@ -1081,17 +1126,17 @@ class Collection(SortMedias):
directory_name = dest_dir / name.replace('.', '_')
for img_path in images.find_similar(image, similarity):
self.src_list.append(img_path)
self.paths.paths_list.append(img_path)
media = self._get_media(img_path, path)
media = self.medias.get_media(img_path, path)
relpath = directory_name / img_path.name
files_data.append((copy(media), relpath))
if files_data:
# Found similar images to image
self.src_list.append(image.img_path)
media = self._get_media(image.img_path, path)
self.paths.paths_list.append(image.img_path)
media = self.medias.get_media(image.img_path, path)
relpath = directory_name / image.img_path.name
files_data.insert(0, (copy(media), relpath))
@ -1103,9 +1148,9 @@ class Collection(SortMedias):
self._init_check_db()
dest_dir = self.root / 'similar_images'
path = self._check_path(path)
path = self.paths.check(path)
images_paths = set(x for x in self._get_images(path))
images_paths = set(self.paths.get_images(path))
images = Images(images_paths, logger=self.logger)
nb_row_ini = self.db.sqlite.len('metadata')
for image in images_paths:
@ -1143,10 +1188,10 @@ class Collection(SortMedias):
else:
print(f"Fill empty {key} values:")
self.src_list = self._get_path_list(path)
paths = self.paths.get_paths_list(path)
for file_path in self.src_list:
media = self._get_media(file_path, self.root, loc)
for file_path in paths:
media = self.medias.get_media(file_path, self.root, loc)
print()
value = media.metadata[key]
if overwrite or not value:

View File

@ -102,6 +102,36 @@ def get_date_from_string(string, user_regex=None):
return date
def split_part(dedup_regex, path_part, items=None):
"""
Split part from regex
:returns: parts
"""
if not items:
items = []
regex = dedup_regex.pop(0)
parts = re.split(regex, path_part)
# Loop thought part, search matched regex part and proceed with
# next regex for others parts
for n, part in enumerate(parts):
if re.match(regex, part):
if part[0] in '-_ .':
if n > 0:
# move the separator to previous item
parts[n - 1] = parts[n - 1] + part[0]
items.append(part[1:])
else:
items.append(part)
elif dedup_regex != []:
# Others parts
items = split_part(dedup_regex, part, items)
else:
items.append(part)
return items
# Conversion functions
# source:https://rodic.fr/blog/camelcase-and-snake_case-strings-conversion-with-python/

View File

@ -32,7 +32,7 @@ class TestOrdigi:
cls.filter_options = (
('--exclude', '.DS_Store'),
('--ignore-tags', 'CreateDate'),
('--filter-by-ext', 'jpg'),
('--ext', 'jpg'),
('--glob', '*'),
)
cls.sort_options = (

View File

@ -8,7 +8,7 @@ import pytest
import inquirer
from ordigi import constants
from ordigi.collection import Collection, FPath
from ordigi.collection import Collection, FPath, Paths
from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool
from ordigi.geolocation import GeoLocation
from ordigi import log
@ -143,7 +143,7 @@ class TestCollection:
logger=self.logger)
loc = GeoLocation()
summary = collection.sort_files([self.src_path],
self.path_format, loc, import_mode='copy')
self.path_format, loc, imp='copy')
self.assert_import(summary, 30)
@ -198,36 +198,36 @@ class TestCollection:
randomize_db(tmp_path)
with pytest.raises(sqlite3.DatabaseError) as e:
summary = collection.sort_files([self.src_path],
self.path_format, loc, import_mode='copy')
self.path_format, loc, imp='copy')
def test_sort_file(self, tmp_path):
for import_mode in 'copy', 'move', False:
for imp in ('copy', 'move', False):
collection = Collection(tmp_path)
# copy mode
src_path = Path(self.src_path, 'test_exif', 'photo.png')
media = Media(src_path, self.src_path)
metadata = media.get_metadata(tmp_path)
name = 'photo_' + str(import_mode) + '.png'
name = 'photo_' + str(imp) + '.png'
dest_path = Path(tmp_path, name)
src_checksum = utils.checksum(src_path)
summary = collection.sort_file(src_path, dest_path, media,
import_mode=import_mode)
imp=imp)
assert not summary.errors
# Ensure files remain the same
assert collection._checkcomp(dest_path, src_checksum)
if import_mode == 'copy':
if imp == 'copy':
assert src_path.exists()
else:
assert not src_path.exists()
shutil.copyfile(dest_path, src_path)
def test__get_files_in_path(self, tmp_path):
collection = Collection(tmp_path, exclude={'**/*.dng',}, max_deep=1,
use_date_filename=True, use_file_dates=True)
paths = [x for x in collection._get_files_in_path(self.src_path,
glob='**/photo*')]
assert len(paths) == 6
def test_get_files(self):
exclude={'**/*.dng',}
paths = Paths(exclude=exclude, max_deep=1)
paths = list(paths.get_files(self.src_path))
assert len(paths) == 9
assert Path(self.src_path, 'test_exif/photo.dng') not in paths
for path in paths:
assert isinstance(path, Path)
@ -248,7 +248,6 @@ class TestCollection:
shutil.copytree(self.src_path, path)
collection = Collection(path, logger=self.logger)
# loc = GeoLocation()
import ipdb; ipdb.set_trace()
# def mockreturn(prompt, theme):
# return {'value': '03-12-2021 08:12:35'}