Refactoring config and geolocation methods

This commit is contained in:
Cédric Leporcq 2021-08-24 17:23:51 +02:00
parent 67f3cd471a
commit 016329e044
8 changed files with 214 additions and 200 deletions

View File

@ -15,3 +15,4 @@ name2=.DS_Store
[Geolocation]
geocoder=Nominatim
prefer_english_names=False
# timeout=1

View File

@ -7,11 +7,12 @@ from datetime import datetime
import click
from ordigi import config
from ordigi.config import Config
from ordigi import constants
from ordigi import log
from ordigi.database import Db
from ordigi.filesystem import FileSystem
from ordigi.geolocation import GeoLocation
from ordigi.media import Media, get_all_subclasses
from ordigi.summary import Summary
@ -98,29 +99,24 @@ def _sort(debug, dry_run, destination, clean, copy, exclude_regex, filter_by_ext
if not os.path.exists(destination):
logger.error(f'Directory {destination} does not exist')
conf = config.load_config(constants.CONFIG_FILE)
path_format = config.get_path_definition(conf)
config = Config(constants.CONFIG_FILE)
opt = config.get_options()
# if no exclude list was passed in we check if there's a config
if len(exclude_regex) == 0:
if 'Exclusions' in conf:
exclude_regex = [value for key, value in conf.items('Exclusions')]
exclude_regex = opt['exclude_regex']
exclude_regex_list = set(exclude_regex)
# Initialize Db
db = Db(destination)
if 'Path' in conf and 'day_begins' in conf['Path']:
config_directory = conf['Path']
day_begins = int(config_directory['day_begins'])
else:
day_begins = 0
filesystem = FileSystem(cache, day_begins, dry_run, exclude_regex_list,
filter_by_ext, logger, max_deep, mode, path_format)
filesystem = FileSystem(cache, opt['day_begins'], dry_run, exclude_regex_list,
filter_by_ext, logger, max_deep, mode, opt['path_format'])
loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'], opt['timeout'])
summary, has_errors = filesystem.sort_files(paths, destination, db,
remove_duplicates, ignore_tags)
loc, remove_duplicates, ignore_tags)
if clean:
remove_empty_folders(destination, logger)

View File

@ -1,47 +1,92 @@
"""Load config file as a singleton."""
from configparser import RawConfigParser
from os import path
from ordigi import constants
from geopy.geocoders import options as gopt
def write(conf_file, config):
with open(conf_file, 'w') as conf_file:
config.write(conf_file)
return True
class Config:
"""Manage config file"""
return False
def __init__(self, conf_path=None, conf={}):
self.conf_path = conf_path
if conf_path == None:
self.conf = conf
else:
self.conf = self.load_config()
def load_config(file):
if not path.exists(file):
return {}
def write(self, conf):
with open(self.conf_path, 'w') as conf_path:
conf.write(conf_path)
return True
config = RawConfigParser()
config.read(file)
return config
return False
def get_path_definition(config):
"""Returns a list of folder definitions.
def load_config(self):
if not path.exists(self.conf_path):
return {}
Each element in the list represents a folder.
Fallback folders are supported and are nested lists.
conf = RawConfigParser()
conf.read(self.conf_path)
return conf
:returns: string
"""
def get_option(self, option, section):
if 'Path' in config:
if 'format' in config['Path']:
return config['Path']['format']
elif 'dirs_path' and 'name' in config['Path']:
return config['Path']['dirs_path'] + '/' + config['Path']['name']
if section in self.conf and option in self.conf[section]:
return self.conf[section][option]
return constants.default_path + '/' + constants.default_name
return False
def get_geocoder():
config = load_config(constants.CONFIG_FILE)
if 'Geolocation' in config and 'geocoder' in config['Geolocation']:
geocoder = config['Geolocation']['geocoder']
if geocoder in ('Nominatim', ):
return geocoder
def get_path_definition(self):
"""Returns a list of folder definitions.
return constants.default_geocoder
Each element in the list represents a folder.
Fallback folders are supported and are nested lists.
:returns: string
"""
if 'Path' in self.conf:
if 'format' in self.conf['Path']:
return self.conf['Path']['format']
elif 'dirs_path' and 'name' in self.conf['Path']:
return self.conf['Path']['dirs_path'] + '/' + self.conf['Path']['name']
return constants.default_path + '/' + constants.default_name
def get_options(self):
"""Get config options
:returns: dict
"""
options = {}
geocoder = self.get_option('geocoder', 'Geolocation')
if geocoder and geocoder in ('Nominatim', ):
options['geocoder'] = geocoder
else:
options['geocoder'] = constants.default_geocoder
prefer_english_names = self.get_option('prefer_english_names', 'Geolocation')
if prefer_english_names:
options['prefer_english_names'] = bool(prefer_english_names)
else:
options['prefer_english_names'] = False
timeout = self.get_option('timeout', 'Geolocation')
if timeout:
options['timeout'] = timeout
else:
options['timeout'] = gopt.default_timeout
options['path_format'] = self.get_path_definition()
if 'Path' in self.conf and 'day_begins' in self.conf['Path']:
config_directory = self.conf['Path']
options['day_begins'] = int(config_directory['day_begins'])
else:
options['day_begins'] = 0
if 'Exclusions' in self.conf:
options['exclude_regex'] = [value for key, value in self.conf.items('Exclusions')]
return options

