Move elodie code base to dozo and various fix

This commit is contained in:
Cédric Leporcq 2021-07-30 07:41:02 +02:00
parent 9f6eb52ebc
commit b2b6a9c170
34 changed files with 318 additions and 2359 deletions

261
dozo.py Executable file
View File

@ -0,0 +1,261 @@
#!/usr/bin/env python
import os
import re
import sys
import logging
from datetime import datetime
import click
from send2trash import send2trash
from dozo import constants
from dozo import config
from dozo.filesystem import FileSystem
from dozo.database import Db
from dozo.media.media import Media, get_all_subclasses
from dozo.summary import Summary
FILESYSTEM = FileSystem()
def print_help(command):
click.echo(command.get_help(click.Context(sort)))
@click.command('batch')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
def _batch(debug):
"""Run batch() for all plugins.
"""
constants.debug = debug
plugins = Plugins()
plugins.run_batch()
@click.command('sort')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@click.option('--dry-run', default=False, is_flag=True,
help='Dry run only, no change made to the filesystem.')
@click.option('--destination', '-d', type=click.Path(file_okay=False),
default=None, help='Sort files into this directory.')
@click.option('--copy', '-c', default=False, is_flag=True,
help='True if you want files to be copied over from src_dir to\
dest_dir rather than moved')
@click.option('--exclude-regex', '-e', default=set(), multiple=True,
help='Regular expression for directories or files to exclude.')
@click.option('--filter-by-ext', '-f', default=set(), multiple=True, help='''Use filename
extension to filter files for sorting. If value is '*', use
common media file extension for filtering. Ignored files remain in
the same directory structure''' )
@click.option('--ignore-tags', '-i', default=set(), multiple=True,
help='Specific tags or group that will be ignored when\
searching for file data. Example \'File:FileModifyDate\' or \'Filename\'' )
@click.option('--max-deep', '-m', default=None,
help='Maximum level to proceed. Number from 0 to desired level.')
@click.option('--remove-duplicates', '-r', default=False, is_flag=True,
help='True to remove files that are exactly the same in name\
and a file hash')
@click.option('--reset-cache', '-R', default=False, is_flag=True,
help='Regenerate the hash.json and location.json database ')
@click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
@click.argument('paths', required=True, nargs=-1, type=click.Path())
def _sort(debug, dry_run, destination, copy, exclude_regex, filter_by_ext, ignore_tags,
max_deep, remove_duplicates, reset_cache, verbose, paths):
"""Sort files or directories by reading their EXIF and organizing them
according to config.ini preferences.
"""
if copy:
mode = 'copy'
else:
mode = 'move'
if debug:
constants.debug = logging.DEBUG
elif verbose:
constants.debug = logging.INFO
else:
constants.debug = logging.ERROR
if max_deep is not None:
max_deep = int(max_deep)
logger = logging.getLogger('dozo')
logger.setLevel(constants.debug)
cache = True
if reset_cache:
cache = False
if not destination and paths:
destination = paths[-1]
paths = paths[0:-1]
else:
sys.exit(1)
paths = set(paths)
destination = os.path.abspath(os.path.expanduser(destination))
if not os.path.exists(destination):
logger.error(f'Directory {destination} does not exist')
conf = config.load_config(constants.CONFIG_FILE)
path_format = config.get_path_definition(conf)
# if no exclude list was passed in we check if there's a config
if len(exclude_regex) == 0:
if 'Exclusions' in conf:
exclude_regex = [value for key, value in conf.items('Exclusions')]
exclude_regex_list = set(exclude_regex)
# Initialize Db
db = Db(destination)
if 'Directory' in conf and 'day_begins' in conf['Directory']:
config_directory = conf['Directory']
day_begins = config_directory['day_begins']
else:
day_begins = 0
filesystem = FileSystem(cache, day_begins, dry_run, exclude_regex_list,
filter_by_ext, logger, max_deep, mode, path_format)
summary, has_errors = filesystem.sort_files(paths, destination, db,
remove_duplicates, ignore_tags)
if verbose or debug:
summary.write()
if has_errors:
sys.exit(1)
@click.command('generate-db')
@click.option('--path', type=click.Path(file_okay=False),
required=True, help='Path of your photo library.')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
def _generate_db(path, debug):
"""Regenerate the hash.json database which contains all of the sha256 signatures of media files. The hash.json file is located at ~/.dozo/.
"""
constants.debug = debug
result = Result()
path = os.path.abspath(os.path.expanduser(path))
if not os.path.isdir(path):
log.error('path is not a valid directory %s' % path)
sys.exit(1)
db = Db(path)
db.backup_hash_db()
db.reset_hash_db()
for current_file in FILESYSTEM.get_all_files(path):
result.append((current_file, True))
db.add_hash(db.checksum(current_file), current_file)
log.progress()
db.update_hash_db()
log.progress('', True)
result.write()
@click.command('verify')
@click.option('--path', type=click.Path(file_okay=False),
required=True, help='Path of your photo library.')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
def _verify(path, debug):
constants.debug = debug
result = Result()
db = Db(path)
for checksum, file_path in db.all():
if not os.path.isfile(file_path):
result.append((file_path, False))
log.progress('x')
continue
actual_checksum = db.checksum(file_path)
if checksum == actual_checksum:
result.append((file_path, True))
log.progress()
else:
result.append((file_path, False))
log.progress('x')
log.progress('', True)
result.write()
@click.command('compare')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@click.option('--dry-run', default=False, is_flag=True,
help='Dry run only, no change made to the filesystem.')
@click.option('--find-duplicates', '-f', default=False, is_flag=True)
@click.option('--output-dir', '-o', default=False, is_flag=True, help='output\
dir')
@click.option('--remove-duplicates', '-r', default=False, is_flag=True)
@click.option('--revert-compare', '-R', default=False, is_flag=True, help='Revert\
compare')
@click.option('--similar-to', '-s', default=False, help='Similar to given\
image')
@click.option('--similarity', '-S', default=80, help='Similarity level for\
images')
@click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
@click.argument('path', nargs=1, required=True)
def _compare(debug, dry_run, find_duplicates, output_dir, remove_duplicates,
revert_compare, similar_to, similarity, verbose, path):
'''Compare files in directories'''
logger = logging.getLogger('dozo')
if debug:
logger.setLevel(logging.DEBUG)
elif verbose:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.ERROR)
# Initialize Db
db = Db(path)
filesystem = FileSystem(mode='move', dry_run=dry_run, logger=logger)
if revert_compare:
summary, has_errors = filesystem.revert_compare(path, db, dry_run)
else:
summary, has_errors = filesystem.sort_similar_images(path, db,
similarity, dry_run)
if verbose or debug:
summary.write()
if has_errors:
sys.exit(1)
@click.group()
def main():
pass
main.add_command(_compare)
main.add_command(_sort)
main.add_command(_generate_db)
main.add_command(_verify)
main.add_command(_batch)
if __name__ == '__main__':
#Initialize ExifTool Subprocess
exiftool_addedargs = [
u'-config',
u'"{}"'.format(constants.exiftool_config)
]
with ExifTool(executable_=get_exiftool(), addedargs=exiftool_addedargs) as et:
main()

View File

@ -1,7 +1,7 @@
"""Load config file as a singleton."""
from configparser import RawConfigParser
from os import path
from elodie import constants
from dozo import constants
def load_config(file):

View File

