Refactoring media class

This commit is contained in:
Cédric Leporcq 2021-08-14 21:31:37 +02:00
parent 11ad41da09
commit 08aaa0c5c4
8 changed files with 8 additions and 592 deletions

View File

@ -9,11 +9,12 @@ from datetime import datetime
import click
from send2trash import send2trash
from ordigi import constants
from ordigi import config
from ordigi.filesystem import FileSystem
from ordigi import constants
from ordigi import log
from ordigi.database import Db
from ordigi.media.media import Media, get_all_subclasses
from ordigi.filesystem import FileSystem
from ordigi.media import Media, get_all_subclasses
from ordigi.summary import Summary
FILESYSTEM = FileSystem()

View File

@ -83,7 +83,7 @@ class Db(object):
# structure might be needed. Some speed up ideas:
# - Sort it and inter-half method can be used
# - Use integer part of long or lat as key to get a lower search list
# - Cache a small number of lookups, photos are likely to be taken in
# - Cache a small number of lookups, images are likely to be taken in
# clusters around a spot during import.
def add_location(self, latitude, longitude, place, write=False):
"""Add a location to the database.

View File

@ -17,8 +17,9 @@ from datetime import datetime, timedelta
from ordigi import constants
from ordigi import geolocation
from ordigi.media.media import get_media_class, get_all_subclasses
from ordigi.media.photo import Photo
from ordigi import media
from ordigi.media import Media, get_all_subclasses
from ordigi.images import Images
from ordigi.summary import Summary

View File

@ -1,36 +0,0 @@
"""
The audio module contains classes specifically for dealing with audio files.
The :class:`Audio` class inherits from the :class:`~ordigi.media.Media`
class.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
import os
from .media import Media
class Audio(Media):
"""An audio object.
:param str source: The fully qualified path to the audio file.
"""
__name__ = 'Audio'
#: Valid extensions for audio files.
extensions = ('m4a',)
def __init__(self, source=None, ignore_tags=set()):
super().__init__(source, ignore_tags=set())
def is_valid(self):
"""Check the file extension against valid file extensions.
The list of valid file extensions come from self.extensions.
:returns: bool
"""
source = self.source
return os.path.splitext(source)[1][1:].lower() in self.extensions

View File

