Add --album-from-folder option to sort

This commit is contained in:
Cédric Leporcq 2021-09-12 07:39:37 +02:00
parent cc958cf53b
commit db74342f21
6 changed files with 226 additions and 153 deletions

View File

@ -21,6 +21,8 @@ def print_help(command):
@click.command('sort')
@click.option('--album-from-folder', default=False, is_flag=True,
help="Use images' folders as their album names.")
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@click.option('--dry-run', default=False, is_flag=True,
@ -41,6 +43,8 @@ def print_help(command):
@click.option('--ignore-tags', '-i', default=set(), multiple=True,
help='Specific tags or group that will be ignored when\
searching for file data. Example \'File:FileModifyDate\' or \'Filename\'' )
@click.option('--interactive', default=False, is_flag=True,
help="Interactive mode")
@click.option('--max-deep', '-m', default=None,
help='Maximum level to proceed. Number from 0 to desired level.')
@click.option('--remove-duplicates', '-R', default=False, is_flag=True,
@ -51,7 +55,8 @@ def print_help(command):
@click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
@click.argument('paths', required=True, nargs=-1, type=click.Path())
def _sort(debug, dry_run, destination, clean, copy, exclude_regex, filter_by_ext, ignore_tags,
def _sort(album_from_folder, debug, dry_run, destination, clean, copy,
exclude_regex, interactive, filter_by_ext, ignore_tags,
max_deep, remove_duplicates, reset_cache, verbose, paths):
"""Sort files or directories by reading their EXIF and organizing them
according to ordigi.conf preferences.
@ -94,8 +99,9 @@ def _sort(debug, dry_run, destination, clean, copy, exclude_regex, filter_by_ext
exclude_regex = opt['exclude_regex']
exclude_regex_list = set(exclude_regex)
collection = Collection(destination, opt['path_format'], cache,
opt['day_begins'], dry_run, exclude_regex_list, filter_by_ext,
collection = Collection(destination, opt['path_format'],
album_from_folder, cache, opt['day_begins'], dry_run,
exclude_regex_list, filter_by_ext, interactive,
logger, max_deep, mode)
loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'],

View File

@ -23,9 +23,10 @@ from ordigi.summary import Summary
class Collection(object):
"""Class of the media collection."""
def __init__(self, root, path_format, cache=False, day_begins=0, dry_run=False, exclude_regex_list=set(),
filter_by_ext=set(), logger=logging.getLogger(), max_deep=None,
mode='copy'):
def __init__(self, root, path_format, album_from_folder=False,
cache=False, day_begins=0, dry_run=False, exclude_regex_list=set(),
filter_by_ext=set(), interactive=False, logger=logging.getLogger(),
max_deep=None, mode='copy'):
# Attributes
self.root = Path(root).expanduser().absolute()
@ -37,6 +38,7 @@ class Collection(object):
self.db = Sqlite(self.root)
# Options
self.album_from_folder = album_from_folder
self.cache = cache
self.day_begins = day_begins
self.dry_run = dry_run
@ -49,6 +51,7 @@ class Collection(object):
self.filter_by_ext = filter_by_ext
self.items = self.get_items()
self.interactive = interactive
self.logger = logger
self.max_deep = max_deep
self.mode = mode
@ -77,27 +80,6 @@ class Collection(object):
'date': '{(%[a-zA-Z][^a-zA-Z]*){1,8}}' # search for date format string
}
def get_date_regex(self, string, user_regex=None):
if user_regex is not None:
matches = re.findall(user_regex, string)
else:
regex = {
# regex to match date format type %Y%m%d, %y%m%d, %d%m%Y,
# etc...
'a': re.compile(
r'.*[_-]?(?P<year>\d{4})[_-]?(?P<month>\d{2})[_-]?(?P<day>\d{2})[_-]?(?P<hour>\d{2})[_-]?(?P<minute>\d{2})[_-]?(?P<second>\d{2})'),
'b': re.compile (
r'[-_./](?P<year>\d{4})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})[-_./]'),
# not very accurate
'c': re.compile (
r'[-_./](?P<year>\d{2})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})[-_./]'),
'd': re.compile (
r'[-_./](?P<day>\d{2})[-_.](?P<month>\d{2})[-_.](?P<year>\d{4})[-_./]')
}
for i, rx in regex.items():
yield i, rx
def check_for_early_morning_photos(self, date):
"""check for early hour photos to be grouped with previous day"""
@ -120,15 +102,18 @@ class Collection(object):
# Each item has its own custom logic and we evaluate a single item and return
# the evaluated string.
part = ''
basename = os.path.splitext(metadata['filename'])[0]
if item == 'basename':
part = os.path.basename(metadata['base_name'])
part = basename
elif item == 'ext':
part = os.path.splitext(metadata['filename'])[1][1:]
elif item == 'name':
# Remove date prefix added to the name.
part = metadata['base_name']
for i, rx in self.get_date_regex(metadata['base_name']):
part = basename
for i, rx in get_date_regex(basename):
part = re.sub(rx, '', part)
elif item == 'date':
date = self.get_date_taken(metadata)
date = metadata['date_taken']
# early morning photos can be grouped with previous day
date = self.check_for_early_morning_photos(date)
if date is not None:
@ -142,7 +127,7 @@ class Collection(object):
part = os.path.join(*folders)
elif item in ('album','camera_make', 'camera_model', 'city', 'country', 'ext',
elif item in ('album','camera_make', 'camera_model', 'city', 'country',
'location', 'original_name', 'state', 'title'):
if item == 'location':
mask = 'default'
@ -210,8 +195,8 @@ class Collection(object):
break
# Else we continue for fallbacks
if(len(path[-1]) == 0):
path[-1] = metadata['base_name']
if len(path[-1]) == 0 or re.match(r'^\..*', path[-1]):
path[-1] = metadata['filename']
path_string = os.path.join(*path)
@ -221,80 +206,8 @@ class Collection(object):
return path_string
def get_date_from_string(self, string, user_regex=None):
# If missing datetime from EXIF data check if filename is in datetime format.
# For this use a user provided regex if possible.
# Otherwise assume a filename such as IMG_20160915_123456.jpg as default.
matches = []
for i, rx in self.get_date_regex(string, user_regex):
match = re.findall(rx, string)
if match != []:
if i == 'c':
match = [('20' + match[0][0], match[0][1], match[0][2])]
elif i == 'd':
# reorder items
match = [(match[0][2], match[0][1], match[0][0])]
# matches = match + matches
if len(match) != 1:
# The time string is not uniq
continue
matches.append((match[0], rx))
# We want only the first match for the moment
break
# check if there is only one result
if len(set(matches)) == 1:
try:
# Convert str to int
date_object = tuple(map(int, matches[0][0]))
time = False
if len(date_object) > 3:
time = True
date = datetime(*date_object)
except (KeyError, ValueError):
return None
return date
return None
def get_date_taken(self, metadata):
'''
Get the date taken from metadata or filename
:returns: datetime or None.
'''
if metadata is None:
return None
basename = metadata['base_name']
date_original = metadata['date_original']
if metadata['original_name'] is not None:
date_filename = self.get_date_from_string(metadata['original_name'])
else:
date_filename = self.get_date_from_string(basename)
date_created = metadata['date_created']
if metadata['date_original'] is not None:
if (date_filename is not None and
date_filename != date_original):
self.logger.warn(f"{basename} time mark is different from {date_original}")
# TODO ask for keep date taken, filename time, or neither
return metadata['date_original']
elif True:
if date_filename is not None:
if date_created is not None and date_filename > date_created:
self.logger.warn(f"{basename} time mark is more recent than {date_created}")
return date_filename
if True:
# TODO warm and ask for confirmation
if date_created is not None:
return date_created
elif metadata['date_modified'] is not None:
return metadata['date_modified']
def checksum(self, file_path, blocksize=65536):
"""Create a hash value for the given file.
@ -346,6 +259,16 @@ class Collection(object):
self.db.add_file_data(dest_path_rel, checksum, *file_values)
def record_file(self, src_path, dest_path, src_checksum, metadata):
def _update_exif_data(self, dest_path, media):
if self.album_from_folder:
media.file_path = dest_path
media.set_album_from_folder()
return True
return False
def record_file(self, src_path, dest_path, src_checksum, media):
"""Check file and record the file to db"""
# Check if file remain the same
checksum = self.checkcomp(dest_path, src_checksum)
@ -353,6 +276,10 @@ class Collection(object):
if checksum:
if not self.dry_run:
self._add_db_data(dest_path, metadata, checksum)
updated = self._update_exif_data(dest_path, media)
if updated:
dest_checksum = self.checksum(dest_path)
self.summary.append((src_path, dest_path))
@ -364,16 +291,10 @@ class Collection(object):
return self.summary, has_errors
def should_exclude(self, path, regex_list=set(), needs_compiled=False):
def should_exclude(self, path, regex_list=set()):
if(len(regex_list) == 0):
return False
if(needs_compiled):
compiled_list = []
for regex in regex_list:
compiled_list.append(re.compile(regex))
regex_list = compiled_list
return any(regex.search(path) for regex in regex_list)
def walklevel(self, src_path, maxlevel=None):
@ -432,7 +353,7 @@ class Collection(object):
self.logger.info(f'copy: {src_path} -> {dest_path}')
return True
def solve_conflicts(self, conflict_file_list, metadata, remove_duplicates):
def _solve_conflicts(self, conflict_file_list, media, remove_duplicates):
has_errors = False
unresolved_conflicts = []
while conflict_file_list != []:
@ -465,7 +386,7 @@ class Collection(object):
if result:
self.summary, has_errors = self.record_file(src_path,
dest_path, src_checksum, metadata)
dest_path, src_checksum, media)
if has_errors:
return False
@ -505,8 +426,7 @@ class Collection(object):
"""
file_list = set()
if os.path.isfile(path):
if not self.should_exclude(path, self.exclude_regex_list, True):
file_list.add((path, ''))
file_list.add((path, ''))
# Create a list of compiled regular expressions to match against the file path
compiled_regex_list = [re.compile(regex) for regex in self.exclude_regex_list]
@ -514,10 +434,12 @@ class Collection(object):
subdirs = ''
for dirname, dirnames, filenames, level in self.walklevel(path,
self.max_deep):
if dirname == os.path.join(path, '.ordigi'):
should_exclude_dir = self.should_exclude(dirname, compiled_regex_list)
if dirname == os.path.join(path, '.ordigi') or should_exclude_dir:
continue
subdirs = os.path.join(subdirs, os.path.basename(dirname))
if level > 0:
subdirs = os.path.join(subdirs, os.path.basename(dirname))
for filename in filenames:
# If file extension is in `extensions`
@ -527,9 +449,9 @@ class Collection(object):
if (
extensions == set()
or os.path.splitext(filename)[1][1:].lower() in extensions
and not self.should_exclude(filename_path, compiled_regex_list, False)
and not self.should_exclude(filename, compiled_regex_list)
):
file_list.add((filename_path, subdirs))
file_list.add((filename, subdirs))
return file_list
@ -592,7 +514,8 @@ class Collection(object):
]
conflict_file_list = []
for src_path, _ in self.get_files_in_path(path):
for filename, subdirs in self.get_files_in_path(path):
file_path = os.path.join(path, subdirs, filename)
src_checksum = self.checksum(src_path)
file_path = Path(src_path).relative_to(self.root)
path_parts = file_path.parts
@ -615,14 +538,14 @@ class Collection(object):
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
self.summary, has_errors = self.record_file(src_path,
dest_path, src_checksum, metadata)
dest_path, src_checksum, media)
elif result is False:
# There is conflict files
conflict_file_list.append({'src_path': src_path,
'src_checksum': src_checksum, 'dest_path': dest_path})
if conflict_file_list != []:
result = self.solve_conflicts(conflict_file_list, metadata, remove_duplicates)
result = self._solve_conflicts(conflict_file_list, media, remove_duplicates)
if not result:
has_errors = True
@ -638,11 +561,13 @@ class Collection(object):
for path in paths:
path = self.check_path(path)
conflict_file_list = []
for src_path, subdirs in self.get_files_in_path(path,
for filename, subdirs in self.get_files_in_path(path,
extensions=self.filter_by_ext):
src_path = os.path.join(path, subdirs, filename)
# Process files
src_checksum = self.checksum(src_path)
media = Media(src_path, ignore_tags, self.logger)
media = Media(path, subdirs, filename, self.album_from_folder, ignore_tags,
self.interactive, self.logger)
if media:
metadata = media.get_metadata(loc, self.db, self.cache)
# Get the destination path according to metadata
@ -661,14 +586,14 @@ class Collection(object):
if result:
self.summary, has_errors = self.record_file(src_path,
dest_path, src_checksum, metadata)
dest_path, src_checksum, media)
elif result is False:
# There is conflict files
conflict_file_list.append({'src_path': src_path,
'src_checksum': src_checksum, 'dest_path': dest_path})
if conflict_file_list != []:
result = self.solve_conflicts(conflict_file_list, metadata,
result = self._solve_conflicts(conflict_file_list, media,
remove_duplicates)
if not result:
@ -804,3 +729,4 @@ class Collection(object):
return self.summary, has_errors

View File

@ -10,6 +10,7 @@ import os
from dateutil.parser import parse
import re
from ordigi.exiftool import ExifTool, ExifToolCaching
from ordigi.utils import get_date_from_string
class Media():
@ -29,11 +30,18 @@ class Media():
extensions = PHOTO + AUDIO + VIDEO
def __init__(self, file_path, ignore_tags=set(), logger=logging.getLogger()):
self.file_path = file_path
def __init__(self, path, subdirs, filename, album_from_folder=False, ignore_tags=set(),
interactive=False, logger=logging.getLogger()):
self.path = path
self.subdirs = subdirs
self.filename = filename
self.file_path = os.path.join(path, subdirs, filename)
self.album_from_folder = album_from_folder
self.ignore_tags = ignore_tags
self.tags_keys = self.get_tags()
self.exif_metadata = None
self.interactive = interactive
self.metadata = None
self.logger = logger
@ -122,13 +130,12 @@ class Media():
:returns: str or None
"""
exiftool_attributes = self.get_exiftool_attributes()
if exiftool_attributes is None:
if self.exif_metadata is None:
return None
if(tag not in exiftool_attributes):
if(tag not in self.exif_metadata):
return None
return exiftool_attributes[tag]
return self.exif_metadata[tag]
def get_date_format(self, value):
"""Formate date attribute.
@ -186,16 +193,52 @@ class Media():
return None
def get_date_taken(self):
'''
Get the date taken from self.metadata or filename
:returns: datetime or None.
'''
if self.metadata is None:
return None
basename = os.path.splitext(self.metadata['filename'])[0]
date_original = self.metadata['date_original']
if self.metadata['original_name'] is not None:
date_filename = get_date_from_string(self.metadata['original_name'])
else:
date_filename = get_date_from_string(basename)
date_created = self.metadata['date_created']
if self.metadata['date_original'] is not None:
if (date_filename is not None and
date_filename != date_original):
self.logger.warn(f"{basename} time mark is different from {date_original}")
# TODO ask for keep date taken, filename time, or neither
return self.metadata['date_original']
elif True:
if date_filename is not None:
if date_created is not None and date_filename > date_created:
self.logger.warn(f"{basename} time mark is more recent than {date_created}")
return date_filename
if True:
# TODO warm and ask for confirmation
if date_created is not None:
return date_created
elif self.metadata['date_modified'] is not None:
return self.metadata['date_modified']
def get_exif_metadata(self):
# Get metadata from exiftool.
self.exif_metadata = ExifToolCaching(self.file_path, logger=self.logger).asdict()
def get_metadata(self, loc=None, db=None, cache=False):
"""Get a dictionary of metadata from exif.
All keys will be present and have a value of None if not obtained.
:returns: dict
"""
# Get metadata from exiftool.
self.exif_metadata = ExifToolCaching(self.file_path, logger=self.logger).asdict()
self.get_exif_metadata()
# TODO to be removed
self.metadata = {}
# Retrieve selected metadata to dict
if not self.exif_metadata:
@ -219,14 +262,35 @@ class Media():
self.metadata[key] = formated_data
self.metadata['base_name'] = os.path.basename(os.path.splitext(self.file_path)[0])
self.metadata['directory_path'] = os.path.dirname(self.file_path)
self.metadata['ext'] = os.path.splitext(self.file_path)[1][1:]
self.metadata['src_path'] = self.path
self.metadata['subdirs'] = self.subdirs
self.metadata['filename'] = self.filename
self.metadata['date_taken'] = self.get_date_taken()
if self.album_from_folder:
album = self.metadata['album']
folder = os.path.basename(self.subdirs)
if album and album != '':
if self.interactive:
print(f"Conflict for file: {self.file_path}")
print(f"Exif album is already set to '{album}'', folder='{folder}'")
i = f"Choice for 'album': (a) '{album}', (f) '{folder}', (c) custom ?\n"
answer = input(i)
if answer == 'c':
self.metadata['album'] = input('album=')
self.set_value('album', folder)
if answer == 'a':
self.metadata['album'] = album
elif answer == 'f':
self.metadata['album'] = folder
if not album or album == '':
self.metadata['album'] = folder
loc_keys = ('latitude', 'longitude', 'city', 'state', 'country', 'default')
location_id = None
if cache and db:
location_id = db.get_file_data(self.file_path, 'LocationId')
location_id = db.get_metadata_data(self.file_path, 'LocationId')
if location_id:
for key in loc_keys:
@ -287,7 +351,7 @@ class Media():
:returns: value (str)
"""
return ExifToolCaching(self.file_path, self.logger).setvalue(tag, value)
return ExifTool(self.file_path, self.logger).setvalue(tag, value)
def set_date_taken(self, date_key, time):
"""Set the date/time a photo was taken.
@ -331,7 +395,7 @@ class Media():
else:
return False
def set_album_from_folder(self, path):
def set_album_from_folder(self):
"""Set the album attribute based on the leaf folder name
:returns: bool

View File

@ -1,5 +1,6 @@
from math import radians, cos, sqrt
import re
def distance_between_two_points(lat1, lon1, lat2, lon2):
# As threshold is quite small use simple math
@ -14,3 +15,63 @@ def distance_between_two_points(lat1, lon1, lat2, lon2):
x = (lon2 - lon1) * cos(0.5 * (lat2 + lat1))
y = lat2 - lat1
return r * sqrt(x * x + y * y)
def get_date_regex(string, user_regex=None):
if user_regex is not None:
matches = re.findall(user_regex, string)
else:
regex = {
# regex to match date format type %Y%m%d, %y%m%d, %d%m%Y,
# etc...
'a': re.compile(
r'.*[_-]?(?P<year>\d{4})[_-]?(?P<month>\d{2})[_-]?(?P<day>\d{2})[_-]?(?P<hour>\d{2})[_-]?(?P<minute>\d{2})[_-]?(?P<second>\d{2})'),
'b': re.compile (
r'[-_./](?P<year>\d{4})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})[-_./]'),
# not very accurate
'c': re.compile (
r'[-_./](?P<year>\d{2})[-_.]?(?P<month>\d{2})[-_.]?(?P<day>\d{2})[-_./]'),
'd': re.compile (
r'[-_./](?P<day>\d{2})[-_.](?P<month>\d{2})[-_.](?P<year>\d{4})[-_./]')
}
for i, rx in regex.items():
yield i, rx
def get_date_from_string(string, user_regex=None):
# If missing datetime from EXIF data check if filename is in datetime format.
# For this use a user provided regex if possible.
# Otherwise assume a filename such as IMG_20160915_123456.jpg as default.
matches = []
for i, rx in get_date_regex(string, user_regex):
match = re.findall(rx, string)
if match != []:
if i == 'c':
match = [('20' + match[0][0], match[0][1], match[0][2])]
elif i == 'd':
# reorder items
match = [(match[0][2], match[0][1], match[0][0])]
# matches = match + matches
if len(match) != 1:
# The time string is not uniq
continue
matches.append((match[0], rx))
# We want only the first match for the moment
break
# check if there is only one result
if len(set(matches)) == 1:
try:
# Convert str to int
date_object = tuple(map(int, matches[0][0]))
time = False
if len(date_object) > 3:
time = True
date = datetime(*date_object)
except (KeyError, ValueError):
return None
return date

View File

@ -15,6 +15,7 @@ from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exif
from ordigi.collection import Collection
from ordigi.geolocation import GeoLocation
from ordigi.media import Media
from ordigi.utils import get_date_from_string, get_date_regex
class TestCollection:
@ -58,7 +59,7 @@ class TestCollection:
subdirs = Path('a', 'b', 'c', 'd')
for file_path in self.file_paths:
media = Media(str(file_path))
media = Media(os.path.dirname(file_path), '', os.path.basename(file_path))
exif_tags = {}
for key in ('album', 'camera_make', 'camera_model', 'latitude',
'longitude', 'original_name', 'title'):
@ -90,7 +91,7 @@ class TestCollection:
assert part == file_path.suffix[1:], file_path
elif item == 'name':
expected_part = file_path.stem
for i, rx in collection.get_date_regex(expected_part):
for i, rx in get_date_regex(expected_part):
part = re.sub(rx, '', expected_part)
assert part == expected_part, file_path
elif item == 'custom':
@ -114,17 +115,17 @@ class TestCollection:
collection = Collection(tmp_path, self.path_format)
for file_path in self.file_paths:
exif_data = ExifToolCaching(str(file_path)).asdict()
media = Media(str(file_path))
media = Media(os.path.dirname(file_path), '', os.path.basename(file_path))
metadata = media.get_metadata()
date_taken = collection.get_date_taken(metadata)
date_taken = media.get_date_taken()
date_filename = None
for tag in media.tags_keys['original_name']:
if tag in exif_data:
date_filename = collection.get_date_from_string(exif_data[tag])
date_filename = get_date_from_string(exif_data[tag])
break
if not date_filename:
date_filename = collection.get_date_from_string(file_path.name)
date_filename = get_date_from_string(file_path.name)
if media.metadata['date_original']:
assert date_taken == media.metadata['date_original']
@ -136,7 +137,7 @@ class TestCollection:
assert date_taken == media.metadata['date_modified']
def test_sort_files(self, tmp_path):
collection = Collection(tmp_path, self.path_format)
collection = Collection(tmp_path, self.path_format, album_from_folder=True)
loc = GeoLocation()
summary, has_errors = collection.sort_files([self.src_paths], loc)
@ -144,10 +145,17 @@ class TestCollection:
assert summary, summary
assert not has_errors, has_errors
for file_path in tmp_path.glob('*/**/*.*'):
if '.db' not in str(file_path):
media = Media(os.path.dirname(file_path), '', os.path.basename(file_path), album_from_folder=True)
media.get_exif_metadata()
for value in media._get_key_values('album'):
assert value != '' or None
# test with populated dest dir
randomize_files(tmp_path)
summary, has_errors = collection.sort_files([self.src_paths], loc)
# Summary is created and there is no errors
assert summary, summary
assert not has_errors, has_errors
# TODO check if path follow path_format

View File

@ -1,4 +1,5 @@
from datetime import datetime
import os
import pytest
from pathlib import Path
import re
@ -24,10 +25,10 @@ class TestMetadata:
def get_media(self):
for file_path in self.file_paths:
self.exif_data = ExifTool(str(file_path)).asdict()
yield Media(str(file_path), self.ignore_tags)
yield file_path, Media(os.path.dirname(file_path), '', os.path.basename(file_path), album_from_folder=True, ignore_tags=self.ignore_tags)
def test_get_metadata(self):
for media in self.get_media():
for file_path, media in self.get_media():
result = media.get_metadata()
assert result
assert isinstance(media.metadata, dict), media.metadata
@ -48,6 +49,13 @@ class TestMetadata:
assert isinstance(value, str)
else:
assert value is None
if key == 'album':
if 'with-album' in str(file_path):
assert value == "Test Album"
else:
assert value == file_path.parent.name
# Check if has_exif_data() is True if 'date_original' key is
# present, else check if it's false
has_exif_data = False