Support user defined folder structure (#5) (#160)

* Custom date directories
* Add config module to parse ini files
* Add method/tests to get a customizable folder path
* Cache path definition
* Initial working version of custom paths
* Add Directory values in sample config
* Refactoring location parsing logic and adding tests
* Remove unused functions
* Add documentation to parse_mask_for_location
* Remove commented out code
* Add tests for migrating old location db to new
This commit is contained in:
Terence Eden 2017-01-03 04:58:52 +00:00 committed by Jaisen Mathai
parent 328aed6dfd
commit cd5ba92b0b
12 changed files with 511 additions and 106 deletions

View File

@ -290,6 +290,27 @@ cp config.ini-sample ~/.elodie/config.ini
# now you're ready to add your MapQuest key # now you're ready to add your MapQuest key
``` ```
## Custom folder structured
OK, so what if you don't like the folders being named "2016-01-Jan"? No problem!
You can add a custom, date based folder structure by editing `~/.elodie/config.ini`
By default, you'll see:
```
[Directory]
dir=%Y-%m-%b
```
### Examples
* To have just `201601`, use `dir=%Y%m`
* For `Sunday, 01 January 2016`, use `dir=%A, %d %B %Y`
* Python also has some pre-built formats. So you can get `Sun Jan 01 12:34:56 2016`, by using `%c`
You can use any of [the standard Python time directives](https://docs.python.org/2/library/datetime.html#strftime-and-strptime-behavior) to create your ideal structure.
## Questions, comments or concerns? ## Questions, comments or concerns?
The best ways to provide feedback is by reaching out on Twitter at [@getelodie](https://twitter.com/getelodie), opening a [GitHub issue](https://github.com/jmathai/elodie/issues) or emailing me at [jaisen@jmathai.com](mailto:jaisen@jmathai.com). The best ways to provide feedback is by reaching out on Twitter at [@getelodie](https://twitter.com/getelodie), opening a [GitHub issue](https://github.com/jmathai/elodie/issues) or emailing me at [jaisen@jmathai.com](mailto:jaisen@jmathai.com).

View File

@ -1,2 +1,8 @@
[MapQuest] [MapQuest]
key=your-api-key-goes-here key=your-api-key-goes-here
[Directory]
date=%Y-%m-%b
location=%city
full_path=%date/%location

19
elodie/config.py Normal file
View File

@ -0,0 +1,19 @@
"""Load config file as a singleton."""
from configparser import RawConfigParser
from os import path
from elodie import constants
config_file = '%s/config.ini' % constants.application_directory
def load_config():
if hasattr(load_config, "config"):
return load_config.config
if not path.exists(config_file):
return {}
load_config.config = RawConfigParser()
load_config.config.read(config_file)
return load_config.config

View File

@ -13,14 +13,21 @@ import time
from elodie import geolocation from elodie import geolocation
from elodie import log from elodie import log
from elodie.config import load_config
from elodie.localstorage import Db from elodie.localstorage import Db
from elodie.media.base import Base, get_all_subclasses from elodie.media.base import Base, get_all_subclasses
class FileSystem(object): class FileSystem(object):
"""A class for interacting with the file system.""" """A class for interacting with the file system."""
def __init__(self):
# The default folder path is along the lines of 2015-01-Jan/Chicago
self.default_folder_path_definition = [
('date', '%Y-%m-%b'), ('location', '%city')
]
self.cached_folder_path_definition = None
def create_directory(self, directory_path): def create_directory(self, directory_path):
"""Create a directory if it does not already exist. """Create a directory if it does not already exist.
@ -134,36 +141,74 @@ class FileSystem(object):
metadata['extension']) metadata['extension'])
return file_name.lower() return file_name.lower()
def get_folder_name_by_date(self, time_obj): def get_folder_path_definition(self):
"""Get date based folder name. # If we've done this already then return it immediately without
# incurring any extra work
if self.cached_folder_path_definition is not None:
return self.cached_folder_path_definition
:param time time_obj: Time object to be used to determine folder name. config = load_config()
:returns: str
""" # If Directory is in the config we assume full_path and its
return time.strftime('%Y-%m-%b', time_obj) # corresponding values (date, location) are also present
if('Directory' not in config):
return self.default_folder_path_definition
config_directory = config['Directory']
path_parts = re.search(
'\%([^/]+)\/\%([^/]+)',
config_directory['full_path']
)
if not path_parts or len(path_parts.groups()) != 2:
return self.default_folder_path_definition
path_part_groups = path_parts.groups()
self.cached_folder_path_definition = [
(path_part_groups[0], config_directory[path_part_groups[0]]),
(path_part_groups[1], config_directory[path_part_groups[1]]),
]
return self.cached_folder_path_definition
def get_folder_path(self, metadata): def get_folder_path(self, metadata):
"""Get folder path by various parameters. """Get folder path by various parameters.
:param time time_obj: Time object to be used to determine folder name. :param metadata dict: Metadata dictionary.
:returns: str :returns: str
""" """
path_parts = self.get_folder_path_definition()
path = [] path = []
if(metadata['date_taken'] is not None): for path_part in path_parts:
path.append(time.strftime('%Y-%m-%b', metadata['date_taken'])) part, mask = path_part
if part == 'date':
path.append(time.strftime(mask, metadata['date_taken']))
elif part == 'location':
if(
metadata['latitude'] is not None and
metadata['longitude'] is not None
):
place_name = geolocation.place_name(
metadata['latitude'],
metadata['longitude']
)
if(place_name is not None):
location_parts = re.findall('(%[^%]+)', mask)
parsed_folder_name = self.parse_mask_for_location(
mask,
location_parts,
place_name,
)
path.append(parsed_folder_name)
# For now we always make the leaf folder an album if it's in the EXIF.
# This is to preserve backwards compatability until we figure out how
# to include %album in the config.ini syntax.
if(metadata['album'] is not None): if(metadata['album'] is not None):
path.append(metadata['album']) if(len(path) == 1):
elif( path.append(metadata['album'])
metadata['latitude'] is not None and elif(len(path) == 2):
metadata['longitude'] is not None path[1] = metadata['album']
):
place_name = geolocation.place_name(
metadata['latitude'],
metadata['longitude']
)
if(place_name is not None):
path.append(place_name)
# if we don't have a 2nd level directory we use 'Unknown Location' # if we don't have a 2nd level directory we use 'Unknown Location'
if(len(path) < 2): if(len(path) < 2):
@ -172,6 +217,66 @@ class FileSystem(object):
# return '/'.join(path[::-1]) # return '/'.join(path[::-1])
return os.path.join(*path) return os.path.join(*path)
def parse_mask_for_location(self, mask, location_parts, place_name):
"""Takes a mask for a location and interpolates the actual place names.
Given these parameters here are the outputs.
mask=%city
location_parts=[('%city','%city','city')]
place_name={'city': u'Sunnyvale'}
output=Sunnyvale
mask=%city-%state
location_parts=[('%city-','%city','city'), ('%state','%state','state')]
place_name={'city': u'Sunnyvale', 'state': u'California'}
output=Sunnyvale-California
mask=%country
location_parts=[('%country','%country','country')]
place_name={'default': u'Sunnyvale', 'city': u'Sunnyvale'}
output=Sunnyvale
:param str mask: The location mask in the form of %city-%state, etc
:param list location_parts: A list of tuples in the form of
[('%city-', '%city', 'city'), ('%state', '%state', 'state')]
:param dict place_name: A dictionary of place keywords and names like
{'default': u'California', 'state': u'California'}
:returns: str
"""
found = False
folder_name = mask
for loc_part in location_parts:
# We assume the search returns a tuple of length 2.
# If not then it's a bad mask in config.ini.
# loc_part = '%country-random'
# component_full = '%country-random'
# component = '%country'
# key = 'country
component_full, component, key = re.search(
'((%([a-z]+))[^%]*)',
loc_part
).groups()
if(key in place_name):
found = True
replace_target = component
replace_with = place_name[key]
else:
replace_target = component_full
replace_with = ''
folder_name = folder_name.replace(
replace_target,
replace_with,
)
if(not found and folder_name == ''):
folder_name = place_name['default']
return folder_name
def process_file(self, _file, destination, media, **kwargs): def process_file(self, _file, destination, media, **kwargs):
move = False move = False
if('move' in kwargs): if('move' in kwargs):

View File

@ -7,13 +7,13 @@ from past.utils import old_div
standard_library.install_aliases() # noqa standard_library.install_aliases() # noqa
from os import path from os import path
from configparser import ConfigParser
import requests import requests
import urllib.request import urllib.request
import urllib.parse import urllib.parse
import urllib.error import urllib.error
from elodie.config import load_config
from elodie import constants from elodie import constants
from elodie import log from elodie import log
from elodie.localstorage import Db from elodie.localstorage import Db
@ -106,12 +106,11 @@ def get_key():
if not path.exists(config_file): if not path.exists(config_file):
return None return None
config = ConfigParser() config = load_config()
config.read(config_file) if('MapQuest' not in config):
if('MapQuest' not in config.sections()):
return None return None
__KEY__ = config.get('MapQuest', 'key') __KEY__ = config['MapQuest']['key']
return __KEY__ return __KEY__
@ -126,22 +125,28 @@ def place_name(lat, lon):
db = Db() db = Db()
# 3km distace radious for a match # 3km distace radious for a match
cached_place_name = db.get_location_name(lat, lon, 3000) cached_place_name = db.get_location_name(lat, lon, 3000)
if(cached_place_name is not None): # We check that it's a dict to coerce an upgrade of the location
# db from a string location to a dictionary. See gh-160.
if(isinstance(cached_place_name, dict)):
return cached_place_name return cached_place_name
lookup_place_name = None lookup_place_name = {}
geolocation_info = lookup(lat=lat, lon=lon) geolocation_info = lookup(lat=lat, lon=lon)
if(geolocation_info is not None): if(geolocation_info is not None):
if('address' in geolocation_info): if('address' in geolocation_info):
address = geolocation_info['address'] address = geolocation_info['address']
if('city' in address): for loc in ['city', 'state', 'country']:
lookup_place_name = address['city'] if(loc in address):
elif('state' in address): lookup_place_name[loc] = address[loc]
lookup_place_name = address['state'] # In many cases the desired key is not available so we
elif('country' in address): # set the most specific as the default.
lookup_place_name = address['country'] if('default' not in lookup_place_name):
lookup_place_name['default'] = address[loc]
if(lookup_place_name is not None): if('default' not in lookup_place_name):
lookup_place_name = 'Unknown Location'
if(lookup_place_name is not {}):
db.add_location(lat, lon, lookup_place_name) db.add_location(lat, lon, lookup_place_name)
# TODO: Maybe this should only be done on exit and not for every write. # TODO: Maybe this should only be done on exit and not for every write.
db.update_location_db() db.update_location_db()

View File

@ -0,0 +1,31 @@
from __future__ import absolute_import
# Project imports
import os
import sys
import unittest
from mock import patch
sys.path.insert(0, os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))))
from elodie import constants
from elodie.config import load_config
BASE_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
@patch('elodie.config.config_file', '%s/config.ini-sample' % BASE_PATH)
def test_load_config_singleton_success():
config = load_config()
assert config['MapQuest']['key'] == 'your-api-key-goes-here', config.get('MapQuest', 'key')
config.set('MapQuest', 'key', 'new-value')
config = load_config()
assert config['MapQuest']['key'] == 'new-value', config.get('MapQuest', 'key')
del load_config.config
@patch('elodie.config.config_file', '%s/config.ini-does-not-exist' % BASE_PATH)
def test_load_config_singleton_no_file():
config = load_config()
assert config == {}, config

