Compare commits

...

10 Commits

25 changed files with 927 additions and 349 deletions

View File

View File

@ -1,36 +0,0 @@
"""
The audio module contains classes specifically for dealing with audio files.
The :class:`Audio` class inherits from the :class:`~dozo.media.Media`
class.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
import os
from .media import Media
class Audio(Media):
"""An audio object.
:param str source: The fully qualified path to the audio file.
"""
__name__ = 'Audio'
#: Valid extensions for audio files.
extensions = ('m4a',)
def __init__(self, source=None, ignore_tags=set()):
super().__init__(source, ignore_tags=set())
def is_valid(self):
"""Check the file extension against valid file extensions.
The list of valid file extensions come from self.extensions.
:returns: bool
"""
source = self.source
return os.path.splitext(source)[1][1:].lower() in self.extensions

View File

@ -1,43 +0,0 @@
"""
The video module contains the :class:`Video` class, which represents video
objects (AVI, MOV, etc.).
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
# load modules
from datetime import datetime
import os
import re
import time
from .media import Media
class Video(Media):
"""A video object.
:param str source: The fully qualified path to the video file.
"""
__name__ = 'Video'
#: Valid extensions for video files.
extensions = ('avi', 'm4v', 'mov', 'mp4', 'mpg', 'mpeg', '3gp', 'mts')
def __init__(self, source=None, ignore_tags=set()):
super().__init__(source, ignore_tags=set())
# self.set_gps_ref = False
def is_valid(self):
"""Check the file extension against valid file extensions.
The list of valid file extensions come from self.extensions.
:returns: bool
"""
source = self.source
return os.path.splitext(source)[1][1:].lower() in self.extensions

View File

@ -5,8 +5,8 @@
# be a number between 0-23')
day_begins=4
dirs_path=%u{%Y-%m}/{city}|{city}-{%Y}/{folders[:1]}/{folder}
name={%Y-%m-%b-%H-%M-%S}-{basename}.%l{ext}
dirs_path={%Y}/{%m-%b}-{city}-{folder}
name={%Y%m%d-%H%M%S}-%u{original_name}.%l{ext}
[Exclusions]
name1=.directory

View File