@ -1,349 +0,0 @@
"""
Base :class:`Media` class for media objects
The Media class provides some base functionality used by all the media types.
Sub-classes (:class:`~ordigi.media.Audio`, :class:`~ordigi.media.Photo`, and :class:`~ordigi.media.Video`).
"""
import mimetypes
import os
import six
import logging
# load modules
from dateutil.parser import parse
import re
from ordigi.exiftool import ExifTool, ExifToolCaching
class Media():
"""The media class for all media objects.
:param str source: The fully qualified path to the video file.
"""
__name__ = 'Media'
d_coordinates = {
'latitude': 'latitude_ref',
'longitude': 'longitude_ref'
}
PHOTO = ('arw', 'cr2', 'dng', 'gif', 'heic', 'jpeg', 'jpg', 'nef', 'png', 'rw2')
AUDIO = ('m4a',)
VIDEO = ('avi', 'm4v', 'mov', 'mp4', 'mpg', 'mpeg', '3gp', 'mts')
extensions = PHOTO + AUDIO + VIDEO
def __init__(self, sources=None, ignore_tags=set(), logger=logging.getLogger()):
self.source = sources
self.ignore_tags = ignore_tags
self.tags_keys = self.get_tags()
self.exif_metadata = None
self.metadata = None
self.logger = logger
def get_tags(self):
tags_keys = {}
tags_keys['date_original'] = [
'EXIF:DateTimeOriginal',
'H264:DateTimeOriginal',
'QuickTime:ContentCreateDate'
]
tags_keys['date_created'] = [
'EXIF:CreateDate',
'QuickTime:CreationDate',
'QuickTime:CreateDate',
'QuickTime:CreationDate-und-US',
'QuickTime:MediaCreateDate'
]
tags_keys['date_modified'] = [
'File:FileModifyDate',
'QuickTime:ModifyDate'
]
tags_keys['camera_make'] = ['EXIF:Make', 'QuickTime:Make']
tags_keys['camera_model'] = ['EXIF:Model', 'QuickTime:Model']
tags_keys['album'] = ['XMP-xmpDM:Album', 'XMP:Album']
tags_keys['title'] = ['XMP:Title', 'XMP:DisplayName']
tags_keys['latitude'] = [
'EXIF:GPSLatitude',
'XMP:GPSLatitude',
# 'QuickTime:GPSLatitude',
'Composite:GPSLatitude'
]
tags_keys['longitude'] = [
'EXIF:GPSLongitude',
'XMP:GPSLongitude',
# 'QuickTime:GPSLongitude',
'Composite:GPSLongitude'
]
tags_keys['latitude_ref'] = ['EXIF:GPSLatitudeRef']
tags_keys['longitude_ref'] = ['EXIF:GPSLongitudeRef']
tags_keys['original_name'] = ['XMP:OriginalFileName']
# Remove ignored tag from list
for tag_regex in self.ignore_tags:
ignored_tags = set()
for key, tags in tags_keys.items():
for n, tag in enumerate(tags):
if re.match(tag_regex, tag):
del(tags_keys[key][n])
return tags_keys
def _del_ignored_tags(self, exif_metadata):
for tag_regex in self.ignore_tags:
ignored_tags = set()
for tag in exif_metadata:
if re.search(tag_regex, tag) is not None:
ignored_tags.add(tag)
for ignored_tag in ignored_tags:
del exif_metadata[ignored_tag]
def get_mimetype(self):
"""Get the mimetype of the file.
:returns: str or None
"""
mimetype = mimetypes.guess_type(self.source)
if(mimetype is None):
return None
return mimetype[0]
def _get_key_values(self, key):
"""Get the first value of a tag set
:returns: str or None if no exif tag
"""
if self.exif_metadata is None:
return None
for tag in self.tags_keys[key]:
if tag in self.exif_metadata:
yield self.exif_metadata[tag]
def get_value(self, tag):
"""Get given value from EXIF.
:returns: str or None
"""
exiftool_attributes = self.get_exiftool_attributes()
if exiftool_attributes is None:
return None
if(tag not in exiftool_attributes):
return None
return exiftool_attributes[tag]
def get_date_format(self, value):
"""Formate date attribute.
:returns: datetime object or None
"""
# We need to parse a string to datetime format.
# EXIF DateTimeOriginal and EXIF DateTime are both stored
# in %Y:%m:%d %H:%M:%S format
if value is None:
return None
try:
# correct nasty formated date
regex = re.compile(r'(\d{4}):(\d{2}):(\d{2})')
if(re.match(regex , value) is not None): # noqa
value = re.sub(regex , r'\g<1>-\g<2>-\g<3>', value)
return parse(value)
except BaseException or dateutil.parser._parser.ParserError as e:
self.logger.error(e)
return None
def get_coordinates(self, key, value):
"""Get latitude or longitude value
:param str key: Type of coordinate to get. Either "latitude" or
"longitude".
:returns: float or None
"""
if value is None:
return None
if isinstance(value, str) and len(value) == 0:
# If exiftool GPS output is empty, the data returned will be a str
# with 0 length.
# https://github.com/jmathai/elodie/issues/354
return None
# Cast coordinate to a float due to a bug in exiftool's
# -json output format.
# https://github.com/jmathai/elodie/issues/171
# http://u88.n24.queensu.ca/exiftool/forum/index.php/topic,7952.0.html # noqa
this_coordinate = float(value)
direction_multiplier = 1.0
# when self.set_gps_ref != True
if key == 'latitude':
if 'EXIF:GPSLatitudeRef' in self.exif_metadata:
if self.exif_metadata['EXIF:GPSLatitudeRef'] == 'S':
direction_multiplier = -1.0
elif key == 'longitude':
if 'EXIF:GPSLongitudeRef' in self.exif_metadata:
if self.exif_metadata['EXIF:GPSLongitudeRef'] == 'W':
direction_multiplier = -1.0
return this_coordinate * direction_multiplier
return None
def get_metadata(self):
"""Get a dictionary of metadata from exif.
All keys will be present and have a value of None if not obtained.
:returns: dict
"""
# Get metadata from exiftool.
self.exif_metadata = ExifToolCaching(self.source, logger=self.logger).asdict()
# TODO to be removed
self.metadata = {}
# Retrieve selected metadata to dict
if not self.exif_metadata:
return self.metadata
for key in self.tags_keys:
formated_data = None
for value in self._get_key_values(key):
if 'date' in key:
formated_data = self.get_date_format(value)
elif key in ('latitude', 'longitude'):
formated_data = self.get_coordinates(key, value)
else:
if value is not None and value != '':
formated_data = value
else:
formated_data = None
if formated_data:
# Use this data and break
break
self.metadata[key] = formated_data
self.metadata['base_name'] = os.path.basename(os.path.splitext(self.source)[0])
self.metadata['ext'] = os.path.splitext(self.source)[1][1:]
self.metadata['directory_path'] = os.path.dirname(self.source)
return self.metadata
def has_exif_data(self):
"""Check if file has metadata, date original"""
if not self.metadata:
return False
if 'date_original' in self.metadata:
if self.metadata['date_original'] != None:
return True
return False
@classmethod
def get_class_by_file(cls, _file, classes, ignore_tags=set(), logger=logging.getLogger()):
"""Static method to get a media object by file.
"""
basestring = (bytes, str)
if not isinstance(_file, basestring) or not os.path.isfile(_file):
return None
extension = os.path.splitext(_file)[1][1:].lower()
if len(extension) > 0:
for i in classes:
if(extension in i.extensions):
return i(_file, ignore_tags=ignore_tags)
exclude_list = ['.DS_Store', '.directory']
if os.path.basename(_file) == '.DS_Store':
return None
else:
return Media(_file, ignore_tags=ignore_tags, logger=logger)
def set_date_taken(self, date_key, time):
"""Set the date/time a photo was taken.
:param datetime time: datetime object of when the photo was taken
:returns: bool
"""
if(time is None):
return False
formatted_time = time.strftime('%Y:%m:%d %H:%M:%S')
status = self.set_value('date_original', formatted_time)
if status == False:
# exif attribute date_original d'ont exist
status = self.set_value('date_created', formatted_time)
return status
def set_coordinates(self, latitude, longitude):
status = []
if self.metadata['latitude_ref']:
latitude = abs(latitude)
if latitude > 0:
status.append(self.set_value('latitude_ref', 'N'))
else:
status.append(self.set_value('latitude_ref', 'S'))
status.append(self.set_value('latitude', latitude))
if self.metadata['longitude_ref']:
longitude = abs(longitude)
if longitude > 0:
status.append(self.set_value('latitude_ref', 'E'))
else:
status.append(self.set_value('longitude_ref', 'W'))
status.append(self.set_value('longitude', longitude))
if all(status):
return True
else:
return False
def set_album_from_folder(self, path):
"""Set the album attribute based on the leaf folder name
:returns: bool
"""
folder = os.path.basename(os.path.dirname(self.source))
return set_value(self, 'album', folder)
def get_all_subclasses(cls=None):
"""Module method to get all subclasses of Media.
"""
subclasses = set()
this_class = Media
if cls is not None:
this_class = cls
subclasses.add(this_class)
this_class_subclasses = this_class.__subclasses__()
for child_class in this_class_subclasses:
subclasses.update(get_all_subclasses(child_class))
return subclasses
def get_media_class(_file, ignore_tags=set(), logger=logging.getLogger()):
if not os.path.exists(_file):
logger.warning(f'Could not find {_file}')
logger.error(f'Could not find {_file}')
return False
media = Media.get_class_by_file(_file, get_all_subclasses(),
ignore_tags=set(), logger=logger)
if not media:
logger.warning(f'File{_file} is not supported')
logger.error(f'File {_file} can\'t be imported')
return False
return media