View File

@ -15,7 +15,6 @@ sys.path.insert(0, os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirna
import helper import helper
elodie = load_source('elodie', os.path.abspath('{}/../../elodie.py'.format(os.path.dirname(os.path.realpath(__file__))))) elodie = load_source('elodie', os.path.abspath('{}/../../elodie.py'.format(os.path.dirname(os.path.realpath(__file__)))))
from elodie import constants
from elodie.localstorage import Db from elodie.localstorage import Db
from elodie.media.audio import Audio from elodie.media.audio import Audio
from elodie.media.photo import Photo from elodie.media.photo import Photo
@ -31,14 +30,14 @@ def test_import_file_text():
origin = '%s/valid.txt' % folder origin = '%s/valid.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin) shutil.copyfile(helper.get_file('valid.txt'), origin)
reset_hash_db() helper.reset_dbs()
dest_path = elodie.import_file(origin, folder_destination, False, False, False) dest_path = elodie.import_file(origin, folder_destination, False, False, False)
restore_hash_db() helper.restore_dbs()
shutil.rmtree(folder) shutil.rmtree(folder)
shutil.rmtree(folder_destination) shutil.rmtree(folder_destination)
assert helper.path_tz_fix(os.path.join('2016-04-Apr','Unknown Location','2016-04-07_11-15-26-valid-sample-title.txt')) in dest_path, dest_path assert helper.path_tz_fix(os.path.join('2016-04-Apr','London','2016-04-07_11-15-26-valid-sample-title.txt')) in dest_path, dest_path
def test_import_file_audio(): def test_import_file_audio():
temporary_folder, folder = helper.create_working_folder() temporary_folder, folder = helper.create_working_folder()
@ -47,9 +46,9 @@ def test_import_file_audio():
origin = '%s/audio.m4a' % folder origin = '%s/audio.m4a' % folder
shutil.copyfile(helper.get_file('audio.m4a'), origin) shutil.copyfile(helper.get_file('audio.m4a'), origin)
reset_hash_db() helper.reset_dbs()
dest_path = elodie.import_file(origin, folder_destination, False, False, False) dest_path = elodie.import_file(origin, folder_destination, False, False, False)
restore_hash_db() helper.restore_dbs()
shutil.rmtree(folder) shutil.rmtree(folder)
shutil.rmtree(folder_destination) shutil.rmtree(folder_destination)
@ -63,9 +62,9 @@ def test_import_file_photo():
origin = '%s/plain.jpg' % folder origin = '%s/plain.jpg' % folder
shutil.copyfile(helper.get_file('plain.jpg'), origin) shutil.copyfile(helper.get_file('plain.jpg'), origin)
reset_hash_db() helper.reset_dbs()
dest_path = elodie.import_file(origin, folder_destination, False, False, False) dest_path = elodie.import_file(origin, folder_destination, False, False, False)
restore_hash_db() helper.restore_dbs()
shutil.rmtree(folder) shutil.rmtree(folder)
shutil.rmtree(folder_destination) shutil.rmtree(folder_destination)
@ -79,9 +78,9 @@ def test_import_file_video():
origin = '%s/video.mov' % folder origin = '%s/video.mov' % folder
shutil.copyfile(helper.get_file('video.mov'), origin) shutil.copyfile(helper.get_file('video.mov'), origin)
reset_hash_db() helper.reset_dbs()
dest_path = elodie.import_file(origin, folder_destination, False, False, False) dest_path = elodie.import_file(origin, folder_destination, False, False, False)
restore_hash_db() helper.restore_dbs()
shutil.rmtree(folder) shutil.rmtree(folder)
shutil.rmtree(folder_destination) shutil.rmtree(folder_destination)
@ -113,10 +112,10 @@ def test_import_file_allow_duplicate_false():
origin = '%s/valid.txt' % folder origin = '%s/valid.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin) shutil.copyfile(helper.get_file('valid.txt'), origin)
reset_hash_db() helper.reset_dbs()
dest_path1 = elodie.import_file(origin, folder_destination, False, False, False) dest_path1 = elodie.import_file(origin, folder_destination, False, False, False)
dest_path2 = elodie.import_file(origin, folder_destination, False, False, False) dest_path2 = elodie.import_file(origin, folder_destination, False, False, False)
restore_hash_db() helper.restore_dbs()
shutil.rmtree(folder) shutil.rmtree(folder)
shutil.rmtree(folder_destination) shutil.rmtree(folder_destination)
@ -131,10 +130,10 @@ def test_import_file_allow_duplicate_true():
origin = '%s/valid.txt' % folder origin = '%s/valid.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin) shutil.copyfile(helper.get_file('valid.txt'), origin)
reset_hash_db() helper.reset_dbs()
dest_path1 = elodie.import_file(origin, folder_destination, False, False, True) dest_path1 = elodie.import_file(origin, folder_destination, False, False, True)
dest_path2 = elodie.import_file(origin, folder_destination, False, False, True) dest_path2 = elodie.import_file(origin, folder_destination, False, False, True)
restore_hash_db() helper.restore_dbs()
shutil.rmtree(folder) shutil.rmtree(folder)
shutil.rmtree(folder_destination) shutil.rmtree(folder_destination)
@ -185,9 +184,9 @@ def test_import_destination_in_source():
origin = '%s/video.mov' % folder origin = '%s/video.mov' % folder
shutil.copyfile(helper.get_file('video.mov'), origin) shutil.copyfile(helper.get_file('video.mov'), origin)
reset_hash_db() helper.reset_dbs()
dest_path = elodie.import_file(origin, folder_destination, False, False, False) dest_path = elodie.import_file(origin, folder_destination, False, False, False)
restore_hash_db() helper.restore_dbs()
shutil.rmtree(folder) shutil.rmtree(folder)
@ -203,9 +202,9 @@ def test_update_location_on_audio():
audio = Audio(origin) audio = Audio(origin)
metadata = audio.get_metadata() metadata = audio.get_metadata()
reset_hash_db() helper.reset_dbs()
status = elodie.update_location(audio, origin, 'Sunnyvale, CA') status = elodie.update_location(audio, origin, 'Sunnyvale, CA')
restore_hash_db() helper.restore_dbs()
audio_processed = Audio(origin) audio_processed = Audio(origin)
metadata_processed = audio_processed.get_metadata() metadata_processed = audio_processed.get_metadata()
@ -228,9 +227,9 @@ def test_update_location_on_photo():
photo = Photo(origin) photo = Photo(origin)
metadata = photo.get_metadata() metadata = photo.get_metadata()
reset_hash_db() helper.reset_dbs()
status = elodie.update_location(photo, origin, 'Sunnyvale, CA') status = elodie.update_location(photo, origin, 'Sunnyvale, CA')
restore_hash_db() helper.restore_dbs()
photo_processed = Photo(origin) photo_processed = Photo(origin)
metadata_processed = photo_processed.get_metadata() metadata_processed = photo_processed.get_metadata()
@ -253,9 +252,9 @@ def test_update_location_on_text():
text = Text(origin) text = Text(origin)
metadata = text.get_metadata() metadata = text.get_metadata()
reset_hash_db() helper.reset_dbs()
status = elodie.update_location(text, origin, 'Sunnyvale, CA') status = elodie.update_location(text, origin, 'Sunnyvale, CA')
restore_hash_db() helper.restore_dbs()
text_processed = Text(origin) text_processed = Text(origin)
metadata_processed = text_processed.get_metadata() metadata_processed = text_processed.get_metadata()
@ -278,9 +277,9 @@ def test_update_location_on_video():
video = Video(origin) video = Video(origin)
metadata = video.get_metadata() metadata = video.get_metadata()
reset_hash_db() helper.reset_dbs()
status = elodie.update_location(video, origin, 'Sunnyvale, CA') status = elodie.update_location(video, origin, 'Sunnyvale, CA')
restore_hash_db() helper.restore_dbs()
video_processed = Video(origin) video_processed = Video(origin)
metadata_processed = video_processed.get_metadata() metadata_processed = video_processed.get_metadata()
@ -303,9 +302,9 @@ def test_update_time_on_audio():
audio = Audio(origin) audio = Audio(origin)
metadata = audio.get_metadata() metadata = audio.get_metadata()
reset_hash_db() helper.reset_dbs()
status = elodie.update_time(audio, origin, '2000-01-01 12:00:00') status = elodie.update_time(audio, origin, '2000-01-01 12:00:00')
restore_hash_db() helper.restore_dbs()
audio_processed = Audio(origin) audio_processed = Audio(origin)
metadata_processed = audio_processed.get_metadata() metadata_processed = audio_processed.get_metadata()
@ -327,9 +326,9 @@ def test_update_time_on_photo():
photo = Photo(origin) photo = Photo(origin)
metadata = photo.get_metadata() metadata = photo.get_metadata()
reset_hash_db() helper.reset_dbs()
status = elodie.update_time(photo, origin, '2000-01-01 12:00:00') status = elodie.update_time(photo, origin, '2000-01-01 12:00:00')
restore_hash_db() helper.restore_dbs()
photo_processed = Photo(origin) photo_processed = Photo(origin)
metadata_processed = photo_processed.get_metadata() metadata_processed = photo_processed.get_metadata()
@ -351,9 +350,9 @@ def test_update_time_on_text():
text = Text(origin) text = Text(origin)
metadata = text.get_metadata() metadata = text.get_metadata()
reset_hash_db() helper.reset_dbs()
status = elodie.update_time(text, origin, '2000-01-01 12:00:00') status = elodie.update_time(text, origin, '2000-01-01 12:00:00')
restore_hash_db() helper.restore_dbs()
text_processed = Text(origin) text_processed = Text(origin)
metadata_processed = text_processed.get_metadata() metadata_processed = text_processed.get_metadata()
@ -375,9 +374,9 @@ def test_update_time_on_video():
video = Video(origin) video = Video(origin)
metadata = video.get_metadata() metadata = video.get_metadata()
reset_hash_db() helper.reset_dbs()
status = elodie.update_time(video, origin, '2000-01-01 12:00:00') status = elodie.update_time(video, origin, '2000-01-01 12:00:00')
restore_hash_db() helper.restore_dbs()
video_processed = Video(origin) video_processed = Video(origin)
metadata_processed = video_processed.get_metadata() metadata_processed = video_processed.get_metadata()
@ -400,16 +399,16 @@ def test_regenerate_valid_source():
origin = '%s/valid.txt' % folder origin = '%s/valid.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin) shutil.copyfile(helper.get_file('valid.txt'), origin)
reset_hash_db() helper.reset_dbs()
runner = CliRunner() runner = CliRunner()
result = runner.invoke(elodie._generate_db, ['--source', folder]) result = runner.invoke(elodie._generate_db, ['--source', folder])
db = Db() db = Db()
restore_hash_db() helper.restore_dbs()
shutil.rmtree(folder) shutil.rmtree(folder)
assert result.exit_code == 0, result.exit_code assert result.exit_code == 0, result.exit_code
assert 'bde2dc0b839a5d20b0b4c1f57605f84e0e2a4562aaebc1c362de6cb7cc02eeb3' in db.hash_db, db.hash_db assert '3c19a5d751cf19e093b7447297731124d9cc987d3f91a9d1872c3b1c1b15639a' in db.hash_db, db.hash_db
def test_regenerate_valid_source_with_invalid_files(): def test_regenerate_valid_source_with_invalid_files():
temporary_folder, folder = helper.create_working_folder() temporary_folder, folder = helper.create_working_folder()
@ -419,16 +418,16 @@ def test_regenerate_valid_source_with_invalid_files():
origin_invalid = '%s/invalid.invalid' % folder origin_invalid = '%s/invalid.invalid' % folder
shutil.copyfile(helper.get_file('invalid.invalid'), origin_invalid) shutil.copyfile(helper.get_file('invalid.invalid'), origin_invalid)
reset_hash_db() helper.reset_dbs()
runner = CliRunner() runner = CliRunner()
result = runner.invoke(elodie._generate_db, ['--source', folder]) result = runner.invoke(elodie._generate_db, ['--source', folder])
db = Db() db = Db()
restore_hash_db() helper.restore_dbs()
shutil.rmtree(folder) shutil.rmtree(folder)
assert result.exit_code == 0, result.exit_code assert result.exit_code == 0, result.exit_code
assert 'bde2dc0b839a5d20b0b4c1f57605f84e0e2a4562aaebc1c362de6cb7cc02eeb3' in db.hash_db, db.hash_db assert '3c19a5d751cf19e093b7447297731124d9cc987d3f91a9d1872c3b1c1b15639a' in db.hash_db, db.hash_db
assert 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' not in db.hash_db, db.hash_db assert 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' not in db.hash_db, db.hash_db
def test_verify_ok(): def test_verify_ok():
@ -437,11 +436,11 @@ def test_verify_ok():
origin = '%s/valid.txt' % folder origin = '%s/valid.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin) shutil.copyfile(helper.get_file('valid.txt'), origin)
reset_hash_db() helper.reset_dbs()
runner = CliRunner() runner = CliRunner()
runner.invoke(elodie._generate_db, ['--source', folder]) runner.invoke(elodie._generate_db, ['--source', folder])
result = runner.invoke(elodie._verify) result = runner.invoke(elodie._verify)
restore_hash_db() helper.restore_dbs()
shutil.rmtree(folder) shutil.rmtree(folder)
@ -454,25 +453,15 @@ def test_verify_error():
origin = '%s/valid.txt' % folder origin = '%s/valid.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin) shutil.copyfile(helper.get_file('valid.txt'), origin)
reset_hash_db() helper.reset_dbs()
runner = CliRunner() runner = CliRunner()
runner.invoke(elodie._generate_db, ['--source', folder]) runner.invoke(elodie._generate_db, ['--source', folder])
with open(origin, 'w') as f: with open(origin, 'w') as f:
f.write('changed text') f.write('changed text')
result = runner.invoke(elodie._verify) result = runner.invoke(elodie._verify)
restore_hash_db() helper.restore_dbs()
shutil.rmtree(folder) shutil.rmtree(folder)
assert origin in result.output, result.output assert origin in result.output, result.output
assert 'Error 1' in result.output, result.output assert 'Error 1' in result.output, result.output
def reset_hash_db():
hash_db = constants.hash_db
if os.path.isfile(hash_db):
os.rename(hash_db, '{}-test'.format(hash_db))
def restore_hash_db():
hash_db = '{}-test'.format(constants.hash_db)
if os.path.isfile(hash_db):
os.rename(hash_db, hash_db.replace('-test', ''))

View File

@ -1,3 +1,3 @@
{"date_taken":1460027726.0,"latitude":"123.456","longitude":"234.567","title":"sample title"} {"date_taken":1460027726.0,"latitude":"51.521435","longitude":"0.162714","title":"sample title"}
This file has a valid header. This file has a valid header.

View File

@ -1,18 +1,19 @@
from __future__ import absolute_import from __future__ import absolute_import
# Project imports # Project imports
import mock
import os import os
import re import re
import shutil import shutil
import time
import sys import sys
import time
from datetime import datetime from datetime import datetime
from datetime import timedelta from datetime import timedelta
from tempfile import gettempdir
import mock
sys.path.insert(0, os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))))) sys.path.insert(0, os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))))
from . import helper from . import helper
from elodie.config import load_config
from elodie.filesystem import FileSystem from elodie.filesystem import FileSystem
from elodie.media.text import Text from elodie.media.text import Text
from elodie.media.media import Media from elodie.media.media import Media
@ -190,18 +191,6 @@ def test_get_file_name_with_title():
assert file_name == helper.path_tz_fix('2015-12-05_00-59-26-with-title-some-title.jpg'), file_name assert file_name == helper.path_tz_fix('2015-12-05_00-59-26-with-title-some-title.jpg'), file_name
def test_get_folder_name_by_date():
filesystem = FileSystem()
time_tuple = (2010, 4, 15, 1, 2, 3, 0, 0, 0)
folder_name = filesystem.get_folder_name_by_date(time_tuple)
assert folder_name == '2010-04-Apr', folder_name
time_tuple = (2010, 9, 15, 1, 2, 3, 0, 0, 0)
folder_name = filesystem.get_folder_name_by_date(time_tuple)
assert folder_name == '2010-09-Sep', folder_name
def test_get_folder_path_plain(): def test_get_folder_path_plain():
filesystem = FileSystem() filesystem = FileSystem()
media = Photo(helper.get_file('plain.jpg')) media = Photo(helper.get_file('plain.jpg'))
@ -223,6 +212,28 @@ def test_get_folder_path_with_location():
assert path == os.path.join('2015-12-Dec','Sunnyvale'), path assert path == os.path.join('2015-12-Dec','Sunnyvale'), path
@mock.patch('elodie.config.config_file', '%s/config.ini-custom-path' % gettempdir())
def test_get_folder_path_with_custom_path():
with open('%s/config.ini-custom-path' % gettempdir(), 'w') as f:
f.write("""
[MapQuest]
key=czjNKTtFjLydLteUBwdgKAIC8OAbGLUx
[Directory]
date=%Y-%m-%d
location=%country-%state-%city
full_path=%date/%location
""")
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
media = Photo(helper.get_file('with-location.jpg'))
path = filesystem.get_folder_path(media.get_metadata())
if hasattr(load_config, 'config'):
del load_config.config
assert path == os.path.join('2015-12-05','United States of America-California-Sunnyvale'), path
def test_get_folder_path_with_location_and_title(): def test_get_folder_path_with_location_and_title():
filesystem = FileSystem() filesystem = FileSystem()
media = Photo(helper.get_file('with-location-and-title.jpg')) media = Photo(helper.get_file('with-location-and-title.jpg'))
@ -230,6 +241,84 @@ def test_get_folder_path_with_location_and_title():
assert path == os.path.join('2015-12-Dec','Sunnyvale'), path assert path == os.path.join('2015-12-Dec','Sunnyvale'), path
def test_parse_folder_name_default():
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
place_name = {'default': u'California', 'country': u'United States of America', 'state': u'California', 'city': u'Sunnyvale'}
mask = '%city'
location_parts = re.findall('(%[^%]+)', mask)
path = filesystem.parse_mask_for_location(mask, location_parts, place_name)
if hasattr(load_config, 'config'):
del load_config.config
assert path == 'Sunnyvale', path
def test_parse_folder_name_multiple():
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
place_name = {'default': u'California', 'country': u'United States of America', 'state': u'California', 'city': u'Sunnyvale'}
mask = '%city-%state-%country'
location_parts = re.findall('(%[^%]+)', mask)
path = filesystem.parse_mask_for_location(mask, location_parts, place_name)
if hasattr(load_config, 'config'):
del load_config.config
assert path == 'Sunnyvale-California-United States of America', path
def test_parse_folder_name_static_chars():
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
place_name = {'default': u'California', 'country': u'United States of America', 'state': u'California', 'city': u'Sunnyvale'}
mask = '%city-is-the-city'
location_parts = re.findall('(%[^%]+)', mask)
path = filesystem.parse_mask_for_location(mask, location_parts, place_name)
if hasattr(load_config, 'config'):
del load_config.config
assert path == 'Sunnyvale-is-the-city', path
def test_parse_folder_name_key_not_found():
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
place_name = {'default': u'California', 'country': u'United States of America', 'state': u'California'}
mask = '%city'
location_parts = re.findall('(%[^%]+)', mask)
path = filesystem.parse_mask_for_location(mask, location_parts, place_name)
if hasattr(load_config, 'config'):
del load_config.config
assert path == 'California', path
def test_parse_folder_name_key_not_found_with_static_chars():
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
place_name = {'default': u'California', 'country': u'United States of America', 'state': u'California'}
mask = '%city-is-not-found'
location_parts = re.findall('(%[^%]+)', mask)
path = filesystem.parse_mask_for_location(mask, location_parts, place_name)
if hasattr(load_config, 'config'):
del load_config.config
assert path == 'California', path
def test_parse_folder_name_multiple_keys_not_found():
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
place_name = {'default': u'United States of America', 'country': u'United States of America'}
mask = '%city-%state'
location_parts = re.findall('(%[^%]+)', mask)
path = filesystem.parse_mask_for_location(mask, location_parts, place_name)
if hasattr(load_config, 'config'):
del load_config.config
assert path == 'United States of America', path
def test_process_file_invalid(): def test_process_file_invalid():
filesystem = FileSystem() filesystem = FileSystem()
temporary_folder, folder = helper.create_working_folder() temporary_folder, folder = helper.create_working_folder()
@ -463,3 +552,95 @@ def test_set_utime_without_exif_date():
assert initial_time == final_stat.st_mtime assert initial_time == final_stat.st_mtime
assert final_stat.st_mtime == time.mktime(metadata_final['date_taken']), (final_stat.st_mtime, time.mktime(metadata_final['date_taken'])) assert final_stat.st_mtime == time.mktime(metadata_final['date_taken']), (final_stat.st_mtime, time.mktime(metadata_final['date_taken']))
assert initial_checksum == final_checksum assert initial_checksum == final_checksum
@mock.patch('elodie.config.config_file', '%s/config.ini-does-not-exist' % gettempdir())
def test_get_folder_path_definition_default():
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
path_definition = filesystem.get_folder_path_definition()
if hasattr(load_config, 'config'):
del load_config.config
assert path_definition == filesystem.default_folder_path_definition, path_definition
@mock.patch('elodie.config.config_file', '%s/config.ini-date-location' % gettempdir())
def test_get_folder_path_definition_date_location():
with open('%s/config.ini-date-location' % gettempdir(), 'w') as f:
f.write("""
[Directory]
date=%Y-%m-%d
location=%country
full_path=%date/%location
""")
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
path_definition = filesystem.get_folder_path_definition()
expected = [
('date', '%Y-%m-%d'), ('location', '%country')
]
if hasattr(load_config, 'config'):
del load_config.config
assert path_definition == expected, path_definition
@mock.patch('elodie.config.config_file', '%s/config.ini-location-date' % gettempdir())
def test_get_folder_path_definition_location_date():
with open('%s/config.ini-location-date' % gettempdir(), 'w') as f:
f.write("""
[Directory]
date=%Y-%m-%d
location=%country
full_path=%location/%date
""")
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
path_definition = filesystem.get_folder_path_definition()
expected = [
('location', '%country'), ('date', '%Y-%m-%d')
]
if hasattr(load_config, 'config'):
del load_config.config
assert path_definition == expected, path_definition
@mock.patch('elodie.config.config_file', '%s/config.ini-cached' % gettempdir())
def test_get_folder_path_definition_cached():
with open('%s/config.ini-cached' % gettempdir(), 'w') as f:
f.write("""
[Directory]
date=%Y-%m-%d
location=%country
full_path=%date/%location
""")
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
path_definition = filesystem.get_folder_path_definition()
expected = [
('date', '%Y-%m-%d'), ('location', '%country')
]
assert path_definition == expected, path_definition
with open('%s/config.ini-cached' % gettempdir(), 'w') as f:
f.write("""
[Directory]
date=%uncached
location=%uncached
full_path=%date/%location
""")
if hasattr(load_config, 'config'):
del load_config.config
filesystem = FileSystem()
path_definition = filesystem.get_folder_path_definition()
expected = [
('date', '%Y-%m-%d'), ('location', '%country')
]
if hasattr(load_config, 'config'):
del load_config.config

