ordigi/ordigi/database.py

351 lines
11 KiB
Python

from datetime import datetime
import os
from pathlib import Path
import sqlite3
import sys
from ordigi import LOG
from ordigi.utils import distance_between_two_points
class Sqlite:
"""Methods for interacting with Sqlite database"""
def __init__(self, target_dir):
# Create dir for target database
db_dir = Path(target_dir, '.ordigi')
if not db_dir.exists():
try:
db_dir.mkdir()
except OSError:
pass
self.db_type = 'SQLite format 3'
self.log = LOG.getChild(self.__class__.__name__)
self.types = {'text': (str, datetime), 'integer': (int,), 'real': (float,)}
self.filename = Path(db_dir, 'collection.db')
self.con = sqlite3.connect(self.filename)
# Allow selecting column by name
self.con.row_factory = sqlite3.Row
self.cur = self.con.cursor()
metadata_header = {
'FilePath': 'text not null',
'Checksum': 'text',
'Album': 'text',
'Title': 'text',
'LocationId': 'integer',
'DateMedia': 'text',
'DateOriginal': 'text',
'DateCreated': 'text',
'DateModified': 'text',
'FileModifyDate': 'text',
'CameraMake': 'text',
'CameraModel': 'text',
'OriginalName': 'text',
'SrcDir': 'text',
'Subdirs': 'text',
'Filename': 'text',
}
location_header = {
'Latitude': 'real not null',
'Longitude': 'real not null',
'LatitudeRef': 'text',
'LongitudeRef': 'text',
'City': 'text',
'State': 'text',
'Country': 'text',
'Location': 'text',
}
self.tables = {
'metadata': {'header': metadata_header},
'location': {'header': location_header},
}
# Create tables
for table, d in self.tables.items():
if not self.is_table(table):
if table == 'metadata':
# https://www.quackit.com/sqlite/tutorial/create_a_relationship.cfm
self.create_table(
table, d['header'],
(
"unique('FilePath')",
"foreign key(LocationId) references location(Id)",
),
)
elif table == 'location':
self.create_table(
table, d['header'],
("unique('Latitude', 'Longitude')",),
)
def is_Sqlite3(self, filename):
if not os.path.isfile(filename):
return False
if os.path.getsize(filename) < 100: # SQLite database file header is 100 bytes
return False
with open(filename, 'rb') as fd:
header = fd.read(100)
return header[:16] == self.db_type + '\x00'
def is_table(self, table):
"""Check if table exist"""
try:
# get the count of tables with the name
self.cur.execute(
f"select count(name) from sqlite_master where type='table' and name='{table}'"
)
except sqlite3.DatabaseError as e:
# raise type(e)(e.message + ' :{self.filename} %s' % arg1)
raise sqlite3.DatabaseError(f"{self.filename} is not valid database")
# if the count is 1, then table exists
if self.cur.fetchone()[0] == 1:
return True
return False
def get_rows(self, table):
"""Cycle through rows in table
:params: str
:return: iter
"""
self.cur.execute(f'select * from {table}')
for row in self.cur:
yield row
def is_empty(self, table):
if [x for x in self.get_rows(table)] == []:
return True
return False
def _run(self, query, n=0):
self.log.debug(f"Sqlite run '{query}'")
try:
result = self.cur.execute(query).fetchone()
except sqlite3.DatabaseError as e:
self.log.error(e)
result = False
if result:
return result[n]
else:
return False
def _run_many(self, query, table_list):
self.cur.executemany(query, table_list)
if self.cur.fetchone()[0] != 1:
return False
self.con.commit()
return True
def create_table(self, table, header, statements=None):
"""
:params: row data (dict), primary_key (tuple)
:returns: bool
"""
fieldset = []
fieldset.append("Id integer primary key autoincrement")
for col, definition in header.items():
fieldset.append(f"{col} {definition}")
# https://stackoverflow.com/questions/11719073/sqlite-insert-or-update-without-changing-rowid-value
if statements:
for statement in statements:
fieldset.append(statement)
if len(fieldset) > 0:
query = "create table {0} ({1})".format(table, ", ".join(fieldset))
self.cur.execute(query)
self.tables[table]['header'] = header
return True
return False
def check_row(self, table, row_data):
header = self.tables[table]['header']
if len(row_data) != len(header):
raise ValueError(
f"""Table {table} length mismatch: row_data
{row_data}, header {header}"""
)
columns = ', '.join(row_data.keys())
placeholders = ', '.join('?' * len(row_data))
return columns, placeholders
def update_query(self, table, row_id, columns, placeholders):
"""
:returns: query (str)
"""
return f"""replace into {table} (Id, {columns})
values ((select id from {table} where id={row_id}), {placeholders})"""
def insert_query(self, table, columns, placeholders):
"""
:returns: query (str)
"""
return f"insert into {table} ({columns}) values ({placeholders})"
def upsert_row(self, table, row_data, columns, placeholders, row_id=None):
"""
:returns: lastrowid (int)
https://www.sqlitetutorial.net/sqlite-replace-statement/
https://www.sqlite.org/lang_UPSERT.html
"""
if row_id:
query = self.update_query(table, row_id, columns, placeholders)
else:
query = self.insert_query(table, columns, placeholders)
values = []
for key, value in row_data.items():
if isinstance(value, bool):
values.append(int(value))
else:
values.append(value)
self.cur.execute(query, values)
self.con.commit()
return self.cur.lastrowid
def upsert_location(self, row_data):
# Check if row already exist
row_id = self.get_location(row_data['Latitude'], row_data['Longitude'], 'Id')
columns, placeholders = self.check_row('location', row_data)
return self.upsert_row('location', row_data, columns, placeholders, row_id)
def upsert_metadata(self, row_data):
# Check if row already exist
row_id = self.get_metadata(row_data['FilePath'], 'Id')
columns, placeholders = self.check_row('metadata', row_data)
return self.upsert_row('metadata', row_data, columns, placeholders, row_id)
def get_header(self, row_data):
"""
:params: row data (dict)
:returns: header
"""
sql_table = {}
for key, value in row_data.items():
for sql_type, t in self.types.items():
# Find corresponding sql_type from python type
if type(value) in t:
sql_table[key] = sql_type
return sql_table
def build_table(self, table, row_data, statements=None):
header = self.get_header(row_data)
return self.create_table(table, header, statements=None)
def check_table(self, table, row_data):
"""
:params: row data (dict), primary_key (tuple)
:returns: bool
"""
if not self.tables[table]['header']:
self.log.error(f"Table {table} do not exist")
return False
return True
def escape_quote(self, string):
return string.translate(str.maketrans({"'": r"''"}))
def get_checksum(self, file_path):
file_path_e = self.escape_quote(str(file_path))
query = f"select Checksum from metadata where FilePath='{file_path_e}'"
return self._run(query)
def get_metadata(self, file_path, column):
file_path_e = self.escape_quote(str(file_path))
query = f"select {column} from metadata where FilePath='{file_path_e}'"
return self._run(query)
def match_location(self, latitude, longitude):
query = f"""select 1 from location where Latitude='{latitude}'
and Longitude='{longitude}'"""
return self._run(query)
def get_location_data(self, location_id, data):
query = f"select {data} from location where Id='{location_id}'"
return self._run(query)
def get_location(self, latitude, longitude, column):
query = f"""select {column} from location where Latitude='{latitude}'
and Longitude='{longitude}'"""
return self._run(query)
def _get_table(self, table):
self.cur.execute(f'SELECT * FROM {table}').fetchall()
def get_location_nearby(self, latitude, longitude, Column, threshold_m=3000):
"""
Find a name for a location in the database.
:param float latitude: Latitude of the location.
:param float longitude: Longitude of the location.
:param int threshold_m: Location in the database must be this close to
the given latitude and longitude.
:returns: str, or None if a matching location couldn't be found.
"""
shorter_distance = sys.maxsize
value = None
self.cur.execute('SELECT * FROM location')
for row in self.cur:
distance = distance_between_two_points(
latitude, longitude, row['Latitude'], row['Longitude']
)
# Use if closer then threshold_km reuse lookup
if distance < shorter_distance and distance <= threshold_m:
shorter_distance = distance
value = row[Column]
return value
def delete_row(self, table, column, value):
"""
Delete a row by row id in table
:param table: database table
:param id: id of the row
:return:
"""
sql = f'delete from {table} where {column}=?'
self.cur.execute(sql, (value,))
self.con.commit()
def delete_filepath(self, value):
self.delete_row('metadata', 'FilePath', value)
def delete_all_rows(self, table):
"""
Delete all row in table
:param table: database table
:return:
"""
sql = f'delete from {table}'
self.cur.execute(sql)
self.con.commit()
def len(self, table):
sql = f'select count() from {table}'
return self._run(sql)