Refactoring Media class

This commit is contained in:
Cédric Leporcq 2021-11-08 07:02:21 +01:00
parent ad14604648
commit a693e6018a
3 changed files with 317 additions and 133 deletions

View File

@ -15,7 +15,7 @@ from pathlib import Path, PurePath
import inquirer
from ordigi.database import Sqlite
from ordigi.media import Media
from ordigi.media import Media, Medias
from ordigi.images import Image, Images
from ordigi import request
from ordigi.summary import Summary
@ -440,98 +440,6 @@ class Paths:
return self.paths_list
class Medias:
"""Get media data in collection or source path"""
def __init__(
self,
paths,
root,
album_from_folder=False,
cache=False,
db=None,
interactive=False,
ignore_tags=None,
logger=logging.getLogger(),
use_date_filename=False,
use_file_dates=False,
):
# Modules
self.db = db
self.paths = paths
# Arguments
self.root = root
# Options
self.cache = cache
self.album_from_folder = album_from_folder
self.ignore_tags = ignore_tags
self.interactive = interactive
self.logger = logger.getChild(self.__class__.__name__)
self.use_date_filename = use_date_filename
self.use_file_dates = use_file_dates
# Attributes
# List to store medias datas
self.datas = []
self.theme = request.load_theme()
def get_media(self, file_path, src_dir, loc=None):
media = Media(
file_path,
src_dir,
self.album_from_folder,
self.ignore_tags,
self.interactive,
self.logger,
self.use_date_filename,
self.use_file_dates,
)
media.get_metadata(self.root, loc, self.db.sqlite, self.cache)
return media
def get_medias(self, src_dirs, imp=False, loc=None):
"""Get medias data"""
for src_dir in src_dirs:
src_dir = self.paths.check(src_dir)
paths = self.paths.get_paths_list(src_dir)
# Get medias and src_dirs
for src_path in paths:
if self.root not in src_path.parents:
if not imp:
self.logger.error(f"""{src_path} not in {self.root}
collection, use `ordigi import`""")
sys.exit(1)
# Get file metadata
media = self.get_media(src_path, src_dir, loc)
yield media
def update_exif_data(self, media):
updated = False
if self.album_from_folder:
media.set_album_from_folder()
updated = True
if media.metadata['original_name'] in (False, ''):
media.set_value('original_name', media.metadata['filename'])
updated = True
if self.album_from_folder:
album = media.metadata['album']
if album and album != '':
media.set_value('album', album)
updated = True
if updated:
return True
return False
class SortMedias:
"""Sort medias in collection"""
@ -576,25 +484,24 @@ class SortMedias:
return True
def _record_file(self, src_path, dest_path, media, imp=False):
def _record_file(self, src_path, dest_path, metadata, imp=False):
"""Check file and record the file to db"""
# Check if file remain the same
checksum = media.metadata['checksum']
checksum = metadata['checksum']
if not self._checkcomp(dest_path, checksum):
self.logger.error(f'Files {src_path} and {dest_path} are not identical')
self.summary.append('check', False, src_path, dest_path)
return False
# change media file_path to dest_path
media.file_path = dest_path
if not self.dry_run:
updated = self.medias.update_exif_data(media)
updated = self.medias.update_exif_data(metadata)
if updated:
checksum = utils.checksum(dest_path)
media.metadata['checksum'] = checksum
metadata['checksum'] = checksum
if not self.dry_run:
self.db.add_file_data(media.metadata)
self.db.add_file_data(metadata)
if imp != 'copy' and self.root in src_path.parents:
self.db.sqlite.delete_filepath(str(src_path.relative_to(self.root)))
@ -613,7 +520,7 @@ class SortMedias:
self.summary.append('sort', False, src_path, dest_path)
def sort_file(self, src_path, dest_path, media, imp=False):
def sort_file(self, src_path, dest_path, metadata, imp=False):
"""Sort file and register it to db"""
if imp == 'copy':
self.fileio.copy(src_path, dest_path)
@ -622,7 +529,7 @@ class SortMedias:
if self.db:
result = self._record_file(
src_path, dest_path, media, imp=imp
src_path, dest_path, metadata, imp=imp
)
else:
result = True
@ -637,8 +544,8 @@ class SortMedias:
:param Path: A fully qualified path of the to create.
:returns: bool
"""
for media in self.medias.datas:
relpath = os.path.dirname(media.metadata['file_path'])
for file_path, metadata in self.medias.datas.items():
relpath = os.path.dirname(metadata['file_path'])
directory_path = self.root / relpath
parts = directory_path.relative_to(self.root).parts
for i, _ in enumerate(parts):
@ -661,10 +568,9 @@ class SortMedias:
self.logger.warning(f'Renaming {dir_path} to {file_path}')
if not self.dry_run:
shutil.move(dir_path, file_path)
for med in self.medias.datas:
if med.file_path == dir_path:
med.file_path = file_path
break
metadata = self.medias.datas[dir_path]
self.medias.datas[file_path] = metadata
del(self.medias.datas[dir_path])
if not self.dry_run:
directory_path.mkdir(parents=True, exist_ok=True)
@ -706,7 +612,7 @@ class SortMedias:
def _solve_conflicts(self, conflicts, remove_duplicates):
unresolved_conflicts = []
while conflicts != []:
src_path, dest_path, media = conflicts.pop()
src_path, dest_path, metadata = conflicts.pop()
# Check for conflict status again in case is has changed
conflict = self.check_conflicts(src_path, dest_path, remove_duplicates)
@ -726,12 +632,12 @@ class SortMedias:
if conflict == 1:
# i = 100:
unresolved_conflicts.append((src_path, dest_path, media))
unresolved_conflicts.append((src_path, dest_path, metadata))
self.logger.error(f"Too many appends for {dest_path}")
media.metadata['file_path'] = os.path.relpath(dest_path, self.root)
metadata['file_path'] = os.path.relpath(dest_path, self.root)
yield (src_path, dest_path, media), conflict
yield (src_path, dest_path, metadata), conflict
def sort_medias(self, imp=False, remove_duplicates=False):
"""
@ -741,19 +647,18 @@ class SortMedias:
self._create_directories()
conflicts = []
for media in self.medias.datas:
src_path = media.file_path
dest_path = self.root / media.metadata['file_path']
for src_path, metadata in self.medias.datas.items():
dest_path = self.root / metadata['file_path']
conflict = self.check_conflicts(src_path, dest_path, remove_duplicates)
if not conflict:
self.sort_file(
src_path, dest_path, media, imp=imp
src_path, dest_path, metadata, imp=imp
)
elif conflict == 1:
# There is conflict and file are different
conflicts.append((src_path, dest_path, media))
conflicts.append((src_path, dest_path, metadata))
elif conflict == 3:
# Same file checksum
if imp == 'move':
@ -766,10 +671,10 @@ class SortMedias:
for files_data, conflict in self._solve_conflicts(conflicts,
remove_duplicates):
src_path, dest_path, media = files_data
src_path, dest_path, metadata = files_data
if not conflict:
self.sort_file(
src_path, dest_path, media, imp=imp
src_path, dest_path, metadata, imp=imp
)
elif conflict == 1:
# There is unresolved conflict
@ -787,6 +692,7 @@ class SortMedias:
class Collection(SortMedias):
"""Class of the media collection."""
def __init__(
self,
root,
@ -1047,7 +953,8 @@ class Collection(SortMedias):
media.metadata['file_path'] = fpath.get_path(media.metadata)
subdirs.add(media.file_path.parent)
self.medias.datas.append(copy(media))
src_path = media.file_path
self.medias.datas[src_path] = copy(media.metadata)
# Sort files and solve conflicts
self.summary = self.sort_medias(imp, remove_duplicates)
@ -1099,7 +1006,8 @@ class Collection(SortMedias):
dedup_path.append(''.join(filtered_items))
media.metadata['file_path'] = os.path.join(*dedup_path)
self.medias.datas.append(copy(media))
src_path = media.file_path
self.medias.datas[src_path] = copy(media.metadata)
# Sort files and solve conflicts
self.sort_medias(remove_duplicates=remove_duplicates)
@ -1120,9 +1028,10 @@ class Collection(SortMedias):
self.paths.paths_list.append(img_path)
media = self.medias.get_media(img_path, path)
relpath = os.path.join(directory_name, image.img_path.name)
relpath = os.path.join(directory_name, img_path.name)
media.metadata['file_path'] = relpath
self.medias.datas.append(copy(media))
file_path = media.file_path
self.medias.datas[file_path] = copy(media.metadata)
if self.medias.datas:
# Found similar images to image
@ -1130,7 +1039,8 @@ class Collection(SortMedias):
media = self.medias.get_media(image.img_path, path)
relpath = os.path.join(directory_name, image.img_path.name)
media.metadata['file_path'] = relpath
self.medias.datas.insert(0, copy(media))
file_path = media.file_path
self.medias.datas[file_path] = copy(media.metadata)
return True
@ -1146,6 +1056,8 @@ class Collection(SortMedias):
images = Images(images_paths, logger=self.logger)
nb_row_ini = self.db.sqlite.len('metadata')
for image in images_paths:
# Clear datas in every loops
self.medias.datas = {}
similar_images = self._find_similar_images(
image, images, path, dest_dir, similarity
)

View File

@ -16,10 +16,185 @@ from ordigi import utils
from ordigi import request
class Metadata:
def __init__(self, ignore_tags=None):
# Options
if ignore_tags is None:
ignore_tags = set()
self.exif_metadata = []
self.metadata = {}
# self.datas = {}
self.ignore_tags = ignore_tags
# Attributes
self.tags_keys = self.get_tags()
def get_tags(self) -> dict:
"""Get exif tags groups in dict"""
tags_keys = {}
tags_keys['date_original'] = [
'EXIF:DateTimeOriginal',
'H264:DateTimeOriginal',
'QuickTime:ContentCreateDate',
]
tags_keys['date_created'] = [
'EXIF:CreateDate',
'QuickTime:CreationDate',
'QuickTime:CreateDate',
'QuickTime:CreationDate-und-US',
'QuickTime:MediaCreateDate',
]
tags_keys['date_modified'] = ['File:FileModifyDate', 'QuickTime:ModifyDate']
tags_keys['camera_make'] = ['EXIF:Make', 'QuickTime:Make']
tags_keys['camera_model'] = ['EXIF:Model', 'QuickTime:Model']
tags_keys['album'] = ['XMP-xmpDM:Album', 'XMP:Album']
tags_keys['title'] = ['XMP:Title', 'XMP:DisplayName']
tags_keys['latitude'] = [
'EXIF:GPSLatitude',
'XMP:GPSLatitude',
# 'QuickTime:GPSLatitude',
'Composite:GPSLatitude',
]
tags_keys['longitude'] = [
'EXIF:GPSLongitude',
'XMP:GPSLongitude',
# 'QuickTime:GPSLongitude',
'Composite:GPSLongitude',
]
tags_keys['latitude_ref'] = ['EXIF:GPSLatitudeRef']
tags_keys['longitude_ref'] = ['EXIF:GPSLongitudeRef']
tags_keys['original_name'] = ['XMP:OriginalFileName']
# Remove ignored tag from list
for tag_regex in self.ignore_tags:
for key, tags in tags_keys.items():
for i, tag in enumerate(tags):
if re.match(tag_regex, tag):
del tags_keys[key][i]
return tags_keys
def _del_ignored_tags(self):
for tag_regex in self.ignore_tags:
ignored_tags = set()
for tag in self.exif_metadata:
if re.search(tag_regex, tag) is not None:
ignored_tags.add(tag)
for ignored_tag in ignored_tags:
del self.exif_metadata[ignored_tag]
class WriteExif(Metadata):
def __init__(
self,
file_path,
metadata,
exif_metadata=None,
ignore_tags=None,
logger=logging.getLogger(),
):
super().__init__(ignore_tags)
self.file_path = file_path
self.metadata = metadata
if not exif_metadata:
exif_metadata = []
self.exif_metadata = exif_metadata
self.logger = logger.getChild(self.__class__.__name__)
def set_value(self, tag, value):
"""Set value of a tag.
:returns: value (str)
"""
return ExifTool(self.file_path, logger=self.logger).setvalue(tag, value)
def set_key_values(self, key, value):
"""Set tags values for given key"""
status = True
if self.exif_metadata is None:
return False
for tag in self.tags_keys[key]:
if tag in self.exif_metadata:
if not self.set_value(tag, value):
status = False
return status
def set_date_media(self, time):
"""
Set the date/time a photo was taken.
:param datetime time: datetime object of when the photo was taken
:returns: bool
"""
if time is None:
return False
formatted_time = time.strftime('%Y:%m:%d %H:%M:%S')
status = self.set_value('date_original', formatted_time)
if status == False:
# exif attribute date_original d'ont exist
status = self.set_value('date_created', formatted_time)
return status
def set_coordinates(self, latitude, longitude):
status = []
if self.metadata['latitude_ref']:
latitude = abs(latitude)
if latitude > 0:
status.append(self.set_value('latitude_ref', 'N'))
else:
status.append(self.set_value('latitude_ref', 'S'))
status.append(self.set_value('latitude', latitude))
if self.metadata['longitude_ref']:
longitude = abs(longitude)
if longitude > 0:
status.append(self.set_value('latitude_ref', 'E'))
else:
status.append(self.set_value('longitude_ref', 'W'))
status.append(self.set_value('longitude', longitude))
if all(status):
return True
else:
return False
def set_album_from_folder(self):
"""Set the album attribute based on the leaf folder name
:returns: bool
"""
return self.set_value('album', self.file_path.parent.name)
class ReadExif(Metadata):
def __init__(
self,
file_path,
src_dir,
album_from_folder=False,
ignore_tags=None,
interactive=False,
logger=logging.getLogger(),
use_date_filename=False,
use_file_dates=False,
):
super().__init__(ignore_tags)
class Media:
"""
The media class for all media objects.
The fully qualified path to the media file.
Extract matadatas from exiftool and sort them to dict structure
"""
d_coordinates = {'latitude': 'latitude_ref', 'longitude': 'longitude_ref'}
@ -128,8 +303,8 @@ class Media:
return mimetype[0]
def _get_key_values(self, key):
"""Get the first value of a tag set
"""
Get the first value of a tag set
:returns: str or None if no exif tag
"""
if self.exif_metadata is None:
@ -140,8 +315,8 @@ class Media:
yield self.exif_metadata[tag]
def get_value(self, tag):
"""Get given value from EXIF.
"""
Get given value from EXIF.
:returns: str or None
"""
if self.exif_metadata is None:
@ -152,7 +327,7 @@ class Media:
return self.exif_metadata[tag]
def get_date_format(self, value):
"""Formate date attribute.
"""Formatting date attribute.
:returns: datetime object or None
"""
# We need to parse a string to datetime format.
@ -587,3 +762,99 @@ class Media:
return self.set_value('album', self.file_path.parent.name)
class Medias:
"""Get media data in collection or source path"""
def __init__(
self,
paths,
root,
album_from_folder=False,
cache=False,
db=None,
interactive=False,
ignore_tags=None,
logger=logging.getLogger(),
use_date_filename=False,
use_file_dates=False,
):
# Modules
self.db = db
self.paths = paths
# Arguments
self.root = root
# Options
self.cache = cache
self.album_from_folder = album_from_folder
self.ignore_tags = ignore_tags
self.interactive = interactive
self.logger = logger.getChild(self.__class__.__name__)
self.use_date_filename = use_date_filename
self.use_file_dates = use_file_dates
# Attributes
# List to store medias datas
self.datas = {}
self.theme = request.load_theme()
def get_media(self, file_path, src_dir, loc=None):
media = Media(
file_path,
src_dir,
self.album_from_folder,
self.ignore_tags,
self.interactive,
self.logger,
self.use_date_filename,
self.use_file_dates,
)
media.get_metadata(self.root, loc, self.db.sqlite, self.cache)
return media
def get_medias(self, src_dirs, imp=False, loc=None):
"""Get medias data"""
for src_dir in src_dirs:
src_dir = self.paths.check(src_dir)
paths = self.paths.get_paths_list(src_dir)
# Get medias and src_dirs
for src_path in paths:
if self.root not in src_path.parents:
if not imp:
self.logger.error(f"""{src_path} not in {self.root}
collection, use `ordigi import`""")
sys.exit(1)
# Get file metadata
media = self.get_media(src_path, src_dir, loc)
yield media
def update_exif_data(self, metadata):
file_path = self.root / metadata['file_path']
exif = WriteExif(file_path, metadata, self.ignore_tags)
updated = False
if self.album_from_folder:
exif.set_album_from_folder()
updated = True
if metadata['original_name'] in (False, ''):
exif.set_value('original_name', metadata['filename'])
updated = True
if self.album_from_folder:
album = metadata['album']
if album and album != '':
exif.set_value('album', album)
updated = True
if updated:
return True
return False

View File

@ -206,13 +206,14 @@ class TestCollection:
# copy mode
src_path = Path(self.src_path, 'test_exif', 'photo.png')
media = Media(src_path, self.src_path)
metadata = media.get_metadata(tmp_path)
media.get_metadata(tmp_path)
name = 'photo_' + str(imp) + '.png'
media.metadata['file_path'] = name
dest_path = Path(tmp_path, name)
src_checksum = utils.checksum(src_path)
summary = collection.sort_file(src_path, dest_path, media,
imp=imp)
summary = collection.sort_file(
src_path, dest_path, media.metadata, imp=imp
)
assert not summary.errors
# Ensure files remain the same
assert collection._checkcomp(dest_path, src_checksum)