View File

@ -9,6 +9,7 @@ import random
import re import re
import sys import sys
from mock import patch from mock import patch
from tempfile import gettempdir
sys.path.insert(0, os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))))) sys.path.insert(0, os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))))
@ -111,6 +112,33 @@ def test_lookup_with_valid_key():
assert latLng['lat'] == 37.36883, latLng assert latLng['lat'] == 37.36883, latLng
assert latLng['lng'] == -122.03635, latLng assert latLng['lng'] == -122.03635, latLng
@mock.patch('elodie.constants.location_db', '%s/location.json-cached' % gettempdir())
def test_place_name_deprecated_string_cached():
# See gh-160 for backwards compatability needed when a string is stored instead of a dict
helper.reset_dbs()
with open('%s/location.json-cached' % gettempdir(), 'w') as f:
f.write("""
[{"lat": 37.3667027222222, "long": -122.033383611111, "name": "OLDVALUE"}]
"""
)
place_name = geolocation.place_name(37.3667027222222, -122.033383611111)
helper.restore_dbs()
assert place_name['city'] == 'Sunnyvale', place_name
@mock.patch('elodie.constants.location_db', '%s/location.json-cached' % gettempdir())
def test_place_name_cached():
helper.reset_dbs()
with open('%s/location.json-cached' % gettempdir(), 'w') as f:
f.write("""
[{"lat": 37.3667027222222, "long": -122.033383611111, "name": {"city": "UNITTEST"}}]
"""
)
place_name = geolocation.place_name(37.3667027222222, -122.033383611111)
helper.restore_dbs()
assert place_name['city'] == 'UNITTEST', place_name
@mock.patch('elodie.geolocation.__KEY__', 'invalid_key') @mock.patch('elodie.geolocation.__KEY__', 'invalid_key')
def test_lookup_with_invalid_key(): def test_lookup_with_invalid_key():
res = geolocation.lookup(location='Sunnyvale, CA') res = geolocation.lookup(location='Sunnyvale, CA')

