Add sort command

This commit is contained in:
Cédric Leporcq 2021-07-16 21:26:42 +02:00
parent f25b8ccd99
commit fc1f4343b0
6 changed files with 396 additions and 27 deletions

View File

@ -3,6 +3,7 @@
import os
import re
import sys
import logging
from datetime import datetime
import click
@ -22,19 +23,24 @@ from elodie.config import load_config
from elodie.filesystem import FileSystem
from elodie.localstorage import Db
from import Media, get_all_subclasses
from import Media
from import Audio
from import Photo
from import Video
from elodie.plugins.plugins import Plugins
from elodie.result import Result
from elodie.summary import Summary
from elodie.external.pyexiftool import ExifTool
from elodie.dependencies import get_exiftool
from elodie import constants
FILESYSTEM = FileSystem()
def import_file(_file, destination, db, album_from_folder, action, trash, allow_duplicates):
def print_help(command):
def import_file(_file, destination, db, album_from_folder, mode, trash, allow_duplicates):
"""Set file metadata and move it to destination.
@ -57,7 +63,7 @@ def import_file(_file, destination, db, album_from_folder, action, trash, allow_
dest_path = FILESYSTEM.process_file(_file, destination, db,
media, album_from_folder, action, allowDuplicate=allow_duplicates)
media, album_from_folder, mode, allowDuplicate=allow_duplicates)
if dest_path:
log.all('%s -> %s' % (_file, dest_path))
if trash:
@ -65,6 +71,7 @@ def import_file(_file, destination, db, album_from_folder, action, trash, allow_
return dest_path or None
@click.option('--debug', default=False, is_flag=True,
help='Override the value in with True.')
@ -101,9 +108,9 @@ def _import(destination, source, file, album_from_folder, trash,
"""Import files or directories by reading their EXIF and organizing them accordingly.
if dry_run:
action = 'dry_run'
mode = 'dry_run'
action = 'copy'
mode = 'copy'
constants.debug = debug
has_errors = False
@ -142,7 +149,7 @@ def _import(destination, source, file, album_from_folder, trash,
for current_file in files:
dest_path = import_file(current_file, destination, db,
album_from_folder, action, trash, allow_duplicates)
album_from_folder, mode, trash, allow_duplicates)
result.append((current_file, dest_path))
has_errors = has_errors is True or not dest_path
@ -154,6 +161,97 @@ def _import(destination, source, file, album_from_folder, trash,
if has_errors:
# recursive : bool
# True if you want src_dir to be searched recursively for files (False to search only in top-level of src_dir)
@click.option('--debug', default=False, is_flag=True,
help='Override the value in with True.')
@click.option('--dry-run', default=False, is_flag=True,
help='Dry run only, no change made to the filesystem.')
@click.option('--destination', '-d', type=click.Path(file_okay=False),
default=None, help='Sort files into this directory.')
@click.option('--copy', '-c', default=False, is_flag=True,
help='True if you want files to be copied over from src_dir to\
dest_dir rather than moved')
@click.option('--day-begins', '-b', default=0,
help='What hour of the day you want the day to begin (only for\
classification purposes). Defaults at 0 as midnight. Can be\
used to group early morning photos with the previous day. Must\
be a number between 0-23')
@click.option('--exclude-regex', '-e', default=set(), multiple=True,
help='Regular expression for directories or files to exclude.')
@click.option('--filter-by-ext', '-f', default=False, help='''Use filename
extension to filter files for sorting. If used without argument, use
common media file extension for filtering. Ignored files remain in
the same directory structure''' )
@click.option('--ignore-tags', '-i', default=set(), multiple=True,
help='Specific tags or group that will be ignored when\
searching for file data. Example \'File:FileModifyDate\' or \'Filename\'' )
@click.option('--remove-duplicates', '-r', default=False, is_flag=True,
help='True to remove files that are exactly the same in name\
and a file hash')
@click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
@click.argument('paths', required=True, nargs=-1, type=click.Path())
def _sort(debug, dry_run, destination, copy, day_begins, exclude_regex, filter_by_ext, ignore_tags,
remove_duplicates, verbose, paths):
"""Sort files or directories by reading their EXIF and organizing them
according to config.ini preferences.
if copy:
mode = 'copy'
mode = 'move'
if debug:
constants.debug = logging.DEBUG
elif verbose:
constants.debug = logging.INFO
constants.debug = logging.ERROR
logger = logging.getLogger('elodie')
if not destination and paths:
destination = paths[-1]
paths = paths[0:-1]
paths = set(paths)
destination = _decode(destination)
destination = os.path.abspath(os.path.expanduser(destination))
if not os.path.exists(destination):
logger.error(f'Directory {destination} does not exist')
# if no exclude list was passed in we check if there's a config
if len(exclude_regex) == 0:
config = load_config(constants.CONFIG_FILE)
if 'Exclusions' in config:
exclude_regex = [value for key, value in config.items('Exclusions')]
exclude_regex_list = set(exclude_regex)
# Initialize Db
db = Db(destination)
filesystem = FileSystem(mode, dry_run, exclude_regex_list, logger)
summary, has_errors = filesystem.sort_files(paths, destination, db, remove_duplicates)
if verbose or debug:
if has_errors:
@click.option('--path', type=click.Path(file_okay=False),
required=True, help='Path of your photo library.')
@ -183,6 +281,7 @@ def _generate_db(path, debug):
log.progress('', True)
@click.option('--path', type=click.Path(file_okay=False),
required=True, help='Path of your photo library.')
@ -298,7 +397,7 @@ def _update(album, location, time, title, paths, debug):
db = Db(destination)
media = Media.get_class_by_file(current_file, get_all_subclasses())
if not media:
if media is None:
updated = False
@ -345,7 +444,7 @@ def _update(album, location, time, title, paths, debug):
original_base_name.replace('-%s' % original_title, ''))
dest_path = FILESYSTEM.process_file(current_file, destination, db,
updated_media, False, action='move', allowDuplicate=True)
updated_media, False, mode='move', allowDuplicate=True)'%s -> %s' % (current_file, dest_path))
log.all('{"source":"%s", "destination":"%s"}' % (current_file,
@ -373,6 +472,7 @@ def main():

View File

@ -5,6 +5,9 @@ General file system methods.
from builtins import object
import filecmp
import hashlib
import logging
import os
import re
import shutil
@ -17,13 +20,16 @@ from elodie.config import load_config
from elodie import constants
from elodie.localstorage import Db
from import media
from import get_media_class
from elodie.plugins.plugins import Plugins
from elodie.summary import Summary
class FileSystem(object):
"""A class for interacting with the file system."""
def __init__(self):
def __init__(self, mode='copy', dry_run=False, exclude_regex_list=set(),
# The default folder path is along the lines of 2017-06-17_01-04-14-dsc_1234-some-title.jpg
self.default_file_name_definition = {
'date': '%Y-%m-%d_%H-%M-%S',
@ -45,9 +51,16 @@ class FileSystem(object):
self.whitespace_regex = '[ \t\n\r\f\v]+'
self.dry_run = dry_run
self.exclude_regex_list = exclude_regex_list
self.mode = mode
self.logger = logger
self.summary = Summary()
# Instantiate a plugins object
self.plugins = Plugins()
def create_directory(self, directory_path):
"""Create a directory if it does not already exist.
@ -59,7 +72,9 @@ class FileSystem(object):
if os.path.exists(directory_path):
return True
if not self.dry_run:
os.makedirs(directory_path)'Create {directory_path}')
return True
except OSError:
# OSError is thrown for cases like no permission
@ -592,6 +607,7 @@ class FileSystem(object):
return folder_name
def process_checksum(self, _file, db, allow_duplicate):
checksum = db.checksum(_file)
if(checksum is None):
@ -618,8 +634,184 @@ class FileSystem(object):
return checksum
def checksum(self, file_path, blocksize=65536):
"""Create a hash value for the given file.
:param str file_path: Path to the file to create a hash for.
:param int blocksize: Read blocks of this size from the file when
creating the hash.
:returns: str or None
hasher = hashlib.sha256()
with open(file_path, 'rb') as f:
buf =
while len(buf) > 0:
buf =
return hasher.hexdigest()
return None
def checkcomp(self, src_path, dest_path):
"""Check file.
src_checksum = self.checksum(src_path)
if self.dry_run:
return src_checksum
dest_checksum = self.checksum(dest_path)
if dest_checksum != src_checksum:'Source checksum and destination checksum are not the same')
return False
return src_checksum
def sort_file(self, src_path, dest_path, remove_duplicates=True):
'''Copy or move file to dest_path.'''
mode = self.mode
dry_run = self.dry_run
# check for collisions
if(src_path == dest_path):'File {dest_path} already sorted')
return True
if os.path.isfile(dest_path):'File {dest_path} already exist')
if remove_duplicates:
if filecmp.cmp(src_path, dest_path):'File in source and destination are identical. Duplicate will be ignored.')
if(mode == 'move'):
if not dry_run:
shutil.remove(src_path)'remove: {src_path}')
return True
else: # name is same, but file is different'File in source and destination are different.')
return False
return False
if(mode == 'move'):
if not dry_run:
# Move the processed file into the destination directory
shutil.move(src_path, dest_path)'move: {src_path} -> {dest_path}')
elif mode == 'copy':
if not dry_run:
shutil.copy2(src_path, dest_path)'copy: {src_path} -> {dest_path}')
return True
return False
def check_file(self, src_path, dest_path, db):
# Check if file remain the same
checksum = self.checkcomp(src_path, dest_path)
has_errors = False
if checksum:
if not self.dry_run:
db.add_hash(checksum, dest_path)
if dest_path:'{src_path} -> {dest_path}')
self.summary.append((src_path, dest_path))
self.logger.error(f'Files {src_path} and {dest_path} are not identical')
# sys.exit(1)
self.summary.append((src_path, False))
has_errors = True
return self.summary, has_errors
def get_all_files_in_path(self, path, exclude_regex_list=set()):
files = set()
# some error checking
if not os.path.exists(path):
self.logger.error(f'Directory {path} does not exist')
path = os.path.expanduser(path)
if os.path.isdir(path):
files.update(self.get_all_files(path, False, exclude_regex_list))
if not self.should_exclude(path, self.exclude_regex_list, True):
return files
def sort_files(self, paths, destination, db, remove_duplicates=False):
has_errors = False
for path in paths:
files = self.get_all_files_in_path(path, self.exclude_regex_list)
num_files = len(files)
conflict_file_list = set()
for src_path in files:
# Process files
media = get_media_class(src_path)
if media:
metadata = media.get_metadata()
# Get the destination path according to metadata
directory_name = self.get_folder_path(metadata, db)
file_name = self.get_file_name(metadata)
# Keep same directory structure
directory_name = os.path.dirname(os.path.relpath(src_path,
file_name = os.path.basename(src_path)
dest_directory = os.path.join(destination, directory_name)
dest_path = os.path.join(dest_directory, file_name)
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
self.summary, has_errors = self.check_file(src_path, dest_path, db)
# There is conflict files
conflict_file_list.add((src_path, dest_path))
for src_path, dest_path in conflict_file_list:
# Try to sort the file
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
conflict_file_list.remove((src_path, dest_path))
n = 1
while not result:
# Add appendix to the name
pre, ext = os.path.splitext(dest_path)
dest_path = pre + '_' + str(n) + ext
result = self.sort_file(src_path, dest_path, remove_duplicates)
if n > 100:
self.logger.error(f'{self.mode}: to many append for {dest_path}...')
break'Same name already exists...renaming to: {dest_path}')
if result:
self.summary, has_errors = self.check_file(src_path, dest_path, db)
self.summary.append((src_path, False))
has_errors = True
return self.summary, has_errors
def process_file(self, _file, destination, db, media, album_from_folder,
action, **kwargs):
mode, **kwargs):
allow_duplicate = False
if('allowDuplicate' in kwargs):
allow_duplicate = kwargs['allowDuplicate']
@ -658,22 +850,15 @@ class FileSystem(object):
# exiftool renames the original file by appending '_original' to the
# file name. A new file is written with new tags with the initial file
# name. See exiftool man page for more details.
# Check if the source file was processed by exiftool and an _original
# file was created.
if(action == 'move'):
if(mode == 'move'):
stat = os.stat(_file)
# Move the processed file into the destination directory
shutil.move(_file, dest_path)
elif action == 'copy':
elif mode == 'copy':
shutil.copy2(_file, dest_path)
if action != 'dry_run':
if mode != 'dry_run':
# Set the utime based on what the original file contained
# before we made any changes.
# Then set the utime on the destination file based on metadata.