@ -1,5 +1,5 @@
"""
Settings used by Elodie.
Settings.
"""
from os import environ, path
@ -8,8 +8,8 @@ from sys import version_info
#: If True, debug messages will be printed.
debug = False
#: Directory in which to store Elodie settings.
application_directory = '{}/.elodie'.format(path.expanduser('~'))
#: Directory in which to store Dozo settings.
application_directory = '{}/.dozo'.format(path.expanduser('~'))
if (
'ELODIE_APPLICATION_DIRECTORY' in environ and
path.isdir(environ['ELODIE_APPLICATION_DIRECTORY'])
@ -18,21 +18,21 @@ if (
default_path = '{%Y-%m-%b}/{album}|{city}|{"Unknown Location"}'
default_name = '{%Y-%m-%d_%H-%M-%S}-{original_name}-{title}.{ext}'
#: File in which to store details about media Elodie has seen.
default_geocoder = 'Nominatim'
# Checksum storage file.
hash_db = 'hash.json'
# TODO will be removed eventualy later
# hash_db = '{}/hash.json'.format(application_directory)
#: File in which to store geolocation details about media Elodie has seen.
# Geolocation details file.
location_db = 'location.json'
# TODO will be removed eventualy later
# location_db = '{}/location.json'.format(application_directory)
#: Elodie installation directory.
# Dozo installation directory.
script_directory = path.dirname(path.dirname(path.abspath(__file__)))
#: Path to Elodie's ExifTool config file.
#: Path to ExifTool config file.
exiftool_config = path.join(script_directory, 'configs', 'ExifTool_config')
#: Path to MapQuest base URL

View File

@ -1,5 +1,5 @@
"""
Methods for interacting with information Elodie caches about stored media.
Methods for interacting with information Dozo caches about stored media.
"""
from builtins import map
from builtins import object
@ -13,21 +13,21 @@ from math import radians, cos, sqrt
from shutil import copyfile
from time import strftime
from elodie import constants
from dozo import constants
class Db(object):
"""A class for interacting with the JSON files created by Elodie."""
"""A class for interacting with the JSON files created by Dozo."""
def __init__(self, target_dir):
# verify that the application directory (~/.elodie) exists,
# verify that the application directory (~/.dozo) exists,
# else create it
# if not os.path.exists(constants.application_directory):
# os.makedirs(constants.application_directory)
# Create dir for target database
dirname = os.path.join(target_dir, '.elodie')
dirname = os.path.join(target_dir, '.dozo')
# Legacy dir
# dirname = constants.application_directory

View File

@ -1,7 +1,5 @@
"""
General file system methods.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
from builtins import object
@ -16,43 +14,20 @@ import shutil
import time
from datetime import datetime, timedelta
from elodie import geolocation
from elodie import log
from elodie.config import load_config
from elodie import constants
from dozo import constants
from dozo import geolocation
from elodie.localstorage import Db
from elodie.media.media import get_media_class, get_all_subclasses
from elodie.media.photo import CompareImages
from elodie.plugins.plugins import Plugins
from elodie.summary import Summary
from dozo.media.media import get_media_class, get_all_subclasses
from dozo.media.photo import CompareImages
from dozo.summary import Summary
class FileSystem(object):
"""A class for interacting with the file system."""
def __init__(self, day_begins=0, dry_run=False, exclude_regex_list=set(),
def __init__(self, cache=False, day_begins=0, dry_run=False, exclude_regex_list=set(),
filter_by_ext=(), logger=logging.getLogger(), max_deep=None,
mode='copy', path_format=None):
# The default folder path is along the lines of 2017-06-17_01-04-14-dsc_1234-some-title.jpg
self.default_file_name_definition = {
'date': '%Y-%m-%d_%H-%M-%S',
'name': '%date-%original_name-%title.%extension',
}
# The default folder path is along the lines of 2015-01-Jan/Chicago
self.default_folder_path_definition = {
'date': '%Y-%m-%b',
'location': '%city',
'full_path': '%date/%album|%location|"{}"'.format(
geolocation.__DEFAULT_LOCATION__
),
}
self.cached_file_name_definition = None
self.cached_folder_path_definition = None
# Python3 treats the regex \s differently than Python2.
# It captures some additional characters like the unicode checkmark \u2713.
# See build failures in Python3 here.
# https://travis-ci.org/jmathai/elodie/builds/483012902
self.items = {
'album': '{album}',
@ -73,24 +48,23 @@ class FileSystem(object):
'date': '{(%[a-zA-Z][^a-zA-Z]*){1,8}}' # search for date format string
}
self.whitespace_regex = '[ \t\n\r\f\v]+'
self.cache = cache
self.day_begins = day_begins
self.dry_run = dry_run
self.exclude_regex_list = exclude_regex_list
self.mode = mode
self.logger = logger
self.summary = Summary()
self.day_begins = day_begins
self.filter_by_ext = filter_by_ext
self.logger = logger
self.max_deep = max_deep
self.mode = mode
# TODO have to be removed
if path_format:
self.path_format = path_format
else:
self.path_format = os.path.join(constants.default_path,
constants.default_name)
# Instantiate a plugins object
self.plugins = Plugins()
self.summary = Summary()
self.whitespace_regex = '[ \t\n\r\f\v]+'
def create_directory(self, directory_path):
@ -114,23 +88,6 @@ class FileSystem(object):
return False
def delete_directory_if_empty(self, directory_path):
"""Delete a directory only if it's empty.
Instead of checking first using `len([name for name in
os.listdir(directory_path)]) == 0`, we catch the OSError exception.
:param str directory_name: A fully qualified path of the directory
to delete.
"""
try:
os.rmdir(directory_path)
return True
except OSError:
pass
return False
def walklevel(self, src_path, maxlevel=None):
"""
@ -169,7 +126,7 @@ class FileSystem(object):
# Create a list of compiled regular expressions to match against the file path
compiled_regex_list = [re.compile(regex) for regex in exclude_regex_list]
for dirname, dirnames, filenames in os.walk(path):
if dirname == os.path.join(path, '.elodie'):
if dirname == os.path.join(path, '.dozo'):
continue
for filename in filenames:
# If file extension is in `extensions`
@ -183,13 +140,6 @@ class FileSystem(object):
):
yield filename_path
def get_current_directory(self):
"""Get the current working directory.
:returns: str
"""
return os.getcwd()
def check_for_early_morning_photos(self, date):
"""check for early hour photos to be grouped with previous day"""
@ -202,269 +152,6 @@ class FileSystem(object):
return date
def get_file_name(self, metadata):
"""Generate file name for a photo or video using its metadata.
Originally we hardcoded the file name to include an ISO date format.
We use an ISO8601-like format for the file name prefix. Instead of
colons as the separator for hours, minutes and seconds we use a hyphen.
https://en.wikipedia.org/wiki/ISO_8601#General_principles
PR #225 made the file name customizable and fixed issues #107 #110 #111.
https://github.com/jmathai/elodie/pull/225
:param media: A Photo or Video instance
:type media: :class:`~elodie.media.photo.Photo` or
:class:`~elodie.media.video.Video`
:returns: str or None for non-photo or non-videos
"""
if(metadata is None):
return None
# Get the name template and definition.
# Name template is in the form %date-%original_name-%title.%extension
# Definition is in the form
# [
# [('date', '%Y-%m-%d_%H-%M-%S')],
# [('original_name', '')], [('title', '')], // contains a fallback
# [('ext', '')]
# ]
name_template, definition = self.get_file_name_definition()
name = name_template
for parts in definition:
this_value = None
for this_part in parts:
part, mask = this_part
if part in ('date', 'day', 'month', 'year'):
date = self.get_date_taken(metadata)
if date is not None:
this_value = date.strftime(mask)
else:
this_value=''
break
elif part in ('location', 'city', 'state', 'country'):
place_name = geolocation.place_name(
metadata['latitude'],
metadata['longitude'],
db
)
location_parts = re.findall('(%[^%]+)', mask)
this_value = self.parse_mask_for_location(
mask,
location_parts,
place_name,
)
break
elif part in ('album', 'extension', 'title'):
key = part
if part == 'extension':
key = 'ext'
if metadata[key]:
this_value = re.sub(self.whitespace_regex, '-',
metadata[key].strip())
break
elif part in ('original_name'):
# First we check if we have metadata['original_name'].
# We have to do this for backwards compatibility because
# we original did not store this back into EXIF.
if metadata[part]:
this_value = os.path.splitext(metadata['original_name'])[0]
else:
# We didn't always store original_name so this is
# for backwards compatability.
# We want to remove the hardcoded date prefix we used
# to add to the name.
# This helps when re-running the program on file
# which were already processed.
this_value = re.sub(
'^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-',
'',
metadata['base_name']
)
if(len(this_value) == 0):
this_value = metadata['base_name']
# Lastly we want to sanitize the name
this_value = re.sub(self.whitespace_regex, '-', this_value.strip())
elif part.startswith('"') and part.endswith('"'):
this_value = part[1:-1]
break
# Here we replace the placeholder with it's corresponding value.
# Check if this_value was not set so that the placeholder
# can be removed completely.
# For example, %title- will be replaced with ''
# Else replace the placeholder (i.e. %title) with the value.
if this_value is None:
name = re.sub(
#'[^a-z_]+%{}'.format(part),
'[^a-zA-Z0-9_]+%{}'.format(part),
'',
name,
)
else:
name = re.sub(
'%{}'.format(part),
this_value,
name,
)
config = load_config(constants.CONFIG_FILE)
if('File' in config and 'capitalization' in config['File'] and config['File']['capitalization'] == 'upper'):
return name.upper()
else:
return name.lower()
def get_file_name_definition(self):
"""Returns a list of folder definitions.
Each element in the list represents a folder.
Fallback folders are supported and are nested lists.
Return values take the following form.
[
('date', '%Y-%m-%d'),
[
('default', '%city'),
('album', ''),
('"Unknown Location", '')
]
]
:returns: list
"""
# If we've done this already then return it immediately without
# incurring any extra work
if self.cached_file_name_definition is not None:
return self.cached_file_name_definition
config = load_config(constants.CONFIG_FILE)
# If File is in the config we assume name and its
# corresponding values are also present
config_file = self.default_file_name_definition
if('File' in config):
config_file = config['File']
# Find all subpatterns of name that map to the components of the file's
# name.
# I.e. %date-%original_name-%title.%extension => ['date', 'original_name', 'title', 'ext'] #noqa
path_parts = re.findall(
'(\%[a-z_]+)',
config_file['name']
)
if not path_parts or len(path_parts) == 0:
return (config_file['name'], self.default_file_name_definition)
self.cached_file_name_definition = []
for part in path_parts:
if part in config_file:
part = part[1:]
self.cached_file_name_definition.append(
[(part, config_file[part])]
)
else:
this_part = []
for p in part.split('|'):
p = p[1:]
this_part.append(
(p, config_file[p] if p in config_file else '')
)
self.cached_file_name_definition.append(this_part)
self.cached_file_name_definition = (config_file['name'], self.cached_file_name_definition)
return self.cached_file_name_definition
def get_folder_path_definition(self):
"""Returns a list of folder definitions.
Each element in the list represents a folder.
Fallback folders are supported and are nested lists.
Return values take the following form.
[
('date', '%Y-%m-%d'),
[
('default', '%city'),
('album', ''),
('"Unknown Location", '')
]
]
:returns: list
"""
# If we've done this already then return it immediately without
# incurring any extra work
if self.cached_folder_path_definition is not None:
return self.cached_folder_path_definition
config = load_config(constants.CONFIG_FILE)
# If Directory is in the config we assume full_path and its
# corresponding values (date, location) are also present
config_directory = self.default_folder_path_definition
if 'Directory' in config:
if 'full_path' in config['Directory']:
config_directory = config['Directory']
# Find all subpatterns of full_path that map to directories.
# I.e. %foo/%bar => ['foo', 'bar']
# I.e. %foo/%bar|%example|"something" => ['foo', 'bar|example|"something"']
path_parts = re.findall(
'(\%[^/]+)',
config_directory['full_path']
)
if not path_parts or len(path_parts) == 0:
return self.default_folder_path_definition
self.cached_folder_path_definition = []
for part in path_parts:
part = part.replace('%', '')
if part in config_directory:
self.cached_folder_path_definition.append(
[(part, config_directory[part])]
)
else:
this_part = []
for p in part.split('|'):
this_part.append(
(p, config_directory[p] if p in config_directory else '')
)
self.cached_folder_path_definition.append(this_part)
return self.cached_folder_path_definition
def get_folder_path(self, metadata, db, path_parts=None):
"""Given a media's metadata this function returns the folder path as a string.
:param dict metadata: Metadata dictionary.
:returns: str
"""
if path_parts is None:
path_parts = self.get_folder_path_definition()
path = []
for path_part in path_parts:
# We support fallback values so that
# 'album|city|"Unknown Location"
# %album|%city|"Unknown Location" results in
# My Album - when an album exists
# Sunnyvale - when no album exists but a city exists
# Unknown Location - when neither an album nor location exist
for this_part in path_part:
part, mask = this_part
this_path = self.get_dynamic_path(part, mask, metadata, db)
if this_path:
path.append(this_path.strip())
# We break as soon as we have a value to append
# Else we continue for fallbacks
break
return os.path.join(*path)
def get_location_part(self, mask, part, place_name):
"""Takes a mask for a location and interpolates the actual place names.
@ -525,7 +212,9 @@ class FileSystem(object):
place_name = geolocation.place_name(
metadata['latitude'],
metadata['longitude'],
db
db,
self.cache,
self.logger
)
if item == 'location':
mask = 'default'
@ -584,7 +273,6 @@ class FileSystem(object):
# if self.path_format is None:
# path_format = self.get_path_definition()
path_format = self.path_format
# self.cached_folder_path_definition = []
path = []
path_parts = path_format.split('/')
for path_part in path_parts:
@ -717,159 +405,6 @@ class FileSystem(object):
elif metadata['date_modified'] is not None:
return metadata['date_modified']
def get_dynamic_path(self, part, mask, metadata, db):
"""Parse a specific folder's name given a mask and metadata.
:param part: Name of the part as defined in the path (i.e. date from %date)
:param mask: Mask representing the template for the path (i.e. %city %state
:param metadata: Metadata dictionary.
:returns: str
"""
# Each part has its own custom logic and we evaluate a single part and return
# the evaluated string.
if part in ('date'):
# If Directory is in the config we assume full_path and its
# corresponding values (date, location) are also present
config_directory = self.default_folder_path_definition
config = load_config(constants.CONFIG_FILE)
if('Directory' in config):
if 'full_path' in config['Directory']:
config_directory = config['Directory']
# Get date mask from config
mask = ''
if 'date' in config_directory:
mask = config_directory['date']
if part in ('custom'):
custom_parts = re.findall('(%[a-z_]+)', mask)
folder = mask
for i in custom_parts:
folder = folder.replace(
i,
self.get_dynamic_path(i[1:], i, metadata, db)
)
return folder
elif part in ('date', 'day', 'month', 'year'):
date = self.get_date_taken(metadata)
# early morning photos can be grouped with previous day
date = self.check_for_early_morning_photos(date)
if date is not None:
return date.strftime(mask)
else:
return ''
elif part in ('location', 'city', 'state', 'country'):
place_name = geolocation.place_name(
metadata['latitude'],
metadata['longitude'],
db
)
location_parts = re.findall('(%[^%]+)', mask)
parsed_folder_name = self.parse_mask_for_location(
mask,
location_parts,
place_name,
)
return parsed_folder_name
elif part in ('folder'):
return os.path.basename(metadata['directory_path'])
elif part in ('album', 'camera_make', 'camera_model'):
if metadata[part]:
return metadata[part]
elif part.startswith('"') and part.endswith('"'):
# Fallback string
return part[1:-1]
return ''
def parse_mask_for_location(self, mask, location_parts, place_name):
"""Takes a mask for a location and interpolates the actual place names.
Given these parameters here are the outputs.
mask=%city
location_parts=[('%city','%city','city')]
place_name={'city': u'Sunnyvale'}
output=Sunnyvale
mask=%city-%state
location_parts=[('%city-','%city','city'), ('%state','%state','state')]
place_name={'city': u'Sunnyvale', 'state': u'California'}
output=Sunnyvale-California
mask=%country
location_parts=[('%country','%country','country')]
place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
output=Sunnyvale
:param str mask: The location mask in the form of %city-%state, etc
:param list location_parts: A list of tuples in the form of
[('%city-', '%city', 'city'), ('%state', '%state', 'state')]
:param dict place_name: A dictionary of place keywords and names like
{'default': u'California', 'state': u'California'}
:returns: str
"""
found = False
folder_name = mask
for loc_part in location_parts:
# We assume the search returns a tuple of length 2.
# If not then it's a bad mask in config.ini.
# loc_part = '%country-random'
# component_full = '%country-random'
# component = '%country'
# key = 'country
component_full, component, key = re.search(
'((%([a-z]+))[^%]*)',
loc_part
).groups()
if(key in place_name):
found = True
replace_target = component
replace_with = place_name[key]
else:
replace_target = component_full
replace_with = ''
folder_name = folder_name.replace(
replace_target,
replace_with,
)
if(not found and folder_name == ''):
folder_name = place_name['default']
return folder_name
def process_checksum(self, _file, db, allow_duplicate):
checksum = db.checksum(_file)
if(checksum is None):
log.info('Could not get checksum for %s.' % _file)
return None
# If duplicates are not allowed then we check if we've seen this file
# before via checksum. We also check that the file exists at the
# location we believe it to be.
# If we find a checksum match but the file doesn't exist where we
# believe it to be then we write a debug log and proceed to import.
checksum_file = db.get_hash(checksum)
# BUG: inconsistency if file removed manually without update db
if(allow_duplicate is False and checksum_file is not None):
if(os.path.isfile(checksum_file)):
log.info('%s already at %s.' % (
_file,
checksum_file
))
return None
else:
log.info('%s matched checksum but file not found at %s.' % ( # noqa
_file,
checksum_file
))
return checksum
def checksum(self, file_path, blocksize=65536):
"""Create a hash value for the given file.
@ -926,7 +461,7 @@ class FileSystem(object):
self.logger.info(f'File in source and destination are identical. Duplicate will be ignored.')
if(mode == 'move'):
if not dry_run:
shutil.remove(src_path)
os.remove(src_path)
self.logger.info(f'remove: {src_path}')
return True
else: # name is same, but file is different
@ -1001,7 +536,7 @@ class FileSystem(object):
subdirs = ''
for dirname, dirnames, filenames, level in self.walklevel(path,
self.max_deep):
if dirname == os.path.join(path, '.elodie'):
if dirname == os.path.join(path, '.dozo'):
continue
subdirs = os.path.join(subdirs, os.path.basename(dirname))
@ -1135,7 +670,7 @@ class FileSystem(object):
has_errors = False
path = self.check_path(path)
for dirname, dirnames, filenames, level in self.walklevel(path, None):
if dirname == os.path.join(path, '.elodie'):
if dirname == os.path.join(path, '.dozo'):
continue
if dirname.find('similar_to') == 0:
continue
@ -1198,7 +733,7 @@ class FileSystem(object):
has_errors = False
path = self.check_path(path)
for dirname, dirnames, filenames, level in self.walklevel(path, None):
if dirname == os.path.join(path, '.elodie'):
if dirname == os.path.join(path, '.dozo'):
continue
if dirname.find('similar_to') == 0:
continue
@ -1224,78 +759,6 @@ class FileSystem(object):
return self.summary, has_errors
def process_file(self, _file, destination, db, media, album_from_folder,
mode, **kwargs):
allow_duplicate = False
if('allowDuplicate' in kwargs):
allow_duplicate = kwargs['allowDuplicate']
stat_info_original = os.stat(_file)
metadata = media.get_metadata(album_from_folder)
if(not media.is_valid()):
print('%s is not a valid media file. Skipping...' % _file)
return
checksum = self.process_checksum(_file, db, allow_duplicate)
if(checksum is None):
log.info('Original checksum returned None for %s. Skipping...' %
_file)
return
# Run `before()` for every loaded plugin and if any of them raise an exception
# then we skip importing the file and log a message.
plugins_run_before_status = self.plugins.run_all_before(_file, destination)
if(plugins_run_before_status == False):
log.warn('At least one plugin pre-run failed for %s' % _file)
return
directory_name = self.get_folder_path(metadata, db)
dest_directory = os.path.join(destination, directory_name)
file_name = self.get_file_name(metadata)
dest_path = os.path.join(dest_directory, file_name)
# If source and destination are identical then
# we should not write the file. gh-210
# TODO Allow this for update?
if(_file == dest_path):
print('Final source and destination path should not be identical')
return
self.create_directory(dest_directory)
if(mode == 'move'):
stat = os.stat(_file)
# Move the processed file into the destination directory
shutil.move(_file, dest_path)
elif mode == 'copy':
shutil.copy2(_file, dest_path)
if mode != 'dry_run':
# Set the utime based on what the original file contained
# before we made any changes.
# Then set the utime on the destination file based on metadata.
date_taken = self.get_date_taken(metadata)
self.set_utime_from_metadata(date_taken, dest_path)
media.set_original_name(dest_path)
if album_from_folder:
media.set_album_from_folder(dest_path)
# get checksum of dest file
dest_checksum = db.checksum(dest_path)
db.add_hash(dest_checksum, dest_path)
db.update_hash_db()
# Run `after()` for every loaded plugin and if any of them raise an exception
# then we skip importing the file and log a message.
plugins_run_after_status = self.plugins.run_all_after(_file, destination, dest_path, metadata)
if(plugins_run_after_status == False):
log.warn('At least one plugin pre-run failed for %s' % _file)
return
return dest_path
def set_utime_from_metadata(self, date_taken, file_path):
""" Set the modification time on the file based on the file name.
@ -1303,8 +766,6 @@ class FileSystem(object):
# Initialize date taken to what's returned from the metadata function.
os.utime(file_path, (int(datetime.now().timestamp()), int(date_taken.timestamp())))
# If the folder and file name follow a time format of
# YYYY-MM-DD_HH-MM-SS-IMG_0001.JPG then we override the date_taken
def should_exclude(self, path, regex_list=set(), needs_compiled=False):

View File

@ -11,9 +11,8 @@ import urllib.error
import geopy
from geopy.geocoders import Nominatim
from elodie.config import load_config
from elodie import constants
from elodie import log
from dozo import constants
from dozo.config import load_config
__KEY__ = None
__DEFAULT_LOCATION__ = 'Unknown Location'
@ -147,7 +146,7 @@ def get_prefer_english_names():
__PREFER_ENGLISH_NAMES__ = bool(config['Geolocation']['prefer_english_names'])
return __PREFER_ENGLISH_NAMES__
def place_name(lat, lon, db):
def place_name(lat, lon, db, cache=True, logger=logging.getLogger()):
lookup_place_name_default = {'default': __DEFAULT_LOCATION__}
if(lat is None or lon is None):
return lookup_place_name_default
@ -160,7 +159,9 @@ def place_name(lat, lon, db):
# Try to get cached location first
# 3km distace radious for a match
cached_place_name = db.get_location_name(lat, lon, 3000)
cached_place_name = None
if cache:
cached_place_name = db.get_location_name(lat, lon, 3000)
# We check that it's a dict to coerce an upgrade of the location
# db from a string location to a dictionary. See gh-160.
if(isinstance(cached_place_name, dict)):
@ -197,7 +198,7 @@ def place_name(lat, lon, db):
return lookup_place_name
def lookup_osm(lat, lon):
def lookup_osm(lat, lon, logger=logging.getLogger()):
prefer_english_names = get_prefer_english_names()
from geopy.geocoders import Nominatim
@ -210,10 +211,10 @@ def lookup_osm(lat, lon):
lang='local'
return locator.reverse(coords, language=lang).raw
except geopy.exc.GeocoderUnavailable as e:
log.error(e)
logger.error(e)
return None
except ValueError as e:
log.error(e)
logger.error(e)
return None

View File

@ -1,6 +1,6 @@
"""
The audio module contains classes specifically for dealing with audio files.
The :class:`Audio` class inherits from the :class:`~elodie.media.Media`
The :class:`Audio` class inherits from the :class:`~dozo.media.Media`
class.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>

View File

@ -1,11 +1,7 @@
"""
The media module provides a base :class:`Media` class for media objects that
are tracked by Elodie. The Media class provides some base functionality used
by all the media types. Its sub-classes (:class:`~elodie.media.Audio`,
:class:`~elodie.media.Photo`, and :class:`~elodie.media.Video`)
are used to represent the actual files.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
Base :class:`Media` class for media objects that are tracked by Dozo.
The Media class provides some base functionality used by all the media types.
Sub-classes (:class:`~dozo.media.Audio`, :class:`~dozo.media.Photo`, and :class:`~dozo.media.Video`).
"""
import mimetypes
@ -14,18 +10,10 @@ import six
import logging
# load modules
from elodie import log
from dateutil.parser import parse
import re
from elodie.external.pyexiftool import ExifTool
# TODO remove
# try: # Py3k compatibility
# basestring
# except NameError:
# basestring = (bytes, str)
class Media():
"""The media class for all media objects.
@ -329,7 +317,7 @@ class Media():
#Cache exif metadata results and use if already exists for media
if(self.exif_metadata is None):
self.exif_metadata = ExifTool().get_metadata(source)
self.exif_metadata = ExifToolCaching(source, logger=self.logger).asdict()
for tag_regex in self.ignore_tags:
ignored_tags = set()
for tag in self.exif_metadata:
@ -589,6 +577,8 @@ class Media():
status = ''
status = ExifTool().set_tags(tags, path)
for tag, value in tags.items():
status = ExifToolCaching(path, self.logger).setvalue(tag, value)
if status.decode().find('unchanged') != -1 or status == '':
return False
if status.decode().find('error') != -1:

View File

@ -1,7 +1,7 @@
from tabulate import tabulate
class Result(object):
class Summary(object):
def __init__(self):
self.records = []
@ -25,7 +25,7 @@ class Result(object):
for id in self.error_items:
error_result.append([id])
print("****** ERROR DETAILS ******")
print('Errors details:')
print(tabulate(error_result, headers=error_headers))
print("\n")
@ -35,5 +35,6 @@ class Result(object):
["Error", self.error],
]
print("****** SUMMARY ******")
print(tabulate(result, headers=headers))
print()
print('Summary:')
print(tabulate(result, tablefmt="plain"))

545
elodie.py
View File

@ -1,545 +0,0 @@
#!/usr/bin/env python
import os
import re
import sys
import logging
from datetime import datetime
import click
from send2trash import send2trash
# Verify that external dependencies are present first, so the user gets a
# more user-friendly error instead of an ImportError traceback.
from elodie.dependencies import verify_dependencies
if not verify_dependencies():
sys.exit(1)
from elodie import constants
from elodie import geolocation
from elodie import log
from elodie.compatability import _decode
from elodie import config
from elodie.config import load_config
from elodie.filesystem import FileSystem
from elodie.gui import CompareImageApp
from elodie.localstorage import Db
from elodie.media.media import Media, get_all_subclasses
from elodie.media.audio import Audio
from elodie.media.photo import Photo
from elodie.media.video import Video
from elodie.plugins.plugins import Plugins
from elodie.result import Result
from elodie.summary import Summary
from elodie.external.pyexiftool import ExifTool
from elodie.dependencies import get_exiftool
from elodie import constants
FILESYSTEM = FileSystem()
def print_help(command):
click.echo(command.get_help(click.Context(sort)))
def import_file(_file, destination, db, album_from_folder, mode, trash, allow_duplicates):
"""Set file metadata and move it to destination.
"""
if not os.path.exists(_file):
log.warn('Could not find %s' % _file)
log.all('{"source":"%s", "error_msg":"Could not find %s"}' %
(_file, _file))
return
# Check if the source, _file, is a child folder within destination
elif destination.startswith(os.path.abspath(os.path.dirname(_file))+os.sep):
log.all('{"source": "%s", "destination": "%s", "error_msg": "Source cannot be in destination"}' % (
_file, destination))
return
media = Media.get_class_by_file(_file, get_all_subclasses())
if not media:
log.warn('Not a supported file (%s)' % _file)
log.all('{"source":"%s", "error_msg":"Not a supported file"}' % _file)
return
dest_path = FILESYSTEM.process_file(_file, destination, db,
media, album_from_folder, mode, allowDuplicate=allow_duplicates)
if dest_path:
log.all('%s -> %s' % (_file, dest_path))
if trash:
send2trash(_file)
return dest_path or None
@click.command('batch')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
def _batch(debug):
"""Run batch() for all plugins.
"""
constants.debug = debug
plugins = Plugins()
plugins.run_batch()
@click.command('import')
@click.option('--destination', type=click.Path(file_okay=False),
required=True, help='Copy imported files into this directory.')
@click.option('--source', type=click.Path(file_okay=False),
help='Import files from this directory, if specified.')
@click.option('--file', type=click.Path(dir_okay=False),
help='Import this file, if specified.')
@click.option('--album-from-folder', default=False, is_flag=True,
help="Use images' folders as their album names.")
@click.option('--trash', default=False, is_flag=True,
help='After copying files, move the old files to the trash.')
@click.option('--allow-duplicates', default=False, is_flag=True,
help='Import the file even if it\'s already been imported.')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@click.option('--dry-run', default=False, is_flag=True,
help='Dry run only, no change made to the filesystem.')
@click.option('--exclude-regex', default=set(), multiple=True,
help='Regular expression for directories or files to exclude.')
@click.argument('paths', nargs=-1, type=click.Path())
def _import(destination, source, file, album_from_folder, trash,
allow_duplicates, debug, dry_run, exclude_regex, paths):
"""Import files or directories by reading their EXIF and organizing them accordingly.
"""
if dry_run:
mode = 'dry_run'
else:
mode = 'copy'
constants.debug = debug
has_errors = False
result = Result()
destination = _decode(destination)
destination = os.path.abspath(os.path.expanduser(destination))
files = set()
paths = set(paths)
if source:
source = _decode(source)
paths.add(source)
if file:
paths.add(file)
# if no exclude list was passed in we check if there's a config
if len(exclude_regex) == 0:
config = load_config(constants.CONFIG_FILE)
if 'Exclusions' in config:
exclude_regex = [value for key, value in config.items('Exclusions')]
exclude_regex_list = set(exclude_regex)
for path in paths:
path = os.path.expanduser(path)
if os.path.isdir(path):
files.update(FILESYSTEM.get_all_files(path, False, exclude_regex_list))
else:
if not FILESYSTEM.should_exclude(path, exclude_regex_list, True):
files.add(path)
# Initialize Db
if os.path.exists(destination):
db = Db(destination)
for current_file in files:
dest_path = import_file(current_file, destination, db,
album_from_folder, mode, trash, allow_duplicates)
result.append((current_file, dest_path))
has_errors = has_errors is True or not dest_path
else:
result.append((destination, False))
has_errors = True
result.write()
if has_errors:
sys.exit(1)
@click.command('sort')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@click.option('--dry-run', default=False, is_flag=True,
help='Dry run only, no change made to the filesystem.')
@click.option('--destination', '-d', type=click.Path(file_okay=False),
default=None, help='Sort files into this directory.')
@click.option('--copy', '-c', default=False, is_flag=True,
help='True if you want files to be copied over from src_dir to\
dest_dir rather than moved')
@click.option('--exclude-regex', '-e', default=set(), multiple=True,
help='Regular expression for directories or files to exclude.')
@click.option('--filter-by-ext', '-f', default=set(), multiple=True, help='''Use filename
extension to filter files for sorting. If value is '*', use
common media file extension for filtering. Ignored files remain in
the same directory structure''' )
@click.option('--ignore-tags', '-i', default=set(), multiple=True,
help='Specific tags or group that will be ignored when\
searching for file data. Example \'File:FileModifyDate\' or \'Filename\'' )
@click.option('--max-deep', '-m', default=None,
help='Maximum level to proceed. Number from 0 to desired level.')
@click.option('--remove-duplicates', '-r', default=False, is_flag=True,
help='True to remove files that are exactly the same in name\
and a file hash')
@click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
@click.argument('paths', required=True, nargs=-1, type=click.Path())
def _sort(debug, dry_run, destination, copy, exclude_regex, filter_by_ext, ignore_tags,
max_deep, remove_duplicates, verbose, paths):
"""Sort files or directories by reading their EXIF and organizing them
according to config.ini preferences.
"""
if copy:
mode = 'copy'
else:
mode = 'move'
if debug:
constants.debug = logging.DEBUG
elif verbose:
constants.debug = logging.INFO
else:
constants.debug = logging.ERROR
if max_deep is not None:
max_deep = int(max_deep)
logger = logging.getLogger('elodie')
logger.setLevel(constants.debug)
if not destination and paths:
destination = paths[-1]
paths = paths[0:-1]
else:
sys.exit(1)
paths = set(paths)
destination = _decode(destination)
destination = os.path.abspath(os.path.expanduser(destination))
if not os.path.exists(destination):
logger.error(f'Directory {destination} does not exist')
conf = config.load_config(constants.CONFIG_FILE)
path_format = config.get_path_definition(conf)
# if no exclude list was passed in we check if there's a config
if len(exclude_regex) == 0:
if 'Exclusions' in conf:
exclude_regex = [value for key, value in conf.items('Exclusions')]
exclude_regex_list = set(exclude_regex)
# Initialize Db
db = Db(destination)
if 'Directory' in conf and 'day_begins' in conf['Directory']:
config_directory = conf['Directory']
day_begins = config_directory['day_begins']
else:
day_begins = 0
filesystem = FileSystem(day_begins, dry_run, exclude_regex_list,
filter_by_ext, logger, max_deep, mode, path_format)
summary, has_errors = filesystem.sort_files(paths, destination, db,
remove_duplicates, ignore_tags)
if verbose or debug:
summary.write()
if has_errors:
sys.exit(1)
@click.command('generate-db')
@click.option('--path', type=click.Path(file_okay=False),
required=True, help='Path of your photo library.')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
def _generate_db(path, debug):
"""Regenerate the hash.json database which contains all of the sha256 signatures of media files. The hash.json file is located at ~/.elodie/.
"""
constants.debug = debug
result = Result()
path = os.path.abspath(os.path.expanduser(path))
if not os.path.isdir(path):
log.error('path is not a valid directory %s' % path)
sys.exit(1)
db = Db(path)
db.backup_hash_db()
db.reset_hash_db()
for current_file in FILESYSTEM.get_all_files(path):
result.append((current_file, True))
db.add_hash(db.checksum(current_file), current_file)
log.progress()
db.update_hash_db()
log.progress('', True)
result.write()
@click.command('verify')
@click.option('--path', type=click.Path(file_okay=False),
required=True, help='Path of your photo library.')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
def _verify(path, debug):
constants.debug = debug
result = Result()
db = Db(path)
for checksum, file_path in db.all():
if not os.path.isfile(file_path):
result.append((file_path, False))
log.progress('x')
continue
actual_checksum = db.checksum(file_path)
if checksum == actual_checksum:
result.append((file_path, True))
log.progress()
else:
result.append((file_path, False))
log.progress('x')
log.progress('', True)
result.write()
def update_location(media, file_path, location_name, db):
"""Update location exif metadata of media.
"""
location_coords = geolocation.coordinates_by_name(location_name, db)
if location_coords and 'latitude' in location_coords and \
'longitude' in location_coords:
location_status = media.set_location(location_coords[
'latitude'], location_coords['longitude'], file_path)
if not location_status:
log.error('Failed to update location')
log.all(('{"source":"%s",' % file_path,
'"error_msg":"Failed to update location"}'))
sys.exit(1)
return True
def update_time(media, file_path, time_string):
"""Update time exif metadata of media.
"""
time_format = '%Y-%m-%d %H:%M:%S'
if re.match(r'^\d{4}-\d{2}-\d{2}$', time_string):
time_string = '%s 00:00:00' % time_string
elif re.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}\d{2}$', time_string):
msg = ('Invalid time format. Use YYYY-mm-dd hh:ii:ss or YYYY-mm-dd')
log.error(msg)
log.all('{"source":"%s", "error_msg":"%s"}' % (file_path, msg))
sys.exit(1)
time = datetime.strptime(time_string, time_format)
media.set_date_original(time, file_path)
return True
@click.command('update')
@click.option('--album', help='Update the image album.')
@click.option('--location', help=('Update the image location. Location '
'should be the name of a place, like "Las '
'Vegas, NV".'))
@click.option('--time', help=('Update the image time. Time should be in '
'YYYY-mm-dd hh:ii:ss or YYYY-mm-dd format.'))
@click.option('--title', help='Update the image title.')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@click.argument('paths', nargs=-1,
required=True)
def _update(album, location, time, title, paths, debug):
"""Update a file's EXIF. Automatically modifies the file's location and file name accordingly.
"""
constants.debug = debug
has_errors = False
result = Result()
files = set()
for path in paths:
path = os.path.expanduser(path)
if os.path.isdir(path):
files.update(FILESYSTEM.get_all_files(path, False))
else:
files.add(path)
for current_file in files:
if not os.path.exists(current_file):
has_errors = True
result.append((current_file, False))
log.warn('Could not find %s' % current_file)
log.all('{"source":"%s", "error_msg":"Could not find %s"}' %
(current_file, current_file))
continue
current_file = os.path.expanduser(current_file)
# The destination folder structure could contain any number of levels
# So we calculate that and traverse up the tree.
# '/path/to/file/photo.jpg' -> '/path/to/file' ->
# ['path','to','file'] -> ['path','to'] -> '/path/to'
current_directory = os.path.dirname(current_file)
destination_depth = -1 * len(FILESYSTEM.get_folder_path_definition())
destination = os.sep.join(
os.path.normpath(
current_directory
).split(os.sep)[:destination_depth]
)
# Initialize Db
db = Db(destination)
media = Media.get_class_by_file(current_file, get_all_subclasses())
if media is None:
continue
updated = False
if location:
update_location(media, current_file, location, db)
updated = True
if time:
update_time(media, current_file, time)
updated = True
if album:
media.set_album(album, current_file)
updated = True
# Updating a title can be problematic when doing it 2+ times on a file.
# You would end up with img_001.jpg -> img_001-first-title.jpg ->
# img_001-first-title-second-title.jpg.
# To resolve that we have to track the prior title (if there was one.
# Then we massage the updated_media's metadata['base_name'] to remove
# the old title.
# Since FileSystem.get_file_name() relies on base_name it will properly
# rename the file by updating the title instead of appending it.
remove_old_title_from_name = False
if title:
# We call get_metadata() to cache it before making any changes
metadata = media.get_metadata()
title_update_status = media.set_title(title)
original_title = metadata['title']
if title_update_status and original_title:
# @TODO: We should move this to a shared method since
# FileSystem.get_file_name() does it too.
original_title = re.sub(r'\W+', '-', original_title.lower())
original_base_name = metadata['base_name']
remove_old_title_from_name = True
updated = True
if updated:
updated_media = Media.get_class_by_file(current_file,
get_all_subclasses())
# See comments above on why we have to do this when titles
# get updated.
if remove_old_title_from_name and len(original_title) > 0:
updated_media.get_metadata()
updated_media.set_metadata_basename(
original_base_name.replace('-%s' % original_title, ''))
dest_path = FILESYSTEM.process_file(current_file, destination, db,
updated_media, False, mode='move', allowDuplicate=True)
log.info(u'%s -> %s' % (current_file, dest_path))
log.all('{"source":"%s", "destination":"%s"}' % (current_file,
dest_path))
# If the folder we moved the file out of or its parent are empty
# we delete it.
FILESYSTEM.delete_directory_if_empty(os.path.dirname(current_file))
FILESYSTEM.delete_directory_if_empty(
os.path.dirname(os.path.dirname(current_file)))
result.append((current_file, dest_path))
# Trip has_errors to False if it's already False or dest_path is.
has_errors = has_errors is True or not dest_path
else:
has_errors = False
result.append((current_file, False))
result.write()
if has_errors:
sys.exit(1)
@click.command('compare')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@click.option('--dry-run', default=False, is_flag=True,
help='Dry run only, no change made to the filesystem.')
@click.option('--find-duplicates', '-f', default=False, is_flag=True)
@click.option('--output-dir', '-o', default=False, is_flag=True, help='output\
dir')
@click.option('--remove-duplicates', '-r', default=False, is_flag=True)
@click.option('--revert-compare', '-R', default=False, is_flag=True, help='Revert\
compare')
@click.option('--similar-to', '-s', default=False, help='Similar to given\
image')
@click.option('--similarity', '-S', default=80, help='Similarity level for\
images')
@click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
@click.argument('path', nargs=1, required=True)
def _compare(debug, dry_run, find_duplicates, output_dir, remove_duplicates,
revert_compare, similar_to, similarity, verbose, path):
'''Compare files in directories'''
logger = logging.getLogger('elodie')
if debug:
logger.setLevel(logging.DEBUG)
elif verbose:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.ERROR)
# Initialize Db
db = Db(path)
filesystem = FileSystem(mode='move', dry_run=dry_run, logger=logger)
if revert_compare:
summary, has_errors = filesystem.revert_compare(path, db, dry_run)
else:
summary, has_errors = filesystem.sort_similar_images(path, db,
similarity, dry_run)
if verbose or debug:
summary.write()
if has_errors:
sys.exit(1)
@click.group()
def main():
pass
main.add_command(_compare)
main.add_command(_import)
main.add_command(_sort)
main.add_command(_update)
main.add_command(_generate_db)
main.add_command(_verify)
main.add_command(_batch)
if __name__ == '__main__':
#Initialize ExifTool Subprocess
exiftool_addedargs = [
u'-config',
u'"{}"'.format(constants.exiftool_config)
]
with ExifTool(executable_=get_exiftool(), addedargs=exiftool_addedargs) as et:
main()

View File

@ -1,32 +0,0 @@
import os
import shutil
import sys
from elodie import constants
def _decode(string, encoding=sys.getfilesystemencoding()):
"""Return a utf8 encoded unicode string.
Python2 and Python3 differ in how they handle strings.
So we do a few checks to see if the string is ascii or unicode.
Then we decode it if needed.
"""
if hasattr(string, 'decode'):
# If the string is already unicode we return it.
try:
if isinstance(string, unicode):
return string
except NameError:
pass
return string.decode(encoding)
return string
def _bytes(string):
if constants.python_version == 3:
return bytes(string, 'utf8')
else:
return bytes(string)

View File

@ -1,49 +0,0 @@
"""
Helpers for checking for an interacting with external dependencies. These are
things that Elodie requires, but aren't installed automatically for the user.
"""
import os
import sys
from distutils.spawn import find_executable
#: Error to print when exiftool can't be found.
EXIFTOOL_ERROR = u"""
It looks like you don't have exiftool installed, which Elodie requires.
Please take a look at the installation steps in the readme:
https://github.com/jmathai/elodie#install-everything-you-need
""".lstrip()
def get_exiftool():
"""Get path to executable exiftool binary.
We wrap this since we call it in a few places and we do a fallback.
:returns: str or None
"""
path = find_executable('exiftool')
# If exiftool wasn't found we try to brute force the homebrew location
if path is None:
path = '/usr/local/bin/exiftool'
if not os.path.isfile(path) or not os.access(path, os.X_OK):
return None
return path
def verify_dependencies():
"""Verify that external dependencies are installed.
Prints a message to stderr and returns False if any dependencies are
missing.
:returns: bool
"""
exiftool = get_exiftool()
if exiftool is None:
print(EXIFTOOL_ERROR, file=sys.stderr)
return False
return True

View File

@ -1,485 +0,0 @@
# -*- coding: utf-8 -*-
# PyExifTool <http://github.com/smarnach/pyexiftool>
# Copyright 2012 Sven Marnach. Enhancements by Leo Broska
# This file is part of PyExifTool.
#
# PyExifTool is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyExifTool is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyExifTool. If not, see <http://www.gnu.org/licenses/>.
"""
PyExifTool is a Python library to communicate with an instance of Phil
Harvey's excellent ExifTool_ command-line application. The library
provides the class :py:class:`ExifTool` that runs the command-line
tool in batch mode and features methods to send commands to that
program, including methods to extract meta-information from one or
more image files. Since ``exiftool`` is run in batch mode, only a
single instance needs to be launched and can be reused for many
queries. This is much more efficient than launching a separate
process for every single query.
.. _ExifTool: http://www.sno.phy.queensu.ca/~phil/exiftool/
The source code can be checked out from the github repository with
::
git clone git://github.com/smarnach/pyexiftool.git
Alternatively, you can download a tarball_. There haven't been any
releases yet.
.. _tarball: https://github.com/smarnach/pyexiftool/tarball/master
PyExifTool is licenced under GNU GPL version 3 or later.
Example usage::
import exiftool
files = ["a.jpg", "b.png", "c.tif"]
with exiftool.ExifTool() as et:
metadata = et.get_metadata_batch(files)
for d in metadata:
print("{:20.20} {:20.20}".format(d["SourceFile"],
d["EXIF:DateTimeOriginal"]))
"""
import sys
import subprocess
import os
import json
import warnings
import logging
import codecs
try: # Py3k compatibility
basestring
except NameError:
basestring = (bytes, str)
executable = "exiftool"
"""The name of the executable to run.
If the executable is not located in one of the paths listed in the
``PATH`` environment variable, the full path should be given here.
"""
# Sentinel indicating the end of the output of a sequence of commands.
# The standard value should be fine.
sentinel = b"{ready}"
# The block size when reading from exiftool. The standard value
# should be fine, though other values might give better performance in
# some cases.
block_size = 4096
# constants related to keywords manipulations
KW_TAGNAME = "IPTC:Keywords"
KW_REPLACE, KW_ADD, KW_REMOVE = range(3)
# This code has been adapted from Lib/os.py in the Python source tree
# (sha1 265e36e277f3)
def _fscodec():
encoding = sys.getfilesystemencoding()
errors = "strict"
if encoding != "mbcs":
try:
codecs.lookup_error("surrogateescape")
except LookupError:
pass
else:
errors = "surrogateescape"
def fsencode(filename):
"""
Encode filename to the filesystem encoding with 'surrogateescape' error
handler, return bytes unchanged. On Windows, use 'strict' error handler if
the file system encoding is 'mbcs' (which is the default encoding).
"""
if isinstance(filename, bytes):
return filename
else:
return filename.encode(encoding, errors)
return fsencode
fsencode = _fscodec()
del _fscodec
#string helper
def strip_nl (s):
return ' '.join(s.splitlines())
# Error checking function
# Note: They are quite fragile, beacsue teh just parse the output text from exiftool
def check_ok (result):
"""Evaluates the output from a exiftool write operation (e.g. `set_tags`)
The argument is the result from the execute method.
The result is True or False.
"""
return not result is None and (not "due to errors" in result)
def format_error (result):
"""Evaluates the output from a exiftool write operation (e.g. `set_tags`)
The argument is the result from the execute method.
The result is a human readable one-line string.
"""
if check_ok (result):
return 'exiftool finished probably properly. ("%s")' % strip_nl(result)
else:
if result is None:
return "exiftool operation can't be evaluated: No result given"
else:
return 'exiftool finished with error: "%s"' % strip_nl(result)
class Singleton(type):
"""Metaclass to use the singleton [anti-]pattern"""
instance = None
def __call__(cls, *args, **kwargs):
if cls.instance is None:
cls.instance = super(Singleton, cls).__call__(*args, **kwargs)
return cls.instance
class ExifTool(object, metaclass=Singleton):
"""Run the `exiftool` command-line tool and communicate to it.
You can pass two arguments to the constructor:
- ``addedargs`` (list of strings): contains additional paramaters for
the stay-open instance of exiftool
- ``executable`` (string): file name of the ``exiftool`` executable.
The default value ``exiftool`` will only work if the executable
is in your ``PATH``
Most methods of this class are only available after calling
:py:meth:`start()`, which will actually launch the subprocess. To
avoid leaving the subprocess running, make sure to call
:py:meth:`terminate()` method when finished using the instance.
This method will also be implicitly called when the instance is
garbage collected, but there are circumstance when this won't ever
happen, so you should not rely on the implicit process
termination. Subprocesses won't be automatically terminated if
the parent process exits, so a leaked subprocess will stay around
until manually killed.
A convenient way to make sure that the subprocess is terminated is
to use the :py:class:`ExifTool` instance as a context manager::
with ExifTool() as et:
...
.. warning:: Note that there is no error handling. Nonsensical
options will be silently ignored by exiftool, so there's not
much that can be done in that regard. You should avoid passing
non-existent files to any of the methods, since this will lead
to undefied behaviour.
.. py:attribute:: running
A Boolean value indicating whether this instance is currently
associated with a running subprocess.
"""
def __init__(self, executable_=None, addedargs=None):
if executable_ is None:
self.executable = executable
else:
self.executable = executable_
if addedargs is None:
self.addedargs = []
elif type(addedargs) is list:
self.addedargs = addedargs
else:
raise TypeError("addedargs not a list of strings")
self.running = False
def start(self):
"""Start an ``exiftool`` process in batch mode for this instance.
This method will issue a ``UserWarning`` if the subprocess is
already running. The process is started with the ``-G`` and
``-n`` as common arguments, which are automatically included
in every command you run with :py:meth:`execute()`.
"""
if self.running:
warnings.warn("ExifTool already running; doing nothing.")
return
with open(os.devnull, "w") as devnull:
procargs = [self.executable, "-stay_open", "True", "-@", "-",
"-common_args", "-G", "-n"];
procargs.extend(self.addedargs)
logging.debug(procargs)
self._process = subprocess.Popen(
procargs,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=devnull)
self.running = True
def terminate(self):
"""Terminate the ``exiftool`` process of this instance.
If the subprocess isn't running, this method will do nothing.
"""
if not self.running:
return
self._process.stdin.write(b"-stay_open\nFalse\n")
self._process.stdin.flush()
self._process.communicate()
del self._process
self.running = False
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.terminate()
def __del__(self):
self.terminate()
def execute(self, *params):
"""Execute the given batch of parameters with ``exiftool``.
This method accepts any number of parameters and sends them to
the attached ``exiftool`` process. The process must be
running, otherwise ``ValueError`` is raised. The final
``-execute`` necessary to actually run the batch is appended
automatically; see the documentation of :py:meth:`start()` for
the common options. The ``exiftool`` output is read up to the
end-of-output sentinel and returned as a raw ``bytes`` object,
excluding the sentinel.
The parameters must also be raw ``bytes``, in whatever
encoding exiftool accepts. For filenames, this should be the
system's filesystem encoding.
.. note:: This is considered a low-level method, and should
rarely be needed by application developers.
"""
if not self.running:
raise ValueError("ExifTool instance not running.")
self._process.stdin.write(b"\n".join(params + (b"-execute\n",)))
self._process.stdin.flush()
output = b""
fd = self._process.stdout.fileno()
while not output[-32:].strip().endswith(sentinel):
output += os.read(fd, block_size)
return output.strip()[:-len(sentinel)]
def execute_json(self, *params):
"""Execute the given batch of parameters and parse the JSON output.
This method is similar to :py:meth:`execute()`. It
automatically adds the parameter ``-j`` to request JSON output
from ``exiftool`` and parses the output. The return value is
a list of dictionaries, mapping tag names to the corresponding
values. All keys are Unicode strings with the tag names
including the ExifTool group name in the format <group>:<tag>.
The values can have multiple types. All strings occurring as
values will be Unicode strings. Each dictionary contains the
name of the file it corresponds to in the key ``"SourceFile"``.
The parameters to this function must be either raw strings
(type ``str`` in Python 2.x, type ``bytes`` in Python 3.x) or
Unicode strings (type ``unicode`` in Python 2.x, type ``str``
in Python 3.x). Unicode strings will be encoded using
system's filesystem encoding. This behaviour means you can
pass in filenames according to the convention of the
respective Python version as raw strings in Python 2.x and
as Unicode strings in Python 3.x.
"""
params = map(fsencode, params)
# Some latin bytes won't decode to utf-8.
# Try utf-8 and fallback to latin.
# http://stackoverflow.com/a/5552623/1318758
# https://github.com/jmathai/elodie/issues/127
try:
return json.loads(self.execute(b"-j", *params).decode("utf-8"))
except UnicodeDecodeError as e:
return json.loads(self.execute(b"-j", *params).decode("latin-1"))
def get_metadata_batch(self, filenames):
"""Return all meta-data for the given files.
The return value will have the format described in the
documentation of :py:meth:`execute_json()`.
"""
return self.execute_json(*filenames)
def get_metadata(self, filename):
"""Return meta-data for a single file.
The returned dictionary has the format described in the
documentation of :py:meth:`execute_json()`.
"""
return self.execute_json(filename)[0]
def get_tags_batch(self, tags, filenames):
"""Return only specified tags for the given files.
The first argument is an iterable of tags. The tag names may
include group names, as usual in the format <group>:<tag>.
The second argument is an iterable of file names.
The format of the return value is the same as for
:py:meth:`execute_json()`.
"""
# Explicitly ruling out strings here because passing in a
# string would lead to strange and hard-to-find errors
if isinstance(tags, basestring):
raise TypeError("The argument 'tags' must be "
"an iterable of strings")
if isinstance(filenames, basestring):
raise TypeError("The argument 'filenames' must be "
"an iterable of strings")
params = ["-" + t for t in tags]
params.extend(filenames)
return self.execute_json(*params)
def get_tags(self, tags, filename):
"""Return only specified tags for a single file.
The returned dictionary has the format described in the
documentation of :py:meth:`execute_json()`.
"""
return self.get_tags_batch(tags, [filename])[0]
def get_tag_batch(self, tag, filenames):
"""Extract a single tag from the given files.
The first argument is a single tag name, as usual in the
format <group>:<tag>.
The second argument is an iterable of file names.
The return value is a list of tag values or ``None`` for
non-existent tags, in the same order as ``filenames``.
"""
data = self.get_tags_batch([tag], filenames)
result = []
for d in data:
d.pop("SourceFile")
result.append(next(iter(d.values()), None))
return result
def get_tag(self, tag, filename):
"""Extract a single tag from a single file.
The return value is the value of the specified tag, or
``None`` if this tag was not found in the file.
"""
return self.get_tag_batch(tag, [filename])[0]
def set_tags_batch(self, tags, filenames, overwrite=True):
"""Writes the values of the specified tags for the given files.
The first argument is a dictionary of tags and values. The tag names may
include group names, as usual in the format <group>:<tag>.
The second argument is an iterable of file names.
The format of the return value is the same as for
:py:meth:`execute()`.
It can be passed into `check_ok()` and `format_error()`.
"""
# Explicitly ruling out strings here because passing in a
# string would lead to strange and hard-to-find errors
if isinstance(tags, basestring):
raise TypeError("The argument 'tags' must be dictionary "
"of strings")
if isinstance(filenames, basestring):
raise TypeError("The argument 'filenames' must be "
"an iterable of strings")
params = []
params_utf8 = []
for tag, value in tags.items():
params.append(u'-%s=%s' % (tag, value))
if overwrite:
params.append('-overwrite_original')
params.extend(filenames)
params_utf8 = [x.encode('utf-8') for x in params]
return self.execute(*params_utf8)
def set_tags(self, tags, filename):
"""Writes the values of the specified tags for the given file.
This is a convenience function derived from `set_tags_batch()`.
Only difference is that it takes as last arugemnt only one file name
as a string.
"""
return self.set_tags_batch(tags, [filename])
def set_keywords_batch(self, mode, keywords, filenames):
"""Modifies the keywords tag for the given files.
The first argument is the operation mode:
KW_REPLACE: Replace (i.e. set) the full keywords tag with `keywords`.
KW_ADD: Add `keywords` to the keywords tag.
If a keyword is present, just keep it.
KW_REMOVE: Remove `keywords` from the keywords tag.
If a keyword wasn't present, just leave it.
The second argument is an iterable of key words.
The third argument is an iterable of file names.
The format of the return value is the same as for
:py:meth:`execute()`.
It can be passed into `check_ok()` and `format_error()`.
"""
# Explicitly ruling out strings here because passing in a
# string would lead to strange and hard-to-find errors
if isinstance(keywords, basestring):
raise TypeError("The argument 'keywords' must be "
"an iterable of strings")
if isinstance(filenames, basestring):
raise TypeError("The argument 'filenames' must be "
"an iterable of strings")
params = []
kw_operation = {KW_REPLACE:"-%s=%s",
KW_ADD:"-%s+=%s",
KW_REMOVE:"-%s-=%s"}[mode]
kw_params = [ kw_operation % (KW_TAGNAME, w) for w in keywords ]
params.extend(kw_params)
params.extend(filenames)
logging.debug (params)
return self.execute(*params)
def set_keywords(self, mode, keywords, filename):
"""Modifies the keywords tag for the given file.
This is a convenience function derived from `set_keywords_batch()`.
Only difference is that it takes as last argument only one file name
as a string.
"""
return self.set_keywords_batch(mode, keywords, [filename])

View File

@ -1,65 +0,0 @@
"""
General file system methods.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
import sys
from json import dumps
from elodie import constants
def all(message):
_print(message)
def info(message):
_print_debug(message)
def info_json(payload):
_print_debug(dumps(payload))
def progress(message='.', new_line=False):
if not new_line:
print(message, end="")
else:
print(message)
def warn(message):
_print_debug(message)
def warn_json(payload):
_print_debug(dumps(payload))
def error(message):
_print_debug(message)
def error_json(payload):
_print_debug(dumps(payload))
def _print_debug(string):
# Print if debug == True or if running with nosetests
# Commenting out because this causes failures in other tests
# which verify that output is correct.
# Use the line below if you want output printed during tests.
# if(constants.debug is True or 'nose' in sys.modules.keys()):
if(constants.debug is True):
_print(string)
def _print(s):
try:
print(s)
except UnicodeEncodeError:
for c in s:
try:
print(c, end='')
except UnicodeEncodeError:
print('?', end='')

View File

@ -1,19 +0,0 @@
"""
Dummy plugin object used for tests.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
from elodie.plugins.plugins import PluginBase
class Dummy(PluginBase):
__name__ = 'Dummy'
"""A dummy class to execute plugin actions for tests."""
def __init__(self):
self.before_ran = False
def before(self, file_path, destination_folder):
self.before_ran = True

View File

@ -1,63 +0,0 @@
# Google Photos Plugin for Elodie
[![Build Status](https://travis-ci.org/jmathai/elodie.svg?branch=master)](https://travis-ci.org/jmathai/elodie) [![Coverage Status](https://coveralls.io/repos/github/jmathai/elodie/badge.svg?branch=master)](https://coveralls.io/github/jmathai/elodie?branch=master) [![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/jmathai/elodie/badges/quality-score.png?b=master)](https://scrutinizer-ci.com/g/jmathai/elodie/?branch=master)
This plugin uploads all photos imported using Elodie to Google Photos. It was created after [Google Photos and Google Drive synchronization was deprecated](https://www.blog.google/products/photos/simplifying-google-photos-and-google-drive/). It aims to replicate my [workflow using Google Photos, Google Drive and Elodie](https://artplusmarketing.com/one-year-of-using-an-automated-photo-organization-and-archiving-workflow-89cf9ad7bddf).
I didn't intend on it, but it turned out that with this plugin you can use Google Photos with Google Drive, iCloud Drive, Dropbox or no cloud storage service while still using Google Photos for viewing and experiencing your photo library.
The hardest part of using this plugin is setting it up. Let's get started.
# Installation and Setup
## Google Photos
Let's start by making sure you have a Google Photos account. If you don't, you should start by [creating your Google Photos account](https://photos.google.com/login).
## Google APIs
Once you've got your Google Photos account created we can enable Google Photos' APIs for your account.
In order to enable Google APIs you need what's called a project. Don't worry about what it is, just create one so you can enable the Google Photos API for it.
1. Go to [Google's developer console](https://console.developers.google.com).
2. If you have a project already then you can skip this step.
If you don't already have a project or would like to create one just for this purpose then you should create it now. In the top bar there's a **project selector** which will open a dialog with a button to create a new project.
3. Now you'll need to [enable the Google Photos API for your project](https://console.developers.google.com/apis/library/photoslibrary.googleapis.com). You should be able to follow that link and click the **Enable API** button. Make sure the project from the prior step is selected.
4. Once you've enabled the Google Photos API you will need to [create an OAuth client ID](https://console.developers.google.com/apis/credentials).
1. Select **other** as the type of client.
2. Set up a consent screen if needed. Only you'll be seeing this so put whatever you want into the required fields. Most everything can be left blank.
3. Download the credentials when prompted or click the download icon on the [credentials page](https://console.developers.google.com/apis/credentials).
## Configure the Google Photos Plugin for Elodie
Now that you're set up with your Google Photos account, have enabled the APIs and configured your OAuth client we're ready to enable this plugin for Elodie.
1. Move the credentials file you downloaded to a permanent location and update your `config.ini` file. You'll need to add a `[Plugins]` section.
[Plugins]
plugins=GooglePhotos
[PluginGooglePhotos]
secrets_file=/full/path/to/saved/secrets_file.json
auth_file=/full/path/to/save/auth_file.json
I put `secrets_file.json` (the one you downloaded) in my `~/.elodie` directory. `auth_file.json` will be automatically created so make sure the path is writable by the user running `./elodie.py`.
2. If you did everything exactly correct you should be able to authenticate Elodie to start uploading to Google Photos.
1. Start by importing a new photo by running `./elodie.py import`.
2. Run `./elodie.py batch` which should open your browser.
3. Login and tell Google Photos to allow Elodie the requested permissions to your Google Photos account.
4. At some point you'll likely see a scary warning screen. This is because your OAuth client is not approved but go ahead and click on **Advanced** and **Go to {Your OAuth client name (unsafe)**.
5. Return to your terminal and close your browser tab if you'd like.
Assuming you did not see any errors you can go back to your browser and load up Google Photos. If your photos show up in Google Photos then you got everything to work *a lot* easier than I did.
## Automating It All
I'm not going to go into how you can automate this process but much of it is covered by various blog posts I've done in the past.
* [Understanding My Need for an Automated Photo Workflow](https://medium.com/vantage/understanding-my-need-for-an-automated-photo-workflow-a2ff95b46f8f#.dmwyjlc57)
* [Introducing Elodie; Your Personal EXIF-based Photo and Video Assistant](https://medium.com/@jmathai/introducing-elodie-your-personal-exif-based-photo-and-video-assistant-d92868f302ec)
* [My Automated Photo Workflow using Google Photos and Elodie](https://medium.com/swlh/my-automated-photo-workflow-using-google-photos-and-elodie-afb753b8c724)
* [One Year of Using an Automated Photo Organization and Archiving Workflow](https://artplusmarketing.com/one-year-of-using-an-automated-photo-organization-and-archiving-workflow-89cf9ad7bddf)
## Credits
Elodie is an open source project with many [contributors](https://github.com/jmathai/elodie/graphs/contributors) and [users](https://github.com/jmathai/elodie/stargazers) who have reported lots of [bugs and feature requests](https://github.com/jmathai/elodie/issues?utf8=%E2%9C%93&q=).
Google Photos is an amazing product. Kudos to the team for making it so magical.

View File

@ -1,154 +0,0 @@
"""
Google Photos plugin object.
This plugin will queue imported photos into the plugin's database file.
Using this plugin should have no impact on performance of importing photos.
In order to upload the photos to Google Photos you need to run the following command.
```
./elodie.py batch
```
That command will execute the batch() method on all plugins, including this one.
This plugin's batch() function reads all files from the database file and attempts to
upload them to Google Photos.
This plugin does not aim to keep Google Photos in sync.
Once a photo is uploaded it's removed from the database and no records are kept thereafter.
Upload code adapted from https://github.com/eshmu/gphotos-upload
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
import json
from os.path import basename, isfile
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import AuthorizedSession
from google.oauth2.credentials import Credentials
from elodie.media.photo import Photo
from elodie.media.video import Video
from elodie.plugins.plugins import PluginBase
class GooglePhotos(PluginBase):
"""A class to execute plugin actions.
Requires a config file with the following configurations set.
secrets_file:
The full file path where to find the downloaded secrets.
auth_file:
The full file path where to store authenticated tokens.
"""
__name__ = 'GooglePhotos'
def __init__(self):
super(GooglePhotos, self).__init__()
self.upload_url = 'https://photoslibrary.googleapis.com/v1/uploads'
self.media_create_url = 'https://photoslibrary.googleapis.com/v1/mediaItems:batchCreate'
self.scopes = ['https://www.googleapis.com/auth/photoslibrary.appendonly']
self.secrets_file = None
if('secrets_file' in self.config_for_plugin):
self.secrets_file = self.config_for_plugin['secrets_file']
# 'client_id.json'
self.auth_file = None
if('auth_file' in self.config_for_plugin):
self.auth_file = self.config_for_plugin['auth_file']
self.session = None
def after(self, file_path, destination_folder, final_file_path, metadata):
extension = metadata['ext']
if(extension in Photo.extensions or extension in Video.extensions):
self.log(u'Added {} to db.'.format(final_file_path))
self.db.set(final_file_path, metadata['original_name'])
else:
self.log(u'Skipping {} which is not a supported media type.'.format(final_file_path))
def batch(self):
queue = self.db.get_all()
status = True
count = 0
for key in queue:
this_status = self.upload(key)
if(this_status):
# Remove from queue if successful then increment count
self.db.delete(key)
count = count + 1
self.display('{} uploaded successfully.'.format(key))
else:
status = False
self.display('{} failed to upload.'.format(key))
return (status, count)
def before(self, file_path, destination_folder):
pass
def set_session(self):
# Try to load credentials from an auth file.
# If it doesn't exist or is not valid then catch the
# exception and reauthenticate.
try:
creds = Credentials.from_authorized_user_file(self.auth_file, self.scopes)
except:
try:
flow = InstalledAppFlow.from_client_secrets_file(self.secrets_file, self.scopes)
creds = flow.run_local_server()
cred_dict = {
'token': creds.token,
'refresh_token': creds.refresh_token,
'id_token': creds.id_token,
'scopes': creds.scopes,
'token_uri': creds.token_uri,
'client_id': creds.client_id,
'client_secret': creds.client_secret
}
# Store the returned authentication tokens to the auth_file.
with open(self.auth_file, 'w') as f:
f.write(json.dumps(cred_dict))
except:
return
self.session = AuthorizedSession(creds)
self.session.headers["Content-type"] = "application/octet-stream"
self.session.headers["X-Goog-Upload-Protocol"] = "raw"
def upload(self, path_to_photo):
self.set_session()
if(self.session is None):
self.log('Could not initialize session')
return None
self.session.headers["X-Goog-Upload-File-Name"] = basename(path_to_photo)
if(not isfile(path_to_photo)):
self.log('Could not find file: {}'.format(path_to_photo))
return None
with open(path_to_photo, 'rb') as f:
photo_bytes = f.read()
upload_token = self.session.post(self.upload_url, photo_bytes)
if(upload_token.status_code != 200 or not upload_token.content):
self.log('Uploading media failed: ({}) {}'.format(upload_token.status_code, upload_token.content))
return None
create_body = json.dumps({'newMediaItems':[{'description':'','simpleMediaItem':{'uploadToken':upload_token.content.decode()}}]}, indent=4)
resp = self.session.post(self.media_create_url, create_body).json()
if(
'newMediaItemResults' not in resp or
'status' not in resp['newMediaItemResults'][0] or
'message' not in resp['newMediaItemResults'][0]['status'] or
(
resp['newMediaItemResults'][0]['status']['message'] != 'Success' and # photos
resp['newMediaItemResults'][0]['status']['message'] != 'OK' # videos
)
):
self.log('Creating new media item failed: {}'.format(json.dumps(resp)))
return None
return resp['newMediaItemResults'][0]

View File

@ -1,3 +0,0 @@
google-api-python-client==1.7.9
google-auth-oauthlib==0.4.0
oauth2client==4.1.3

View File

@ -1,219 +0,0 @@
"""
Plugin object.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
from builtins import object
import io
from json import dumps, loads
from importlib import import_module
from os.path import dirname, dirname, isdir, isfile
from os import mkdir
from sys import exc_info
from traceback import format_exc
from elodie.compatability import _bytes
from elodie.config import load_config_for_plugin, load_plugin_config
from elodie import constants as c
from elodie import log
class ElodiePluginError(Exception):
"""Exception which can be thrown by plugins to return failures.
"""
pass
class PluginBase(object):
"""Base class which all plugins should inherit from.
Defines stubs for all methods and exposes logging and database functionality
"""
__name__ = 'PluginBase'
def __init__(self):
# Loads the config for the plugin from config.ini
self.config_for_plugin = load_config_for_plugin(self.__name__, c.CONFIG_FILE)
self.db = PluginDb(self.__name__)
def after(self, file_path, destination_folder, final_file_path, metadata):
pass
def batch(self):
pass
def before(self, file_path, destination_folder):
pass
def log(self, msg):
# Writes an info log not shown unless being run in --debug mode.
log.info(dumps(
{self.__name__: msg}
))
def display(self, msg):
# Writes a log for all modes and will be displayed.
log.all(dumps(
{self.__name__: msg}
))
class PluginDb(object):
"""A database module which provides a simple key/value database.
The database is a JSON file located at %application_directory%/plugins/%pluginname.lower()%.json
"""
def __init__(self, plugin_name):
self.db_file = '{}/plugins/{}.json'.format(
c.application_directory,
plugin_name.lower()
)
# If the plugin db directory does not exist, create it
if(not isdir(dirname(self.db_file))):
mkdir(dirname(self.db_file))
# If the db file does not exist we initialize it
if(not isfile(self.db_file)):
with io.open(self.db_file, 'wb') as f:
f.write(_bytes(dumps({})))
def get(self, key):
with io.open(self.db_file, 'r') as f:
db = loads(f.read())
if(key not in db):
return None
return db[key]
def set(self, key, value):
with io.open(self.db_file, 'r') as f:
data = f.read()
db = loads(data)
db[key] = value
new_content = dumps(db, ensure_ascii=False).encode('utf8')
with io.open(self.db_file, 'wb') as f:
f.write(new_content)
def get_all(self):
with io.open(self.db_file, 'r') as f:
db = loads(f.read())
return db
def delete(self, key):
with io.open(self.db_file, 'r') as f:
db = loads(f.read())
# delete key without throwing an exception
db.pop(key, None)
new_content = dumps(db, ensure_ascii=False).encode('utf8')
with io.open(self.db_file, 'wb') as f:
f.write(new_content)
class Plugins(object):
"""Plugin object which manages all interaction with plugins.
Exposes methods to load plugins and execute their methods.
"""
def __init__(self):
self.plugins = []
self.classes = {}
self.loaded = False
def load(self):
"""Load plugins from config file.
"""
# If plugins have been loaded then return
if self.loaded == True:
return
plugin_list = load_plugin_config(c.CONFIG_FILE)
for plugin in plugin_list:
plugin_lower = plugin.lower()
try:
# We attempt to do the following.
# 1. Load the module of the plugin.
# 2. Instantiate an object of the plugin's class.
# 3. Add the plugin to the list of plugins.
#
# #3 should only happen if #2 doesn't throw an error
this_module = import_module('elodie.plugins.{}.{}'.format(plugin_lower, plugin_lower))
self.classes[plugin] = getattr(this_module, plugin)()
# We only append to self.plugins if we're able to load the class
self.plugins.append(plugin)
except:
log.error('An error occurred initiating plugin {}'.format(plugin))
log.error(format_exc())
self.loaded = True
def run_all_after(self, file_path, destination_folder, final_file_path, metadata):
"""Process `before` methods of each plugin that was loaded.
"""
self.load()
pass_status = True
for cls in self.classes:
this_method = getattr(self.classes[cls], 'after')
# We try to call the plugin's `before()` method.
# If the method explicitly raises an ElodiePluginError we'll fail the import
# by setting pass_status to False.
# If any other error occurs we log the message and proceed as usual.
# By default, plugins don't change behavior.
try:
this_method(file_path, destination_folder, final_file_path, metadata)
log.info('Called after() for {}'.format(cls))
except ElodiePluginError as err:
log.warn('Plugin {} raised an exception in run_all_before: {}'.format(cls, err))
log.error(format_exc())
log.error('false')
pass_status = False
except:
log.error(format_exc())
return pass_status
def run_batch(self):
self.load()
pass_status = True
for cls in self.classes:
this_method = getattr(self.classes[cls], 'batch')
# We try to call the plugin's `before()` method.
# If the method explicitly raises an ElodiePluginError we'll fail the import
# by setting pass_status to False.
# If any other error occurs we log the message and proceed as usual.
# By default, plugins don't change behavior.
try:
this_method()
log.info('Called batch() for {}'.format(cls))
except ElodiePluginError as err:
log.warn('Plugin {} raised an exception in run_batch: {}'.format(cls, err))
log.error(format_exc())
pass_status = False
except:
log.error(format_exc())
return pass_status
def run_all_before(self, file_path, destination_folder):
"""Process `before` methods of each plugin that was loaded.
"""
self.load()
pass_status = True
for cls in self.classes:
this_method = getattr(self.classes[cls], 'before')
# We try to call the plugin's `before()` method.
# If the method explicitly raises an ElodiePluginError we'll fail the import
# by setting pass_status to False.
# If any other error occurs we log the message and proceed as usual.
# By default, plugins don't change behavior.
try:
this_method(file_path, destination_folder)
log.info('Called before() for {}'.format(cls))
except ElodiePluginError as err:
log.warn('Plugin {} raised an exception in run_all_after: {}'.format(cls, err))
log.error(format_exc())
pass_status = False
except:
log.error(format_exc())
return pass_status

View File

@ -1,24 +0,0 @@
"""
RuntimeError plugin object used for tests.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
from elodie.plugins.plugins import PluginBase
class RuntimeError(PluginBase):
__name__ = 'ThrowError'
"""A dummy class to execute plugin actions for tests."""
def __init__(self):
pass
def after(self, file_path, destination_folder, final_file_path, metadata):
print(does_not_exist)
def batch(self):
print(does_not_exist)
def before(self, file_path, destination_folder):
print(does_not_exist)

View File

@ -1,24 +0,0 @@
"""
ThrowError plugin object used for tests.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
from elodie.plugins.plugins import PluginBase, ElodiePluginError
class ThrowError(PluginBase):
__name__ = 'ThrowError'
"""A dummy class to execute plugin actions for tests."""
def __init__(self):
pass
def after(self, file_path, destination_folder, final_file_path, metadata):
raise ElodiePluginError('Sample plugin error for after')
def batch(self):
raise ElodiePluginError('Sample plugin error for batch')
def before(self, file_path, destination_folder):
raise ElodiePluginError('Sample plugin error for before')

View File

@ -1,72 +0,0 @@
import os
import re
import sys
from elodie import constants
from elodie import geolocation
from elodie import log
from elodie.compatability import _decode
from elodie.filesystem import FileSystem
from elodie.localstorage import Db
from elodie.media.media import Media, get_all_subclasses
from elodie.media.audio import Audio
from elodie.media.photo import Photo
from elodie.media.video import Video
from elodie.result import Result
def main(argv):
filesystem = FileSystem()
result = Result()
subclasses = get_all_subclasses()
paths = argv[1:]
for path in paths:
path = os.path.expanduser(path)
if os.path.isdir(path):
for source in filesystem.get_all_files(path, False):
status = add_original_name(source, subclasses)
result.append((_decode(source), status))
else:
status = add_original_name(path, subclasses)
result.append((_decode(path), status))
result.write()
def add_original_name(source, subclasses):
media = Media.get_class_by_file(source, subclasses)
if media is None:
print('{} is not a valid media object'.format(source))
return
metadata = media.get_metadata()
if metadata['original_name'] is not None:
print('{} already has OriginalFileName...Skipping'.format(source))
return
original_name = parse_original_name_from_media(metadata)
return media.set_original_name(original_name)
def parse_original_name_from_media(metadata):
# 2015-07-23_04-31-12-img_9414-test3.jpg
base_name = metadata['base_name']
title = metadata['title']
extension = metadata['extension']
date_regex = r'^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-'
if not re.match(date_regex, base_name):
print("File name did not match date pattern...Skipping")
return
trimmed_base_name = re.sub(date_regex, '', base_name)
if title:
normalized_title = re.sub(r'\W+', '-', title.lower())
trimmed_base_name = trimmed_base_name.replace(
'-{}'.format(normalized_title),
''
)
return '{}.{}'.format(trimmed_base_name, extension)
if __name__ == "__main__":
main(sys.argv)

View File

@ -4,6 +4,5 @@ requests==2.20.0
Send2Trash==1.3.0
configparser==3.5.0
tabulate==0.7.7
Pillow==6.2.2; python_version == '2.7'
Pillow==8.0; python_version >= '3.6'
Pillow==8.0
six==1.9