@ -3,18 +3,17 @@
import os
import re
import sys
import logging
from datetime import datetime
import click
from send2trash import send2trash
from dozo import constants
from dozo import config
from dozo.filesystem import FileSystem
from dozo.database import Db
from dozo.media.media import Media, get_all_subclasses
from dozo.summary import Summary
from ordigi import config
from ordigi import constants
from ordigi import log
from ordigi.database import Db
from ordigi.filesystem import FileSystem
from ordigi.media import Media, get_all_subclasses
from ordigi.summary import Summary
FILESYSTEM = FileSystem()
@ -34,22 +33,6 @@ def _batch(debug):
plugins.run_batch()
def get_logger(verbose, debug):
if debug:
level = logging.DEBUG
elif verbose:
level = logging.INFO
else:
level = logging.WARNING
logging.basicConfig(format='%(levelname)s:%(message)s', level=level)
logging.debug('This message should appear on the console')
logging.info('So should this')
logging.getLogger('asyncio').setLevel(level)
logger = logging.getLogger('dozo')
logger.level = level
return logger
@click.command('sort')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@ -57,6 +40,8 @@ def get_logger(verbose, debug):
help='Dry run only, no change made to the filesystem.')
@click.option('--destination', '-d', type=click.Path(file_okay=False),
default=None, help='Sort files into this directory.')
@click.option('--clean', '-C', default=False, is_flag=True,
help='Clean empty folders')
@click.option('--copy', '-c', default=False, is_flag=True,
help='True if you want files to be copied over from src_dir to\
dest_dir rather than moved')
@ -79,10 +64,10 @@ def get_logger(verbose, debug):
@click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
@click.argument('paths', required=True, nargs=-1, type=click.Path())
def _sort(debug, dry_run, destination, copy, exclude_regex, filter_by_ext, ignore_tags,
def _sort(debug, dry_run, destination, clean, copy, exclude_regex, filter_by_ext, ignore_tags,
max_deep, remove_duplicates, reset_cache, verbose, paths):
"""Sort files or directories by reading their EXIF and organizing them
according to config.ini preferences.
according to ordigi.conf preferences.
"""
if copy:
@ -90,7 +75,7 @@ def _sort(debug, dry_run, destination, copy, exclude_regex, filter_by_ext, ignor
else:
mode = 'move'
logger = get_logger(verbose, debug)
logger = log.get_logger(verbose, debug)
if max_deep is not None:
max_deep = int(max_deep)
@ -106,6 +91,8 @@ def _sort(debug, dry_run, destination, copy, exclude_regex, filter_by_ext, ignor
sys.exit(1)
paths = set(paths)
filter_by_ext = set(filter_by_ext)
destination = os.path.abspath(os.path.expanduser(destination))
if not os.path.exists(destination):
@ -124,17 +111,21 @@ def _sort(debug, dry_run, destination, copy, exclude_regex, filter_by_ext, ignor
# Initialize Db
db = Db(destination)
if 'Directory' in conf and 'day_begins' in conf['Directory']:
config_directory = conf['Directory']
day_begins = config_directory['day_begins']
if 'Path' in conf and 'day_begins' in conf['Path']:
config_directory = conf['Path']
day_begins = int(config_directory['day_begins'])
else:
day_begins = 0
filesystem = FileSystem(cache, day_begins, dry_run, exclude_regex_list,
filter_by_ext, logger, max_deep, mode, path_format)
import ipdb; ipdb.set_trace()
summary, has_errors = filesystem.sort_files(paths, destination, db,
remove_duplicates, ignore_tags)
if clean:
remove_empty_folders(destination, logger)
if verbose or debug:
summary.write()
@ -142,13 +133,49 @@ def _sort(debug, dry_run, destination, copy, exclude_regex, filter_by_ext, ignor
sys.exit(1)
def remove_empty_folders(path, logger, remove_root=True):
'Function to remove empty folders'
if not os.path.isdir(path):
return
# remove empty subfolders
files = os.listdir(path)
if len(files):
for f in files:
fullpath = os.path.join(path, f)
if os.path.isdir(fullpath):
remove_empty_folders(fullpath, logger)
# if folder empty, delete it
files = os.listdir(path)
if len(files) == 0 and remove_root:
logger.info(f"Removing empty folder: {path}")
os.rmdir(path)
@click.command('clean')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
@click.option('--verbose', '-v', default=False, is_flag=True,
help='True if you want to see details of file processing')
@click.argument('path', required=True, nargs=1, type=click.Path())
def _clean(debug, verbose, path):
"""Remove empty folders
Usage: clean [--verbose|--debug] directory [removeRoot]"""
logger = log.get_logger(verbose, debug)
remove_empty_folders(path, logger)
@click.command('generate-db')
@click.option('--path', type=click.Path(file_okay=False),
required=True, help='Path of your photo library.')
@click.option('--debug', default=False, is_flag=True,
help='Override the value in constants.py with True.')
def _generate_db(path, debug):
"""Regenerate the hash.json database which contains all of the sha256 signatures of media files. The hash.json file is located at ~/.dozo/.
"""Regenerate the hash.json database which contains all of the sha256 signatures of media files.
"""
constants.debug = debug
result = Result()
@ -221,7 +248,7 @@ def _compare(debug, dry_run, find_duplicates, output_dir, remove_duplicates,
revert_compare, similar_to, similarity, verbose, path):
'''Compare files in directories'''
logger = get_logger(verbose, debug)
logger = log.get_logger(verbose, debug)
# Initialize Db
db = Db(path)
@ -247,6 +274,7 @@ def main():
pass
main.add_command(_clean)
main.add_command(_compare)
main.add_command(_sort)
main.add_command(_generate_db)

View File

@ -1,7 +1,7 @@
"""Load config file as a singleton."""
from configparser import RawConfigParser
from os import path
from dozo import constants
from ordigi import constants
def write(conf_file, config):

View File

@ -8,8 +8,15 @@ from sys import version_info
#: If True, debug messages will be printed.
debug = False
#: Directory in which to store Dozo settings.
application_directory = '{}/.dozo'.format(path.expanduser('~'))
#Ordigi settings directory.
if 'XDG_CONFIG_HOME' in environ:
confighome = environ['XDG_CONFIG_HOME']
elif 'APPDATA' in environ:
confighome = environ['APPDATA']
else:
confighome = path.join(environ['HOME'], '.config')
application_directory = path.join(confighome, 'ordigi')
default_path = '{%Y-%m-%b}/{album}|{city}|{"Unknown Location"}'
default_name = '{%Y-%m-%d_%H-%M-%S}-{name}-{title}.%l{ext}'
default_geocoder = 'Nominatim'
@ -23,7 +30,7 @@ location_db = 'location.json'
# TODO will be removed eventualy later
# location_db = '{}/location.json'.format(application_directory)
# Dozo installation directory.
# Ordigi installation directory.
script_directory = path.dirname(path.dirname(path.abspath(__file__)))
#: Accepted language in responses from MapQuest
@ -32,4 +39,4 @@ accepted_language = 'en'
# check python version, required in filesystem.py to trigger appropriate method
python_version = version_info.major
CONFIG_FILE = '%s/config.ini' % application_directory
CONFIG_FILE = f'{application_directory}/ordigi.conf'

View File

@ -1,5 +1,5 @@
"""
Methods for interacting with information Dozo caches about stored media.
Methods for interacting with database files
"""
from builtins import map
from builtins import object
@ -12,23 +12,17 @@ from math import radians, cos, sqrt
from shutil import copyfile
from time import strftime
from dozo import constants
from ordigi import constants
class Db(object):
"""A class for interacting with the JSON files created by Dozo."""
"""A class for interacting with the JSON files database."""
def __init__(self, target_dir):
# verify that the application directory (~/.dozo) exists,
# else create it
# if not os.path.exists(constants.application_directory):
# os.makedirs(constants.application_directory)
# Create dir for target database
dirname = os.path.join(target_dir, '.dozo')
# Legacy dir
# dirname = constants.application_directory
dirname = os.path.join(target_dir, '.ordigi')
if not os.path.exists(dirname):
try:
@ -89,7 +83,7 @@ class Db(object):
# structure might be needed. Some speed up ideas:
# - Sort it and inter-half method can be used
# - Use integer part of long or lat as key to get a lower search list
# - Cache a small number of lookups, photos are likely to be taken in
# - Cache a small number of lookups, images are likely to be taken in
# clusters around a spot during import.
def add_location(self, latitude, longitude, place, write=False):
"""Add a location to the database.

View File

@ -14,25 +14,32 @@ import shutil
import time
from datetime import datetime, timedelta
from dozo import constants
from dozo import geolocation
from ordigi import constants
from ordigi import geolocation
from dozo.media.media import get_media_class, get_all_subclasses
from dozo.media.photo import Photo
from dozo.summary import Summary
from ordigi import media
from ordigi.media import Media, get_all_subclasses
from ordigi.images import Images
from ordigi.summary import Summary
class FileSystem(object):
"""A class for interacting with the file system."""
def __init__(self, cache=False, day_begins=0, dry_run=False, exclude_regex_list=set(),
filter_by_ext=(), logger=logging.getLogger(), max_deep=None,
filter_by_ext=set(), logger=logging.getLogger(), max_deep=None,
mode='copy', path_format=None):
self.cache = cache
self.day_begins = day_begins
self.dry_run = dry_run
self.exclude_regex_list = exclude_regex_list
self.filter_by_ext = filter_by_ext
if '%media' in filter_by_ext:
filter_by_ext.remove('%media')
self.filter_by_ext = filter_by_ext.union(media.extensions)
else:
self.filter_by_ext = filter_by_ext
self.items = self.get_items()
self.logger = logger
self.max_deep = max_deep
@ -69,7 +76,6 @@ class FileSystem(object):
return False
def get_items(self):
return {
'album': '{album}',
@ -91,7 +97,6 @@ class FileSystem(object):
'date': '{(%[a-zA-Z][^a-zA-Z]*){1,8}}' # search for date format string
}
def walklevel(self, src_path, maxlevel=None):
"""
Walk into input directory recursively until desired maxlevel
@ -108,7 +113,6 @@ class FileSystem(object):
if maxlevel is not None and level >= maxlevel:
del dirs[:]
def get_all_files(self, path, extensions=False, exclude_regex_list=set()):
"""Recursively get all files which match a path and extension.
@ -129,7 +133,7 @@ class FileSystem(object):
# Create a list of compiled regular expressions to match against the file path
compiled_regex_list = [re.compile(regex) for regex in exclude_regex_list]
for dirname, dirnames, filenames in os.walk(path):
if dirname == os.path.join(path, '.dozo'):
if dirname == os.path.join(path, '.ordigi'):
continue
for filename in filenames:
# If file extension is in `extensions`
@ -143,7 +147,6 @@ class FileSystem(object):
):
yield filename_path
def check_for_early_morning_photos(self, date):
"""check for early hour photos to be grouped with previous day"""
@ -154,7 +157,6 @@ class FileSystem(object):
return date
def get_location_part(self, mask, part, place_name):
"""Takes a mask for a location and interpolates the actual place names.
@ -187,7 +189,6 @@ class FileSystem(object):
return folder_name
def get_part(self, item, mask, metadata, db, subdirs):
"""Parse a specific folder's name given a mask and metadata.
@ -275,15 +276,20 @@ class FileSystem(object):
part = part.strip()
# Capitalization
u_regex = '%u' + regex
l_regex = '%l' + regex
if re.search(u_regex, this_part):
this_part = re.sub(u_regex, part.upper(), this_part)
elif re.search(l_regex, this_part):
this_part = re.sub(l_regex, part.lower(), this_part)
else:
if part == '':
# delete separator if any
regex = '[-_ .]?(%[ul])?' + regex
this_part = re.sub(regex, part, this_part)
else:
# Capitalization
u_regex = '%u' + regex
l_regex = '%l' + regex
if re.search(u_regex, this_part):
this_part = re.sub(u_regex, part.upper(), this_part)
elif re.search(l_regex, this_part):
this_part = re.sub(l_regex, part.lower(), this_part)
else:
this_part = re.sub(regex, part, this_part)
if this_part:
@ -404,7 +410,6 @@ class FileSystem(object):
elif metadata['date_modified'] is not None:
return metadata['date_modified']
def checksum(self, file_path, blocksize=65536):
"""Create a hash value for the given file.
@ -425,7 +430,6 @@ class FileSystem(object):
return hasher.hexdigest()
return None
def checkcomp(self, dest_path, src_checksum):
"""Check file.
"""
@ -442,7 +446,6 @@ class FileSystem(object):
return src_checksum
def sort_file(self, src_path, dest_path, remove_duplicates=True):
'''Copy or move file to dest_path.'''
@ -452,8 +455,8 @@ class FileSystem(object):
# check for collisions
if(src_path == dest_path):
self.logger.info(f'File {dest_path} already sorted')
return True
if os.path.isfile(dest_path):
return None
elif os.path.isfile(dest_path):
self.logger.info(f'File {dest_path} already exist')
if remove_duplicates:
if filecmp.cmp(src_path, dest_path):
@ -462,7 +465,7 @@ class FileSystem(object):
if not dry_run:
os.remove(src_path)
self.logger.info(f'remove: {src_path}')
return True
return None
else: # name is same, but file is different
self.logger.info(f'File in source and destination are different.')
return False
@ -480,9 +483,6 @@ class FileSystem(object):
self.logger.info(f'copy: {src_path} -> {dest_path}')
return True
return False
def check_file(self, src_path, dest_path, src_checksum, db):
# Check if file remain the same
@ -493,9 +493,6 @@ class FileSystem(object):
db.add_hash(checksum, dest_path)
db.update_hash_db()
if dest_path:
self.logger.info(f'{src_path} -> {dest_path}')
self.summary.append((src_path, dest_path))
else:
@ -506,24 +503,13 @@ class FileSystem(object):
return self.summary, has_errors
def get_files_in_path(self, path, extensions=False):
def get_files_in_path(self, path, extensions=set()):
"""Recursively get files which match a path and extension.
:param str path string: Path to start recursive file listing
:param tuple(str) extensions: File extensions to include (whitelist)
:returns: file_path, subdirs
"""
if self.filter_by_ext != () and not extensions:
# Filtering files by extensions.
if '%media' in self.filter_by_ext:
extensions = set()
subclasses = get_all_subclasses()
for cls in subclasses:
extensions.update(cls.extensions)
else:
extensions = self.filter_by_ext
file_list = set()
if os.path.isfile(path):
if not self.should_exclude(path, self.exclude_regex_list, True):
@ -535,7 +521,7 @@ class FileSystem(object):
subdirs = ''
for dirname, dirnames, filenames, level in self.walklevel(path,
self.max_deep):
if dirname == os.path.join(path, '.dozo'):
if dirname == os.path.join(path, '.ordigi'):
continue
subdirs = os.path.join(subdirs, os.path.basename(dirname))
@ -546,7 +532,7 @@ class FileSystem(object):
# Then append to the list
filename_path = os.path.join(dirname, filename)
if (
extensions == False
extensions == set()
or os.path.splitext(filename)[1][1:].lower() in extensions
and not self.should_exclude(filename_path, compiled_regex_list, False)
):
@ -554,6 +540,35 @@ class FileSystem(object):
return file_list
def _conflict_solved(self, conflict_file_list, item, dest_path):
self.logger.warning(f'Same name already exists...renaming to: {dest_path}')
del(conflict_file_list[item])
def solve_conflicts(self, conflict_file_list, remove_duplicates):
file_list = conflict_file_list.copy()
for item, file_paths in enumerate(file_list):
src_path = file_paths['src_path']
dest_path = file_paths['dest_path']
# Try to sort the file
result = self.sort_file(src_path, dest_path, remove_duplicates)
# remove to conflict file list if file as be successfully copied or ignored
if result is True or None:
self._conflict_solved(conflict_file_list, item, dest_path)
else:
n = 1
while result is False:
if n > 100:
self.logger.warning(f'{self.mode}: to many append for {dest_path}...')
break
# Add appendix to the name
pre, ext = os.path.splitext(dest_path)
dest_path = pre + '_' + str(n) + ext
conflict_file_list[item]['dest_path'] = dest_path
result = self.sort_file(src_path, dest_path, remove_duplicates)
else:
self._conflict_solved(conflict_file_list, item, dest_path)
return result
def sort_files(self, paths, destination, db, remove_duplicates=False,
ignore_tags=set()):
@ -568,11 +583,12 @@ class FileSystem(object):
path = os.path.expanduser(path)
conflict_file_list = set()
for src_path, subdirs in self.get_files_in_path(path):
conflict_file_list = []
for src_path, subdirs in self.get_files_in_path(path,
extensions=self.filter_by_ext):
# Process files
src_checksum = self.checksum(src_path)
media = get_media_class(src_path, ignore_tags, self.logger)
media = Media(src_path, ignore_tags, self.logger)
if media:
metadata = media.get_metadata()
# Get the destination path according to metadata
@ -587,40 +603,23 @@ class FileSystem(object):
self.create_directory(dest_directory)
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
self.summary, has_errors = self.check_file(src_path,
dest_path, src_checksum, db)
else:
if result is False:
# There is conflict files
conflict_file_list.add((src_path, dest_path))
conflict_file_list.append({'src_path': src_path, 'dest_path': dest_path})
result = self.solve_conflicts(conflict_file_list, remove_duplicates)
for src_path, dest_path in conflict_file_list:
# Try to sort the file
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
conflict_file_list.remove((src_path, dest_path))
else:
n = 1
while not result:
# Add appendix to the name
pre, ext = os.path.splitext(dest_path)
dest_path = pre + '_' + str(n) + ext
result = self.sort_file(src_path, dest_path, remove_duplicates)
if n > 100:
self.logger.error(f'{self.mode}: to many append for {dest_path}...')
break
self.logger.info(f'Same name already exists...renaming to: {dest_path}')
if result:
if result is True:
self.summary, has_errors = self.check_file(src_path,
dest_path, src_checksum, db)
elif result is None:
has_errors = False
else:
self.summary.append((src_path, False))
has_errors = True
return self.summary, has_errors
def check_path(self, path):
path = os.path.abspath(os.path.expanduser(path))
@ -631,7 +630,6 @@ class FileSystem(object):
return path
def set_hash(self, result, src_path, dest_path, src_checksum, db):
if result:
# Check if file remain the same
@ -658,7 +656,6 @@ class FileSystem(object):
return has_errors
def move_file(self, img_path, dest_path, checksum, db):
if not self.dry_run:
try:
@ -669,13 +666,12 @@ class FileSystem(object):
self.logger.info(f'move: {img_path} -> {dest_path}')
return self.set_hash(True, img_path, dest_path, checksum, db)
def sort_similar_images(self, path, db, similarity=80):
has_errors = False
path = self.check_path(path)
for dirname, dirnames, filenames, level in self.walklevel(path, None):
if dirname == os.path.join(path, '.dozo'):
if dirname == os.path.join(path, '.ordigi'):
continue
if dirname.find('similar_to') == 0:
continue
@ -684,21 +680,21 @@ class FileSystem(object):
for filename in filenames:
file_paths.add(os.path.join(dirname, filename))
photo = Photo(logger=self.logger)
i = Images(file_paths, logger=self.logger)
images = set([ i for i in photo.get_images(file_paths) ])
images = set([ i for i in i.get_images() ])
for image in images:
if not os.path.isfile(image):
continue
checksum1 = self.checksum(image)
# Process files
# media = get_media_class(src_path, False, self.logger)
# media = Media(src_path, False, self.logger)
# TODO compare metadata
# if media:
# metadata = media.get_metadata()
similar = False
moved_imgs = set()
for img_path in photo.find_similar(image, file_paths, similarity):
for img_path in i.find_similar(image, similarity):
similar = True
checksum2 = self.checksum(img_path)
# move image into directory
@ -732,13 +728,12 @@ class FileSystem(object):
return self.summary, has_errors
def revert_compare(self, path, db):
has_errors = False
path = self.check_path(path)
for dirname, dirnames, filenames, level in self.walklevel(path, None):
if dirname == os.path.join(path, '.dozo'):
if dirname == os.path.join(path, '.ordigi'):
continue
if dirname.find('similar_to') == 0:
continue
@ -764,7 +759,6 @@ class FileSystem(object):
return self.summary, has_errors
def set_utime_from_metadata(self, date_taken, file_path):
""" Set the modification time on the file based on the file name.
"""
@ -772,7 +766,6 @@ class FileSystem(object):
# Initialize date taken to what's returned from the metadata function.
os.utime(file_path, (int(datetime.now().timestamp()), int(date_taken.timestamp())))
def should_exclude(self, path, regex_list=set(), needs_compiled=False):
if(len(regex_list) == 0):
return False

View File

@ -1,6 +1,4 @@
"""Look up geolocation information for media objects."""
from past.utils import old_div
from os import path
@ -8,8 +6,8 @@ import geopy
from geopy.geocoders import Nominatim
import logging
from dozo import constants
from dozo.config import load_config, get_geocoder
from ordigi import constants
from ordigi.config import load_config, get_geocoder
__KEY__ = None
__DEFAULT_LOCATION__ = 'Unknown Location'
@ -28,7 +26,9 @@ def coordinates_by_name(name, db):
# If the name is not cached then we go ahead with an API lookup
geocoder = get_geocoder()
if geocoder == 'Nominatim':
locator = Nominatim(user_agent='myGeocoder')
# timeout = DEFAULT_SENTINEL
timeout = 10
locator = Nominatim(user_agent='myGeocoder', timeout=timeout)
geolocation_info = locator.geocode(name)
if geolocation_info is not None:
return {
@ -53,12 +53,10 @@ def decimal_to_dms(decimal):
def dms_to_decimal(degrees, minutes, seconds, direction=' '):
sign = 1
if(direction[0] in 'WSws'):
if direction[0] in 'WSws':
sign = -1
return (
float(degrees) + old_div(float(minutes), 60) +
old_div(float(seconds), 3600)
) * sign
return (degrees + minutes / 60 + seconds / 3600) * sign
def dms_string(decimal, type='latitude'):
@ -139,14 +137,19 @@ def lookup_osm(lat, lon, logger=logging.getLogger()):
prefer_english_names = get_prefer_english_names()
try:
locator = Nominatim(user_agent='myGeocoder')
timeout = 10
locator = Nominatim(user_agent='myGeocoder', timeout=timeout)
coords = (lat, lon)
if(prefer_english_names):
lang='en'
else:
lang='local'
return locator.reverse(coords, language=lang).raw
except geopy.exc.GeocoderUnavailable as e:
locator_reverse = locator.reverse(coords, language=lang)
if locator_reverse is not None:
return locator_reverse.raw
else:
return None
except geopy.exc.GeocoderUnavailable or geopy.exc.GeocoderServiceError as e:
logger.error(e)
return None
# Fix *** TypeError: `address` must not be None

View File

@ -1,5 +1,5 @@
"""
The photo module contains the :class:`Photo` class, which is used to track
The image module contains the :class:`Images` class, which is used to track
image objects (JPG, DNG, etc.).
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
@ -10,50 +10,36 @@ import imghdr
import logging
import numpy as np
import os
from PIL import Image, UnidentifiedImageError
from PIL import Image as img
from PIL import UnidentifiedImageError
import time
from .media import Media
# HEIC extension support (experimental, not tested)
PYHEIF = False
try:
from pyheif_pillow_opener import register_heif_opener
PYHEIF = True
# Allow to open HEIF/HEIC image from pillow
register_heif_opener()
except ImportError as e:
logging.info(e)
class Photo(Media):
class Image():
"""A photo object.
:param str source: The fully qualified path to the photo file
"""
__name__ = 'Photo'
#: Valid extensions for photo files.
extensions = ('arw', 'cr2', 'dng', 'gif', 'heic', 'jpeg', 'jpg', 'nef', 'png', 'rw2')
def __init__(self, source=None, hash_size=8, ignore_tags=set(),
logger=logging.getLogger()):
super().__init__(source, ignore_tags)
def __init__(self, img_path, hash_size=8):
self.img_path = img_path
self.hash_size = hash_size
self.logger = logger
logger.setLevel(logging.INFO)
# HEIC extension support (experimental, not tested)
self.pyheif = False
try:
from pyheif_pillow_opener import register_heif_opener
self.pyheif = True
# Allow to open HEIF/HEIC images from pillow
register_heif_opener()
except ImportError as e:
self.logger.info(e)
def is_image(self, img_path):
def is_image(self):
"""Check whether the file is an image.
:returns: bool
"""
# gh-4 This checks if the source file is an image.
# gh-4 This checks if the file is an image.
# It doesn't validate against the list of supported types.
# We check with imghdr and pillow.
if imghdr.what(img_path) is None:
if imghdr.what(self.img_path) is None:
# Pillow is used as a fallback
# imghdr won't detect all variants of images (https://bugs.python.org/issue28591)
# see https://github.com/jmathai/elodie/issues/281
@ -65,7 +51,7 @@ class Photo(Media):
# things like mode, size, and other properties required to decode the file,
# but the rest of the file is not processed until later.
try:
im = Image.open(img_path)
im = img.open(self.img_path)
except (IOError, UnidentifiedImageError):
return False
@ -74,26 +60,48 @@ class Photo(Media):
return True
def get_images(self, file_paths):
def get_hash(self):
with img.open(self.img_path) as img_path:
return imagehash.average_hash(img_path, self.hash_size).hash
class Images():
"""A image object.
:param str img_path: The fully qualified path to the image file
"""
#: Valid extensions for image files.
extensions = ('arw', 'cr2', 'dng', 'gif', 'heic', 'jpeg', 'jpg', 'nef', 'png', 'rw2')
def __init__(self, file_paths=None, hash_size=8, logger=logging.getLogger()):
self.file_paths = file_paths
self.hash_size = hash_size
self.duplicates = []
self.logger = logger
def get_images(self):
''':returns: img_path generator
'''
:returns: img_path generator
'''
for img_path in file_paths:
if self.is_image(img_path):
for img_path in self.file_paths:
image = Image(img_path)
if image.is_image():
yield img_path
def get_images_hashes(self, file_paths):
def get_images_hashes(self):
"""Get image hashes"""
hashes = {}
duplicates = []
# Searching for duplicates.
for img_path in self.get_images(file_paths):
with Image.open(img_path) as img:
for img_path in self.get_images():
with img.open(img_path) as img:
yield imagehash.average_hash(img, self.hash_size)
def find_duplicates(self, file_paths):
def find_duplicates(self, img_path):
"""Find duplicates"""
for temp_hash in get_images_hashes(file_paths):
duplicates = []
for temp_hash in get_images_hashes(self.file_paths):
if temp_hash in hashes:
self.logger.info("Duplicate {} \nfound for image {}\n".format(img_path, hashes[temp_hash]))
duplicates.append(img_path)
@ -118,10 +126,6 @@ class Photo(Media):
else:
self.logger.info("No duplicates found")
def get_hash(self, img_path):
with Image.open(img_path) as img:
return imagehash.average_hash(img, self.hash_size).hash
def diff(self, hash1, hash2):
return np.count_nonzero(hash1 != hash2)
@ -131,24 +135,25 @@ class Photo(Media):
return similarity_img
def find_similar(self, image, file_paths, similarity=80):
def find_similar(self, image, similarity=80):
'''
Find similar images
:returns: img_path generator
'''
hash1 = ''
if self.is_image(image):
hash1 = self.get_hash(image)
image = Image(image)
if image.is_image():
hash1 = image.get_hash()
self.logger.info(f'Finding similar images to {image}')
threshold = 1 - similarity/100
diff_limit = int(threshold*(self.hash_size**2))
for img_path in self.get_images(file_paths):
for img_path in self.get_images():
if img_path == image:
continue
hash2 = self.get_hash(img_path)
hash2 = image.get_hash()
img_diff = self.diff(hash1, hash2)
if img_diff <= diff_limit:
similarity_img = self.similarity(img_diff)

16
ordigi/log.py Normal file
View File

@ -0,0 +1,16 @@
import logging
def get_logger(verbose, debug):
if debug:
level = logging.DEBUG
elif verbose:
level = logging.INFO
else:
level = logging.WARNING
logging.basicConfig(format='%(levelname)s:%(message)s', level=level)
logging.getLogger('asyncio').setLevel(level)
logger = logging.getLogger('ordigi')
logger.level = level
return logger

View File

@ -1,28 +1,24 @@
"""
Base :class:`Media` class for media objects that are tracked by Dozo.
The Media class provides some base functionality used by all the media types.
Sub-classes (:class:`~dozo.media.Audio`, :class:`~dozo.media.Photo`, and :class:`~dozo.media.Video`).
Media :class:`Media` class to get file metadata
"""
import logging
import mimetypes
import os
import six
import logging
# load modules
from dateutil.parser import parse
import re
from dozo.exiftool import ExifTool, ExifToolCaching
from ordigi.exiftool import ExifTool, ExifToolCaching
class Media():
"""The media class for all media objects.
:param str source: The fully qualified path to the video file.
:param str file_path: The fully qualified path to the media file.
"""
__name__ = 'Media'
d_coordinates = {
'latitude': 'latitude_ref',
'longitude': 'longitude_ref'
@ -34,8 +30,8 @@ class Media():
extensions = PHOTO + AUDIO + VIDEO
def __init__(self, sources=None, ignore_tags=set(), logger=logging.getLogger()):
self.source = sources
def __init__(self, file_path, ignore_tags=set(), logger=logging.getLogger()):
self.file_path = file_path
self.ignore_tags = ignore_tags
self.tags_keys = self.get_tags()
self.exif_metadata = None
@ -104,7 +100,7 @@ class Media():
:returns: str or None
"""
mimetype = mimetypes.guess_type(self.source)
mimetype = mimetypes.guess_type(self.file_path)
if(mimetype is None):
return None
@ -152,7 +148,8 @@ class Media():
value = re.sub(regex , r'\g<1>-\g<2>-\g<3>', value)
return parse(value)
except BaseException or dateutil.parser._parser.ParserError as e:
self.logger.error(e)
self.logger.error(e, value)
import ipdb; ipdb.set_trace()
return None
def get_coordinates(self, key, value):
@ -198,7 +195,7 @@ class Media():
:returns: dict
"""
# Get metadata from exiftool.
self.exif_metadata = ExifToolCaching(self.source, logger=self.logger).asdict()
self.exif_metadata = ExifToolCaching(self.file_path, logger=self.logger).asdict()
# TODO to be removed
self.metadata = {}
@ -224,9 +221,9 @@ class Media():
self.metadata[key] = formated_data
self.metadata['base_name'] = os.path.basename(os.path.splitext(self.source)[0])
self.metadata['ext'] = os.path.splitext(self.source)[1][1:]
self.metadata['directory_path'] = os.path.dirname(self.source)
self.metadata['base_name'] = os.path.basename(os.path.splitext(self.file_path)[0])
self.metadata['ext'] = os.path.splitext(self.file_path)[1][1:]
self.metadata['directory_path'] = os.path.dirname(self.file_path)
return self.metadata
@ -245,8 +242,7 @@ class Media():
def get_class_by_file(cls, _file, classes, ignore_tags=set(), logger=logging.getLogger()):
"""Static method to get a media object by file.
"""
basestring = (bytes, str)
if not isinstance(_file, basestring) or not os.path.isfile(_file):
if not os.path.isfile(_file):
return None
extension = os.path.splitext(_file)[1][1:].lower()
@ -254,13 +250,9 @@ class Media():
if len(extension) > 0:
for i in classes:
if(extension in i.extensions):
return i(_file, ignore_tags=ignore_tags)
return i(_file, ignore_tags=ignore_tags, logger=logger)
exclude_list = ['.DS_Store', '.directory']
if os.path.basename(_file) == '.DS_Store':
return None
else:
return Media(_file, ignore_tags=ignore_tags, logger=logger)
return Media(_file, logger, ignore_tags=ignore_tags, logger=logger)
def set_date_taken(self, date_key, time):
"""Set the date/time a photo was taken.
@ -309,7 +301,7 @@ class Media():
:returns: bool
"""
folder = os.path.basename(os.path.dirname(self.source))
folder = os.path.basename(os.path.dirname(self.file_path))
return set_value(self, 'album', folder)

7
pytest.ini Normal file
View File

@ -0,0 +1,7 @@
[pytest]
addopts = --ignore=old_tests -s
# collect_ignore = ["old_test"]
[pycodestyle]
# ignore = old_test/* ALL

View File

@ -5,4 +5,5 @@ Send2Trash==1.3.0
configparser==3.5.0
tabulate==0.7.7
Pillow==8.0
pyheif_pillow_opener=0.1
six==1.9

View File

@ -6,10 +6,10 @@ from pathlib import Path
import shutil
import tempfile
from dozo import config
from dozo.exiftool import _ExifToolProc
from ordigi import config
from ordigi.exiftool import _ExifToolProc
DOZO_PATH = Path(__file__).parent.parent
ORDIGI_PATH = Path(__file__).parent.parent
@pytest.fixture(autouse=True)
def reset_singletons():
@ -18,8 +18,8 @@ def reset_singletons():
def copy_sample_files():
src_path = tempfile.mkdtemp(prefix='dozo-src')
paths = Path(DOZO_PATH, 'samples/test_exif').glob('*')
src_path = tempfile.mkdtemp(prefix='ordigi-src')
paths = Path(ORDIGI_PATH, 'samples/test_exif').glob('*')
file_paths = [x for x in paths if x.is_file()]
for file_path in file_paths:
source_path = Path(src_path, file_path.name)
@ -30,7 +30,7 @@ def copy_sample_files():
@pytest.fixture(scope="module")
def conf_path():
tmp_path = tempfile.mkdtemp(prefix='dozo-')
tmp_path = tempfile.mkdtemp(prefix='ordigi-')
conf = RawConfigParser()
conf['Path'] = {
'day_begins': '4',
@ -40,7 +40,7 @@ def conf_path():
conf['Geolocation'] = {
'geocoder': 'Nominatium'
}
conf_path = Path(tmp_path, "dozo.conf")
conf_path = Path(tmp_path, "ordigi.conf")
config.write(conf_path, conf)
yield conf_path

View File

@ -4,7 +4,7 @@ import shutil
import tempfile
from unittest import mock
from dozo import config
from ordigi import config
# Helpers
import random

View File

@ -2,7 +2,7 @@ import pytest
CONTENT = "content"
class TestDozo:
class TestOrdigi:
@pytest.mark.skip()
def test__sort(self):
assert 0

View File

@ -1,8 +1,8 @@
import json
import pytest
import dozo.exiftool
from dozo.exiftool import get_exiftool_path
import ordigi.exiftool
from ordigi.exiftool import get_exiftool_path
TEST_FILE_ONE_KEYWORD = "samples/images/wedding.jpg"
TEST_FILE_BAD_IMAGE = "samples/images/badimage.jpeg"
@ -103,86 +103,86 @@ if exiftool is None:
def test_get_exiftool_path():
exiftool = dozo.exiftool.get_exiftool_path()
exiftool = ordigi.exiftool.get_exiftool_path()
assert exiftool is not None
def test_version():
exif = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif.version is not None
assert isinstance(exif.version, str)
def test_read():
exif = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif.data["File:MIMEType"] == "image/jpeg"
assert exif.data["EXIF:ISO"] == 160
assert exif.data["IPTC:Keywords"] == "wedding"
def test_singleton():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif2 = dozo.exiftool.ExifTool(TEST_FILE_MULTI_KEYWORD)
exif1 = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif2 = ordigi.exiftool.ExifTool(TEST_FILE_MULTI_KEYWORD)
assert exif1._process.pid == exif2._process.pid
def test_pid():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif1 = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif1.pid == exif1._process.pid
def test_exiftoolproc_process():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif1 = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif1._exiftoolproc.process is not None
def test_exiftoolproc_exiftool():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif1._exiftoolproc.exiftool == dozo.exiftool.get_exiftool_path()
exif1 = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif1._exiftoolproc.exiftool == ordigi.exiftool.get_exiftool_path()
def test_as_dict():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif1 = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exifdata = exif1.asdict()
assert exifdata["XMP:TagsList"] == "wedding"
def test_as_dict_normalized():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif1 = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exifdata = exif1.asdict(normalized=True)
assert exifdata["xmp:tagslist"] == "wedding"
assert "XMP:TagsList" not in exifdata
def test_as_dict_no_tag_groups():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif1 = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exifdata = exif1.asdict(tag_groups=False)
assert exifdata["TagsList"] == "wedding"
def test_json():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif1 = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exifdata = json.loads(exif1.json())
assert exifdata[0]["XMP:TagsList"] == "wedding"
def test_str():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif1 = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert "file: " in str(exif1)
assert "exiftool: " in str(exif1)
def test_exiftool_terminate():
""" Test that exiftool process is terminated when exiftool.terminate() is called """
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif1 = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert dozo.exiftool.exiftool_is_running()
assert ordigi.exiftool.exiftool_is_running()
dozo.exiftool.terminate_exiftool()
ordigi.exiftool.terminate_exiftool()
assert not dozo.exiftool.exiftool_is_running()
assert not ordigi.exiftool.exiftool_is_running()
# verify we can create a new instance after termination
exif2 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif2 = ordigi.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif2.asdict()["IPTC:Keywords"] == "wedding"

View File

@ -8,11 +8,11 @@ from sys import platform
from time import sleep
from .conftest import copy_sample_files
from dozo import constants
from dozo.database import Db
from dozo.filesystem import FileSystem
from dozo.media.media import Media
from dozo.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool
from ordigi import constants
from ordigi.database import Db
from ordigi.filesystem import FileSystem
from ordigi.media import Media
from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool
@pytest.mark.skip()
@ -153,6 +153,7 @@ class TestFilesystem:
for mode in 'copy', 'move':
filesystem = FileSystem(path_format=self.path_format, mode=mode)
# copy mode
import ipdb; ipdb.set_trace()
src_path = Path(self.src_paths, 'photo.png')
dest_path = Path(tmp_path,'photo_copy.png')
src_checksum = filesystem.checksum(src_path)

View File

@ -6,14 +6,12 @@ import shutil
import tempfile
from .conftest import copy_sample_files
from dozo import constants
from dozo.media.media import Media
from dozo.media.audio import Audio
from dozo.media.photo import Photo
from dozo.media.video import Video
from dozo.exiftool import ExifTool, ExifToolCaching
from ordigi import constants
from ordigi.media import Media
from ordigi.images import Images
from ordigi.exiftool import ExifTool, ExifToolCaching
DOZO_PATH = Path(__file__).parent.parent
ORDIGI_PATH = Path(__file__).parent.parent
CACHING = True
class TestMetadata:

612
todo.md Executable file
View File

@ -0,0 +1,612 @@
# NOW
# Media:
- rewrite set_date...
# Test:
- finish filesystem
- date_taken
- geolocation
move elodie to dozo
check for early morning photos: add test
add --folder-path option %Y-%d-%m/%city/%album
datetime.today().strftime('%Y-%m-%d')
add %filename
add edit_exif command?
Add update command
# enhancement
- acccept Path in get_exiftool
- Use get_exiftool instead of get metadata:
try to do it in get_date_taken...
media class:
- Add self.file_path
-
## Album form folder
- move to filesystem
# TODO implement album from folder here?
# folder = os.path.basename(os.path.dirname(source))
# album = self.metadata['album']
# if album_from_folder and (album is None or album == ''):
# album = folder
# Update
use pathlib instead of os.path
Allow update in sort command in same dir if path is the dest dir
ENhancement: swap hash db key value: for checking file integrity
https://github.com/JohannesBuchner/imagehash
https://github.com/cw-somil/Duplicate-Remover
https://leons.im/posts/a-python-implementation-of-simhash-algorithm/
Visualy check similar image
https://www.pluralsight.com/guides/importing-image-data-into-numpy-arrays
https://stackoverflow.com/questions/56056054/add-check-boxes-to-scrollable-image-in-python
https://wellsr.com/python/python-image-manipulation-with-pillow-library/
kitty gird image?
https://fr.wikibooks.org/wiki/PyQt/PyQt_versus_wxPython
https://docs.python.org/3/faq/gui.html
https://docs.opencv.org/3.4/d3/df2/tutorial_py_basic_ops.html
https://stackoverflow.com/questions/52727332/python-tkinter-create-checkbox-list-from-listbox
Image gird method:
matplot
https://gist.github.com/lebedov/7018889ba47668c64bcf96aee82caec0
Tkinter
https://python-forum.io/thread-22700.html
https://stackoverflow.com/questions/43326282/how-can-i-use-images-in-a-tkinter-grid
wxwidget
https://wxpython.org/Phoenix/docs/html/wx.lib.agw.thumbnailctrl.html
Ability to change metadata to selection
Enhancement: Option to keep existing directory structure
Fix: change versvalidion number to 0.x99
Fix: README
Refactoring: elodie update: update metadata of destination
Fix: update: fix move files...
Refactoring: Move exiftool config
Checksum:
FIX: test if checksum remain the same for all files (global check)
FIX: if dest file already here and checksum d'ont match change name to
prevent overwriting to file with same dest path
Enhancement: media file, do not filter files, only to prevent error when copying
fix: Valid file: check for open file error
Enhancement: Add %base_name string key
Refactoring: class get_metadata
check if as exiF, check exif type...
Interface: show error and warning
interface: less verbose when no error
interface: Move default setting to config?
Behavior: Move only by defaut without changing metatdata and filename...
Refactoring: check one time media is valid?
Refactoring: Unify source and path
Enhancement: allow nested dir
Fix: check exclusion for file
Refactoring: Import perl as submodule?
Enhancement: # setup arguments to exiftool
https://github.com/andrewning/sortphotos/blob/master/src/sortphotos.py
# AFTER
Enhancement: add walklevel function
Enhancement: change early morning date sort
# TODO
Fix: date, make correction in filename if needed
Check: date from filename
Options:
--update-cache|-u
--date-from-filename
--location --time
# --date from folder
# --date from file
# -f overwrite metadata
Add get tag function
Add --copy alternative
--auto|-a: a set of option: geolocalisation, best match date, rename, album
from folder...
defaut: only move
# --keep-folder option
# --rename
-- no cache mode!!
--confirm unsure operation
--interactive
# TEST
# lat='45.58339'
# lon='4.79823'
# coordinates ='53.480837, -2.244914'
# Alger
# coords=(36.752887, 3.042048)
https://www.gitmemory.com/issue/pallets/click/843/634305917
https://github.com/pallets/click/issues/843
# import unittest
# import pytest
# from thing.__main__ import cli
# class TestCli(unittest.TestCase):
# @pytest.fixture(autouse=True)
# def capsys(self, capsys):
# self.capsys = capsys
# def test_cli(self):
# with pytest.raises(SystemExit) as ex:
# cli(["create", "--name", "test"])
# self.assertEqual(ex.value.code, 0)
# out, err = self.capsys.readouterr()
# self.assertEqual(out, "Succesfully created test\n")
# dev
# mode ~/.elodie ~/.config/elodie
# location selection buggy
# TODO:
# /home/cedric/src/elodie/elodie/media/photo.py(86)get_date_taken()
# 85 # TODO potential bu for old photo below 1970...
# ---> 86 if(seconds_since_epoch == 0):
# 87 return None
import os
def walklevel(some_dir, level=1):
some_dir = some_dir.rstrip(os.path.sep)
assert os.path.isdir(some_dir)
num_sep = some_dir.count(os.path.sep)
for root, dirs, files in os.walk(some_dir):
yield root, dirs, files
num_sep_this = root.count(os.path.sep)
if num_sep + level <= num_sep_this:
del dirs[:]
49/2: y=walklevel('/home/cedric', level=1)
49/3: next(y)
49/4: next(y)
49/5: next(y)
49/6: next(y)
49/7: next(y)
49/8: y=walklevel('/home/cedric', level=0)
49/9: next(y)
49/10: next(y)
49/11: y=walklevel('/home/cedric/.test/Nexcloud/', level=0)
49/12:
import os
def walklevel(some_dir, level=1):
some_dir = some_dir.rstrip(os.path.sep)
assert os.path.isdir(some_dir)
num_sep = some_dir.count(os.path.sep)
for root, dirs, files in os.walk(some_dir):
yield root, dirs, files
num_sep_this = root.count(os.path.sep)
if num_sep + level <= num_sep_this:
print dirs, files
49/13:
import os
def walklevel(some_dir, level=1):
some_dir = some_dir.rstrip(os.path.sep)
assert os.path.isdir(some_dir)
num_sep = some_dir.count(os.path.sep)
for root, dirs, files in os.walk(some_dir):
yield root, dirs, files
num_sep_this = root.count(os.path.sep)
if num_sep + level <= num_sep_this:
print(dirs, files)
49/14: y=walklevel('/home/cedric/.test/Nexcloud/', level=0)
49/15: next(y)
49/16: next(y)
49/17: y=walklevel('/home/cedric/.test/Nexcloud/', level=0)
49/18:
import os
def walklevel(some_dir, level=1):
some_dir = some_dir.rstrip(os.path.sep)
assert os.path.isdir(some_dir)
num_sep = some_dir.count(os.path.sep)
for root, dirs, files in os.walk(some_dir):
yield root, dirs, files
num_sep_this = root.count(os.path.sep)
49/19: y=walklevel('/home/cedric/.test/Nexcloud/', level=0)
49/20: next(y)
49/21: next(y)
49/22: y=walklevel('/home/cedric/.test/Nexcloud/', level=2)
49/23: next(y)
49/24: next(y)
49/25: y=walklevel('/home/cedric/.test/las canarias 2012/', level=2)
49/26: next(y)
49/27: next(y)
49/28: next(y)
49/29: next(y)
49/30: y=walklevel('/home/cedric/.test/las canarias 2012/', level=0)
49/31: next(y)
49/32: next(y)
49/33: next(y)
49/34:
import os
def walklevel(some_dir, level=1):
some_dir = some_dir.rstrip(os.path.sep)
assert os.path.isdir(some_dir)
num_sep = some_dir.count(os.path.sep)
for root, dirs, files in os.walk(some_dir):
yield root, dirs, files
num_sep_this = root.count(os.path.sep)
if num_sep + level <= num_sep_this:
print('fuck')
49/35: y=walklevel('/home/cedric/.test/las canarias 2012/', level=0)
49/36: next(y)
49/37: next(y)
49/38: next(y)
64/1: a=os.walk('/home/cedric/.test/las canarias 2012')
64/2: import os
64/3: a=os.walk('/home/cedric/.test/las canarias 2012')
64/4: next(a)
64/5: next(a)
64/6: os.path.sep
64/7: os.path.relpath('/home/cedric/.test/las canarias 2012/private', 'private')
64/8: os.path.relpath('/home/cedric/.test/las canarias 2012', 'private')
64/9: os.path.relpath('/home/cedric/.test/las canarias 2012/private', '/home/cedric/.test/las canarias 2012')
64/10: b='test'
64/11: a='private'
64/12: a+b
64/13: os.path.join(a,b,b)
64/14: !True
64/15: not True
64/16: a=TRue
64/17: a=True
64/18: not a
77/1:
import os
import requests
def get_location(geotags):
coords = get_coordinates(geotags)
uri = 'https://revgeocode.search.hereapi.com/v1/revgeocode'
headers = {}
params = {
'apiKey': os.environ['API_KEY'],
'at': "%s,%s" % coords,
'lang': 'en-US',
'limit': 1,
}
response = requests.get(uri, headers=headers, params=params)
try:
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
print(str(e))
return {}
77/2: cd ~/.test/
77/3: ls
77/4: cd 2021-02-Feb/
77/5: ls
77/6: cd Villeurbanne/
77/7: ls
77/8: ls -l
77/9: exif = get_exif('2021-02-24_09-33-29-20210305_081001_01.mp4')
77/10:
from PIL import Image
def get_exif(filename):
image = Image.open(filename)
image.verify()
return image._getexif()
77/11: exif = get_exif('2021-02-24_09-33-29-20210305_081001_01.mp4')
77/12: ..
77/13: cd ..
77/14: ls
77/15: cd ..
77/16: ls
77/17: cd 2021-03-Mar/
77/18: cd Villeurbanne/
77/19: ls
77/20: exif = get_exif('2021-03-09_09-58-42-img_20210309_105842.jpg')
77/21: exif
77/22:
def get_geotagging(exif):
if not exif:
raise ValueError("No EXIF metadata found")
geotagging = {}
for (idx, tag) in TAGS.items():
if tag == 'GPSInfo':
if idx not in exif:
raise ValueError("No EXIF geotagging found")
for (key, val) in GPSTAGS.items():
if key in exif[idx]:
geotagging[val] = exif[idx][key]
return geotagging
77/23: get_geotagging(exif)
77/24: from PIL.ExifTags import TAGS
77/25:
def get_labeled_exif(exif):
labeled = {}
for (key, val) in exif.items():
labeled[TAGS.get(key)] = val
return labeled
77/26: get_geotagging(exif)
77/27: from PIL.ExifTags import GPSTAGS
77/28: get_geotagging(exif)
77/29: geotags = get_geotagging(exif)
77/30: get_location(geotags)
77/31:
def get_decimal_from_dms(dms, ref):
degrees = dms[0][0] / dms[0][1]
minutes = dms[1][0] / dms[1][1] / 60.0
seconds = dms[2][0] / dms[2][1] / 3600.0
if ref in ['S', 'W']:
degrees = -degrees
minutes = -minutes
seconds = -seconds
return round(degrees + minutes + seconds, 5)
def get_coordinates(geotags):
lat = get_decimal_from_dms(geotags['GPSLatitude'], geotags['GPSLatitudeRef'])
lon = get_decimal_from_dms(geotags['GPSLongitude'], geotags['GPSLongitudeRef'])
return (lat,lon)
77/32: get_geotagging(exif)
77/33: get_location(geotags)
77/34: from geopy.geocoders import Here
78/1: from geopy.geocoders import Here
78/3:
78/4: get_exif
78/5: ls
78/6: cd ~/.test
78/7: ls
78/8: cd 2021-03-Mar/
78/9: ls
78/10: cd Villeurbanne/
78/11: get_exif('2021-03-04_11-50-32-img_20210304_125032.jpg')
78/12: exif=get_exif('2021-03-04_11-50-32-img_20210304_125032.jpg')
78/13: get_geotagging(exif)
78/14:
from PIL.ExifTags import GPSTAGS
def get_geotagging(exif):
if not exif:
raise ValueError("No EXIF metadata found")
geotagging = {}
for (idx, tag) in TAGS.items():
if tag == 'GPSInfo':
if idx not in exif:
raise ValueError("No EXIF geotagging found")
for (key, val) in GPSTAGS.items():
if key in exif[idx]:
geotagging[val] = exif[idx][key]
return geotagging
78/15: geotags = get_geotagging(exif)
78/17: geotags = get_geotagging(exif)
78/18: get_coordinates(geotags)
78/19:
78/23: get_location(geotags)
78/24:
78/25: get_location(geotags)
78/26:
def get_decimal_from_dms(dms, ref):
degrees = dms[0][0] / dms[0][1]
minutes = dms[1][0] / dms[1][1] / 60.0
seconds = dms[2][0] / dms[2][1] / 3600.0
if ref in ['S', 'W']:
degrees = -degrees
minutes = -minutes
seconds = -seconds
return round(degrees + minutes + seconds, 5)
78/27: get_location(geotags)
78/28:
def get_decimal_from_dms(dms, ref):
degrees = dms[0]
minutes = dms[1] / 60.0
seconds = dms[2] / 3600.0
if ref in ['S', 'W']:
degrees = -degrees
minutes = -minutes
seconds = -seconds
return round(degrees + minutes + seconds, 5)
78/29: get_location(geotags)
78/30: exif
78/31: get_geotagging(exif)
78/32: geotags = get_geotagging(exif)
78/33: get_coordinates(geotags)
78/34: geotags = get_geotagging(exif)
78/35: get_location(geotags)
78/36: get_coordinates(geotags)
78/37: coords = get_coordinates(geotags)
78/38: coords
78/39: uri = 'https://revgeocode.search.hereapi.com/v1/revgeocode'
78/40:
headers = {}
params = {
'apiKey': os.environ['API_KEY'],
'at': "%s,%s" % coords,
'lang': 'en-US',
'limit': 1,
}
78/41: headers = {}
78/42:
params = {
'apiKey': os.environ['API_KEY'],
'at': "%s,%s" % coords,
'lang': 'en-US',
'limit': 1,
}
78/43:
params = {
'apiKey': os.environ['API_KEY'],
'at': "%s,%s" % coords,
'lang': 'en-US',
'limit': 1,
}
78/44: API_KEY=m5aGo8xGe4LLhxeKZYpHr2MPXGN2aDhe
78/45: API_KEY='m5aGo8xGe4LLhxeKZYpHr2MPXGN2aDhe'
78/46:
params = {
'apiKey': os.environ['API_KEY'],
'at': "%s,%s" % coords,
'lang': 'en-US',
'limit': 1,
}
78/47: API_KEY='m5aGo8xGe4LLhxeKZYpHr2MPXGN2aDhe'
78/48:
params = {
'apiKey': os.environ['API_KEY'],
'at': "%s,%s" % coords,
'lang': 'en-US',
'limit': 1,
}
78/49:
params = {
'apiKey': os.environ['m5aGo8xGe4LLhxeKZYpHr2MPXGN2aDhe'],
'at': "%s,%s" % coords,
'lang': 'en-US',
'limit': 1,
}
78/50: %load_ext autotime
78/51:
import pandas as pd
import geopandas as gpd
import geopy
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiterimport matplotlib.pyplot as plt
import plotly_express as pximport tqdm
from tqdm._tqdm_notebook import tqdm_notebook
78/52:
import pandas as pd
import geopandas as gpd
import geopy
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiterimport matplotlib.pyplot as plt
import plotly_express as px
import pandas as pd
import geopandas as gpd
from PIL import Image
filename='2021-02-24_09-33-29-20210305_081001_01.mp4'
def get_exif(filename):
image = Image.open(filename)
image.verify()
return image._getexif()
exif=get_exif(filename)
from PIL.ExifTags import TAGS
from PIL.ExifTags import GPSTAGS
def get_geotagging(exif):
if not exif:
raise ValueError("No EXIF metadata found")
geotagging = {}
for (idx, tag) in TAGS.items():
if tag == 'GPSInfo':
if idx not in exif:
raise ValueError("No EXIF geotagging found")
for (key, val) in GPSTAGS.items():
if key in exif[idx]:
geotagging[val] = exif[idx][key]
return geotagging
geotags = get_geotagging(exif)
import os
import requests
def get_location(geotags):
coords = get_coordinates(geotags)
uri = 'https://revgeocode.search.hereapi.com/v1/revgeocode'
headers = {}
params = {
'apiKey': os.environ['API_KEY'],
'at': "%s,%s" % coords,
'lang': 'en-US',
'limit': 1,
}
response = requests.get(uri, headers=headers, params=params)
try:
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
print(str(e))
return {}
def get_coordinates(geotags):
lat = get_decimal_from_dms(geotags['GPSLatitude'], geotags['GPSLatitudeRef'])
lon = get_decimal_from_dms(geotags['GPSLongitude'], geotags['GPSLongitudeRef'])
return (lat,lon)
coords = get_coordinates(geotags)
import geopy
from geopy.geocoders import Nominatim
locator = Nominatim(user_agent='myGeocoder')
# coordinates ='53.480837, -2.244914'
lat='45.58339'
lon='4.79823'
coords = lat + ',' + lon
locator.reverse(coords)
location =locator.reverse(coords)
location.address.split(',')
city=location.address.split(',')[1].strip()
country=location.address.split(',')[7].strip()
location.raw
rint
country=location.raw['address']['country']
city=location.raw['address']['village']