Refactoring filesystem.py

This commit is contained in:
Cédric Leporcq 2021-08-27 12:45:25 +02:00
parent 0fea0fcfd4
commit 9b055c88bd
7 changed files with 452 additions and 372 deletions

150
ordigi.py
View File

@ -11,29 +11,16 @@ from ordigi.config import Config
from ordigi import constants
from ordigi import log
from ordigi.database import Db
from ordigi.filesystem import FileSystem
from ordigi.collection import Collection
from ordigi.geolocation import GeoLocation
from ordigi.media import Media, get_all_subclasses
from ordigi.summary import Summary
FILESYSTEM = FileSystem()
def print_help(command):
click.echo(command.get_help(click.Context(sort)))
@click.command('batch')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
def _batch(debug):
"""Run batch() for all plugins.
"""
constants.debug = debug
plugins = Plugins()
plugins.run_batch()
@click.command('sort')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@ -57,10 +44,10 @@ def _batch(debug):
searching for file data. Example \'File:FileModifyDate\' or \'Filename\'' )
@click.option('--max-deep', '-m', default=None,
help='Maximum level to proceed. Number from 0 to desired level.')
@click.option('--remove-duplicates', '-r', default=False, is_flag=True,
@click.option('--remove-duplicates', '-R', default=False, is_flag=True,
help='True to remove files that are exactly the same in name\
and a file hash')
@click.option('--reset-cache', '-R', default=False, is_flag=True,
@click.option('--reset-cache', '-r', default=False, is_flag=True,
help='Regenerate the hash.json and location.json database ')
@click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
@ -85,10 +72,16 @@ def _sort(debug, dry_run, destination, clean, copy, exclude_regex, filter_by_ext
if reset_cache:
cache = False
if not destination and paths:
destination = paths[-1]
paths = paths[0:-1]
if len(paths) > 1:
if not destination:
# Use last path argument as destination
destination = paths[-1]
paths = paths[0:-1]
elif paths:
# Source and destination are the same
destination = paths[0]
else:
logger.error(f'`ordigi sort` need at least one path argument')
sys.exit(1)
paths = set(paths)
@ -110,13 +103,15 @@ def _sort(debug, dry_run, destination, clean, copy, exclude_regex, filter_by_ext
# Initialize Db
db = Db(destination)
filesystem = FileSystem(cache, opt['day_begins'], dry_run, exclude_regex_list,
filter_by_ext, logger, max_deep, mode, opt['path_format'])
collection = Collection(opt['path_format'], destination, cache,
opt['day_begins'], dry_run, exclude_regex_list, filter_by_ext,
logger, max_deep, mode)
loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'], opt['timeout'])
loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'],
opt['timeout'])
summary, has_errors = filesystem.sort_files(paths, destination, db,
loc, remove_duplicates, ignore_tags)
summary, has_errors = collection.sort_files(paths, db, loc,
remove_duplicates, ignore_tags)
if clean:
remove_empty_folders(destination, logger)
@ -151,17 +146,54 @@ def remove_empty_folders(path, logger, remove_root=True):
@click.command('clean')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@click.option('--verbose', '-v', default=False, is_flag=True,
@click.option('--dedup-regex', '-d', default=set(), multiple=True,
help='Regex to match duplicate strings parts')
@click.option('--dry-run', default=False, is_flag=True,
help='Dry run only, no change made to the filesystem.')
@click.option('--folders', '-f', default=False, is_flag=True,
help='Remove empty folders')
@click.option('--max-deep', '-m', default=None,
help='Maximum level to proceed. Number from 0 to desired level.')
@click.option('--path-string', '-p', default=False, is_flag=True,
help='Deduplicate path string')
@click.option('--remove-duplicates', '-R', default=False, is_flag=True,
help='True to remove files that are exactly the same in name\
and a file hash')
@click.option('--root', '-r', type=click.Path(file_okay=False),
default=None, help='Root dir of media collection. If not set, use path')
@click.option('--verbose', '-v', default=False,
help='True if you want to see details of file processing')
@click.argument('path', required=True, nargs=1, type=click.Path())
def _clean(debug, verbose, path):
def _clean(debug, dedup_regex, dry_run, folders, max_deep, path_string, remove_duplicates, root, verbose, path):
"""Remove empty folders
Usage: clean [--verbose|--debug] directory [removeRoot]"""
logger = log.get_logger(verbose, debug)
remove_empty_folders(path, logger)
clean_all = False
if not folders:
clean_all = True
if not root:
root = path
if clean_all or folders:
remove_empty_folders(path, logger)
config = Config(constants.CONFIG_FILE)
opt = config.get_options()
if path_string:
# Initialize Db
db = Db(root)
collection = Collection(opt['path_format'], root, dry_run=dry_run, logger=logger, max_deep=max_deep, mode='move')
dedup_regex = list(dedup_regex)
summary, has_errors = collection.dedup_regex(path, dedup_regex, db, logger, remove_duplicates)
if verbose or debug:
summary.write()
if has_errors:
sys.exit(1)
@click.command('generate-db')
@ -172,26 +204,8 @@ def _clean(debug, verbose, path):
def _generate_db(path, debug):
"""Regenerate the hash.json database which contains all of the sha256 signatures of media files.
"""
constants.debug = debug
result = Result()
path = os.path.abspath(os.path.expanduser(path))
if not os.path.isdir(path):
log.error('path is not a valid directory %s' % path)
sys.exit(1)
db = Db(path)
db.backup_hash_db()
db.reset_hash_db()
for current_file in FILESYSTEM.get_all_files(path):
result.append((current_file, True))
db.add_hash(db.checksum(current_file), current_file)
log.progress()
db.update_hash_db()
log.progress('', True)
result.write()
# TODO
pass
@click.command('verify')
@ -200,25 +214,9 @@ def _generate_db(path, debug):
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
def _verify(path, debug):
constants.debug = debug
result = Result()
db = Db(path)
for checksum, file_path in db.all():
if not os.path.isfile(file_path):
result.append((file_path, False))
log.progress('x')
continue
actual_checksum = db.checksum(file_path)
if checksum == actual_checksum:
result.append((file_path, True))
log.progress()
else:
result.append((file_path, False))
log.progress('x')
log.progress('', True)
result.write()
"""Verify hashes"""
# TODO
pass
@click.command('compare')
@ -232,6 +230,8 @@ def _verify(path, debug):
@click.option('--remove-duplicates', '-r', default=False, is_flag=True)
@click.option('--revert-compare', '-R', default=False, is_flag=True, help='Revert\
compare')
@click.option('--root', '-r', type=click.Path(file_okay=False),
default=None, help='Root dir of media collection. If not set, use path')
@click.option('--similar-to', '-s', default=False, help='Similar to given\
image')
@click.option('--similarity', '-S', default=80, help='Similarity level for\
@ -240,21 +240,26 @@ def _verify(path, debug):
help='True if you want to see details of file processing')
@click.argument('path', nargs=1, required=True)
def _compare(debug, dry_run, find_duplicates, output_dir, remove_duplicates,
revert_compare, similar_to, similarity, verbose, path):
revert_compare, root, similar_to, similarity, verbose, path):
'''Compare files in directories'''
logger = log.get_logger(verbose, debug)
if not root:
root = path
config = Config(constants.CONFIG_FILE)
opt = config.get_options()
# Initialize Db
db = Db(path)
db = Db(root)
filesystem = FileSystem(mode='move', dry_run=dry_run, logger=logger)
collection = Collection(path_format, root, mode='move', dry_run=dry_run, logger=logger)
if revert_compare:
summary, has_errors = filesystem.revert_compare(path, db, dry_run)
summary, has_errors = collection.revert_compare(path, db, dry_run)
else:
summary, has_errors = filesystem.sort_similar_images(path, db,
summary, has_errors = collection.sort_similar_images(path, db,
similarity)
if verbose or debug:
@ -274,7 +279,6 @@ main.add_command(_compare)
main.add_command(_sort)
main.add_command(_generate_db)
main.add_command(_verify)
main.add_command(_batch)
if __name__ == '__main__':

View File

@ -7,27 +7,27 @@ import filecmp
import hashlib
import logging
import os
import pathlib
from pathlib import Path
import re
import sys
import shutil
import time
from datetime import datetime, timedelta
from ordigi import constants
from ordigi import media
from ordigi.media import Media, get_all_subclasses
from ordigi.images import Images
from ordigi.summary import Summary
class FileSystem(object):
"""A class for interacting with the file system."""
class Collection(object):
"""Class of the media collection."""
def __init__(self, cache=False, day_begins=0, dry_run=False, exclude_regex_list=set(),
def __init__(self, path_format, root, cache=False, day_begins=0, dry_run=False, exclude_regex_list=set(),
filter_by_ext=set(), logger=logging.getLogger(), max_deep=None,
mode='copy', path_format=None):
mode='copy'):
self.root = root
self.cache = cache
self.day_begins = day_begins
self.dry_run = dry_run
@ -43,38 +43,11 @@ class FileSystem(object):
self.logger = logger
self.max_deep = max_deep
self.mode = mode
# TODO have to be removed
if path_format:
self.path_format = path_format
else:
self.path_format = os.path.join(constants.default_path,
constants.default_name)
self.path_format = path_format
self.summary = Summary()
self.whitespace_regex = '[ \t\n\r\f\v]+'
def create_directory(self, directory_path):
"""Create a directory if it does not already exist.
:param str directory_name: A fully qualified path of the
to create.
:returns: bool
"""
try:
if os.path.exists(directory_path):
return True
else:
if not self.dry_run:
os.makedirs(directory_path)
self.logger.info(f'Create {directory_path}')
return True
except OSError:
# OSError is thrown for cases like no permission
pass
return False
def get_items(self):
return {
'album': '{album}',
@ -96,65 +69,26 @@ class FileSystem(object):
'date': '{(%[a-zA-Z][^a-zA-Z]*){1,8}}' # search for date format string
}
def walklevel(self, src_path, maxlevel=None):
"""
Walk into input directory recursively until desired maxlevel
source: https://stackoverflow.com/questions/229186/os-walk-without-digging-into-directories-below
"""
src_path = src_path.rstrip(os.path.sep)
if not os.path.isdir(src_path):
return None
def get_date_regex(self, string, user_regex=None):
if user_regex is not None:
matches = re.findall(user_regex, string)
else:
regex = {
# regex to match date format type %Y%m%d, %y%m%d, %d%m%Y,
# etc...
'a': re.compile(
r'.*[_-]?(?P<year>\d{4})[_-]?(?P<month>\d{2})[_-]?(?P<day>\d{2})[_-]?(?P<hour>\d{2})[_-]?(?P<minute>\d{2})[_-]?(?P<second>\d{2})'),
'b': re.compile (
r'[-_./](?P<year>\d{4})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})[-_./]'),
# not very accurate
'c': re.compile (
r'[-_./](?P<year>\d{2})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})[-_./]'),
'd': re.compile (
r'[-_./](?P<day>\d{2})[-_.](?P<month>\d{2})[-_.](?P<year>\d{4})[-_./]')
}
num_sep = src_path.count(os.path.sep)
for root, dirs, files in os.walk(src_path):
level = root.count(os.path.sep) - num_sep
yield root, dirs, files, level
if maxlevel is not None and level >= maxlevel:
del dirs[:]
def get_all_files(self, path, extensions=False, exclude_regex_list=set()):
"""Recursively get all files which match a path and extension.
:param str path string: Path to start recursive file listing
:param tuple(str) extensions: File extensions to include (whitelist)
:returns: generator
"""
if self.filter_by_ext != () and not extensions:
# Filtering files by extensions.
if '%media' in self.filter_by_ext:
extensions = set()
subclasses = get_all_subclasses()
for cls in subclasses:
extensions.update(cls.extensions)
else:
extensions = self.filter_by_ext
# Create a list of compiled regular expressions to match against the file path
compiled_regex_list = [re.compile(regex) for regex in exclude_regex_list]
for dirname, dirnames, filenames in os.walk(path):
if dirname == os.path.join(path, '.ordigi'):
continue
for filename in filenames:
# If file extension is in `extensions`
# And if file path is not in exclude regexes
# Then append to the list
filename_path = os.path.join(dirname, filename)
if (
extensions == False
or os.path.splitext(filename)[1][1:].lower() in extensions
and not self.should_exclude(filename_path, compiled_regex_list, False)
):
yield filename_path
def check_for_early_morning_photos(self, date):
"""check for early hour photos to be grouped with previous day"""
if date.hour < self.day_begins:
self.logger.info('moving this photo to the previous day for\
classification purposes (day_begins=' + str(self.day_begins) + ')')
date = date - timedelta(hours=date.hour+1) # push it to the day before for classificiation purposes
return date
for i, rx in regex.items():
yield i, rx
def get_location_part(self, mask, part, place_name):
"""Takes a mask for a location and interpolates the actual place names.
@ -188,6 +122,16 @@ class FileSystem(object):
return folder_name
def check_for_early_morning_photos(self, date):
"""check for early hour photos to be grouped with previous day"""
if date.hour < self.day_begins:
self.logger.info('moving this photo to the previous day for\
classification purposes (day_begins=' + str(self.day_begins) + ')')
date = date - timedelta(hours=date.hour+1) # push it to the day before for classificiation purposes
return date
def get_part(self, item, mask, metadata, db, subdirs, loc):
"""Parse a specific folder's name given a mask and metadata.
@ -198,14 +142,14 @@ class FileSystem(object):
"""
# Each item has its own custom logic and we evaluate a single item and return
# the evaluated string.
# the evaluated string.
part = ''
if item == 'basename':
part = os.path.basename(metadata['base_name'])
elif item == 'name':
# Remove date prefix added to the name.
part = metadata['base_name']
for i, rx in self.match_date_from_string(metadata['base_name']):
for i, rx in self.get_date_regex(metadata['base_name']):
part = re.sub(rx, '', part)
elif item == 'date':
date = self.get_date_taken(metadata)
@ -229,27 +173,49 @@ class FileSystem(object):
part = os.path.basename(subdirs)
elif item == 'folders':
folders = pathlib.Path(subdirs).parts
folders = Path(subdirs).parts
folders = eval(mask)
part = os.path.join(*folders)
elif item in ('album','camera_make', 'camera_model', 'ext',
'title'):
'original_name', 'title'):
if metadata[item]:
part = metadata[item]
elif item == 'original_name':
# First we check if we have metadata['original_name'].
# We have to do this for backwards compatibility because
# we original did not store this back into EXIF.
if metadata[item]:
part = metadata['original_name']
elif item in 'custom':
# Fallback string
part = mask[1:-1]
return part
def get_path_part(self, this_part, metadata, db, subdirs, loc):
"""Build path part
:returns: part (string)"""
for item, regex in self.items.items():
matched = re.search(regex, this_part)
if matched:
part = self.get_part(item, matched.group()[1:-1], metadata, db,
subdirs, loc)
part = part.strip()
if part == '':
# delete separator if any
regex = '[-_ .]?(%[ul])?' + regex
this_part = re.sub(regex, part, this_part)
else:
# Capitalization
u_regex = '%u' + regex
l_regex = '%l' + regex
if re.search(u_regex, this_part):
this_part = re.sub(u_regex, part.upper(), this_part)
elif re.search(l_regex, this_part):
this_part = re.sub(l_regex, part.lower(), this_part)
else:
this_part = re.sub(regex, part, this_part)
return this_part
def get_path(self, metadata, db, loc, subdirs='', whitespace_sub='_'):
"""path_format: {%Y-%d-%m}/%u{city}/{album}
@ -262,34 +228,8 @@ class FileSystem(object):
path_parts = path_format.split('/')
for path_part in path_parts:
this_parts = path_part.split('|')
# p = []
for this_part in this_parts:
# parts = ''
for item, regex in self.items.items():
matched = re.search(regex, this_part)
if matched:
# parts = re.split(mask, this_part)
# parts = this_part.split('%')[1:]
part = self.get_part(item, matched.group()[1:-1], metadata, db,
subdirs, loc)
part = part.strip()
if part == '':
# delete separator if any
regex = '[-_ .]?(%[ul])?' + regex
this_part = re.sub(regex, part, this_part)
else:
# Capitalization
u_regex = '%u' + regex
l_regex = '%l' + regex
if re.search(u_regex, this_part):
this_part = re.sub(u_regex, part.upper(), this_part)
elif re.search(l_regex, this_part):
this_part = re.sub(l_regex, part.lower(), this_part)
else:
this_part = re.sub(regex, part, this_part)
this_part = self.get_path_part(this_part, metadata, db, subdirs, loc)
if this_part:
# Check if all masks are substituted
@ -314,34 +254,13 @@ class FileSystem(object):
return path_string
def match_date_from_string(self, string, user_regex=None):
if user_regex is not None:
matches = re.findall(user_regex, string)
else:
regex = {
# regex to match date format type %Y%m%d, %y%m%d, %d%m%Y,
# etc...
'a': re.compile(
r'.*[_-]?(?P<year>\d{4})[_-]?(?P<month>\d{2})[_-]?(?P<day>\d{2})[_-]?(?P<hour>\d{2})[_-]?(?P<minute>\d{2})[_-]?(?P<second>\d{2})'),
'b': re.compile (
r'[-_./](?P<year>\d{4})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})[-_./]'),
# not very accurate
'c': re.compile (
r'[-_./](?P<year>\d{2})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})[-_./]'),
'd': re.compile (
r'[-_./](?P<day>\d{2})[-_.](?P<month>\d{2})[-_.](?P<year>\d{4})[-_./]')
}
for i, rx in regex.items():
yield i, rx
def get_date_from_string(self, string, user_regex=None):
# If missing datetime from EXIF data check if filename is in datetime format.
# For this use a user provided regex if possible.
# Otherwise assume a filename such as IMG_20160915_123456.jpg as default.
matches = []
for i, rx in self.match_date_from_string(string, user_regex):
for i, rx in self.get_date_regex(string, user_regex):
match = re.findall(rx, string)
if match != []:
if i == 'c':
@ -445,43 +364,6 @@ class FileSystem(object):
return src_checksum
def sort_file(self, src_path, dest_path, remove_duplicates=True):
'''Copy or move file to dest_path.'''
mode = self.mode
dry_run = self.dry_run
# check for collisions
if(src_path == dest_path):
self.logger.info(f'File {dest_path} already sorted')
return None
elif os.path.isfile(dest_path):
self.logger.info(f'File {dest_path} already exist')
if remove_duplicates:
if filecmp.cmp(src_path, dest_path):
self.logger.info(f'File in source and destination are identical. Duplicate will be ignored.')
if(mode == 'move'):
if not dry_run:
os.remove(src_path)
self.logger.info(f'remove: {src_path}')
return None
else: # name is same, but file is different
self.logger.info(f'File in source and destination are different.')
return False
else:
return False
else:
if(mode == 'move'):
if not dry_run:
# Move the processed file into the destination directory
shutil.move(src_path, dest_path)
self.logger.info(f'move: {src_path} -> {dest_path}')
elif mode == 'copy':
if not dry_run:
shutil.copy2(src_path, dest_path)
self.logger.info(f'copy: {src_path} -> {dest_path}')
return True
def check_file(self, src_path, dest_path, src_checksum, db):
# Check if file remain the same
@ -502,6 +384,138 @@ class FileSystem(object):
return self.summary, has_errors
def should_exclude(self, path, regex_list=set(), needs_compiled=False):
if(len(regex_list) == 0):
return False
if(needs_compiled):
compiled_list = []
for regex in regex_list:
compiled_list.append(re.compile(regex))
regex_list = compiled_list
return any(regex.search(path) for regex in regex_list)
def walklevel(self, src_path, maxlevel=None):
"""
Walk into input directory recursively until desired maxlevel
source: https://stackoverflow.com/questions/229186/os-walk-without-digging-into-directories-below
"""
src_path = src_path.rstrip(os.path.sep)
if not os.path.isdir(src_path):
return None
num_sep = src_path.count(os.path.sep)
for root, dirs, files in os.walk(src_path):
level = root.count(os.path.sep) - num_sep
yield root, dirs, files, level
if maxlevel is not None and level >= maxlevel:
del dirs[:]
def remove(self, file_path):
if not self.dry_run:
os.remove(file_path)
self.logger.info(f'remove: {file_path}')
def sort_file(self, src_path, dest_path, remove_duplicates=False):
'''Copy or move file to dest_path.'''
mode = self.mode
dry_run = self.dry_run
# check for collisions
if(src_path == dest_path):
self.logger.info(f'File {dest_path} already sorted')
return None
elif os.path.isfile(dest_path):
self.logger.warning(f'File {dest_path} already exist')
if remove_duplicates:
if filecmp.cmp(src_path, dest_path):
self.logger.info(f'File in source and destination are identical. Duplicate will be ignored.')
if(mode == 'move'):
self.remove(src_path)
return None
else: # name is same, but file is different
self.logger.warning(f'File in source and destination are different.')
return False
else:
return False
else:
if(mode == 'move'):
if not dry_run:
# Move the processed file into the destination directory
shutil.move(src_path, dest_path)
self.logger.info(f'move: {src_path} -> {dest_path}')
elif mode == 'copy':
if not dry_run:
shutil.copy2(src_path, dest_path)
self.logger.info(f'copy: {src_path} -> {dest_path}')
return True
def solve_conflicts(self, conflict_file_list, db, remove_duplicates):
has_errors = False
unresolved_conflicts = []
while conflict_file_list != []:
file_paths = conflict_file_list.pop()
src_path = file_paths['src_path']
src_checksum = file_paths['src_checksum']
dest_path = file_paths['dest_path']
# Try to sort the file
result = self.sort_file(src_path, dest_path, remove_duplicates)
# remove to conflict file list if file as be successfully copied or ignored
n = 1
while result is False and n < 100:
# Add appendix to the name
pre, ext = os.path.splitext(dest_path)
if n > 1:
regex = '_' + str(n-1) + ext
pre = re.split(regex, dest_path)[0]
dest_path = pre + '_' + str(n) + ext
# file_list[item]['dest_path'] = dest_path
file_paths['dest_path'] = dest_path
result = self.sort_file(src_path, dest_path, remove_duplicates)
n = n + 1
if result is False:
# n > 100:
unresolved_conflicts.append(file_paths)
self.logger.error(f'{self.mode}: too many append for {dest_path}...')
self.summary.append((src_path, False))
has_errors = True
if result:
self.summary, has_errors = self.check_file(src_path,
dest_path, src_checksum, db)
if has_errors:
return False
else:
return True
def _split_part(self, dedup_regex, path_part, items):
"""Split part from regex
:returns: parts"""
regex = dedup_regex.pop(0)
parts = re.split(regex, path_part)
# Loop thought part, search matched regex part and proceed with
# next regex for others parts
for n, part in enumerate(parts):
if re.match(regex, part):
if part[0] in '-_ .':
if n > 0:
# move the separator to previous item
parts[n-1] = parts[n-1] + part[0]
items.append(part[1:])
else:
items.append(part)
elif dedup_regex != []:
# Others parts
self._split_part(dedup_regex, part, items)
else:
items.append(part)
return items
def get_files_in_path(self, path, extensions=set()):
"""Recursively get files which match a path and extension.
@ -539,49 +553,110 @@ class FileSystem(object):
return file_list
def _conflict_solved(self, conflict_file_list, item, dest_path):
self.logger.warning(f'Same name already exists...renaming to: {dest_path}')
del(conflict_file_list[item])
def create_directory(self, directory_path):
"""Create a directory if it does not already exist.
def solve_conflicts(self, conflict_file_list, remove_duplicates):
file_list = conflict_file_list.copy()
for item, file_paths in enumerate(file_list):
src_path = file_paths['src_path']
dest_path = file_paths['dest_path']
# Try to sort the file
result = self.sort_file(src_path, dest_path, remove_duplicates)
# remove to conflict file list if file as be successfully copied or ignored
if result is True or None:
self._conflict_solved(conflict_file_list, item, dest_path)
:param str directory_name: A fully qualified path of the
to create.
:returns: bool
"""
try:
if os.path.exists(directory_path):
return True
else:
n = 1
while result is False:
if n > 100:
self.logger.warning(f'{self.mode}: to many append for {dest_path}...')
break
# Add appendix to the name
pre, ext = os.path.splitext(dest_path)
dest_path = pre + '_' + str(n) + ext
conflict_file_list[item]['dest_path'] = dest_path
result = self.sort_file(src_path, dest_path, remove_duplicates)
else:
self._conflict_solved(conflict_file_list, item, dest_path)
if not self.dry_run:
os.makedirs(directory_path)
self.logger.info(f'Create {directory_path}')
return True
except OSError:
# OSError is thrown for cases like no permission
pass
return result
return False
def sort_files(self, paths, destination, db, loc, remove_duplicates=False,
def check_path(self, path):
path = os.path.abspath(os.path.expanduser(path))
# some error checking
if not os.path.exists(path):
self.logger.error(f'Directory {path} does not exist')
sys.exit(1)
return path
def set_utime_from_metadata(self, date_taken, file_path):
""" Set the modification time on the file based on the file name.
"""
# Initialize date taken to what's returned from the metadata function.
os.utime(file_path, (int(datetime.now().timestamp()), int(date_taken.timestamp())))
def dedup_regex(self, path, dedup_regex, db, logger, remove_duplicates=False):
# cycle throught files
has_errors = False
path = self.check_path(path)
# Delimiter regex
delim = r'[-_ .]'
# Numeric date item regex
d = r'\d{2}'
# Numeric date regex
if len(dedup_regex) == 0:
date_num2 = re.compile(fr'([^0-9]{d}{delim}{d}{delim}|{delim}{d}{delim}{d}[^0-9])')
date_num3 = re.compile(fr'([^0-9]{d}{delim}{d}{delim}{d}{delim}|{delim}{d}{delim}{d}{delim}{d}[^0-9])')
default = re.compile(r'([^-_ .]+[-_ .])')
dedup_regex = [
date_num3,
date_num2,
default
]
conflict_file_list = []
for src_path, _ in self.get_files_in_path(path):
src_checksum = self.checksum(src_path)
file_path = Path(src_path).relative_to(self.root)
path_parts = file_path.parts
dedup_path = []
for path_part in path_parts:
items = []
items = self._split_part(dedup_regex.copy(), path_part, items)
filtered_items = []
for item in items:
if item not in filtered_items:
filtered_items.append(item)
dedup_path.append(''.join(filtered_items))
# Dedup path
dest_path = os.path.join(self.root, *dedup_path)
self.create_directory(os.path.dirname(dest_path))
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
self.summary, has_errors = self.check_file(src_path,
dest_path, src_checksum, db)
elif result is False:
# There is conflict files
conflict_file_list.append({'src_path': src_path,
'src_checksum': src_checksum, 'dest_path': dest_path})
if conflict_file_list != []:
result = self.solve_conflicts(conflict_file_list, db, remove_duplicates)
if not result:
has_errors = True
return self.summary, has_errors
def sort_files(self, paths, db, loc, remove_duplicates=False,
ignore_tags=set()):
"""
Sort files into appropriate folder
"""
has_errors = False
for path in paths:
# some error checking
if not os.path.exists(path):
self.logger.error(f'Directory {path} does not exist')
path = os.path.expanduser(path)
path = self.check_path(path)
conflict_file_list = []
for src_path, subdirs in self.get_files_in_path(path,
extensions=self.filter_by_ext):
@ -596,39 +671,27 @@ class FileSystem(object):
# Keep same directory structure
file_path = os.path.relpath(src_path, path)
dest_directory = os.path.join(destination,
dest_directory = os.path.join(self.root,
os.path.dirname(file_path))
dest_path = os.path.join(destination, file_path)
dest_path = os.path.join(self.root, file_path)
self.create_directory(dest_directory)
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result is False:
# There is conflict files
conflict_file_list.append({'src_path': src_path, 'dest_path': dest_path})
result = self.solve_conflicts(conflict_file_list, remove_duplicates)
conflict_file_list.append({'src_path': src_path,
'src_checksum': src_checksum, 'dest_path': dest_path})
if result is True:
self.summary, has_errors = self.check_file(src_path,
dest_path, src_checksum, db)
elif result is None:
has_errors = False
else:
self.summary.append((src_path, False))
has_errors = True
if conflict_file_list != []:
result = self.solve_conflicts(conflict_file_list, db, remove_duplicates)
if not result:
has_errors = True
return self.summary, has_errors
def check_path(self, path):
path = os.path.abspath(os.path.expanduser(path))
# some error checking
if not os.path.exists(path):
self.logger.error(f'Directory {path} does not exist')
sys.exit(1)
return path
def set_hash(self, result, src_path, dest_path, src_checksum, db):
if result:
# Check if file remain the same
@ -758,21 +821,3 @@ class FileSystem(object):
return self.summary, has_errors
def set_utime_from_metadata(self, date_taken, file_path):
""" Set the modification time on the file based on the file name.
"""
# Initialize date taken to what's returned from the metadata function.
os.utime(file_path, (int(datetime.now().timestamp()), int(date_taken.timestamp())))
def should_exclude(self, path, regex_list=set(), needs_compiled=False):
if(len(regex_list) == 0):
return False
if(needs_compiled):
compiled_list = []
for regex in regex_list:
compiled_list.append(re.compile(regex))
regex_list = compiled_list
return any(regex.search(path) for regex in regex_list)

View File

@ -36,7 +36,7 @@ script_directory = path.dirname(path.dirname(path.abspath(__file__)))
#: Accepted language in responses from MapQuest
accepted_language = 'en'
# check python version, required in filesystem.py to trigger appropriate method
# check python version, required in collection.py to trigger appropriate method
python_version = version_info.major
CONFIG_FILE = f'{application_directory}/ordigi.conf'

View File

@ -1,9 +1,12 @@
""" pytest test configuration """
from configparser import RawConfigParser
import os
import pytest
from pathlib import Path
from pathlib import Path, PurePath
import random
import shutil
import string
import tempfile
from ordigi.config import Config
@ -17,20 +20,35 @@ def reset_singletons():
_ExifToolProc.instance = None
def copy_sample_files():
src_path = tempfile.mkdtemp(prefix='ordigi-src')
@pytest.fixture(scope="session")
def sample_files_paths(tmpdir_factory):
tmp_path = tmpdir_factory.mktemp("ordigi-src-")
paths = Path(ORDIGI_PATH, 'samples/test_exif').glob('*')
file_paths = [x for x in paths if x.is_file()]
for file_path in file_paths:
source_path = Path(src_path, file_path.name)
source_path = tmp_path.join(file_path.name)
shutil.copyfile(file_path, source_path)
return src_path, file_paths
return tmp_path, file_paths
def randomize_files(dest_dir):
# Get files randomly
paths = Path(dest_dir).glob('*')
for path, subdirs, files in os.walk(dest_dir):
for name in files:
file_path = PurePath(path, name)
if bool(random.getrandbits(1)):
with open(file_path, 'wb') as fout:
fout.write(os.urandom(random.randrange(128, 2048)))
if bool(random.getrandbits(1)):
dest_path = PurePath(path, file_path.stem + '_1'+ file_path.suffix)
shutil.copyfile(file_path, dest_path)
@pytest.fixture(scope="module")
def conf_path():
tmp_path = tempfile.mkdtemp(prefix='ordigi-')
conf_dir = tempfile.mkdtemp(prefix='ordigi-')
conf = RawConfigParser()
conf['Path'] = {
'day_begins': '4',
@ -40,11 +58,11 @@ def conf_path():
conf['Geolocation'] = {
'geocoder': 'Nominatium'
}
conf_path = Path(tmp_path, "ordigi.conf")
conf_path = Path(conf_dir, "ordigi.conf")
config = Config(conf_path)
config.write(conf)
yield conf_path
shutil.rmtree(tmp_path)
shutil.rmtree(conf_dir)

View File

@ -7,11 +7,11 @@ import re
from sys import platform
from time import sleep
from .conftest import copy_sample_files
from .conftest import randomize_files
from ordigi import constants
from ordigi.database import Db
from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool
from ordigi.filesystem import FileSystem
from ordigi.collection import Collection
from ordigi.geolocation import GeoLocation
from ordigi.media import Media
@ -20,9 +20,11 @@ from ordigi.media import Media
class TestDb:
pass
class TestFilesystem:
def setup_class(cls):
cls.src_paths, cls.file_paths = copy_sample_files()
class TestCollection:
@pytest.fixture(autouse=True)
def setup_class(cls, sample_files_paths):
cls.src_paths, cls.file_paths = sample_files_paths
cls.path_format = constants.default_path + '/' + constants.default_name
def teardown_class(self):
@ -34,8 +36,8 @@ class TestFilesystem:
Test all parts
"""
# Item to search for:
filesystem = FileSystem()
items = filesystem.get_items()
collection = Collection(self.path_format, tmp_path)
items = collection.get_items()
masks = [
'{album}',
'{basename}',
@ -73,7 +75,7 @@ class TestFilesystem:
for mask in masks:
matched = re.search(regex, mask)
if matched:
part = filesystem.get_part(item, mask[1:-1],
part = collection.get_part(item, mask[1:-1],
metadata, Db(tmp_path), subdirs, loc)
# check if part is correct
assert isinstance(part, str), file_path
@ -92,7 +94,7 @@ class TestFilesystem:
assert part == file_path.suffix[1:], file_path
elif item == 'name':
expected_part = file_path.stem
for i, rx in filesystem.match_date_from_string(expected_part):
for i, rx in collection.get_date_regex(expected_part):
part = re.sub(rx, '', expected_part)
assert part == expected_part, file_path
elif item == 'custom':
@ -112,21 +114,21 @@ class TestFilesystem:
assert part == '', file_path
def test_get_date_taken(self):
filesystem = FileSystem()
def test_get_date_taken(self, tmp_path):
collection = Collection(self.path_format, tmp_path)
for file_path in self.file_paths:
exif_data = ExifToolCaching(str(file_path)).asdict()
media = Media(str(file_path))
metadata = media.get_metadata()
date_taken = filesystem.get_date_taken(metadata)
date_taken = collection.get_date_taken(metadata)
date_filename = None
for tag in media.tags_keys['original_name']:
if tag in exif_data:
date_filename = filesystem.get_date_from_string(exif_data[tag])
date_filename = collection.get_date_from_string(exif_data[tag])
break
if not date_filename:
date_filename = filesystem.get_date_from_string(file_path.name)
date_filename = collection.get_date_from_string(file_path.name)
if media.metadata['date_original']:
assert date_taken == media.metadata['date_original']
@ -139,31 +141,40 @@ class TestFilesystem:
def test_sort_files(self, tmp_path):
db = Db(tmp_path)
filesystem = FileSystem(path_format=self.path_format)
collection = Collection(self.path_format, tmp_path)
loc = GeoLocation()
summary, has_errors = filesystem.sort_files([self.src_paths],
tmp_path, db, loc)
summary, has_errors = collection.sort_files([self.src_paths],
db, loc)
# Summary is created and there is no errors
assert summary, summary
assert not has_errors, has_errors
randomize_files(tmp_path)
collection = Collection(self.path_format, tmp_path)
loc = GeoLocation()
summary, has_errors = collection.sort_files([self.src_paths],
db, loc)
# Summary is created and there is no errors
assert summary, summary
assert not has_errors, has_errors
# TODO check if path follow path_format
# TODO make another class?
def test_sort_file(self, tmp_path):
for mode in 'copy', 'move':
filesystem = FileSystem(path_format=self.path_format, mode=mode)
collection = Collection(self.path_format, tmp_path, mode=mode)
# copy mode
src_path = Path(self.src_paths, 'photo.png')
name = 'photo_' + mode + '.png'
dest_path = Path(tmp_path, name)
src_checksum = filesystem.checksum(src_path)
result_copy = filesystem.sort_file(src_path, dest_path)
src_checksum = collection.checksum(src_path)
result_copy = collection.sort_file(src_path, dest_path)
assert result_copy
# Ensure files remain the same
assert filesystem.checkcomp(dest_path, src_checksum)
assert collection.checkcomp(dest_path, src_checksum)
if mode == 'copy':
assert src_path.exists()
@ -175,7 +186,9 @@ class TestFilesystem:
# TODO check date
# filesystem.sort_files
def test_filter_part():
_filter_part(dedup_regex, path_part, items)
assert
#- Sort similar images into a directory
# filesystem.sort_similar
# collection.sort_similar

View File

@ -5,7 +5,6 @@ import re
import shutil
import tempfile
from .conftest import copy_sample_files
from ordigi import constants
from ordigi.media import Media
from ordigi.images import Images
@ -16,8 +15,9 @@ CACHING = True
class TestMetadata:
def setup_class(cls):
cls.src_paths, cls.file_paths = copy_sample_files()
@pytest.fixture(autouse=True)
def setup_class(cls, sample_files_paths):
cls.src_paths, cls.file_paths = sample_files_paths
cls.ignore_tags = ('EXIF:CreateDate', 'File:FileModifyDate',
'File:FileAccessDate', 'EXIF:Make', 'Composite:LightValue')