Add funcions to insert Sqlite tables dynamically
This commit is contained in:
parent
db74342f21
commit
1e673dde44
|
@ -18,6 +18,7 @@ from ordigi.database import Sqlite
|
|||
from ordigi.media import Media, get_all_subclasses
|
||||
from ordigi.images import Images
|
||||
from ordigi.summary import Summary
|
||||
from ordigi.utils import get_date_regex, camel2snake
|
||||
|
||||
|
||||
class Collection(object):
|
||||
|
@ -244,21 +245,21 @@ class Collection(object):
|
|||
|
||||
return src_checksum
|
||||
|
||||
def _add_db_data(self, dest_path, metadata, checksum):
|
||||
loc_keys = ('latitude', 'longitude', 'city', 'state', 'country', 'default')
|
||||
loc_values = []
|
||||
for key in loc_keys:
|
||||
loc_values.append(metadata[key])
|
||||
metadata['location_id'] = self.db.add_location(*loc_values)
|
||||
def _get_row_data(self, table, metadata):
|
||||
row_data = {}
|
||||
for title in self.db.tables[table]['header']:
|
||||
key = camel2snake(title)
|
||||
row_data[title] = metadata[key]
|
||||
|
||||
file_keys = ('original_name', 'date_original', 'album', 'location_id')
|
||||
file_values = []
|
||||
for key in file_keys:
|
||||
file_values.append(metadata[key])
|
||||
dest_path_rel = os.path.relpath(dest_path, self.root)
|
||||
self.db.add_file_data(dest_path_rel, checksum, *file_values)
|
||||
return row_data
|
||||
|
||||
def _add_db_data(self, dest_path, metadata):
|
||||
loc_values = self._get_row_data('location', metadata)
|
||||
metadata['location_id'] = self.db.add_row('location', loc_values)
|
||||
|
||||
row_data = self._get_row_data('metadata', metadata)
|
||||
self.db.add_row('metadata', row_data)
|
||||
|
||||
def record_file(self, src_path, dest_path, src_checksum, metadata):
|
||||
def _update_exif_data(self, dest_path, media):
|
||||
if self.album_from_folder:
|
||||
media.file_path = dest_path
|
||||
|
@ -275,11 +276,14 @@ class Collection(object):
|
|||
has_errors = False
|
||||
if checksum:
|
||||
if not self.dry_run:
|
||||
self._add_db_data(dest_path, metadata, checksum)
|
||||
updated = self._update_exif_data(dest_path, media)
|
||||
if updated:
|
||||
dest_checksum = self.checksum(dest_path)
|
||||
|
||||
media.metadata['file_path'] = os.path.relpath(dest_path,
|
||||
self.root)
|
||||
media.metadata['checksum'] = checksum
|
||||
self._add_db_data(dest_path, media.metadata)
|
||||
|
||||
self.summary.append((src_path, dest_path))
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
|
||||
from datetime import datetime
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
@ -28,20 +29,64 @@ class Sqlite:
|
|||
pass
|
||||
|
||||
self.db_type = 'SQLite format 3'
|
||||
self.types = {
|
||||
'text': (str, datetime),
|
||||
'integer': (int,),
|
||||
'real': (float,)
|
||||
}
|
||||
|
||||
self.filename = Path(db_dir, target_dir.name + '.db')
|
||||
self.con = sqlite3.connect(self.filename)
|
||||
# Allow selecting column by name
|
||||
self.con.row_factory = sqlite3.Row
|
||||
self.cur = self.con.cursor()
|
||||
|
||||
metadata_header = {
|
||||
'FilePath': 'text not null',
|
||||
'Checksum': 'text',
|
||||
'Album': 'text',
|
||||
'LocationId': 'integer',
|
||||
'DateTaken': 'text',
|
||||
'DateOriginal': 'text',
|
||||
'DateCreated': 'text',
|
||||
'DateModified': 'text',
|
||||
'CameraMake': 'text',
|
||||
'CameraModel': 'text',
|
||||
'SrcPath': 'text',
|
||||
'Subdirs': 'text',
|
||||
'Filename': 'text'
|
||||
}
|
||||
|
||||
location_header = {
|
||||
'Latitude': 'real not null',
|
||||
'Longitude': 'real not null',
|
||||
'LatitudeRef': 'text',
|
||||
'LongitudeRef': 'text',
|
||||
'City': 'text',
|
||||
'State': 'text',
|
||||
'Country': 'text',
|
||||
'Default': 'text'
|
||||
}
|
||||
|
||||
self.tables = {
|
||||
'metadata': {
|
||||
'header': metadata_header,
|
||||
'primary_keys': ('FilePath',)
|
||||
},
|
||||
'location': {
|
||||
'header': location_header,
|
||||
'primary_keys': ('Latitude', 'Longitude')
|
||||
}
|
||||
}
|
||||
|
||||
self.primary_metadata_keys = self.tables['metadata']['primary_keys']
|
||||
self.primary_location_keys = self.tables['location']['primary_keys']
|
||||
# Create tables
|
||||
if not self.is_table('file'):
|
||||
self.create_file_table()
|
||||
if not self.is_table('location'):
|
||||
self.create_location_table()
|
||||
for table, d in self.tables.items():
|
||||
if not self.is_table(table):
|
||||
self.create_table(table, d['header'], d['primary_keys'])
|
||||
|
||||
def is_Sqlite3(self, filename):
|
||||
import ipdb; ipdb.set_trace()
|
||||
if not os.path.isfile(filename):
|
||||
return False
|
||||
if os.path.getsize(filename) < 100: # SQLite database file header is 100 bytes
|
||||
|
@ -57,7 +102,7 @@ class Sqlite:
|
|||
|
||||
try:
|
||||
# get the count of tables with the name
|
||||
self.cur.execute(f"SELECT count(name) FROM sqlite_master WHERE type='table' AND name='{table}'")
|
||||
self.cur.execute(f"select count(name) from sqlite_master where type='table' and name='{table}'")
|
||||
except sqlite3.DatabaseError as e:
|
||||
# raise type(e)(e.message + ' :{self.filename} %s' % arg1)
|
||||
raise sqlite3.DatabaseError(f"{self.filename} is not valid database")
|
||||
|
@ -84,77 +129,101 @@ class Sqlite:
|
|||
self.con.commit()
|
||||
return True
|
||||
|
||||
def create_file_table(self):
|
||||
query = """create table file (
|
||||
FilePath text not null primary key,
|
||||
Checksum text,
|
||||
OriginalName text,
|
||||
DateOriginal text,
|
||||
Album text,
|
||||
LocationId integer)
|
||||
def create_table(self, table, header, primary_keys):
|
||||
"""
|
||||
self.cur.execute(query)
|
||||
|
||||
def add_file_data(self, FilePath, Checksum, OriginalName, DateOriginal,
|
||||
Album, LocationId):
|
||||
query =f"""insert into file values
|
||||
('{FilePath}', '{Checksum}', '{OriginalName}',
|
||||
'{DateOriginal}', '{Album}', '{LocationId}')"""
|
||||
:params: row data (dict), primary_key (tuple)
|
||||
:returns: bool
|
||||
"""
|
||||
fieldset = []
|
||||
for col, definition in header.items():
|
||||
fieldset.append(f"'{col}' {definition}")
|
||||
items = ', '.join(primary_keys)
|
||||
fieldset.append(f"primary key ({items})")
|
||||
|
||||
if len(fieldset) > 0:
|
||||
query = "create table {0} ({1})".format(table, ", ".join(fieldset))
|
||||
self.cur.execute(query)
|
||||
self.tables[table]['header'] = header
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def add_row(self, table, row_data):
|
||||
"""
|
||||
:returns: lastrowid (int)
|
||||
"""
|
||||
header = self.tables[table]['header']
|
||||
if len(row_data) != len(header):
|
||||
raise ValueError(f'''Table {table} length mismatch: row_data
|
||||
{row_data}, header {header}''')
|
||||
|
||||
columns = ', '.join(row_data.keys())
|
||||
placeholders = ', '.join('?' * len(row_data))
|
||||
# If duplicate primary keys, row is replaced(updated) with new value
|
||||
query = f'replace into {table} values ({placeholders})'
|
||||
values = []
|
||||
for key, value in row_data.items():
|
||||
if key in self.tables[table]['primary_keys'] and value is None:
|
||||
# Ignore entry is primary key is None
|
||||
return None
|
||||
|
||||
if isinstance(value, bool):
|
||||
values.append(int(value))
|
||||
else:
|
||||
values.append(value)
|
||||
|
||||
self.cur.execute(query, values)
|
||||
self.con.commit()
|
||||
|
||||
def add_file_values(self, table_list):
|
||||
query = f"insert into file values (?, ?, ?, ?, ?, ?)"
|
||||
return self._run_many(query)
|
||||
return self.cur.lastrowid
|
||||
|
||||
def get_header(self, row_data):
|
||||
"""
|
||||
:params: row data (dict)
|
||||
:returns: header
|
||||
"""
|
||||
|
||||
sql_table = {}
|
||||
for key, value in row_data.items():
|
||||
for sql_type, t in self.types.items():
|
||||
# Find corresponding sql_type from python type
|
||||
if type(value) in t:
|
||||
sql_table[key] = sql_type
|
||||
|
||||
return sql_table
|
||||
|
||||
def build_table(self, table, row_data, primary_keys):
|
||||
header = self.get_header(row_data)
|
||||
create_table(table, row_data, primary_keys)
|
||||
|
||||
def build_row(self, table, row_data):
|
||||
"""
|
||||
:params: row data (dict), primary_key (tuple)
|
||||
:returns: bool
|
||||
"""
|
||||
if not self.tables[table]['header']:
|
||||
result = self.build_table(table, row_data,
|
||||
self.tables[table]['primary_keys'])
|
||||
if not result:
|
||||
return False
|
||||
|
||||
return self.add_row(table, row_data)
|
||||
|
||||
def get_checksum(self, FilePath):
|
||||
query = f"select Checksum from file where FilePath='{FilePath}'"
|
||||
query = f"select Checksum from metadata where FilePath='{FilePath}'"
|
||||
return self._run(query)
|
||||
|
||||
def get_file_data(self, FilePath, data):
|
||||
query = f"select {data} from file where FilePath='{FilePath}'"
|
||||
def get_metadata_data(self, FilePath, data):
|
||||
query = f"select {data} from metadata where FilePath='{FilePath}'"
|
||||
return self._run(query)
|
||||
|
||||
def create_location_table(self):
|
||||
query = """create table location (
|
||||
Latitude real not null,
|
||||
Longitude real not null,
|
||||
City text,
|
||||
State text,
|
||||
Country text,
|
||||
'Default' text)
|
||||
"""
|
||||
self.cur.execute(query)
|
||||
|
||||
def match_location(self, Latitude, Longitude):
|
||||
query = f"""select 1 from location where Latitude='{Latitude}'
|
||||
and Longitude='{Longitude}'"""
|
||||
return self._run(query)
|
||||
|
||||
def add_location(self, Latitude, Longitude, City, State, Country, Default):
|
||||
# Check if row with same latitude and longitude have not been already
|
||||
# added
|
||||
location_id = self.get_location(Latitude, Longitude, 'ROWID')
|
||||
|
||||
if not location_id:
|
||||
query = f"""insert into location values
|
||||
('{Latitude}', '{Longitude}', '{City}', '{State}',
|
||||
'{Country}', '{Default}')
|
||||
"""
|
||||
self.cur.execute(query)
|
||||
self.con.commit()
|
||||
|
||||
return self._run('select last_insert_rowid()')
|
||||
|
||||
return location_id
|
||||
|
||||
def add_location_values(self, table_list):
|
||||
query = f"insert into location values (?, ?, ?, ?, ?, ?)"
|
||||
return _insert_many_query(query)
|
||||
|
||||
def get_location_data(self, LocationId, data):
|
||||
query = f"select {data} from file where ROWID='{LocationId}'"
|
||||
query = f"select {data} from location where ROWID='{LocationId}'"
|
||||
return self._run(query)
|
||||
|
||||
def get_location(self, Latitude, Longitude, column):
|
||||
|
|
|
@ -75,3 +75,17 @@ def get_date_from_string(string, user_regex=None):
|
|||
|
||||
return date
|
||||
|
||||
# Conversion functions
|
||||
# source:https://rodic.fr/blog/camelcase-and-snake_case-strings-conversion-with-python/
|
||||
|
||||
def snake2camel(name):
|
||||
return re.sub(r'(?:^|_)([a-z])', lambda x: x.group(1).upper(), name)
|
||||
|
||||
def snake2camelback(name):
|
||||
return re.sub(r'_([a-z])', lambda x: x.group(1).upper(), name)
|
||||
|
||||
def camel2snake(name):
|
||||
return name[0].lower() + re.sub(r'(?!^)[A-Z]', lambda x: '_' + x.group(0).lower(), name[1:])
|
||||
|
||||
def camelback2snake(name):
|
||||
return re.sub(r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
import shutil
|
||||
|
@ -12,8 +13,38 @@ class TestSqlite:
|
|||
def setup_class(cls, tmp_path):
|
||||
cls.test='abs'
|
||||
cls.sqlite = Sqlite(tmp_path)
|
||||
cls.sqlite.add_file_data('filename', 'ksinslsdosic', 'original_name', 'date_original', 'album', 1)
|
||||
cls.sqlite.add_location(24.2, 7.3, 'city', 'state', 'country', 'default')
|
||||
|
||||
row_data = {
|
||||
'FilePath': 'file_path',
|
||||
'Checksum': 'checksum',
|
||||
'Album': 'album',
|
||||
'LocationId': 2,
|
||||
'DateTaken': datetime(2012, 3, 27),
|
||||
'DateOriginal': datetime(2013, 3, 27),
|
||||
'DateCreated': 'date_created',
|
||||
'DateModified': 'date_modified',
|
||||
'CameraMake': 'camera_make',
|
||||
'CameraModel': 'camera_model',
|
||||
'SrcPath': 'src_path',
|
||||
'Subdirs': 'subdirs',
|
||||
'Filename': 'filename'
|
||||
}
|
||||
|
||||
location_data = {
|
||||
'Latitude': 24.2,
|
||||
'Longitude': 7.3,
|
||||
'LatitudeRef': 'latitude_ref',
|
||||
'LongitudeRef': 'longitude_ref',
|
||||
'City': 'city',
|
||||
'State': 'state',
|
||||
'Country': 'country',
|
||||
'Default': 'default'
|
||||
}
|
||||
|
||||
cls.sqlite.add_row('metadata', row_data)
|
||||
cls.sqlite.add_row('location', location_data)
|
||||
# cls.sqlite.add_metadata_data('filename', 'ksinslsdosic', 'original_name', 'date_original', 'album', 1)
|
||||
# cls.sqlite.add_location(24.2, 7.3, 'city', 'state', 'country', 'default')
|
||||
|
||||
yield
|
||||
|
||||
|
@ -24,29 +55,27 @@ class TestSqlite:
|
|||
assert isinstance(self.sqlite.con, sqlite3.Connection)
|
||||
assert isinstance(self.sqlite.cur, sqlite3.Cursor)
|
||||
|
||||
def test_create_file_table(self):
|
||||
assert self.sqlite.is_table('file')
|
||||
def test_create_table(self):
|
||||
assert self.sqlite.is_table('metadata')
|
||||
assert self.sqlite.is_table('location')
|
||||
|
||||
def test_add_file_data(self):
|
||||
result = tuple(self.sqlite.cur.execute("""select * from file where
|
||||
def test_add_metadata_data(self):
|
||||
result = tuple(self.sqlite.cur.execute("""select * from metadata where
|
||||
rowid=1""").fetchone())
|
||||
assert result == ('filename', 'ksinslsdosic', 'original_name', 'date_original', 'album', 1)
|
||||
assert result == ('file_path', 'checksum', 'album', 2, '2012-03-27 00:00:00', '2013-03-27 00:00:00', 'date_created', 'date_modified', 'camera_make', 'camera_model', 'src_path', 'subdirs', 'filename')
|
||||
|
||||
def test_get_checksum(self):
|
||||
assert not self.sqlite.get_checksum('checksum')
|
||||
assert self.sqlite.get_checksum('filename') == 'ksinslsdosic'
|
||||
assert not self.sqlite.get_checksum('invalid')
|
||||
assert self.sqlite.get_checksum('file_path') == 'checksum'
|
||||
|
||||
def test_get_file_data(self):
|
||||
assert not self.sqlite.get_file_data('invalid', 'DateOriginal')
|
||||
assert self.sqlite.get_file_data('filename', 'Album') == 'album'
|
||||
|
||||
def test_create_location_table(self):
|
||||
assert self.sqlite.is_table('location')
|
||||
def test_get_metadata_data(self):
|
||||
assert not self.sqlite.get_metadata_data('invalid', 'DateOriginal')
|
||||
assert self.sqlite.get_metadata_data('file_path', 'Album') == 'album'
|
||||
|
||||
def test_add_location(self):
|
||||
result = tuple(self.sqlite.cur.execute("""select * from location where
|
||||
rowid=1""").fetchone())
|
||||
assert result == (24.2, 7.3, 'city', 'state', 'country', 'default')
|
||||
assert result == (24.2, 7.3, 'latitude_ref', 'longitude_ref', 'city', 'state', 'country', 'default')
|
||||
|
||||
@pytest.mark.skip('TODO')
|
||||
def test_get_location_data(self, LocationId, data):
|
||||
|
|
Loading…
Reference in New Issue