Add FPath class

This commit is contained in:
Cédric Leporcq 2021-10-17 12:33:14 +02:00
parent 4442c18570
commit 5c255093e3
2 changed files with 109 additions and 78 deletions

View File

@ -24,69 +24,16 @@ from ordigi.summary import Summary
from ordigi import utils from ordigi import utils
class Collection: class FPath:
"""Class of the media collection.""" """Featured path object"""
def __init__( def __init__(self, path_format, day_begins=0, logger=logging.getLogger()):
self,
root,
path_format,
album_from_folder=False,
cache=False,
day_begins=0,
dry_run=False,
exclude=set(),
filter_by_ext=set(),
glob='**/*',
interactive=False,
logger=logging.getLogger(),
max_deep=None,
mode='copy',
use_date_filename=False,
use_file_dates=False,
):
# Attributes
self.root = Path(root).expanduser().absolute()
if not self.root.exists():
logger.error(f'Directory {self.root} does not exist')
sys.exit(1)
self.path_format = path_format
self.db = Sqlite(self.root)
# Options
self.album_from_folder = album_from_folder
self.cache = cache
self.day_begins = day_begins self.day_begins = day_begins
self.dry_run = dry_run
self.exclude = exclude
if '%media' in filter_by_ext:
filter_by_ext.remove('%media')
self.filter_by_ext = filter_by_ext.union(media.extensions)
else:
self.filter_by_ext = filter_by_ext
self.glob = glob
self.items = self.get_items() self.items = self.get_items()
self.interactive = interactive self.logger = logger
self.logger = logger.getChild(self.__class__.__name__) self.path_format = path_format
self.max_deep = max_deep
self.mode = mode
# List to store media metadata
self.medias = []
self.summary = Summary()
self.use_date_filename = use_date_filename
self.use_file_dates = use_file_dates
self.whitespace_regex = '[ \t\n\r\f\v]+' self.whitespace_regex = '[ \t\n\r\f\v]+'
self.src_list = []
self.dest_list = []
# Constants
self.theme = request.load_theme()
def get_items(self): def get_items(self):
return { return {
'album': '{album}', 'album': '{album}',
@ -96,7 +43,7 @@ class Collection:
'city': '{city}', 'city': '{city}',
'custom': '{".*"}', 'custom': '{".*"}',
'country': '{country}', 'country': '{country}',
# 'folder': '{folder[<>]?[-+]?[1-9]?}', 'date': '{(%[a-zA-Z][^a-zA-Z]*){1,8}}', # search for date format string
'ext': '{ext}', 'ext': '{ext}',
'folder': '{folder}', 'folder': '{folder}',
'folders': r'{folders(\[[0-9:]{0,3}\])?}', 'folders': r'{folders(\[[0-9:]{0,3}\])?}',
@ -105,19 +52,25 @@ class Collection:
'original_name': '{original_name}', 'original_name': '{original_name}',
'state': '{state}', 'state': '{state}',
'title': '{title}', 'title': '{title}',
'date': '{(%[a-zA-Z][^a-zA-Z]*){1,8}}', # search for date format string
} }
def _check_for_early_morning_photos(self, date): def get_early_morning_photos_date(self, date, mask):
"""check for early hour photos to be grouped with previous day""" """check for early hour photos to be grouped with previous day"""
for m in '%H', '%M', '%S','%I', '%p', '%f':
if m in mask:
# D'ont change date format if datestring contain hour, minutes or seconds...
return date.strftime(mask)
if date.hour < self.day_begins: if date.hour < self.day_begins:
self.logger.info( self.logger.info(
"moving this photo to the previous day for classification purposes" "moving this photo to the previous day for classification purposes"
) )
# push it to the day before for classification purposes # push it to the day before for classification purposes
date = date - timedelta(hours=date.hour + 1) date = date - timedelta(hours=date.hour + 1)
return date return date.strftime(mask)
def _get_folders(self, folders, mask): def _get_folders(self, folders, mask):
""" """
@ -186,8 +139,7 @@ class Collection:
date = metadata['date_media'] date = metadata['date_media']
# early morning photos can be grouped with previous day # early morning photos can be grouped with previous day
if date is not None: if date is not None:
date = self._check_for_early_morning_photos(date) part = self.get_early_morning_photos_date(date, mask)
part = date.strftime(mask)
elif item == 'folder': elif item == 'folder':
part = os.path.basename(metadata['subdirs']) part = os.path.basename(metadata['subdirs'])
@ -295,6 +247,68 @@ class Collection:
return None return None
class Collection:
"""Class of the media collection."""
def __init__(
self,
root,
path_format,
album_from_folder=False,
cache=False,
day_begins=0,
dry_run=False,
exclude=set(),
filter_by_ext=set(),
glob='**/*',
interactive=False,
logger=logging.getLogger(),
max_deep=None,
mode='copy',
use_date_filename=False,
use_file_dates=False,
):
# Attributes
self.root = Path(root).expanduser().absolute()
if not self.root.exists():
logger.error(f'Directory {self.root} does not exist')
sys.exit(1)
self.path_format = path_format
self.db = Sqlite(self.root)
# Options
self.album_from_folder = album_from_folder
self.cache = cache
self.day_begins = day_begins
self.dry_run = dry_run
self.exclude = exclude
if '%media' in filter_by_ext:
filter_by_ext.remove('%media')
self.filter_by_ext = filter_by_ext.union(media.extensions)
else:
self.filter_by_ext = filter_by_ext
self.glob = glob
self.interactive = interactive
self.logger = logger.getChild(self.__class__.__name__)
self.max_deep = max_deep
self.mode = mode
# List to store media metadata
self.medias = []
self.summary = Summary()
self.use_date_filename = use_date_filename
self.use_file_dates = use_file_dates
self.src_list = []
self.dest_list = []
# Constants
self.theme = request.load_theme()
def _checkcomp(self, dest_path, src_checksum): def _checkcomp(self, dest_path, src_checksum):
"""Check file.""" """Check file."""
if self.dry_run: if self.dry_run:
@ -845,7 +859,8 @@ class Collection:
) )
metadata = media.get_metadata(self.root, loc, self.db, self.cache) metadata = media.get_metadata(self.root, loc, self.db, self.cache)
# Get the destination path according to metadata # Get the destination path according to metadata
relpath = Path(self.get_path(metadata)) fpath = FPath(self.path_format, self.day_begins, self.logger)
relpath = Path(fpath.get_path(metadata))
files_data.append((copy(media), relpath)) files_data.append((copy(media), relpath))
@ -1031,3 +1046,5 @@ class Collection:
result = self.check_db() result = self.check_db()
return self.summary, result return self.summary, result

