Change json Db to Sqlite

This commit is contained in:
Cédric Leporcq 2021-08-31 16:18:41 +02:00
parent 9b055c88bd
commit cc958cf53b
10 changed files with 389 additions and 313 deletions

View File

@ -10,7 +10,6 @@ import click
from ordigi.config import Config
from ordigi import constants
from ordigi import log
from ordigi.database import Db
from ordigi.collection import Collection
from ordigi.geolocation import GeoLocation
from ordigi.media import Media, get_all_subclasses
@ -87,11 +86,6 @@ def _sort(debug, dry_run, destination, clean, copy, exclude_regex, filter_by_ext
paths = set(paths)
filter_by_ext = set(filter_by_ext)
destination = os.path.abspath(os.path.expanduser(destination))
if not os.path.exists(destination):
logger.error(f'Directory {destination} does not exist')
config = Config(constants.CONFIG_FILE)
opt = config.get_options()
@ -100,17 +94,14 @@ def _sort(debug, dry_run, destination, clean, copy, exclude_regex, filter_by_ext
exclude_regex = opt['exclude_regex']
exclude_regex_list = set(exclude_regex)
# Initialize Db
db = Db(destination)
collection = Collection(opt['path_format'], destination, cache,
collection = Collection(destination, opt['path_format'], cache,
opt['day_begins'], dry_run, exclude_regex_list, filter_by_ext,
logger, max_deep, mode)
loc = GeoLocation(opt['geocoder'], opt['prefer_english_names'],
opt['timeout'])
summary, has_errors = collection.sort_files(paths, db, loc,
summary, has_errors = collection.sort_files(paths, loc,
remove_duplicates, ignore_tags)
if clean:
@ -176,18 +167,16 @@ def _clean(debug, dedup_regex, dry_run, folders, max_deep, path_string, remove_d
if not root:
root = path
if clean_all or folders:
remove_empty_folders(path, logger)
config = Config(constants.CONFIG_FILE)
opt = config.get_options()
if path_string:
# Initialize Db
db = Db(root)
collection = Collection(opt['path_format'], root, dry_run=dry_run, logger=logger, max_deep=max_deep, mode='move')
collection = Collection(root, opt['path_format'], dry_run=dry_run, logger=logger, max_deep=max_deep, mode='move')
dedup_regex = list(dedup_regex)
summary, has_errors = collection.dedup_regex(path, dedup_regex, db, logger, remove_duplicates)
summary, has_errors = collection.dedup_regex(path, dedup_regex, logger, remove_duplicates)
if clean_all or folders:
remove_empty_folders(path, logger)
if verbose or debug:
summary.write()
@ -251,16 +240,12 @@ def _compare(debug, dry_run, find_duplicates, output_dir, remove_duplicates,
config = Config(constants.CONFIG_FILE)
opt = config.get_options()
# Initialize Db
db = Db(root)
collection = Collection(path_format, root, mode='move', dry_run=dry_run, logger=logger)
collection = Collection(root, None, mode='move', dry_run=dry_run, logger=logger)
if revert_compare:
summary, has_errors = collection.revert_compare(path, db, dry_run)
summary, has_errors = collection.revert_compare(path, dry_run)
else:
summary, has_errors = collection.sort_similar_images(path, db,
similarity)
summary, has_errors = collection.sort_similar_images(path, similarity)
if verbose or debug:
summary.write()

View File

@ -14,6 +14,7 @@ import shutil
from datetime import datetime, timedelta
from ordigi import media
from ordigi.database import Sqlite
from ordigi.media import Media, get_all_subclasses
from ordigi.images import Images
from ordigi.summary import Summary
@ -22,12 +23,20 @@ from ordigi.summary import Summary
class Collection(object):
"""Class of the media collection."""
def __init__(self, path_format, root, cache=False, day_begins=0, dry_run=False, exclude_regex_list=set(),
def __init__(self, root, path_format, cache=False, day_begins=0, dry_run=False, exclude_regex_list=set(),
filter_by_ext=set(), logger=logging.getLogger(), max_deep=None,
mode='copy'):
self.root = root
# Attributes
self.root = Path(root).expanduser().absolute()
if not os.path.exists(self.root):
logger.error(f'Directory {self.root} does not exist')
sys.exit(1)
self.path_format = path_format
self.db = Sqlite(self.root)
# Options
self.cache = cache
self.day_begins = day_begins
self.dry_run = dry_run
@ -43,7 +52,6 @@ class Collection(object):
self.logger = logger
self.max_deep = max_deep
self.mode = mode
self.path_format = path_format
self.summary = Summary()
self.whitespace_regex = '[ \t\n\r\f\v]+'
@ -90,38 +98,6 @@ class Collection(object):
for i, rx in regex.items():
yield i, rx
def get_location_part(self, mask, part, place_name):
"""Takes a mask for a location and interpolates the actual place names.
Given these parameters here are the outputs.
mask = 'city'
part = 'city-random'
place_name = {'city': u'Sunnyvale'}
return 'Sunnyvale'
mask = 'location'
part = 'location'
place_name = {'default': u'Sunnyvale', 'city': u'Sunnyvale'}
return 'Sunnyvale'
:returns: str
"""
folder_name = part
if(mask in place_name):
replace_target = mask
replace_with = place_name[mask]
else:
replace_target = part
replace_with = ''
folder_name = folder_name.replace(
replace_target,
replace_with,
)
return folder_name
def check_for_early_morning_photos(self, date):
"""check for early hour photos to be grouped with previous day"""
@ -132,7 +108,7 @@ class Collection(object):
return date
def get_part(self, item, mask, metadata, db, subdirs, loc):
def get_part(self, item, mask, metadata, subdirs):
"""Parse a specific folder's name given a mask and metadata.
:param item: Name of the item as defined in the path (i.e. date from %date)
@ -157,18 +133,6 @@ class Collection(object):
date = self.check_for_early_morning_photos(date)
if date is not None:
part = date.strftime(mask)
elif item in ('location', 'city', 'state', 'country'):
place_name = loc.place_name(
metadata['latitude'],
metadata['longitude'],
db,
self.cache,
self.logger
)
if item == 'location':
mask = 'default'
part = self.get_location_part(mask, item, place_name)
elif item == 'folder':
part = os.path.basename(subdirs)
@ -178,24 +142,27 @@ class Collection(object):
part = os.path.join(*folders)
elif item in ('album','camera_make', 'camera_model', 'ext',
'original_name', 'title'):
if metadata[item]:
part = metadata[item]
elif item in ('album','camera_make', 'camera_model', 'city', 'country', 'ext',
'location', 'original_name', 'state', 'title'):
if item == 'location':
mask = 'default'
if metadata[mask]:
part = metadata[mask]
elif item in 'custom':
# Fallback string
part = mask[1:-1]
return part
def get_path_part(self, this_part, metadata, db, subdirs, loc):
def get_path_part(self, this_part, metadata, subdirs):
"""Build path part
:returns: part (string)"""
for item, regex in self.items.items():
matched = re.search(regex, this_part)
if matched:
part = self.get_part(item, matched.group()[1:-1], metadata, db,
subdirs, loc)
part = self.get_part(item, matched.group()[1:-1], metadata,
subdirs)
part = part.strip()
@ -216,7 +183,7 @@ class Collection(object):
return this_part
def get_path(self, metadata, db, loc, subdirs='', whitespace_sub='_'):
def get_path(self, metadata, subdirs='', whitespace_sub='_'):
"""path_format: {%Y-%d-%m}/%u{city}/{album}
Returns file path.
@ -229,7 +196,7 @@ class Collection(object):
for path_part in path_parts:
this_parts = path_part.split('|')
for this_part in this_parts:
this_part = self.get_path_part(this_part, metadata, db, subdirs, loc)
this_part = self.get_path_part(this_part, metadata, subdirs)
if this_part:
# Check if all masks are substituted
@ -364,15 +331,28 @@ class Collection(object):
return src_checksum
def check_file(self, src_path, dest_path, src_checksum, db):
def _add_db_data(self, dest_path, metadata, checksum):
loc_keys = ('latitude', 'longitude', 'city', 'state', 'country', 'default')
loc_values = []
for key in loc_keys:
loc_values.append(metadata[key])
metadata['location_id'] = self.db.add_location(*loc_values)
file_keys = ('original_name', 'date_original', 'album', 'location_id')
file_values = []
for key in file_keys:
file_values.append(metadata[key])
dest_path_rel = os.path.relpath(dest_path, self.root)
self.db.add_file_data(dest_path_rel, checksum, *file_values)
def record_file(self, src_path, dest_path, src_checksum, metadata):
# Check if file remain the same
checksum = self.checkcomp(dest_path, src_checksum)
has_errors = False
if checksum:
if not self.dry_run:
db.add_hash(checksum, dest_path)
db.update_hash_db()
self._add_db_data(dest_path, metadata, checksum)
self.summary.append((src_path, dest_path))
@ -452,7 +432,7 @@ class Collection(object):
self.logger.info(f'copy: {src_path} -> {dest_path}')
return True
def solve_conflicts(self, conflict_file_list, db, remove_duplicates):
def solve_conflicts(self, conflict_file_list, metadata, remove_duplicates):
has_errors = False
unresolved_conflicts = []
while conflict_file_list != []:
@ -484,8 +464,8 @@ class Collection(object):
has_errors = True
if result:
self.summary, has_errors = self.check_file(src_path,
dest_path, src_checksum, db)
self.summary, has_errors = self.record_file(src_path,
dest_path, src_checksum, metadata)
if has_errors:
return False
@ -591,7 +571,7 @@ class Collection(object):
# Initialize date taken to what's returned from the metadata function.
os.utime(file_path, (int(datetime.now().timestamp()), int(date_taken.timestamp())))
def dedup_regex(self, path, dedup_regex, db, logger, remove_duplicates=False):
def dedup_regex(self, path, dedup_regex, logger, remove_duplicates=False):
# cycle throught files
has_errors = False
path = self.check_path(path)
@ -634,22 +614,22 @@ class Collection(object):
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result:
self.summary, has_errors = self.check_file(src_path,
dest_path, src_checksum, db)
self.summary, has_errors = self.record_file(src_path,
dest_path, src_checksum, metadata)
elif result is False:
# There is conflict files
conflict_file_list.append({'src_path': src_path,
'src_checksum': src_checksum, 'dest_path': dest_path})
if conflict_file_list != []:
result = self.solve_conflicts(conflict_file_list, db, remove_duplicates)
result = self.solve_conflicts(conflict_file_list, metadata, remove_duplicates)
if not result:
has_errors = True
return self.summary, has_errors
def sort_files(self, paths, db, loc, remove_duplicates=False,
def sort_files(self, paths, loc, remove_duplicates=False,
ignore_tags=set()):
"""
Sort files into appropriate folder
@ -664,9 +644,9 @@ class Collection(object):
src_checksum = self.checksum(src_path)
media = Media(src_path, ignore_tags, self.logger)
if media:
metadata = media.get_metadata()
metadata = media.get_metadata(loc, self.db, self.cache)
# Get the destination path according to metadata
file_path = self.get_path(metadata, db, loc, subdirs=subdirs)
file_path = self.get_path(metadata, subdirs=subdirs)
else:
# Keep same directory structure
file_path = os.path.relpath(src_path, path)
@ -679,28 +659,31 @@ class Collection(object):
result = self.sort_file(src_path, dest_path, remove_duplicates)
if result is False:
if result:
self.summary, has_errors = self.record_file(src_path,
dest_path, src_checksum, metadata)
elif result is False:
# There is conflict files
conflict_file_list.append({'src_path': src_path,
'src_checksum': src_checksum, 'dest_path': dest_path})
if conflict_file_list != []:
result = self.solve_conflicts(conflict_file_list, db, remove_duplicates)
result = self.solve_conflicts(conflict_file_list, metadata,
remove_duplicates)
if not result:
has_errors = True
return self.summary, has_errors
def set_hash(self, result, src_path, dest_path, src_checksum, db):
def set_hash(self, result, src_path, dest_path, src_checksum):
if result:
# Check if file remain the same
result = self.checkcomp(dest_path, src_checksum)
has_errors = False
if result:
if not self.dry_run:
db.add_hash(checksum, dest_path)
db.update_hash_db()
self._add_db_data(dest_path, metadata, checksum)
if dest_path:
self.logger.info(f'{src_path} -> {dest_path}')
@ -718,7 +701,7 @@ class Collection(object):
return has_errors
def move_file(self, img_path, dest_path, checksum, db):
def move_file(self, img_path, dest_path, checksum):
if not self.dry_run:
try:
shutil.move(img_path, dest_path)
@ -726,9 +709,9 @@ class Collection(object):
self.logger.error(error)
self.logger.info(f'move: {img_path} -> {dest_path}')
return self.set_hash(True, img_path, dest_path, checksum, db)
return self.set_hash(True, img_path, dest_path, checksum)
def sort_similar_images(self, path, db, similarity=80):
def sort_similar_images(self, path, similarity=80):
has_errors = False
path = self.check_path(path)
@ -769,7 +752,7 @@ class Collection(object):
result = self.create_directory(dest_directory)
# Move the simlars file into the destination directory
if result:
result = self.move_file(img_path, dest_path, checksum2, db)
result = self.move_file(img_path, dest_path, checksum2)
moved_imgs.add(img_path)
if not result:
has_errors = True
@ -780,7 +763,7 @@ class Collection(object):
if similar:
dest_path = os.path.join(dest_directory,
os.path.basename(image))
result = self.move_file(image, dest_path, checksum1, db)
result = self.move_file(image, dest_path, checksum1)
moved_imgs.add(image)
if not result:
has_errors = True
@ -790,7 +773,7 @@ class Collection(object):
return self.summary, has_errors
def revert_compare(self, path, db):
def revert_compare(self, path):
has_errors = False
path = self.check_path(path)
@ -810,7 +793,7 @@ class Collection(object):
continue
checksum = self.checksum(img_path)
dest_path = os.path.join(dirname, os.path.basename(img_path))
result = self.move_file(img_path, dest_path, checksum, db)
result = self.move_file(img_path, dest_path, checksum)
if not result:
has_errors = True
# remove directory

View File

@ -3,7 +3,6 @@ Settings.
"""
from os import environ, path
from sys import version_info
#: If True, debug messages will be printed.
debug = False
@ -17,26 +16,8 @@ else:
confighome = path.join(environ['HOME'], '.config')
application_directory = path.join(confighome, 'ordigi')
default_path = '{%Y-%m-%b}/{album}|{city}|{"Unknown Location"}'
default_path = '{%Y-%m-%b}/{album}|{city}'
default_name = '{%Y-%m-%d_%H-%M-%S}-{name}-{title}.%l{ext}'
default_geocoder = 'Nominatim'
# Checksum storage file.
hash_db = 'hash.json'
# TODO will be removed eventualy later
# hash_db = '{}/hash.json'.format(application_directory)
# Geolocation details file.
location_db = 'location.json'
# TODO will be removed eventualy later
# location_db = '{}/location.json'.format(application_directory)
# Ordigi installation directory.
script_directory = path.dirname(path.dirname(path.abspath(__file__)))
#: Accepted language in responses from MapQuest
accepted_language = 'en'
# check python version, required in collection.py to trigger appropriate method
python_version = version_info.major
CONFIG_FILE = f'{application_directory}/ordigi.conf'

View File

@ -1,134 +1,172 @@
"""
Methods for interacting with database files
"""
from builtins import map
from builtins import object
import json
import os
from pathlib import Path
import sqlite3
import sys
from math import radians, cos, sqrt
from shutil import copyfile
from time import strftime
from ordigi import constants
from ordigi.utils import distance_between_two_points
class Db(object):
class Sqlite:
"""A class for interacting with the JSON files database."""
"""Methods for interacting with Sqlite database"""
def __init__(self, target_dir):
# Create dir for target database
dirname = os.path.join(target_dir, '.ordigi')
db_dir = Path(target_dir, '.ordigi')
if not os.path.exists(dirname):
if not db_dir.exists():
try:
os.makedirs(dirname)
db_dir.mkdir()
except OSError:
pass
# self.hash_db = constants.hash_db
self.hash_db_file = os.path.join(dirname, constants.hash_db)
self.check_db(self.hash_db_file)
self.db_type = 'SQLite format 3'
self.filename = Path(db_dir, target_dir.name + '.db')
self.con = sqlite3.connect(self.filename)
# Allow selecting column by name
self.con.row_factory = sqlite3.Row
self.cur = self.con.cursor()
self.hash_db = {}
# Create tables
if not self.is_table('file'):
self.create_file_table()
if not self.is_table('location'):
self.create_location_table()
# We know from above that this file exists so we open it
# for reading only.
with open(self.hash_db_file, 'r') as f:
try:
self.hash_db = json.load(f)
except ValueError:
pass
def is_Sqlite3(self, filename):
import ipdb; ipdb.set_trace()
if not os.path.isfile(filename):
return False
if os.path.getsize(filename) < 100: # SQLite database file header is 100 bytes
return False
# self.location_db_file = constants.location_db
self.location_db_file = os.path.join(dirname, constants.location_db)
self.check_db(self.location_db_file)
with open(filename, 'rb') as fd:
header = fd.read(100)
self.location_db = []
return header[:16] == self.db_type + '\x00'
# We know from above that this file exists so we open it
# for reading only.
with open(self.location_db_file, 'r') as f:
try:
self.location_db = json.load(f)
except ValueError:
pass
def is_table(self, table):
"""Check if table exist"""
def check_db(self, db_file):
'''Load db from file'''
# If the hash db doesn't exist we create it.
# Otherwise we only open for reading
if not os.path.isfile(db_file):
with open(db_file, 'a'):
os.utime(db_file, None)
try:
# get the count of tables with the name
self.cur.execute(f"SELECT count(name) FROM sqlite_master WHERE type='table' AND name='{table}'")
except sqlite3.DatabaseError as e:
# raise type(e)(e.message + ' :{self.filename} %s' % arg1)
raise sqlite3.DatabaseError(f"{self.filename} is not valid database")
def add_hash(self, key, value, write=False):
"""Add a hash to the hash db.
# if the count is 1, then table exists
if self.cur.fetchone()[0] == 1:
return True
:param str key:
:param str value:
:param bool write: If true, write the hash db to disk.
return False
def _run(self, query, n=0):
result = None
result = self.cur.execute(query).fetchone()
if result:
return result[n]
else:
return None
def _run_many(self, query):
self.cur.executemany(query, table_list)
if self.cur.fetchone()[0] != 1:
return False
self.con.commit()
return True
def create_file_table(self):
query = """create table file (
FilePath text not null primary key,
Checksum text,
OriginalName text,
DateOriginal text,
Album text,
LocationId integer)
"""
self.hash_db[key] = value
if(write is True):
self.update_hash_db()
self.cur.execute(query)
# Location database
# Currently quite simple just a list of long/lat pairs with a name
# If it gets many entries a lookup might take too long and a better
# structure might be needed. Some speed up ideas:
# - Sort it and inter-half method can be used
# - Use integer part of long or lat as key to get a lower search list
# - Cache a small number of lookups, images are likely to be taken in
# clusters around a spot during import.
def add_location(self, latitude, longitude, place, write=False):
"""Add a location to the database.
def add_file_data(self, FilePath, Checksum, OriginalName, DateOriginal,
Album, LocationId):
query =f"""insert into file values
('{FilePath}', '{Checksum}', '{OriginalName}',
'{DateOriginal}', '{Album}', '{LocationId}')"""
:param float latitude: Latitude of the location.
:param float longitude: Longitude of the location.
:param str place: Name for the location.
:param bool write: If true, write the location db to disk.
self.cur.execute(query)
self.con.commit()
def add_file_values(self, table_list):
query = f"insert into file values (?, ?, ?, ?, ?, ?)"
return self._run_many(query)
def get_checksum(self, FilePath):
query = f"select Checksum from file where FilePath='{FilePath}'"
return self._run(query)
def get_file_data(self, FilePath, data):
query = f"select {data} from file where FilePath='{FilePath}'"
return self._run(query)
def create_location_table(self):
query = """create table location (
Latitude real not null,
Longitude real not null,
City text,
State text,
Country text,
'Default' text)
"""
data = {}
data['lat'] = latitude
data['long'] = longitude
data['name'] = place
self.location_db.append(data)
if(write is True):
self.update_location_db()
self.cur.execute(query)
def backup_hash_db(self):
"""Backs up the hash db."""
# TODO
if os.path.isfile(self.hash_db_file):
mask = strftime('%Y-%m-%d_%H-%M-%S')
backup_file_name = '%s-%s' % (self.hash_db_file, mask)
copyfile(self.hash_db_file, backup_file_name)
return backup_file_name
def match_location(self, Latitude, Longitude):
query = f"""select 1 from location where Latitude='{Latitude}'
and Longitude='{Longitude}'"""
return self._run(query)
def check_hash(self, key):
"""Check whether a hash is present for the given key.
def add_location(self, Latitude, Longitude, City, State, Country, Default):
# Check if row with same latitude and longitude have not been already
# added
location_id = self.get_location(Latitude, Longitude, 'ROWID')
:param str key:
:returns: bool
"""
return key in self.hash_db
if not location_id:
query = f"""insert into location values
('{Latitude}', '{Longitude}', '{City}', '{State}',
'{Country}', '{Default}')
"""
self.cur.execute(query)
self.con.commit()
def get_hash(self, key):
"""Get the hash value for a given key.
return self._run('select last_insert_rowid()')
:param str key:
:returns: str or None
"""
if(self.check_hash(key) is True):
return self.hash_db[key]
return None
return location_id
def get_location_name(self, latitude, longitude, threshold_m):
def add_location_values(self, table_list):
query = f"insert into location values (?, ?, ?, ?, ?, ?)"
return _insert_many_query(query)
def get_location_data(self, LocationId, data):
query = f"select {data} from file where ROWID='{LocationId}'"
return self._run(query)
def get_location(self, Latitude, Longitude, column):
query = f"""select {column} from location where Latitude='{Latitude}'
and Longitude='{Longitude}'"""
return self._run(query)
def _get_table(self, table):
self.cur.execute(f'SELECT * FROM {table}').fetchall()
def get_location_nearby(self, latitude, longitude, Column,
threshold_m=3000):
"""Find a name for a location in the database.
:param float latitude: Latitude of the location.
@ -137,58 +175,36 @@ class Db(object):
the given latitude and longitude.
:returns: str, or None if a matching location couldn't be found.
"""
last_d = sys.maxsize
name = None
for data in self.location_db:
# As threshold is quite small use simple math
# From http://stackoverflow.com/questions/15736995/how-can-i-quickly-estimate-the-distance-between-two-latitude-longitude-points # noqa
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = list(map(
radians,
[longitude, latitude, data['long'], data['lat']]
))
r = 6371000 # radius of the earth in m
x = (lon2 - lon1) * cos(0.5 * (lat2 + lat1))
y = lat2 - lat1
d = r * sqrt(x * x + y * y)
shorter_distance = sys.maxsize
value = None
self.cur.execute('SELECT * FROM location')
for row in self.cur:
distance = distance_between_two_points(latitude, longitude,
row[0], row[1])
# Use if closer then threshold_km reuse lookup
if(d <= threshold_m and d < last_d):
name = data['name']
last_d = d
if(distance < shorter_distance and distance <= threshold_m):
shorter_distance = distance
value = row[Column]
return name
return value
def get_location_coordinates(self, name):
"""Get the latitude and longitude for a location.
:param str name: Name of the location.
:returns: tuple(float), or None if the location wasn't in the database.
def delete_row(self, table, id):
"""
for data in self.location_db:
if data['name'] == name:
return (data['lat'], data['long'])
return None
def all(self):
"""Generator to get all entries from self.hash_db
:returns tuple(string)
Delete a row by row id in table
:param table: database table
:param id: id of the row
:return:
"""
for checksum, path in self.hash_db.items():
yield (checksum, path)
sql = f'delete from {table} where id=?'
self.cur.execute(sql, (id,))
self.con.commit()
def reset_hash_db(self):
self.hash_db = {}
def update_hash_db(self):
"""Write the hash db to disk."""
with open(self.hash_db_file, 'w') as f:
json.dump(self.hash_db, f)
def update_location_db(self):
"""Write the location db to disk."""
with open(self.location_db_file, 'w') as f:
json.dump(self.location_db, f)
def delete_all_rows(self, table):
"""
Delete all row in table
:param table: database table
:return:
"""
sql = f'delete from {table}'
self.cur.execute(sql)
self.con.commit()

View File

@ -8,7 +8,6 @@ import logging
from ordigi import config
__KEY__ = None
__DEFAULT_LOCATION__ = 'Unknown Location'
class GeoLocation:
@ -43,8 +42,8 @@ class GeoLocation:
return None
def place_name(self, lat, lon, db, cache=True, logger=logging.getLogger(), timeout=options.default_timeout):
lookup_place_name_default = {'default': __DEFAULT_LOCATION__}
def place_name(self, lat, lon, logger=logging.getLogger(), timeout=options.default_timeout):
lookup_place_name_default = {'default': None}
if(lat is None or lon is None):
return lookup_place_name_default
@ -54,16 +53,6 @@ class GeoLocation:
if(not isinstance(lon, float)):
lon = float(lon)
# Try to get cached location first
# 3km distace radious for a match
cached_place_name = None
if cache:
cached_place_name = db.get_location_name(lat, lon, 3000)
# We check that it's a dict to coerce an upgrade of the location
# db from a string location to a dictionary. See gh-160.
if(isinstance(cached_place_name, dict)):
return cached_place_name
lookup_place_name = {}
geocoder = self.geocoder
if geocoder == 'Nominatim':
@ -83,11 +72,6 @@ class GeoLocation:
if('default' not in lookup_place_name):
lookup_place_name['default'] = address[loc]
if(lookup_place_name):
db.add_location(lat, lon, lookup_place_name)
# TODO: Maybe this should only be done on exit and not for every write.
db.update_location_db()
if('default' not in lookup_place_name):
lookup_place_name = lookup_place_name_default

View File

@ -147,7 +147,7 @@ class Media():
value = re.sub(regex , r'\g<1>-\g<2>-\g<3>', value)
return parse(value)
except BaseException or dateutil.parser._parser.ParserError as e:
self.logger.error(e, value)
self.logger.warning(e.args, value)
return None
def get_coordinates(self, key, value):
@ -186,7 +186,7 @@ class Media():
return None
def get_metadata(self):
def get_metadata(self, loc=None, db=None, cache=False):
"""Get a dictionary of metadata from exif.
All keys will be present and have a value of None if not obtained.
@ -220,8 +220,38 @@ class Media():
self.metadata[key] = formated_data
self.metadata['base_name'] = os.path.basename(os.path.splitext(self.file_path)[0])
self.metadata['ext'] = os.path.splitext(self.file_path)[1][1:]
self.metadata['directory_path'] = os.path.dirname(self.file_path)
self.metadata['ext'] = os.path.splitext(self.file_path)[1][1:]
loc_keys = ('latitude', 'longitude', 'city', 'state', 'country', 'default')
location_id = None
if cache and db:
location_id = db.get_file_data(self.file_path, 'LocationId')
if location_id:
for key in loc_keys:
# use str to convert non string format data like latitude and
# longitude
self.metadata[key] = str(db.get_location(location_id, key.capitalize()))
elif loc:
place_name = loc.place_name(
self.metadata['latitude'],
self.metadata['longitude'],
self.logger
)
for key in ('city', 'state', 'country', 'default'):
# mask = 'city'
# place_name = {'default': u'Sunnyvale', 'city-random': u'Sunnyvale'}
if(key in place_name):
self.metadata[key] = place_name[key]
else:
self.metadata[key] = None
else:
for key in loc_keys:
self.metadata[key] = None
self.metadata['location_id'] = location_id
return self.metadata
@ -252,6 +282,13 @@ class Media():
return Media(_file, logger, ignore_tags=ignore_tags, logger=logger)
def set_value(self, tag, value):
"""Set value of a tag.
:returns: value (str)
"""
return ExifToolCaching(self.file_path, self.logger).setvalue(tag, value)
def set_date_taken(self, date_key, time):
"""Set the date/time a photo was taken.
@ -301,7 +338,7 @@ class Media():
"""
folder = os.path.basename(os.path.dirname(self.file_path))
return set_value(self, 'album', folder)
return self.set_value('album', folder)
def get_all_subclasses(cls=None):

16
ordigi/utils.py Normal file
View File

@ -0,0 +1,16 @@
from math import radians, cos, sqrt
def distance_between_two_points(lat1, lon1, lat2, lon2):
# As threshold is quite small use simple math
# From http://stackoverflow.com/questions/15736995/how-can-i-quickly-estimate-the-distance-between-two-latitude-longitude-points # noqa
# convert decimal degrees to radians
lat1, lon1, lat2, lon2 = list(map(
radians,
[lat1, lon1, lat2, lon2]
))
r = 6371000 # radius of the earth in m
x = (lon2 - lon1) * cos(0.5 * (lat2 + lat1))
y = lat2 - lat1
return r * sqrt(x * x + y * y)

View File

@ -36,6 +36,9 @@ def randomize_files(dest_dir):
# Get files randomly
paths = Path(dest_dir).glob('*')
for path, subdirs, files in os.walk(dest_dir):
if '.ordigi' in path:
continue
for name in files:
file_path = PurePath(path, name)
if bool(random.getrandbits(1)):
@ -46,6 +49,13 @@ def randomize_files(dest_dir):
shutil.copyfile(file_path, dest_path)
def randomize_db(dest_dir):
# alterate database
file_path = Path(str(dest_dir), '.ordigi', str(dest_dir.name) + '.db')
with open(file_path, 'wb') as fout:
fout.write(os.urandom(random.randrange(128, 2048)))
@pytest.fixture(scope="module")
def conf_path():
conf_dir = tempfile.mkdtemp(prefix='ordigi-')

View File

@ -2,24 +2,21 @@
from datetime import datetime
import os
import pytest
import sqlite3
from pathlib import Path
import re
from sys import platform
from time import sleep
from .conftest import randomize_files
from .conftest import randomize_files, randomize_db
from ordigi import constants
from ordigi.database import Db
from ordigi.database import Sqlite
from ordigi.exiftool import ExifToolCaching, exiftool_is_running, terminate_exiftool
from ordigi.collection import Collection
from ordigi.geolocation import GeoLocation
from ordigi.media import Media
@pytest.mark.skip()
class TestDb:
pass
class TestCollection:
@pytest.fixture(autouse=True)
@ -36,7 +33,7 @@ class TestCollection:
Test all parts
"""
# Item to search for:
collection = Collection(self.path_format, tmp_path)
collection = Collection(tmp_path, self.path_format)
items = collection.get_items()
masks = [
'{album}',
@ -60,7 +57,6 @@ class TestCollection:
]
subdirs = Path('a', 'b', 'c', 'd')
for file_path in self.file_paths:
media = Media(str(file_path))
exif_tags = {}
@ -69,14 +65,14 @@ class TestCollection:
exif_tags[key] = media.tags_keys[key]
exif_data = ExifToolCaching(str(file_path)).asdict()
metadata = media.get_metadata()
loc = GeoLocation()
metadata = media.get_metadata(loc)
for item, regex in items.items():
for mask in masks:
matched = re.search(regex, mask)
if matched:
part = collection.get_part(item, mask[1:-1],
metadata, Db(tmp_path), subdirs, loc)
metadata, subdirs)
# check if part is correct
assert isinstance(part, str), file_path
if item == 'basename':
@ -115,7 +111,7 @@ class TestCollection:
def test_get_date_taken(self, tmp_path):
collection = Collection(self.path_format, tmp_path)
collection = Collection(tmp_path, self.path_format)
for file_path in self.file_paths:
exif_data = ExifToolCaching(str(file_path)).asdict()
media = Media(str(file_path))
@ -140,32 +136,33 @@ class TestCollection:
assert date_taken == media.metadata['date_modified']
def test_sort_files(self, tmp_path):
db = Db(tmp_path)
collection = Collection(self.path_format, tmp_path)
collection = Collection(tmp_path, self.path_format)
loc = GeoLocation()
summary, has_errors = collection.sort_files([self.src_paths],
db, loc)
summary, has_errors = collection.sort_files([self.src_paths], loc)
# Summary is created and there is no errors
assert summary, summary
assert not has_errors, has_errors
randomize_files(tmp_path)
collection = Collection(self.path_format, tmp_path)
loc = GeoLocation()
summary, has_errors = collection.sort_files([self.src_paths],
db, loc)
summary, has_errors = collection.sort_files([self.src_paths], loc)
# Summary is created and there is no errors
assert summary, summary
assert not has_errors, has_errors
# TODO check if path follow path_format
def test_sort_files_invalid_db(self, tmp_path):
collection = Collection(tmp_path, self.path_format)
loc = GeoLocation()
randomize_db(tmp_path)
with pytest.raises(sqlite3.DatabaseError) as e:
summary, has_errors = collection.sort_files([self.src_paths], loc)
def test_sort_file(self, tmp_path):
for mode in 'copy', 'move':
collection = Collection(self.path_format, tmp_path, mode=mode)
collection = Collection(tmp_path, self.path_format, mode=mode)
# copy mode
src_path = Path(self.src_paths, 'photo.png')
name = 'photo_' + mode + '.png'
@ -186,9 +183,6 @@ class TestCollection:
# TODO check date
def test_filter_part():
_filter_part(dedup_regex, path_part, items)
assert
#- Sort similar images into a directory
# collection.sort_similar

70
tests/test_database.py Normal file
View File

@ -0,0 +1,70 @@
from pathlib import Path
import pytest
import shutil
import sqlite3
from ordigi.database import Sqlite
class TestSqlite:
@pytest.fixture(autouse=True)
def setup_class(cls, tmp_path):
cls.test='abs'
cls.sqlite = Sqlite(tmp_path)
cls.sqlite.add_file_data('filename', 'ksinslsdosic', 'original_name', 'date_original', 'album', 1)
cls.sqlite.add_location(24.2, 7.3, 'city', 'state', 'country', 'default')
yield
shutil.rmtree(tmp_path)
def test_init(self):
assert isinstance(self.sqlite.filename, Path)
assert isinstance(self.sqlite.con, sqlite3.Connection)
assert isinstance(self.sqlite.cur, sqlite3.Cursor)
def test_create_file_table(self):
assert self.sqlite.is_table('file')
def test_add_file_data(self):
result = tuple(self.sqlite.cur.execute("""select * from file where
rowid=1""").fetchone())
assert result == ('filename', 'ksinslsdosic', 'original_name', 'date_original', 'album', 1)
def test_get_checksum(self):
assert not self.sqlite.get_checksum('checksum')
assert self.sqlite.get_checksum('filename') == 'ksinslsdosic'
def test_get_file_data(self):
assert not self.sqlite.get_file_data('invalid', 'DateOriginal')
assert self.sqlite.get_file_data('filename', 'Album') == 'album'
def test_create_location_table(self):
assert self.sqlite.is_table('location')
def test_add_location(self):
result = tuple(self.sqlite.cur.execute("""select * from location where
rowid=1""").fetchone())
assert result == (24.2, 7.3, 'city', 'state', 'country', 'default')
@pytest.mark.skip('TODO')
def test_get_location_data(self, LocationId, data):
pass
@pytest.mark.skip('TODO')
def test_get_location(self, Latitude, Longitude, column):
pass
def test_get_location_nearby(self):
value = self.sqlite.get_location_nearby(24.2005, 7.3004, 'Default')
assert value == 'default'
@pytest.mark.skip('TODO')
def test_delete_row(self, table, id):
pass
@pytest.mark.skip('TODO')
def test_delete_all_rows(self, table):
pass