View File

@ -1,158 +0,0 @@
"""
The photo module contains the :class:`Photo` class, which is used to track
image objects (JPG, DNG, etc.).
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
import imagehash
import imghdr
import logging
import numpy as np
import os
from PIL import Image, UnidentifiedImageError
import time
from .media import Media
class Photo(Media):
"""A photo object.
:param str source: The fully qualified path to the photo file
"""
__name__ = 'Photo'
#: Valid extensions for photo files.
extensions = ('arw', 'cr2', 'dng', 'gif', 'heic', 'jpeg', 'jpg', 'nef', 'png', 'rw2')
def __init__(self, source=None, hash_size=8, ignore_tags=set(),
logger=logging.getLogger()):
super().__init__(source, ignore_tags)
self.hash_size = hash_size
self.logger = logger
logger.setLevel(logging.INFO)
# HEIC extension support (experimental, not tested)
self.pyheif = False
try:
from pyheif_pillow_opener import register_heif_opener
self.pyheif = True
# Allow to open HEIF/HEIC images from pillow
register_heif_opener()
except ImportError as e:
self.logger.info(e)
def is_image(self, img_path):
"""Check whether the file is an image.
:returns: bool
"""
# gh-4 This checks if the source file is an image.
# It doesn't validate against the list of supported types.
# We check with imghdr and pillow.
if imghdr.what(img_path) is None:
# Pillow is used as a fallback
# imghdr won't detect all variants of images (https://bugs.python.org/issue28591)
# see https://github.com/jmathai/elodie/issues/281
# before giving up, we use `pillow` imaging library to detect file type
#
# It is important to note that the library doesn't decode or load the
# raster data unless it really has to. When you open a file,
# the file header is read to determine the file format and extract
# things like mode, size, and other properties required to decode the file,
# but the rest of the file is not processed until later.
try:
im = Image.open(img_path)
except (IOError, UnidentifiedImageError):
return False
if(im.format is None):
return False
return True
def get_images(self, file_paths):
'''
:returns: img_path generator
'''
for img_path in file_paths:
if self.is_image(img_path):
yield img_path
def get_images_hashes(self, file_paths):
"""Get image hashes"""
hashes = {}
duplicates = []
# Searching for duplicates.
for img_path in self.get_images(file_paths):
with Image.open(img_path) as img:
yield imagehash.average_hash(img, self.hash_size)
def find_duplicates(self, file_paths):
"""Find duplicates"""
for temp_hash in get_images_hashes(file_paths):
if temp_hash in hashes:
self.logger.info("Duplicate {} \nfound for image {}\n".format(img_path, hashes[temp_hash]))
duplicates.append(img_path)
else:
hashes[temp_hash] = img_path
return duplicates
def remove_duplicates(self, duplicates):
for duplicate in duplicates:
try:
os.remove(duplicate)
except OSError as error:
self.logger.error(error)
def remove_duplicates_interactive(self, duplicates):
if len(duplicates) != 0:
answer = input(f"Do you want to delete these {duplicates} images? Y/n: ")
if(answer.strip().lower() == 'y'):
self.remove_duplicates(duplicates)
self.logger.info(f'{duplicate} deleted successfully!')
else:
self.logger.info("No duplicates found")
def get_hash(self, img_path):
with Image.open(img_path) as img:
return imagehash.average_hash(img, self.hash_size).hash
def diff(self, hash1, hash2):
return np.count_nonzero(hash1 != hash2)
def similarity(self, img_diff):
threshold_img = img_diff / (self.hash_size**2)
similarity_img = round((1 - threshold_img) * 100)
return similarity_img
def find_similar(self, image, file_paths, similarity=80):
'''
Find similar images
:returns: img_path generator
'''
hash1 = ''
if self.is_image(image):
hash1 = self.get_hash(image)
self.logger.info(f'Finding similar images to {image}')
threshold = 1 - similarity/100
diff_limit = int(threshold*(self.hash_size**2))
for img_path in self.get_images(file_paths):
if img_path == image:
continue
hash2 = self.get_hash(img_path)
img_diff = self.diff(hash1, hash2)
if img_diff <= diff_limit:
similarity_img = self.similarity(img_diff)
self.logger.info(f'{img_path} image found {similarity_img}% similar to {image}')
yield img_path

View File

@ -1,43 +0,0 @@
"""
The video module contains the :class:`Video` class, which represents video
objects (AVI, MOV, etc.).
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
# load modules
from datetime import datetime
import os
import re
import time
from .media import Media
class Video(Media):
"""A video object.
:param str source: The fully qualified path to the video file.
"""
__name__ = 'Video'
#: Valid extensions for video files.
extensions = ('avi', 'm4v', 'mov', 'mp4', 'mpg', 'mpeg', '3gp', 'mts')
def __init__(self, source=None, ignore_tags=set()):
super().__init__(source, ignore_tags=set())
# self.set_gps_ref = False
def is_valid(self):
"""Check the file extension against valid file extensions.
The list of valid file extensions come from self.extensions.
:returns: bool
"""
source = self.source
return os.path.splitext(source)[1][1:].lower() in self.extensions