View File

@ -14,6 +14,8 @@ import urllib
from datetime import datetime from datetime import datetime
from datetime import timedelta from datetime import timedelta
from elodie import constants
def checksum(file_path, blocksize=65536): def checksum(file_path, blocksize=65536):
hasher = hashlib.sha256() hasher = hashlib.sha256()
with open(file_path, 'rb') as f: with open(file_path, 'rb') as f:
@ -133,3 +135,21 @@ def isclose(a, b, rel_tol = 1e-8):
diff = abs(a - b) diff = abs(a - b)
return (diff <= abs(rel_tol * a) and return (diff <= abs(rel_tol * a) and
diff <= abs(rel_tol * b)) diff <= abs(rel_tol * b))
def reset_dbs():
hash_db = constants.hash_db
if os.path.isfile(hash_db):
os.rename(hash_db, '{}-test'.format(hash_db))
location_db = constants.location_db
if os.path.isfile(location_db):
os.rename(location_db, '{}-test'.format(location_db))
def restore_dbs():
hash_db = '{}-test'.format(constants.hash_db)
if os.path.isfile(hash_db):
os.rename(hash_db, hash_db.replace('-test', ''))
location_db = '{}-test'.format(constants.location_db)
if os.path.isfile(location_db):
os.rename(location_db, location_db.replace('-test', ''))

