From 1e673dde4459fc25ff6e49737b9e759b3f435033 Mon Sep 17 00:00:00 2001 From: Cedric Leporcq Date: Sun, 12 Sep 2021 07:41:44 +0200 Subject: [PATCH] Add funcions to insert Sqlite tables dynamically --- ordigi/collection.py | 32 ++++--- ordigi/database.py | 189 ++++++++++++++++++++++++++++------------- ordigi/utils.py | 14 +++ tests/test_database.py | 61 +++++++++---- 4 files changed, 206 insertions(+), 90 deletions(-) diff --git a/ordigi/collection.py b/ordigi/collection.py index a5703f2..a554b60 100644 --- a/ordigi/collection.py +++ b/ordigi/collection.py @@ -18,6 +18,7 @@ from ordigi.database import Sqlite from ordigi.media import Media, get_all_subclasses from ordigi.images import Images from ordigi.summary import Summary +from ordigi.utils import get_date_regex, camel2snake class Collection(object): @@ -244,21 +245,21 @@ class Collection(object): return src_checksum - def _add_db_data(self, dest_path, metadata, checksum): - loc_keys = ('latitude', 'longitude', 'city', 'state', 'country', 'default') - loc_values = [] - for key in loc_keys: - loc_values.append(metadata[key]) - metadata['location_id'] = self.db.add_location(*loc_values) + def _get_row_data(self, table, metadata): + row_data = {} + for title in self.db.tables[table]['header']: + key = camel2snake(title) + row_data[title] = metadata[key] - file_keys = ('original_name', 'date_original', 'album', 'location_id') - file_values = [] - for key in file_keys: - file_values.append(metadata[key]) - dest_path_rel = os.path.relpath(dest_path, self.root) - self.db.add_file_data(dest_path_rel, checksum, *file_values) + return row_data + + def _add_db_data(self, dest_path, metadata): + loc_values = self._get_row_data('location', metadata) + metadata['location_id'] = self.db.add_row('location', loc_values) + + row_data = self._get_row_data('metadata', metadata) + self.db.add_row('metadata', row_data) - def record_file(self, src_path, dest_path, src_checksum, metadata): def _update_exif_data(self, dest_path, media): if self.album_from_folder: media.file_path = dest_path @@ -275,11 +276,14 @@ class Collection(object): has_errors = False if checksum: if not self.dry_run: - self._add_db_data(dest_path, metadata, checksum) updated = self._update_exif_data(dest_path, media) if updated: dest_checksum = self.checksum(dest_path) + media.metadata['file_path'] = os.path.relpath(dest_path, + self.root) + media.metadata['checksum'] = checksum + self._add_db_data(dest_path, media.metadata) self.summary.append((src_path, dest_path)) diff --git a/ordigi/database.py b/ordigi/database.py index 290d017..444cee8 100644 --- a/ordigi/database.py +++ b/ordigi/database.py @@ -1,4 +1,5 @@ +from datetime import datetime import json import os from pathlib import Path @@ -28,20 +29,64 @@ class Sqlite: pass self.db_type = 'SQLite format 3' + self.types = { + 'text': (str, datetime), + 'integer': (int,), + 'real': (float,) + } + self.filename = Path(db_dir, target_dir.name + '.db') self.con = sqlite3.connect(self.filename) # Allow selecting column by name self.con.row_factory = sqlite3.Row self.cur = self.con.cursor() + metadata_header = { + 'FilePath': 'text not null', + 'Checksum': 'text', + 'Album': 'text', + 'LocationId': 'integer', + 'DateTaken': 'text', + 'DateOriginal': 'text', + 'DateCreated': 'text', + 'DateModified': 'text', + 'CameraMake': 'text', + 'CameraModel': 'text', + 'SrcPath': 'text', + 'Subdirs': 'text', + 'Filename': 'text' + } + + location_header = { + 'Latitude': 'real not null', + 'Longitude': 'real not null', + 'LatitudeRef': 'text', + 'LongitudeRef': 'text', + 'City': 'text', + 'State': 'text', + 'Country': 'text', + 'Default': 'text' + } + + self.tables = { + 'metadata': { + 'header': metadata_header, + 'primary_keys': ('FilePath',) + }, + 'location': { + 'header': location_header, + 'primary_keys': ('Latitude', 'Longitude') + } + } + + self.primary_metadata_keys = self.tables['metadata']['primary_keys'] + self.primary_location_keys = self.tables['location']['primary_keys'] # Create tables - if not self.is_table('file'): - self.create_file_table() - if not self.is_table('location'): - self.create_location_table() + for table, d in self.tables.items(): + if not self.is_table(table): + self.create_table(table, d['header'], d['primary_keys']) def is_Sqlite3(self, filename): - import ipdb; ipdb.set_trace() if not os.path.isfile(filename): return False if os.path.getsize(filename) < 100: # SQLite database file header is 100 bytes @@ -57,7 +102,7 @@ class Sqlite: try: # get the count of tables with the name - self.cur.execute(f"SELECT count(name) FROM sqlite_master WHERE type='table' AND name='{table}'") + self.cur.execute(f"select count(name) from sqlite_master where type='table' and name='{table}'") except sqlite3.DatabaseError as e: # raise type(e)(e.message + ' :{self.filename} %s' % arg1) raise sqlite3.DatabaseError(f"{self.filename} is not valid database") @@ -84,77 +129,101 @@ class Sqlite: self.con.commit() return True - def create_file_table(self): - query = """create table file ( - FilePath text not null primary key, - Checksum text, - OriginalName text, - DateOriginal text, - Album text, - LocationId integer) + def create_table(self, table, header, primary_keys): """ - self.cur.execute(query) + :params: row data (dict), primary_key (tuple) + :returns: bool + """ + fieldset = [] + for col, definition in header.items(): + fieldset.append(f"'{col}' {definition}") + items = ', '.join(primary_keys) + fieldset.append(f"primary key ({items})") - def add_file_data(self, FilePath, Checksum, OriginalName, DateOriginal, - Album, LocationId): - query =f"""insert into file values - ('{FilePath}', '{Checksum}', '{OriginalName}', - '{DateOriginal}', '{Album}', '{LocationId}')""" + if len(fieldset) > 0: + query = "create table {0} ({1})".format(table, ", ".join(fieldset)) + self.cur.execute(query) + self.tables[table]['header'] = header + return True - self.cur.execute(query) + return False + + def add_row(self, table, row_data): + """ + :returns: lastrowid (int) + """ + header = self.tables[table]['header'] + if len(row_data) != len(header): + raise ValueError(f'''Table {table} length mismatch: row_data + {row_data}, header {header}''') + + columns = ', '.join(row_data.keys()) + placeholders = ', '.join('?' * len(row_data)) + # If duplicate primary keys, row is replaced(updated) with new value + query = f'replace into {table} values ({placeholders})' + values = [] + for key, value in row_data.items(): + if key in self.tables[table]['primary_keys'] and value is None: + # Ignore entry is primary key is None + return None + + if isinstance(value, bool): + values.append(int(value)) + else: + values.append(value) + + self.cur.execute(query, values) self.con.commit() - def add_file_values(self, table_list): - query = f"insert into file values (?, ?, ?, ?, ?, ?)" - return self._run_many(query) + return self.cur.lastrowid + + def get_header(self, row_data): + """ + :params: row data (dict) + :returns: header + """ + + sql_table = {} + for key, value in row_data.items(): + for sql_type, t in self.types.items(): + # Find corresponding sql_type from python type + if type(value) in t: + sql_table[key] = sql_type + + return sql_table + + def build_table(self, table, row_data, primary_keys): + header = self.get_header(row_data) + create_table(table, row_data, primary_keys) + + def build_row(self, table, row_data): + """ + :params: row data (dict), primary_key (tuple) + :returns: bool + """ + if not self.tables[table]['header']: + result = self.build_table(table, row_data, + self.tables[table]['primary_keys']) + if not result: + return False + + return self.add_row(table, row_data) def get_checksum(self, FilePath): - query = f"select Checksum from file where FilePath='{FilePath}'" + query = f"select Checksum from metadata where FilePath='{FilePath}'" return self._run(query) - def get_file_data(self, FilePath, data): - query = f"select {data} from file where FilePath='{FilePath}'" + def get_metadata_data(self, FilePath, data): + query = f"select {data} from metadata where FilePath='{FilePath}'" return self._run(query) - def create_location_table(self): - query = """create table location ( - Latitude real not null, - Longitude real not null, - City text, - State text, - Country text, - 'Default' text) - """ - self.cur.execute(query) - def match_location(self, Latitude, Longitude): query = f"""select 1 from location where Latitude='{Latitude}' and Longitude='{Longitude}'""" return self._run(query) - def add_location(self, Latitude, Longitude, City, State, Country, Default): - # Check if row with same latitude and longitude have not been already - # added - location_id = self.get_location(Latitude, Longitude, 'ROWID') - - if not location_id: - query = f"""insert into location values - ('{Latitude}', '{Longitude}', '{City}', '{State}', - '{Country}', '{Default}') - """ - self.cur.execute(query) - self.con.commit() - - return self._run('select last_insert_rowid()') - - return location_id - - def add_location_values(self, table_list): - query = f"insert into location values (?, ?, ?, ?, ?, ?)" - return _insert_many_query(query) - def get_location_data(self, LocationId, data): - query = f"select {data} from file where ROWID='{LocationId}'" + query = f"select {data} from location where ROWID='{LocationId}'" return self._run(query) def get_location(self, Latitude, Longitude, column): diff --git a/ordigi/utils.py b/ordigi/utils.py index 155c9eb..fe674cd 100644 --- a/ordigi/utils.py +++ b/ordigi/utils.py @@ -75,3 +75,17 @@ def get_date_from_string(string, user_regex=None): return date +# Conversion functions +# source:https://rodic.fr/blog/camelcase-and-snake_case-strings-conversion-with-python/ + +def snake2camel(name): + return re.sub(r'(?:^|_)([a-z])', lambda x: x.group(1).upper(), name) + +def snake2camelback(name): + return re.sub(r'_([a-z])', lambda x: x.group(1).upper(), name) + +def camel2snake(name): + return name[0].lower() + re.sub(r'(?!^)[A-Z]', lambda x: '_' + x.group(0).lower(), name[1:]) + +def camelback2snake(name): + return re.sub(r'[A-Z]', lambda x: '_' + x.group(0).lower(), name) diff --git a/tests/test_database.py b/tests/test_database.py index 7050641..27d48c9 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -1,4 +1,5 @@ +from datetime import datetime from pathlib import Path import pytest import shutil @@ -12,8 +13,38 @@ class TestSqlite: def setup_class(cls, tmp_path): cls.test='abs' cls.sqlite = Sqlite(tmp_path) - cls.sqlite.add_file_data('filename', 'ksinslsdosic', 'original_name', 'date_original', 'album', 1) - cls.sqlite.add_location(24.2, 7.3, 'city', 'state', 'country', 'default') + + row_data = { + 'FilePath': 'file_path', + 'Checksum': 'checksum', + 'Album': 'album', + 'LocationId': 2, + 'DateTaken': datetime(2012, 3, 27), + 'DateOriginal': datetime(2013, 3, 27), + 'DateCreated': 'date_created', + 'DateModified': 'date_modified', + 'CameraMake': 'camera_make', + 'CameraModel': 'camera_model', + 'SrcPath': 'src_path', + 'Subdirs': 'subdirs', + 'Filename': 'filename' + } + + location_data = { + 'Latitude': 24.2, + 'Longitude': 7.3, + 'LatitudeRef': 'latitude_ref', + 'LongitudeRef': 'longitude_ref', + 'City': 'city', + 'State': 'state', + 'Country': 'country', + 'Default': 'default' + } + + cls.sqlite.add_row('metadata', row_data) + cls.sqlite.add_row('location', location_data) + # cls.sqlite.add_metadata_data('filename', 'ksinslsdosic', 'original_name', 'date_original', 'album', 1) + # cls.sqlite.add_location(24.2, 7.3, 'city', 'state', 'country', 'default') yield @@ -24,29 +55,27 @@ class TestSqlite: assert isinstance(self.sqlite.con, sqlite3.Connection) assert isinstance(self.sqlite.cur, sqlite3.Cursor) - def test_create_file_table(self): - assert self.sqlite.is_table('file') + def test_create_table(self): + assert self.sqlite.is_table('metadata') + assert self.sqlite.is_table('location') - def test_add_file_data(self): - result = tuple(self.sqlite.cur.execute("""select * from file where + def test_add_metadata_data(self): + result = tuple(self.sqlite.cur.execute("""select * from metadata where rowid=1""").fetchone()) - assert result == ('filename', 'ksinslsdosic', 'original_name', 'date_original', 'album', 1) + assert result == ('file_path', 'checksum', 'album', 2, '2012-03-27 00:00:00', '2013-03-27 00:00:00', 'date_created', 'date_modified', 'camera_make', 'camera_model', 'src_path', 'subdirs', 'filename') def test_get_checksum(self): - assert not self.sqlite.get_checksum('checksum') - assert self.sqlite.get_checksum('filename') == 'ksinslsdosic' + assert not self.sqlite.get_checksum('invalid') + assert self.sqlite.get_checksum('file_path') == 'checksum' - def test_get_file_data(self): - assert not self.sqlite.get_file_data('invalid', 'DateOriginal') - assert self.sqlite.get_file_data('filename', 'Album') == 'album' - - def test_create_location_table(self): - assert self.sqlite.is_table('location') + def test_get_metadata_data(self): + assert not self.sqlite.get_metadata_data('invalid', 'DateOriginal') + assert self.sqlite.get_metadata_data('file_path', 'Album') == 'album' def test_add_location(self): result = tuple(self.sqlite.cur.execute("""select * from location where rowid=1""").fetchone()) - assert result == (24.2, 7.3, 'city', 'state', 'country', 'default') + assert result == (24.2, 7.3, 'latitude_ref', 'longitude_ref', 'city', 'state', 'country', 'default') @pytest.mark.skip('TODO') def test_get_location_data(self, LocationId, data):