Closes gh-159 Remove RESULT as global variable in elodie.py

This commit is contained in:
Jaisen Mathai 2016-12-15 01:24:04 -08:00
parent 9ad7358198
commit 7c3f323c44
4 changed files with 128 additions and 17 deletions

View File

@ -30,7 +30,6 @@ from elodie.result import Result
FILESYSTEM = FileSystem() FILESYSTEM = FileSystem()
RESULT = Result()
def import_file(_file, destination, album_from_folder, trash, allow_duplicates): def import_file(_file, destination, album_from_folder, trash, allow_duplicates):
@ -83,6 +82,7 @@ def import_file(_file, destination, album_from_folder, trash, allow_duplicates):
def _import(destination, source, file, album_from_folder, trash, paths, allow_duplicates): def _import(destination, source, file, album_from_folder, trash, paths, allow_duplicates):
"""Import files or directories by reading their EXIF and organizing them accordingly. """Import files or directories by reading their EXIF and organizing them accordingly.
""" """
result = Result()
destination = os.path.abspath(os.path.expanduser(destination)) destination = os.path.abspath(os.path.expanduser(destination))
files = set() files = set()
@ -101,9 +101,9 @@ def _import(destination, source, file, album_from_folder, trash, paths, allow_du
for current_file in files: for current_file in files:
dest_path = import_file(current_file, destination, album_from_folder, dest_path = import_file(current_file, destination, album_from_folder,
trash, allow_duplicates) trash, allow_duplicates)
RESULT.append((current_file, dest_path)) result.append((current_file, dest_path))
RESULT.write() result.write()
@click.command('generate-db') @click.command('generate-db')
@ -112,6 +112,7 @@ def _import(destination, source, file, album_from_folder, trash, paths, allow_du
def _generate_db(source): def _generate_db(source):
"""Regenerate the hash.json database which contains all of the sha1 signatures of media files. """Regenerate the hash.json database which contains all of the sha1 signatures of media files.
""" """
result = Result()
source = os.path.abspath(os.path.expanduser(source)) source = os.path.abspath(os.path.expanduser(source))
extensions = set() extensions = set()
@ -135,31 +136,31 @@ def _generate_db(source):
for current_file in all_files: for current_file in all_files:
if os.path.splitext(current_file)[1][1:].lower() not in extensions: if os.path.splitext(current_file)[1][1:].lower() not in extensions:
log.info('Skipping invalid file %s' % current_file) log.info('Skipping invalid file %s' % current_file)
RESULT.append((current_file, None)) result.append((current_file, False))
continue continue
RESULT.append((current_file, True)) result.append((current_file, True))
db.add_hash(db.checksum(current_file), current_file) db.add_hash(db.checksum(current_file), current_file)
db.update_hash_db() db.update_hash_db()
RESULT.write() result.write()
@click.command('verify') @click.command('verify')
def _verify(): def _verify():
result = Result()
db = Db() db = Db()
for checksum, file_path in db.all(): for checksum, file_path in db.all():
if not os.path.isfile(file_path): if not os.path.isfile(file_path):
RESULT.append((file_path, True)) result.append((file_path, False))
continue continue
actual_checksum = db.checksum(file_path) actual_checksum = db.checksum(file_path)
if checksum == actual_checksum: if checksum == actual_checksum:
RESULT.append((file_path, True)) result.append((file_path, True))
else: else:
RESULT.append((file_path, None)) result.append((file_path, False))
RESULT.write()
result.write()
def update_location(media, file_path, location_name): def update_location(media, file_path, location_name):
@ -209,6 +210,7 @@ def update_time(media, file_path, time_string):
def _update(album, location, time, title, files): def _update(album, location, time, title, files):
"""Update a file's EXIF. Automatically modifies the file's location and file name accordingly. """Update a file's EXIF. Automatically modifies the file's location and file name accordingly.
""" """
result = Result()
for current_file in files: for current_file in files:
if not os.path.exists(current_file): if not os.path.exists(current_file):
if constants.debug: if constants.debug:
@ -278,11 +280,11 @@ def _update(album, location, time, title, files):
FILESYSTEM.delete_directory_if_empty(os.path.dirname(current_file)) FILESYSTEM.delete_directory_if_empty(os.path.dirname(current_file))
FILESYSTEM.delete_directory_if_empty( FILESYSTEM.delete_directory_if_empty(
os.path.dirname(os.path.dirname(current_file))) os.path.dirname(os.path.dirname(current_file)))
RESULT.append((current_file, dest_path)) result.append((current_file, dest_path))
else: else:
RESULT.append((current_file, None)) result.append((current_file, False))
RESULT.write() result.write()
@click.group() @click.group()

View File

@ -379,6 +379,42 @@ def test_regenerate_valid_source_with_invalid_files():
assert 'bde2dc0b839a5d20b0b4c1f57605f84e0e2a4562aaebc1c362de6cb7cc02eeb3' in db.hash_db, db.hash_db assert 'bde2dc0b839a5d20b0b4c1f57605f84e0e2a4562aaebc1c362de6cb7cc02eeb3' in db.hash_db, db.hash_db
assert 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' not in db.hash_db, db.hash_db assert 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' not in db.hash_db, db.hash_db
def test_verify_ok():
temporary_folder, folder = helper.create_working_folder()
origin = '%s/valid.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin)
reset_hash_db()
runner = CliRunner()
runner.invoke(elodie._generate_db, ['--source', folder])
result = runner.invoke(elodie._verify)
restore_hash_db()
shutil.rmtree(folder)
assert 'Success 1' in result.output, result.output
assert 'Error 0' in result.output, result.output
def test_verify_error():
temporary_folder, folder = helper.create_working_folder()
origin = '%s/valid.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin)
reset_hash_db()
runner = CliRunner()
runner.invoke(elodie._generate_db, ['--source', folder])
with open(origin, 'w') as f:
f.write('changed text')
result = runner.invoke(elodie._verify)
restore_hash_db()
shutil.rmtree(folder)
assert origin in result.output, result.output
assert 'Error 1' in result.output, result.output
def reset_hash_db(): def reset_hash_db():
hash_db = constants.hash_db hash_db = constants.hash_db
if os.path.isfile(hash_db): if os.path.isfile(hash_db):

View File

@ -107,6 +107,50 @@ def test_get_hash_does_not_exist():
assert db.get_hash(random_key) is None, 'Lookup for hash that should not exist did not return None' assert db.get_hash(random_key) is None, 'Lookup for hash that should not exist did not return None'
def test_get_all():
db = Db()
db.reset_hash_db()
random_keys = []
random_values = []
for _ in range(10):
random_keys.append(helper.random_string(10))
random_values.append(helper.random_string(12))
db.add_hash(random_keys[-1:][0], random_values[-1:][0], False)
counter = 0
for key, value in db.all():
assert key in random_keys, key
assert value in random_values, value
counter += 1
assert counter == 10, counter
def test_get_all_empty():
db = Db()
db.reset_hash_db()
counter = 0
for key, value in db.all():
counter += 1
# there's a final iteration because of the generator
assert counter == 0, counter
def test_reset_hash_db():
db = Db()
random_key = helper.random_string(10)
random_value = helper.random_string(12)
# Test with explicit False value as 3rd param
db.add_hash(random_key, random_value, False)
assert random_key in db.hash_db, random_key
db.reset_hash_db()
assert random_key not in db.hash_db, random_key
def test_update_hash_db(): def test_update_hash_db():
db = Db() db = Db()

View File

@ -24,11 +24,40 @@ def call_result_and_assert(result, expected):
sys.stdout = out sys.stdout = out
result.write() result.write()
output = out.getvalue().strip() output = out.getvalue().strip()
assert output == expected, expected assert output == expected, output
finally: finally:
sys.stdout = saved_stdout sys.stdout = saved_stdout
def test_add_multiple_rows(): def test_add_multiple_rows_with_success():
expected = """****** SUMMARY ******
Metric Count
-------- -------
Success 2
Error 0"""
result = Result()
result.append(('id1', '/some/path/1'))
result.append(('id2', '/some/path/2'))
call_result_and_assert(result, expected)
def test_add_multiple_rows_with_failure():
expected = """****** ERROR DETAILS ******
File
------
id1
id2
****** SUMMARY ******
Metric Count
-------- -------
Success 0
Error 2"""
result = Result()
result.append(('id1', False))
result.append(('id2', False))
call_result_and_assert(result, expected)
def test_add_multiple_rows_with_failure_and_success():
expected = """****** ERROR DETAILS ****** expected = """****** ERROR DETAILS ******
File File
------ ------
@ -41,6 +70,6 @@ Metric Count
Success 1 Success 1
Error 1""" Error 1"""
result = Result() result = Result()
result.append(('id1', None)) result.append(('id1', False))
result.append(('id2', '/some/path')) result.append(('id2', '/some/path'))
call_result_and_assert(result, expected) call_result_and_assert(result, expected)