Merge pull request #158 from jmathai/regen-stats

Add command to check all files for bit rot
This commit is contained in:
Jaisen Mathai 2016-12-15 01:31:34 -08:00 committed by GitHub
commit a7c9a5ffbd
5 changed files with 149 additions and 9 deletions

View File

@ -30,7 +30,6 @@ from elodie.result import Result
FILESYSTEM = FileSystem()
RESULT = Result()
def import_file(_file, destination, album_from_folder, trash, allow_duplicates):
@ -83,6 +82,7 @@ def import_file(_file, destination, album_from_folder, trash, allow_duplicates):
def _import(destination, source, file, album_from_folder, trash, paths, allow_duplicates):
"""Import files or directories by reading their EXIF and organizing them accordingly.
"""
result = Result()
destination = os.path.abspath(os.path.expanduser(destination))
files = set()
@ -101,9 +101,9 @@ def _import(destination, source, file, album_from_folder, trash, paths, allow_du
for current_file in files:
dest_path = import_file(current_file, destination, album_from_folder,
trash, allow_duplicates)
RESULT.append((current_file, dest_path))
result.append((current_file, dest_path))
RESULT.write()
result.write()
@click.command('generate-db')
@ -112,6 +112,7 @@ def _import(destination, source, file, album_from_folder, trash, paths, allow_du
def _generate_db(source):
"""Regenerate the hash.json database which contains all of the sha1 signatures of media files.
"""
result = Result()
source = os.path.abspath(os.path.expanduser(source))
extensions = set()
@ -135,11 +136,31 @@ def _generate_db(source):
for current_file in all_files:
if os.path.splitext(current_file)[1][1:].lower() not in extensions:
log.info('Skipping invalid file %s' % current_file)
result.append((current_file, False))
continue
result.append((current_file, True))
db.add_hash(db.checksum(current_file), current_file)
db.update_hash_db()
result.write()
@click.command('verify')
def _verify():
result = Result()
db = Db()
for checksum, file_path in db.all():
if not os.path.isfile(file_path):
result.append((file_path, False))
continue
actual_checksum = db.checksum(file_path)
if checksum == actual_checksum:
result.append((file_path, True))
else:
result.append((file_path, False))
result.write()
def update_location(media, file_path, location_name):
@ -189,6 +210,7 @@ def update_time(media, file_path, time_string):
def _update(album, location, time, title, files):
"""Update a file's EXIF. Automatically modifies the file's location and file name accordingly.
"""
result = Result()
for current_file in files:
if not os.path.exists(current_file):
if constants.debug:
@ -258,11 +280,11 @@ def _update(album, location, time, title, files):
FILESYSTEM.delete_directory_if_empty(os.path.dirname(current_file))
FILESYSTEM.delete_directory_if_empty(
os.path.dirname(os.path.dirname(current_file)))
RESULT.append((current_file, dest_path))
result.append((current_file, dest_path))
else:
RESULT.append((current_file, None))
result.append((current_file, False))
RESULT.write()
result.write()
@click.group()
@ -273,6 +295,7 @@ def main():
main.add_command(_import)
main.add_command(_update)
main.add_command(_generate_db)
main.add_command(_verify)
if __name__ == '__main__':

View File

@ -183,6 +183,14 @@ class Db(object):
return None
def all(self):
"""Generator to get all entries from self.hash_db
:returns tuple(string)
"""
for checksum, path in self.hash_db.items():
yield (checksum, path)
def reset_hash_db(self):
self.hash_db = {}

View File

@ -379,6 +379,42 @@ def test_regenerate_valid_source_with_invalid_files():
assert 'bde2dc0b839a5d20b0b4c1f57605f84e0e2a4562aaebc1c362de6cb7cc02eeb3' in db.hash_db, db.hash_db
assert 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' not in db.hash_db, db.hash_db
def test_verify_ok():
temporary_folder, folder = helper.create_working_folder()
origin = '%s/valid.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin)
reset_hash_db()
runner = CliRunner()
runner.invoke(elodie._generate_db, ['--source', folder])
result = runner.invoke(elodie._verify)
restore_hash_db()
shutil.rmtree(folder)
assert 'Success 1' in result.output, result.output
assert 'Error 0' in result.output, result.output
def test_verify_error():
temporary_folder, folder = helper.create_working_folder()
origin = '%s/valid.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin)
reset_hash_db()
runner = CliRunner()
runner.invoke(elodie._generate_db, ['--source', folder])
with open(origin, 'w') as f:
f.write('changed text')
result = runner.invoke(elodie._verify)
restore_hash_db()
shutil.rmtree(folder)
assert origin in result.output, result.output
assert 'Error 1' in result.output, result.output
def reset_hash_db():
hash_db = constants.hash_db
if os.path.isfile(hash_db):

View File

@ -107,6 +107,50 @@ def test_get_hash_does_not_exist():
assert db.get_hash(random_key) is None, 'Lookup for hash that should not exist did not return None'
def test_get_all():
db = Db()
db.reset_hash_db()
random_keys = []
random_values = []
for _ in range(10):
random_keys.append(helper.random_string(10))
random_values.append(helper.random_string(12))
db.add_hash(random_keys[-1:][0], random_values[-1:][0], False)
counter = 0
for key, value in db.all():
assert key in random_keys, key
assert value in random_values, value
counter += 1
assert counter == 10, counter
def test_get_all_empty():
db = Db()
db.reset_hash_db()
counter = 0
for key, value in db.all():
counter += 1
# there's a final iteration because of the generator
assert counter == 0, counter
def test_reset_hash_db():
db = Db()
random_key = helper.random_string(10)
random_value = helper.random_string(12)
# Test with explicit False value as 3rd param
db.add_hash(random_key, random_value, False)
assert random_key in db.hash_db, random_key
db.reset_hash_db()
assert random_key not in db.hash_db, random_key
def test_update_hash_db():
db = Db()

View File

@ -24,11 +24,40 @@ def call_result_and_assert(result, expected):
sys.stdout = out
result.write()
output = out.getvalue().strip()
assert output == expected, expected
assert output == expected, output
finally:
sys.stdout = saved_stdout
def test_add_multiple_rows():
def test_add_multiple_rows_with_success():
expected = """****** SUMMARY ******
Metric Count
-------- -------
Success 2
Error 0"""
result = Result()
result.append(('id1', '/some/path/1'))
result.append(('id2', '/some/path/2'))
call_result_and_assert(result, expected)
def test_add_multiple_rows_with_failure():
expected = """****** ERROR DETAILS ******
File
------
id1
id2
****** SUMMARY ******
Metric Count
-------- -------
Success 0
Error 2"""
result = Result()
result.append(('id1', False))
result.append(('id2', False))
call_result_and_assert(result, expected)
def test_add_multiple_rows_with_failure_and_success():
expected = """****** ERROR DETAILS ******
File
------
@ -41,6 +70,6 @@ Metric Count
Success 1
Error 1"""
result = Result()
result.append(('id1', None))
result.append(('id1', False))
result.append(('id2', '/some/path'))
call_result_and_assert(result, expected)