diff --git a/ordigi.conf b/ordigi.conf index 3cf8e0a..8481f0c 100644 --- a/ordigi.conf +++ b/ordigi.conf @@ -15,3 +15,4 @@ name2=.DS_Store [Geolocation] geocoder=Nominatim prefer_english_names=False +# timeout=1 diff --git a/ordigi.py b/ordigi.py index 6f0b4bd..0aa511d 100755 --- a/ordigi.py +++ b/ordigi.py @@ -7,11 +7,12 @@ from datetime import datetime import click -from ordigi import config +from ordigi.config import Config from ordigi import constants from ordigi import log from ordigi.database import Db from ordigi.filesystem import FileSystem +from ordigi.geolocation import GeoLocation from ordigi.media import Media, get_all_subclasses from ordigi.summary import Summary @@ -98,29 +99,24 @@ def _sort(debug, dry_run, destination, clean, copy, exclude_regex, filter_by_ext if not os.path.exists(destination): logger.error(f'Directory {destination} does not exist') - conf = config.load_config(constants.CONFIG_FILE) - path_format = config.get_path_definition(conf) + config = Config(constants.CONFIG_FILE) + opt = config.get_options() # if no exclude list was passed in we check if there's a config if len(exclude_regex) == 0: - if 'Exclusions' in conf: - exclude_regex = [value for key, value in conf.items('Exclusions')] - + exclude_regex = opt['exclude_regex'] exclude_regex_list = set(exclude_regex) # Initialize Db db = Db(destination) - if 'Path' in conf and 'day_begins' in conf['Path']: - config_directory = conf['Path'] - day_begins = int(config_directory['day_begins']) - else: - day_begins = 0 - filesystem = FileSystem(cache, day_begins, dry_run, exclude_regex_list, - filter_by_ext, logger, max_deep, mode, path_format) + filesystem = FileSystem(cache, opt['day_begins'], dry_run, exclude_regex_list, + filter_by_ext, logger, max_deep, mode, opt['path_format']) + + loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'], opt['timeout']) summary, has_errors = filesystem.sort_files(paths, destination, db, - remove_duplicates, ignore_tags) + loc, remove_duplicates, ignore_tags) if clean: remove_empty_folders(destination, logger) diff --git a/ordigi/config.py b/ordigi/config.py index 855e6e8..a2e1c3b 100644 --- a/ordigi/config.py +++ b/ordigi/config.py @@ -1,47 +1,92 @@ -"""Load config file as a singleton.""" from configparser import RawConfigParser from os import path from ordigi import constants +from geopy.geocoders import options as gopt -def write(conf_file, config): - with open(conf_file, 'w') as conf_file: - config.write(conf_file) - return True +class Config: + """Manage config file""" - return False + def __init__(self, conf_path=None, conf={}): + self.conf_path = conf_path + if conf_path == None: + self.conf = conf + else: + self.conf = self.load_config() -def load_config(file): - if not path.exists(file): - return {} + def write(self, conf): + with open(self.conf_path, 'w') as conf_path: + conf.write(conf_path) + return True - config = RawConfigParser() - config.read(file) - return config + return False -def get_path_definition(config): - """Returns a list of folder definitions. + def load_config(self): + if not path.exists(self.conf_path): + return {} - Each element in the list represents a folder. - Fallback folders are supported and are nested lists. + conf = RawConfigParser() + conf.read(self.conf_path) + return conf - :returns: string - """ + def get_option(self, option, section): - if 'Path' in config: - if 'format' in config['Path']: - return config['Path']['format'] - elif 'dirs_path' and 'name' in config['Path']: - return config['Path']['dirs_path'] + '/' + config['Path']['name'] + if section in self.conf and option in self.conf[section]: + return self.conf[section][option] - return constants.default_path + '/' + constants.default_name + return False -def get_geocoder(): - config = load_config(constants.CONFIG_FILE) - if 'Geolocation' in config and 'geocoder' in config['Geolocation']: - geocoder = config['Geolocation']['geocoder'] - if geocoder in ('Nominatim', ): - return geocoder + def get_path_definition(self): + """Returns a list of folder definitions. - return constants.default_geocoder + Each element in the list represents a folder. + Fallback folders are supported and are nested lists. + + :returns: string + """ + + if 'Path' in self.conf: + if 'format' in self.conf['Path']: + return self.conf['Path']['format'] + elif 'dirs_path' and 'name' in self.conf['Path']: + return self.conf['Path']['dirs_path'] + '/' + self.conf['Path']['name'] + + return constants.default_path + '/' + constants.default_name + + def get_options(self): + """Get config options + :returns: dict + """ + + options = {} + geocoder = self.get_option('geocoder', 'Geolocation') + if geocoder and geocoder in ('Nominatim', ): + options['geocoder'] = geocoder + else: + options['geocoder'] = constants.default_geocoder + + prefer_english_names = self.get_option('prefer_english_names', 'Geolocation') + if prefer_english_names: + options['prefer_english_names'] = bool(prefer_english_names) + else: + options['prefer_english_names'] = False + + timeout = self.get_option('timeout', 'Geolocation') + if timeout: + options['timeout'] = timeout + else: + options['timeout'] = gopt.default_timeout + + options['path_format'] = self.get_path_definition() + + if 'Path' in self.conf and 'day_begins' in self.conf['Path']: + config_directory = self.conf['Path'] + options['day_begins'] = int(config_directory['day_begins']) + else: + options['day_begins'] = 0 + + if 'Exclusions' in self.conf: + options['exclude_regex'] = [value for key, value in self.conf.items('Exclusions')] + + return options diff --git a/ordigi/filesystem.py b/ordigi/filesystem.py index 4bebe27..52f7417 100644 --- a/ordigi/filesystem.py +++ b/ordigi/filesystem.py @@ -15,7 +15,6 @@ import time from datetime import datetime, timedelta from ordigi import constants -from ordigi import geolocation from ordigi import media from ordigi.media import Media, get_all_subclasses @@ -189,7 +188,7 @@ class FileSystem(object): return folder_name - def get_part(self, item, mask, metadata, db, subdirs): + def get_part(self, item, mask, metadata, db, subdirs, loc): """Parse a specific folder's name given a mask and metadata. :param item: Name of the item as defined in the path (i.e. date from %date) @@ -215,7 +214,7 @@ class FileSystem(object): if date is not None: part = date.strftime(mask) elif item in ('location', 'city', 'state', 'country'): - place_name = geolocation.place_name( + place_name = loc.place_name( metadata['latitude'], metadata['longitude'], db, @@ -251,7 +250,7 @@ class FileSystem(object): return part - def get_path(self, metadata, db, subdirs='', whitespace_sub='_'): + def get_path(self, metadata, db, loc, subdirs='', whitespace_sub='_'): """path_format: {%Y-%d-%m}/%u{city}/{album} Returns file path. @@ -272,7 +271,7 @@ class FileSystem(object): # parts = re.split(mask, this_part) # parts = this_part.split('%')[1:] part = self.get_part(item, matched.group()[1:-1], metadata, db, - subdirs) + subdirs, loc) part = part.strip() @@ -570,7 +569,7 @@ class FileSystem(object): return result - def sort_files(self, paths, destination, db, remove_duplicates=False, + def sort_files(self, paths, destination, db, loc, remove_duplicates=False, ignore_tags=set()): """ Sort files into appropriate folder @@ -592,7 +591,7 @@ class FileSystem(object): if media: metadata = media.get_metadata() # Get the destination path according to metadata - file_path = self.get_path(metadata, db, subdirs=subdirs) + file_path = self.get_path(metadata, db, loc, subdirs=subdirs) else: # Keep same directory structure file_path = os.path.relpath(src_path, path) diff --git a/ordigi/geolocation.py b/ordigi/geolocation.py index b595f3f..9cdcd1e 100644 --- a/ordigi/geolocation.py +++ b/ordigi/geolocation.py @@ -1,153 +1,119 @@ -"""Look up geolocation information for media objects.""" from os import path import geopy -from geopy.geocoders import Nominatim +from geopy.geocoders import Nominatim, options import logging -from ordigi import constants -from ordigi.config import load_config, get_geocoder +from ordigi import config __KEY__ = None __DEFAULT_LOCATION__ = 'Unknown Location' -__PREFER_ENGLISH_NAMES__ = None -def coordinates_by_name(name, db): - # Try to get cached location first - cached_coordinates = db.get_location_coordinates(name) - if(cached_coordinates is not None): - return { - 'latitude': cached_coordinates[0], - 'longitude': cached_coordinates[1] - } +class GeoLocation: + """Look up geolocation information for media objects.""" - # If the name is not cached then we go ahead with an API lookup - geocoder = get_geocoder() - if geocoder == 'Nominatim': - locator = Nominatim(user_agent='myGeocoder') - geolocation_info = locator.geocode(name) - if geolocation_info is not None: + def __init__(self, geocoder='Nominatim', prefer_english_names=False, timeout=options.default_timeout): + self.geocoder = geocoder + self.prefer_english_names = prefer_english_names + self.timeout = timeout + + def coordinates_by_name(self, name, db, timeout=options.default_timeout): + # Try to get cached location first + cached_coordinates = db.get_location_coordinates(name) + if(cached_coordinates is not None): return { - 'latitude': geolocation_info.latitude, - 'longitude': geolocation_info.longitude + 'latitude': cached_coordinates[0], + 'longitude': cached_coordinates[1] } - else: - raise NameError(geocoder) - return None - - -def decimal_to_dms(decimal): - decimal = float(decimal) - decimal_abs = abs(decimal) - minutes, seconds = divmod(decimal_abs*3600, 60) - degrees, minutes = divmod(minutes, 60) - degrees = degrees - sign = 1 if decimal >= 0 else -1 - return (degrees, minutes, seconds, sign) - - -def dms_to_decimal(degrees, minutes, seconds, direction=' '): - sign = 1 - if direction[0] in 'WSws': - sign = -1 - - return (degrees + minutes / 60 + seconds / 3600) * sign - - -def dms_string(decimal, type='latitude'): - # Example string -> 38 deg 14' 27.82" S - dms = decimal_to_dms(decimal) - if type == 'latitude': - direction = 'N' if decimal >= 0 else 'S' - elif type == 'longitude': - direction = 'E' if decimal >= 0 else 'W' - return '{} deg {}\' {}" {}'.format(dms[0], dms[1], dms[2], direction) - - -def get_prefer_english_names(): - global __PREFER_ENGLISH_NAMES__ - if __PREFER_ENGLISH_NAMES__ is not None: - return __PREFER_ENGLISH_NAMES__ - - config = load_config(constants.CONFIG_FILE) - if('prefer_english_names' not in config['Geolocation']): - return False - - __PREFER_ENGLISH_NAMES__ = bool(config['Geolocation']['prefer_english_names']) - return __PREFER_ENGLISH_NAMES__ - - -def place_name(lat, lon, db, cache=True, logger=logging.getLogger()): - lookup_place_name_default = {'default': __DEFAULT_LOCATION__} - if(lat is None or lon is None): - return lookup_place_name_default - - # Convert lat/lon to floats - if(not isinstance(lat, float)): - lat = float(lat) - if(not isinstance(lon, float)): - lon = float(lon) - - # Try to get cached location first - # 3km distace radious for a match - cached_place_name = None - if cache: - cached_place_name = db.get_location_name(lat, lon, 3000) - # We check that it's a dict to coerce an upgrade of the location - # db from a string location to a dictionary. See gh-160. - if(isinstance(cached_place_name, dict)): - return cached_place_name - - lookup_place_name = {} - geocoder = get_geocoder() - if geocoder == 'Nominatim': - geolocation_info = lookup_osm(lat, lon, logger) - else: - raise NameError(geocoder) - - if(geolocation_info is not None and 'address' in geolocation_info): - address = geolocation_info['address'] - # gh-386 adds support for town - # taking precedence after city for backwards compatability - for loc in ['city', 'town', 'village', 'state', 'country']: - if(loc in address): - lookup_place_name[loc] = address[loc] - # In many cases the desired key is not available so we - # set the most specific as the default. - if('default' not in lookup_place_name): - lookup_place_name['default'] = address[loc] - - if(lookup_place_name): - db.add_location(lat, lon, lookup_place_name) - # TODO: Maybe this should only be done on exit and not for every write. - db.update_location_db() - - if('default' not in lookup_place_name): - lookup_place_name = lookup_place_name_default - - return lookup_place_name - - -def lookup_osm(lat, lon, logger=logging.getLogger()): - - prefer_english_names = get_prefer_english_names() - try: - locator = Nominatim(user_agent='myGeocoder') - coords = (lat, lon) - if(prefer_english_names): - lang='en' + # If the name is not cached then we go ahead with an API lookup + geocoder = self.geocoder + if geocoder == 'Nominatim': + locator = Nominatim(user_agent='myGeocoder', timeout=timeout) + geolocation_info = locator.geocode(name) + if geolocation_info is not None: + return { + 'latitude': geolocation_info.latitude, + 'longitude': geolocation_info.longitude + } else: - lang='local' - return locator.reverse(coords, language=lang).raw - except geopy.exc.GeocoderUnavailable as e: - logger.error(e) - return None - # Fix *** TypeError: `address` must not be None - except (TypeError, ValueError) as e: - logger.error(e) + raise NameError(geocoder) + return None + def place_name(self, lat, lon, db, cache=True, logger=logging.getLogger(), timeout=options.default_timeout): + lookup_place_name_default = {'default': __DEFAULT_LOCATION__} + if(lat is None or lon is None): + return lookup_place_name_default + + # Convert lat/lon to floats + if(not isinstance(lat, float)): + lat = float(lat) + if(not isinstance(lon, float)): + lon = float(lon) + + # Try to get cached location first + # 3km distace radious for a match + cached_place_name = None + if cache: + cached_place_name = db.get_location_name(lat, lon, 3000) + # We check that it's a dict to coerce an upgrade of the location + # db from a string location to a dictionary. See gh-160. + if(isinstance(cached_place_name, dict)): + return cached_place_name + + lookup_place_name = {} + geocoder = self.geocoder + if geocoder == 'Nominatim': + geolocation_info = self.lookup_osm(lat, lon, logger, timeout) + else: + raise NameError(geocoder) + + if(geolocation_info is not None and 'address' in geolocation_info): + address = geolocation_info['address'] + # gh-386 adds support for town + # taking precedence after city for backwards compatability + for loc in ['city', 'town', 'village', 'state', 'country']: + if(loc in address): + lookup_place_name[loc] = address[loc] + # In many cases the desired key is not available so we + # set the most specific as the default. + if('default' not in lookup_place_name): + lookup_place_name['default'] = address[loc] + + if(lookup_place_name): + db.add_location(lat, lon, lookup_place_name) + # TODO: Maybe this should only be done on exit and not for every write. + db.update_location_db() + + if('default' not in lookup_place_name): + lookup_place_name = lookup_place_name_default + + return lookup_place_name + + + def lookup_osm(self, lat, lon, logger=logging.getLogger(), timeout=options.default_timeout): + + try: + locator = Nominatim(user_agent='myGeocoder', timeout=timeout) + coords = (lat, lon) + if(self.prefer_english_names): + lang='en' + else: + lang='local' + locator_reverse = locator.reverse(coords, language=lang) + if locator_reverse is not None: + return locator_reverse.raw + else: + return None + except geopy.exc.GeocoderUnavailable or geopy.exc.GeocoderServiceError as e: + logger.error(e) + return None + # Fix *** TypeError: `address` must not be None + except (TypeError, ValueError) as e: + logger.error(e) + return None + diff --git a/tests/conftest.py b/tests/conftest.py index e2bc519..f6a7e07 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,7 +6,7 @@ from pathlib import Path import shutil import tempfile -from ordigi import config +from ordigi.config import Config from ordigi.exiftool import _ExifToolProc ORDIGI_PATH = Path(__file__).parent.parent @@ -41,7 +41,8 @@ def conf_path(): 'geocoder': 'Nominatium' } conf_path = Path(tmp_path, "ordigi.conf") - config.write(conf_path, conf) + config = Config(conf_path) + config.write(conf) yield conf_path diff --git a/tests/test_config.py b/tests/test_config.py index 6ea7a87..72f2f46 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -4,7 +4,7 @@ import shutil import tempfile from unittest import mock -from ordigi import config +from ordigi.config import Config # Helpers import random @@ -21,7 +21,8 @@ class TestConfig: @pytest.fixture(scope="module") def conf(self, conf_path): - return config.load_config(conf_path) + config = Config(conf_path) + return config.load_config() def test_write(self, conf_path): assert conf_path.is_file() @@ -38,20 +39,21 @@ class TestConfig: def test_load_config_no_exist(self): # test file not exist - conf = config.load_config('filename') - assert conf == {} + config = Config('filename') + assert config.conf == {} def test_load_config_invalid(self, conf_path): # test invalid config write_random_file(conf_path) with pytest.raises(Exception) as e: - config.load_config(conf_path) + config = Config(conf_path) assert e.typename == 'MissingSectionHeaderError' def test_get_path_definition(self, conf): """ Get path definition from config """ - path = config.get_path_definition(conf) + config = Config(conf=conf) + path = config.get_path_definition() assert path == '%u{%Y-%m}/{city}|{city}-{%Y}/{folders[:1]}/{folder}/{%Y-%m-%b-%H-%M-%S}-{basename}.%l{ext}' diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py index cda0707..f55e1d4 100644 --- a/tests/test_filesystem.py +++ b/tests/test_filesystem.py @@ -10,9 +10,10 @@ from time import sleep from .conftest import copy_sample_files from ordigi import constants from ordigi.database import Db -from ordigi.filesystem import FileSystem -from ordigi.media import Media from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool +from ordigi.filesystem import FileSystem +from ordigi.geolocation import GeoLocation +from ordigi.media import Media @pytest.mark.skip() @@ -67,12 +68,13 @@ class TestFilesystem: exif_data = ExifToolCaching(str(file_path)).asdict() metadata = media.get_metadata() + loc = GeoLocation() for item, regex in items.items(): for mask in masks: matched = re.search(regex, mask) if matched: part = filesystem.get_part(item, mask[1:-1], - metadata, Db(tmp_path), subdirs) + metadata, Db(tmp_path), subdirs, loc) # check if part is correct assert isinstance(part, str), file_path if item == 'basename': @@ -138,8 +140,9 @@ class TestFilesystem: def test_sort_files(self, tmp_path): db = Db(tmp_path) filesystem = FileSystem(path_format=self.path_format) - - summary, has_errors = filesystem.sort_files([self.src_paths], tmp_path, db) + loc = GeoLocation() + summary, has_errors = filesystem.sort_files([self.src_paths], + tmp_path, db, loc) # Summary is created and there is no errors assert summary, summary @@ -154,7 +157,8 @@ class TestFilesystem: filesystem = FileSystem(path_format=self.path_format, mode=mode) # copy mode src_path = Path(self.src_paths, 'photo.png') - dest_path = Path(tmp_path,'photo_copy.png') + name = 'photo_' + mode + '.png' + dest_path = Path(tmp_path, name) src_checksum = filesystem.checksum(src_path) result_copy = filesystem.sort_file(src_path, dest_path) assert result_copy