ordigi/ordigi/database.py

298 lines
9.1 KiB
Python
Raw Normal View History

2016-01-08 23:49:06 +01:00
from datetime import datetime
import json
import os
2021-08-31 16:18:41 +02:00
from pathlib import Path
import sqlite3
import sys
from shutil import copyfile
from time import strftime
2021-08-13 21:11:24 +02:00
from ordigi import constants
2021-08-31 16:18:41 +02:00
from ordigi.utils import distance_between_two_points
2021-08-31 16:18:41 +02:00
class Sqlite:
2016-01-08 23:49:06 +01:00
2021-08-31 16:18:41 +02:00
"""Methods for interacting with Sqlite database"""
2016-01-08 23:49:06 +01:00
def __init__(self, target_dir):
# Create dir for target database
2021-08-31 16:18:41 +02:00
db_dir = Path(target_dir, '.ordigi')
2021-08-31 16:18:41 +02:00
if not db_dir.exists():
try:
2021-08-31 16:18:41 +02:00
db_dir.mkdir()
except OSError:
pass
2021-08-31 16:18:41 +02:00
self.db_type = 'SQLite format 3'
self.types = {
'text': (str, datetime),
'integer': (int,),
'real': (float,)
}
2021-08-31 16:18:41 +02:00
self.filename = Path(db_dir, target_dir.name + '.db')
self.con = sqlite3.connect(self.filename)
# Allow selecting column by name
self.con.row_factory = sqlite3.Row
self.cur = self.con.cursor()
metadata_header = {
'FilePath': 'text not null',
'Checksum': 'text',
'Album': 'text',
'Title': 'text',
'LocationId': 'integer',
'DateMedia': 'text',
'DateOriginal': 'text',
'DateCreated': 'text',
'DateModified': 'text',
'CameraMake': 'text',
'CameraModel': 'text',
'OriginalName':'text',
'SrcPath': 'text',
'Subdirs': 'text',
'Filename': 'text'
}
location_header = {
'Latitude': 'real not null',
'Longitude': 'real not null',
'LatitudeRef': 'text',
'LongitudeRef': 'text',
'City': 'text',
'State': 'text',
'Country': 'text',
'Default': 'text'
}
self.tables = {
'metadata': {
'header': metadata_header,
'primary_keys': ('FilePath',)
},
'location': {
'header': location_header,
'primary_keys': ('Latitude', 'Longitude')
}
}
self.primary_metadata_keys = self.tables['metadata']['primary_keys']
self.primary_location_keys = self.tables['location']['primary_keys']
2021-08-31 16:18:41 +02:00
# Create tables
for table, d in self.tables.items():
if not self.is_table(table):
self.create_table(table, d['header'], d['primary_keys'])
2021-08-31 16:18:41 +02:00
def is_Sqlite3(self, filename):
if not os.path.isfile(filename):
return False
if os.path.getsize(filename) < 100: # SQLite database file header is 100 bytes
return False
with open(filename, 'rb') as fd:
header = fd.read(100)
return header[:16] == self.db_type + '\x00'
def is_table(self, table):
"""Check if table exist"""
try:
# get the count of tables with the name
self.cur.execute(f"select count(name) from sqlite_master where type='table' and name='{table}'")
2021-08-31 16:18:41 +02:00
except sqlite3.DatabaseError as e:
# raise type(e)(e.message + ' :{self.filename} %s' % arg1)
raise sqlite3.DatabaseError(f"{self.filename} is not valid database")
# if the count is 1, then table exists
if self.cur.fetchone()[0] == 1:
return True
return False
def _run(self, query, n=0):
result = False
2021-08-31 16:18:41 +02:00
result = self.cur.execute(query).fetchone()
if result:
return result[n]
else:
return False
2021-08-31 16:18:41 +02:00
def _run_many(self, query):
self.cur.executemany(query, table_list)
if self.cur.fetchone()[0] != 1:
return False
self.con.commit()
return True
def create_table(self, table, header, primary_keys):
"""
:params: row data (dict), primary_key (tuple)
:returns: bool
2021-08-31 16:18:41 +02:00
"""
fieldset = []
for col, definition in header.items():
fieldset.append(f"'{col}' {definition}")
items = ', '.join(primary_keys)
fieldset.append(f"primary key ({items})")
if len(fieldset) > 0:
query = "create table {0} ({1})".format(table, ", ".join(fieldset))
self.cur.execute(query)
self.tables[table]['header'] = header
return True
2021-08-31 16:18:41 +02:00
return False
2021-08-31 16:18:41 +02:00
def add_row(self, table, row_data):
"""
:returns: lastrowid (int)
"""
header = self.tables[table]['header']
if len(row_data) != len(header):
raise ValueError(f'''Table {table} length mismatch: row_data
{row_data}, header {header}''')
columns = ', '.join(row_data.keys())
placeholders = ', '.join('?' * len(row_data))
# If duplicate primary keys, row is replaced(updated) with new value
query = f'replace into {table} values ({placeholders})'
values = []
for key, value in row_data.items():
if key in self.tables[table]['primary_keys'] and value is None:
# Ignore entry is primary key is None
return None
if isinstance(value, bool):
values.append(int(value))
else:
values.append(value)
self.cur.execute(query, values)
2021-08-31 16:18:41 +02:00
self.con.commit()
return self.cur.lastrowid
def get_header(self, row_data):
"""
:params: row data (dict)
:returns: header
"""
sql_table = {}
for key, value in row_data.items():
for sql_type, t in self.types.items():
# Find corresponding sql_type from python type
if type(value) in t:
sql_table[key] = sql_type
return sql_table
def build_table(self, table, row_data, primary_keys):
header = self.get_header(row_data)
create_table(table, row_data, primary_keys)
def build_row(self, table, row_data):
"""
:params: row data (dict), primary_key (tuple)
:returns: bool
"""
if not self.tables[table]['header']:
result = self.build_table(table, row_data,
self.tables[table]['primary_keys'])
if not result:
return False
return self.add_row(table, row_data)
2021-08-31 16:18:41 +02:00
2021-10-15 06:41:22 +02:00
def get_checksum(self, file_path):
query = f"select Checksum from metadata where FilePath='{file_path}'"
2021-08-31 16:18:41 +02:00
return self._run(query)
2021-10-15 06:41:22 +02:00
def get_metadata_data(self, file_path, data):
query = f"select {data} from metadata where FilePath='{file_path}'"
2021-08-31 16:18:41 +02:00
return self._run(query)
2021-10-15 06:41:22 +02:00
def match_location(self, latitude, longitude):
query = f"""select 1 from location where Latitude='{latitude}'
and Longitude='{longitude}'"""
2021-08-31 16:18:41 +02:00
return self._run(query)
2021-10-15 06:41:22 +02:00
def get_location_data(self, location_id, data):
query = f"select '{data}' from location where ROWID='{location_id}'"
2021-08-31 16:18:41 +02:00
return self._run(query)
2021-10-15 06:41:22 +02:00
def get_location(self, latitude, longitude, column):
query = f"""select {column} from location where Latitude='{latitude}'
and Longitude='{longitude}'"""
2021-08-31 16:18:41 +02:00
return self._run(query)
2016-01-08 23:49:06 +01:00
2021-08-31 16:18:41 +02:00
def _get_table(self, table):
self.cur.execute(f'SELECT * FROM {table}').fetchall()
2021-08-31 16:18:41 +02:00
def get_location_nearby(self, latitude, longitude, Column,
threshold_m=3000):
2016-01-08 23:49:06 +01:00
"""Find a name for a location in the database.
:param float latitude: Latitude of the location.
:param float longitude: Longitude of the location.
:param int threshold_m: Location in the database must be this close to
the given latitude and longitude.
:returns: str, or None if a matching location couldn't be found.
"""
2021-08-31 16:18:41 +02:00
shorter_distance = sys.maxsize
value = None
self.cur.execute('SELECT * FROM location')
for row in self.cur:
distance = distance_between_two_points(latitude, longitude,
row[0], row[1])
# Use if closer then threshold_km reuse lookup
2021-08-31 16:18:41 +02:00
if(distance < shorter_distance and distance <= threshold_m):
shorter_distance = distance
value = row[Column]
2021-08-31 16:18:41 +02:00
return value
def delete_row(self, table, column, value):
2016-01-08 23:49:06 +01:00
"""
2021-08-31 16:18:41 +02:00
Delete a row by row id in table
:param table: database table
:param id: id of the row
:return:
"""
sql = f'delete from {table} where {column}=?'
self.cur.execute(sql, (value,))
2021-08-31 16:18:41 +02:00
self.con.commit()
def delete_filepath(self, value):
self.delete_row('metadata', 'FilePath', value)
2021-08-31 16:18:41 +02:00
def delete_all_rows(self, table):
"""
Delete all row in table
:param table: database table
:return:
"""
sql = f'delete from {table}'
self.cur.execute(sql)
self.con.commit()
2021-10-15 06:41:22 +02:00
def len(self, table):
sql = f'select count() from {table}'
return self._run(sql)
2021-10-15 06:41:22 +02:00
def get_rows(self, table):
"""Cycle through rows in table
:params: str
:return: iter
"""
self.cur.execute(f'select * from {table}')
for row in self.cur:
yield row