View File

@ -37,17 +37,17 @@ def test_get_title():
def test_get_default_coordinate(): def test_get_default_coordinate():
text = Text(helper.get_file('valid.txt')) text = Text(helper.get_file('valid.txt'))
text.get_metadata() text.get_metadata()
assert text.get_coordinate() == '123.456', text.get_coordinate() assert text.get_coordinate() == '51.521435', text.get_coordinate()
def test_get_coordinate_latitude(): def test_get_coordinate_latitude():
text = Text(helper.get_file('valid.txt')) text = Text(helper.get_file('valid.txt'))
text.get_metadata() text.get_metadata()
assert text.get_coordinate('latitude') == '123.456', text.get_coordinate('latitude') assert text.get_coordinate('latitude') == '51.521435', text.get_coordinate('latitude')
def test_get_coordinate_longitude(): def test_get_coordinate_longitude():
text = Text(helper.get_file('valid.txt')) text = Text(helper.get_file('valid.txt'))
text.get_metadata() text.get_metadata()
assert text.get_coordinate('longitude') == '234.567', text.get_coordinate('longitude') assert text.get_coordinate('longitude') == '0.162714', text.get_coordinate('longitude')
def test_get_date_taken(): def test_get_date_taken():
text = Text(helper.get_file('valid.txt')) text = Text(helper.get_file('valid.txt'))