View File

@ -15,7 +15,6 @@ import time
from datetime import datetime, timedelta
from ordigi import constants
from ordigi import geolocation
from ordigi import media
from ordigi.media import Media, get_all_subclasses
@ -189,7 +188,7 @@ class FileSystem(object):
return folder_name
def get_part(self, item, mask, metadata, db, subdirs):
def get_part(self, item, mask, metadata, db, subdirs, loc):
"""Parse a specific folder's name given a mask and metadata.
:param item: Name of the item as defined in the path (i.e. date from %date)
@ -215,7 +214,7 @@ class FileSystem(object):
if date is not None:
part = date.strftime(mask)
elif item in ('location', 'city', 'state', 'country'):
place_name = geolocation.place_name(
place_name = loc.place_name(
metadata['latitude'],
metadata['longitude'],
db,
@ -251,7 +250,7 @@ class FileSystem(object):
return part
def get_path(self, metadata, db, subdirs='', whitespace_sub='_'):
def get_path(self, metadata, db, loc, subdirs='', whitespace_sub='_'):
"""path_format: {%Y-%d-%m}/%u{city}/{album}
Returns file path.
@ -272,7 +271,7 @@ class FileSystem(object):
# parts = re.split(mask, this_part)
# parts = this_part.split('%')[1:]
part = self.get_part(item, matched.group()[1:-1], metadata, db,
subdirs)
subdirs, loc)
part = part.strip()
@ -570,7 +569,7 @@ class FileSystem(object):
return result
def sort_files(self, paths, destination, db, remove_duplicates=False,
def sort_files(self, paths, destination, db, loc, remove_duplicates=False,
ignore_tags=set()):
"""
Sort files into appropriate folder
@ -592,7 +591,7 @@ class FileSystem(object):
if media:
metadata = media.get_metadata()
# Get the destination path according to metadata
file_path = self.get_path(metadata, db, subdirs=subdirs)
file_path = self.get_path(metadata, db, loc, subdirs=subdirs)
else:
# Keep same directory structure
file_path = os.path.relpath(src_path, path)

View File

@ -1,153 +1,119 @@
"""Look up geolocation information for media objects."""
from os import path
import geopy
from geopy.geocoders import Nominatim
from geopy.geocoders import Nominatim, options
import logging
from ordigi import constants
from ordigi.config import load_config, get_geocoder
from ordigi import config
__KEY__ = None
__DEFAULT_LOCATION__ = 'Unknown Location'
__PREFER_ENGLISH_NAMES__ = None
def coordinates_by_name(name, db):
# Try to get cached location first
cached_coordinates = db.get_location_coordinates(name)
if(cached_coordinates is not None):
return {
'latitude': cached_coordinates[0],
'longitude': cached_coordinates[1]
}
class GeoLocation:
"""Look up geolocation information for media objects."""
# If the name is not cached then we go ahead with an API lookup
geocoder = get_geocoder()
if geocoder == 'Nominatim':
locator = Nominatim(user_agent='myGeocoder')
geolocation_info = locator.geocode(name)
if geolocation_info is not None:
def __init__(self, geocoder='Nominatim', prefer_english_names=False, timeout=options.default_timeout):
self.geocoder = geocoder
self.prefer_english_names = prefer_english_names
self.timeout = timeout
def coordinates_by_name(self, name, db, timeout=options.default_timeout):
# Try to get cached location first
cached_coordinates = db.get_location_coordinates(name)
if(cached_coordinates is not None):
return {
'latitude': geolocation_info.latitude,
'longitude': geolocation_info.longitude
'latitude': cached_coordinates[0],
'longitude': cached_coordinates[1]
}
else:
raise NameError(geocoder)
return None
def decimal_to_dms(decimal):
decimal = float(decimal)
decimal_abs = abs(decimal)
minutes, seconds = divmod(decimal_abs*3600, 60)
degrees, minutes = divmod(minutes, 60)
degrees = degrees
sign = 1 if decimal >= 0 else -1
return (degrees, minutes, seconds, sign)
def dms_to_decimal(degrees, minutes, seconds, direction=' '):
sign = 1
if direction[0] in 'WSws':
sign = -1
return (degrees + minutes / 60 + seconds / 3600) * sign
def dms_string(decimal, type='latitude'):
# Example string -> 38 deg 14' 27.82" S
dms = decimal_to_dms(decimal)
if type == 'latitude':
direction = 'N' if decimal >= 0 else 'S'
elif type == 'longitude':
direction = 'E' if decimal >= 0 else 'W'
return '{} deg {}\' {}" {}'.format(dms[0], dms[1], dms[2], direction)
def get_prefer_english_names():
global __PREFER_ENGLISH_NAMES__
if __PREFER_ENGLISH_NAMES__ is not None:
return __PREFER_ENGLISH_NAMES__
config = load_config(constants.CONFIG_FILE)
if('prefer_english_names' not in config['Geolocation']):
return False
__PREFER_ENGLISH_NAMES__ = bool(config['Geolocation']['prefer_english_names'])
return __PREFER_ENGLISH_NAMES__
def place_name(lat, lon, db, cache=True, logger=logging.getLogger()):
lookup_place_name_default = {'default': __DEFAULT_LOCATION__}
if(lat is None or lon is None):
return lookup_place_name_default
# Convert lat/lon to floats
if(not isinstance(lat, float)):
lat = float(lat)
if(not isinstance(lon, float)):
lon = float(lon)
# Try to get cached location first
# 3km distace radious for a match
cached_place_name = None
if cache:
cached_place_name = db.get_location_name(lat, lon, 3000)
# We check that it's a dict to coerce an upgrade of the location
# db from a string location to a dictionary. See gh-160.
if(isinstance(cached_place_name, dict)):
return cached_place_name
lookup_place_name = {}
geocoder = get_geocoder()
if geocoder == 'Nominatim':
geolocation_info = lookup_osm(lat, lon, logger)
else:
raise NameError(geocoder)
if(geolocation_info is not None and 'address' in geolocation_info):
address = geolocation_info['address']
# gh-386 adds support for town
# taking precedence after city for backwards compatability
for loc in ['city', 'town', 'village', 'state', 'country']:
if(loc in address):
lookup_place_name[loc] = address[loc]
# In many cases the desired key is not available so we
# set the most specific as the default.
if('default' not in lookup_place_name):
lookup_place_name['default'] = address[loc]
if(lookup_place_name):
db.add_location(lat, lon, lookup_place_name)
# TODO: Maybe this should only be done on exit and not for every write.
db.update_location_db()
if('default' not in lookup_place_name):
lookup_place_name = lookup_place_name_default
return lookup_place_name
def lookup_osm(lat, lon, logger=logging.getLogger()):
prefer_english_names = get_prefer_english_names()
try:
locator = Nominatim(user_agent='myGeocoder')
coords = (lat, lon)
if(prefer_english_names):
lang='en'
# If the name is not cached then we go ahead with an API lookup
geocoder = self.geocoder
if geocoder == 'Nominatim':
locator = Nominatim(user_agent='myGeocoder', timeout=timeout)
geolocation_info = locator.geocode(name)
if geolocation_info is not None:
return {
'latitude': geolocation_info.latitude,
'longitude': geolocation_info.longitude
}
else:
lang='local'
return locator.reverse(coords, language=lang).raw
except geopy.exc.GeocoderUnavailable as e:
logger.error(e)
return None
# Fix *** TypeError: `address` must not be None
except (TypeError, ValueError) as e:
logger.error(e)
raise NameError(geocoder)
return None
def place_name(self, lat, lon, db, cache=True, logger=logging.getLogger(), timeout=options.default_timeout):
lookup_place_name_default = {'default': __DEFAULT_LOCATION__}
if(lat is None or lon is None):
return lookup_place_name_default
# Convert lat/lon to floats
if(not isinstance(lat, float)):
lat = float(lat)
if(not isinstance(lon, float)):
lon = float(lon)
# Try to get cached location first
# 3km distace radious for a match
cached_place_name = None
if cache:
cached_place_name = db.get_location_name(lat, lon, 3000)
# We check that it's a dict to coerce an upgrade of the location
# db from a string location to a dictionary. See gh-160.
if(isinstance(cached_place_name, dict)):
return cached_place_name
lookup_place_name = {}
geocoder = self.geocoder
if geocoder == 'Nominatim':
geolocation_info = self.lookup_osm(lat, lon, logger, timeout)
else:
raise NameError(geocoder)
if(geolocation_info is not None and 'address' in geolocation_info):
address = geolocation_info['address']
# gh-386 adds support for town
# taking precedence after city for backwards compatability
for loc in ['city', 'town', 'village', 'state', 'country']:
if(loc in address):
lookup_place_name[loc] = address[loc]
# In many cases the desired key is not available so we
# set the most specific as the default.
if('default' not in lookup_place_name):
lookup_place_name['default'] = address[loc]
if(lookup_place_name):
db.add_location(lat, lon, lookup_place_name)
# TODO: Maybe this should only be done on exit and not for every write.
db.update_location_db()
if('default' not in lookup_place_name):
lookup_place_name = lookup_place_name_default
return lookup_place_name
def lookup_osm(self, lat, lon, logger=logging.getLogger(), timeout=options.default_timeout):
try:
locator = Nominatim(user_agent='myGeocoder', timeout=timeout)
coords = (lat, lon)
if(self.prefer_english_names):
lang='en'
else:
lang='local'
locator_reverse = locator.reverse(coords, language=lang)
if locator_reverse is not None:
return locator_reverse.raw
else:
return None
except geopy.exc.GeocoderUnavailable or geopy.exc.GeocoderServiceError as e:
logger.error(e)
return None
# Fix *** TypeError: `address` must not be None
except (TypeError, ValueError) as e:
logger.error(e)
return None

View File

@ -6,7 +6,7 @@ from pathlib import Path
import shutil
import tempfile
from ordigi import config
from ordigi.config import Config
from ordigi.exiftool import _ExifToolProc
ORDIGI_PATH = Path(__file__).parent.parent
@ -41,7 +41,8 @@ def conf_path():
'geocoder': 'Nominatium'
}
conf_path = Path(tmp_path, "ordigi.conf")
config.write(conf_path, conf)
config = Config(conf_path)
config.write(conf)
yield conf_path

View File

@ -4,7 +4,7 @@ import shutil
import tempfile
from unittest import mock
from ordigi import config
from ordigi.config import Config
# Helpers
import random
@ -21,7 +21,8 @@ class TestConfig:
@pytest.fixture(scope="module")
def conf(self, conf_path):
return config.load_config(conf_path)
config = Config(conf_path)
return config.load_config()
def test_write(self, conf_path):
assert conf_path.is_file()
@ -38,20 +39,21 @@ class TestConfig:
def test_load_config_no_exist(self):
# test file not exist
conf = config.load_config('filename')
assert conf == {}
config = Config('filename')
assert config.conf == {}
def test_load_config_invalid(self, conf_path):
# test invalid config
write_random_file(conf_path)
with pytest.raises(Exception) as e:
config.load_config(conf_path)
config = Config(conf_path)
assert e.typename == 'MissingSectionHeaderError'
def test_get_path_definition(self, conf):
"""
Get path definition from config
"""
path = config.get_path_definition(conf)
config = Config(conf=conf)
path = config.get_path_definition()
assert path == '%u{%Y-%m}/{city}|{city}-{%Y}/{folders[:1]}/{folder}/{%Y-%m-%b-%H-%M-%S}-{basename}.%l{ext}'

View File

@ -10,9 +10,10 @@ from time import sleep
from .conftest import copy_sample_files
from ordigi import constants
from ordigi.database import Db
from ordigi.filesystem import FileSystem
from ordigi.media import Media
from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool
from ordigi.filesystem import FileSystem
from ordigi.geolocation import GeoLocation
from ordigi.media import Media
@pytest.mark.skip()
@ -67,12 +68,13 @@ class TestFilesystem:
exif_data = ExifToolCaching(str(file_path)).asdict()
metadata = media.get_metadata()
loc = GeoLocation()
for item, regex in items.items():
for mask in masks:
matched = re.search(regex, mask)
if matched:
part = filesystem.get_part(item, mask[1:-1],
metadata, Db(tmp_path), subdirs)
metadata, Db(tmp_path), subdirs, loc)
# check if part is correct
assert isinstance(part, str), file_path
if item == 'basename':
@ -138,8 +140,9 @@ class TestFilesystem:
def test_sort_files(self, tmp_path):
db = Db(tmp_path)
filesystem = FileSystem(path_format=self.path_format)
summary, has_errors = filesystem.sort_files([self.src_paths], tmp_path, db)
loc = GeoLocation()
summary, has_errors = filesystem.sort_files([self.src_paths],
tmp_path, db, loc)
# Summary is created and there is no errors
assert summary, summary
@ -154,7 +157,8 @@ class TestFilesystem:
filesystem = FileSystem(path_format=self.path_format, mode=mode)
# copy mode
src_path = Path(self.src_paths, 'photo.png')
dest_path = Path(tmp_path,'photo_copy.png')
name = 'photo_' + mode + '.png'
dest_path = Path(tmp_path, name)
src_checksum = filesystem.checksum(src_path)
result_copy = filesystem.sort_file(src_path, dest_path)
assert result_copy