View File

@ -11,6 +11,7 @@ are used to represent the actual files.
import mimetypes
import os
import six
import logging
# load modules
from elodie import log
@ -236,7 +237,11 @@ class Media():
if(extension in i.extensions):
return i(_file)
return None
exclude_list = ['.DS_Store', '.directory']
if os.path.basename(_file) == '.DS_Store':
return None
return Media(_file)
@ -600,3 +605,18 @@ def get_all_subclasses(cls=None):
return subclasses
def get_media_class(_file):
if not os.path.exists(_file):
logging.warning(f'Could not find {_file}')
logging.error(f'Could not find {_file}')
return False
media = Media.get_class_by_file(_file, get_all_subclasses())
if not media:
logging.warning(f'File{_file} is not supported')
logging.error(f'File {_file} can\'t be imported')
return False
return media

View File

@ -29,12 +29,16 @@ def test_get_all_subclasses():
expected = {Media, Photo, Video, Audio}
assert subclasses == expected, subclasses
# def test_get_media_class(_file):
# pass
def test_get_class_by_file_without_extension():
base_file = helper.get_file('withoutextension')
cls = Media.get_class_by_file(base_file, [Audio, Photo, Video])
assert cls is None, cls
assert cls is not None, cls
def test_get_original_name():
temporary_folder, folder = helper.create_working_folder()
@ -156,12 +160,12 @@ def test_get_class_by_file_video():
def test_get_class_by_file_unsupported():
media = Media.get_class_by_file(helper.get_file('text.txt'), [Photo, Video])
assert media is None
assert media is not None, media
def test_get_class_by_file_ds_store():
media = Media.get_class_by_file(helper.get_file('.DS_Store'),
[Photo, Video, Audio])
assert media is None
assert media is None, media
def test_get_class_by_file_invalid_type():
media = Media.get_class_by_file(None,

View File

@ -364,6 +364,11 @@ def test_import_file_with_multiple_config_exclude():
assert 'Success 0' in result.output, result.output
assert 'Error 0' in result.output, result.output
def test_get_all_files_in_paths():
def test_update_location_on_audio():
temporary_folder, folder = helper.create_working_folder()
temporary_folder_destination, folder_destination = helper.create_working_folder()
@ -656,7 +661,6 @@ def test_cli_batch_plugin_googlephotos():
@unittest.skip('to fix')
def test_cli_debug_import():
import ipdb; ipdb.set_trace()
runner = CliRunner()
# import
result = runner.invoke(elodie._import, ['--destination', '/does/not/exist', '/does/not/exist'])

View File

@ -713,6 +713,62 @@ def test_parse_folder_name_multiple_keys_not_found():
assert path == 'United States of America', path
def test_checkcomp():
filesystem = FileSystem()
temporary_folder, folder = helper.create_working_folder()
orig = helper.get_file('photo.png')
src_path1 = os.path.join(folder,'photo.png')
src_path2 = os.path.join(folder,'plain.jpg')
shutil.copyfile(helper.get_file('photo.png'), src_path1)
shutil.copyfile(helper.get_file('plain.jpg'), src_path2)
dest_path = os.path.join(folder,'photo_copy.jpg')
shutil.copyfile(src_path1, dest_path)
checksum1 = filesystem.checksum(src_path1)
checksum2 = filesystem.checksum(src_path2)
valid_checksum = filesystem.checkcomp(dest_path, checksum1)
invalid_checksum = filesystem.checkcomp(dest_path, checksum2)
assert valid_checksum
assert not invalid_checksum
def test_sort_file():
filesystem = FileSystem()
temporary_folder, folder = helper.create_working_folder()
src_path = os.path.join(folder,'photo.png')
shutil.copyfile(helper.get_file('photo.png'), src_path)
dest_path1 = os.path.join(folder,'photo_copy.jpg')
checksum1 = filesystem.checksum(src_path)
result_copy = filesystem.sort_file(src_path, dest_path1)
assert result_copy
assert filesystem.checkcomp(dest_path1, checksum1)
dest_path2 = os.path.join(folder,'photo_move.jpg')
checksum2 = filesystem.checksum(src_path)
result_move = filesystem.sort_file(src_path, dest_path2)
assert result_move
assert filesystem.checkcomp(dest_path2, checksum2)
def test_sort_files():
temporary_folder, folder = helper.create_working_folder()
temporary_folder_destination, folder_destination = helper.create_working_folder()
db = Db(folder)
filesystem = FileSystem()
filenames = ['photo.png', 'plain.jpg', 'text.txt', 'withoutextension']
for src_file in filenames:
origin = os.path.join(folder, src_file)
shutil.copyfile(helper.get_file(src_file), origin)
summary, has_errors = filesystem.sort_files([folder], folder_destination, db)
assert summary, summary
assert not has_errors, has_errors
def test_process_file_invalid():
filesystem = FileSystem()
temporary_folder, folder = helper.create_working_folder()