View File

@ -11,7 +11,7 @@ from time import sleep
from .conftest import randomize_files, randomize_db from .conftest import randomize_files, randomize_db
from ordigi import constants from ordigi import constants
from ordigi.collection import Collection from ordigi.collection import Collection, FPath
from ordigi.database import Sqlite from ordigi.database import Sqlite
from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool
from ordigi.geolocation import GeoLocation from ordigi.geolocation import GeoLocation
@ -20,7 +20,7 @@ from ordigi.media import Media
from ordigi import utils from ordigi import utils
class TestCollection: class TestFPath:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_class(cls, sample_files_paths): def setup_class(cls, sample_files_paths):
@ -28,18 +28,13 @@ class TestCollection:
cls.path_format = constants.default_path + '/' + constants.default_name cls.path_format = constants.default_path + '/' + constants.default_name
cls.logger = log.get_logger(level=10) cls.logger = log.get_logger(level=10)
def teardown_class(self):
terminate_exiftool()
assert not exiftool_is_running()
def test_get_part(self, tmp_path): def test_get_part(self, tmp_path):
""" """
Test all parts Test all parts
""" """
fpath = FPath(self.path_format, 4, self.logger)
# Item to search for: # Item to search for:
collection = Collection(tmp_path, self.path_format, items = fpath.get_items()
use_date_filename=True, use_file_dates=True)
items = collection.get_items()
masks = [ masks = [
'{album}', '{album}',
'{basename}', '{basename}',
@ -77,7 +72,7 @@ class TestCollection:
for mask in masks: for mask in masks:
matched = re.search(regex, mask) matched = re.search(regex, mask)
if matched: if matched:
part = collection.get_part(item, mask[1:-1], metadata) part = fpath.get_part(item, mask[1:-1], metadata)
# check if part is correct # check if part is correct
assert isinstance(part, str), file_path assert isinstance(part, str), file_path
if item == 'basename': if item == 'basename':
@ -113,6 +108,28 @@ class TestCollection:
else: else:
assert part == '', file_path assert part == '', file_path
def test_get_early_morning_photos_date(self):
date = datetime(2021, 10, 16, 2, 20, 40)
fpath = FPath(self.path_format, 4, self.logger)
part = fpath.get_early_morning_photos_date(date, '%Y-%m-%d')
assert part == '2021-10-15'
part = fpath.get_early_morning_photos_date(date, '%Y%m%d-%H%M%S')
assert part == '20211016-022040'
class TestCollection:
@pytest.fixture(autouse=True)
def setup_class(cls, sample_files_paths):
cls.src_path, cls.file_paths = sample_files_paths
cls.path_format = constants.default_path + '/' + constants.default_name
cls.logger = log.get_logger(level=10)
def teardown_class(self):
terminate_exiftool()
assert not exiftool_is_running()
def test_sort_files(self, tmp_path): def test_sort_files(self, tmp_path):
collection = Collection(tmp_path, self.path_format, collection = Collection(tmp_path, self.path_format,
album_from_folder=True, logger=self.logger) album_from_folder=True, logger=self.logger)
@ -182,9 +199,6 @@ class TestCollection:
# TODO check for conflicts # TODO check for conflicts
# TODO check date
def test__get_files_in_path(self, tmp_path): def test__get_files_in_path(self, tmp_path):
collection = Collection(tmp_path, self.path_format, collection = Collection(tmp_path, self.path_format,
exclude={'**/*.dng',}, exclude={'**/*.dng',},