Merge pull request #101 from jmathai/text-file-support

gh-100 Add support for importing text files and a few initial tests
This commit is contained in:
Jaisen Mathai 2016-04-12 00:14:49 -07:00
commit 9bdb451c86
9 changed files with 629 additions and 154 deletions

1
.gitignore vendored
View File

@ -7,3 +7,4 @@ docs/_build
build/** build/**
**/*.nef **/*.nef
**/*.dng **/*.dng
**/*.rw2

View File

@ -16,7 +16,9 @@ if not verify_dependencies():
from elodie import constants from elodie import constants
from elodie import geolocation from elodie import geolocation
from elodie.media.base import Base
from elodie.media.media import Media from elodie.media.media import Media
from elodie.media.text import Text
from elodie.media.audio import Audio from elodie.media.audio import Audio
from elodie.media.photo import Photo from elodie.media.photo import Photo
from elodie.media.video import Video from elodie.media.video import Video
@ -38,7 +40,7 @@ def import_file(_file, destination, album_from_folder, trash):
(_file, _file) (_file, _file)
return return
media = Media.get_class_by_file(_file, [Audio, Photo, Video]) media = Media.get_class_by_file(_file, [Text, Audio, Photo, Video])
if not media: if not media:
if constants.debug: if constants.debug:
print 'Not a supported file (%s)' % _file print 'Not a supported file (%s)' % _file
@ -155,7 +157,7 @@ def _update(album, location, time, title, files):
destination = os.path.expanduser(os.path.dirname(os.path.dirname( destination = os.path.expanduser(os.path.dirname(os.path.dirname(
os.path.dirname(file_path)))) os.path.dirname(file_path))))
media = Media.get_class_by_file(file_path, [Audio, Photo, Video]) media = Media.get_class_by_file(file_path, [Text, Audio, Photo, Video])
if not media: if not media:
continue continue
@ -194,7 +196,7 @@ def _update(album, location, time, title, files):
if updated: if updated:
updated_media = Media.get_class_by_file(file_path, updated_media = Media.get_class_by_file(file_path,
[Audio, Photo, Video]) [Text, Audio, Photo, Video])
# See comments above on why we have to do this when titles # See comments above on why we have to do this when titles
# get updated. # get updated.
if remove_old_title_from_name and len(original_title) > 0: if remove_old_title_from_name and len(original_title) > 0:

200
elodie/media/base.py Normal file
View File

@ -0,0 +1,200 @@
"""
The base module provides a base :class:`Base` class for all objects that
are tracked by Elodie. The Base class provides some base functionality used
by all the media types, but isn't itself used to represent anything. Its
sub-classes (:class:`~elodie.media.audio.Audio`,
:class:`~elodie.media.photo.Photo`, :class:`~elodie.media.video.Video`, and
:class:`~elodie.media.text.Text`)
are used to represent the actual files.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
import mimetypes
import os
class Base(object):
"""The base class for all media objects.
:param str source: The fully qualified path to the video file.
"""
__name__ = 'Base'
def __init__(self, source=None):
self.source = source
self.reset_cache()
def format_metadata(self, **kwargs):
"""Method to consistently return a populated metadata dictionary.
:returns: dict
"""
def get_album(self):
"""Base method for getting an album
:returns: None
"""
return None
def get_file_path(self):
"""Get the full path to the video.
:returns: string
"""
return self.source
def get_coordinate(self, type):
return None
def get_extension(self):
"""Get the file extension as a lowercased string.
:returns: string or None for a non-video
"""
if(not self.is_valid()):
return None
source = self.source
return os.path.splitext(source)[1][1:].lower()
def get_metadata(self, update_cache=False):
"""Get a dictionary of metadata for any file.
All keys will be present and have a value of None if not obtained.
:returns: dict or None for non-text files
"""
if(not self.is_valid()):
return None
if(self.metadata is not None and update_cache is False):
return self.metadata
source = self.source
self.metadata = {
'date_taken': self.get_date_taken(),
'latitude': self.get_coordinate('latitude'),
'longitude': self.get_coordinate('longitude'),
'album': self.get_album(),
'title': self.get_title(),
'mime_type': self.get_mimetype(),
'base_name': os.path.splitext(os.path.basename(source))[0],
'extension': self.get_extension(),
'directory_path': os.path.dirname(source)
}
return self.metadata
def get_mimetype(self):
"""Get the mimetype of the file.
:returns: str or None for a non-video
"""
if(not self.is_valid()):
return None
source = self.source
mimetype = mimetypes.guess_type(source)
if(mimetype is None):
return None
return mimetype[0]
def get_title(self):
"""Base method for getting the title of a file
:returns: None
"""
return None
def is_valid(self):
"""Check the file extension against valid file extensions.
The list of valid file extensions come from self.extensions.
:returns: bool
"""
source = self.source
return os.path.splitext(source)[1][1:].lower() in self.extensions
def reset_cache(self):
"""Resets any internal cache
"""
self.metadata = None
def set_album(self, name):
"""Base method for setting the album of a file
:returns: None
"""
return None
def set_album_from_folder(self):
"""Set the album attribute based on the leaf folder name
:returns: bool
"""
metadata = self.get_metadata()
# If this file has an album already set we do not overwrite EXIF
if(metadata['album'] is not None):
return False
folder = os.path.basename(metadata['directory_path'])
# If folder is empty we skip
if(len(folder) == 0):
return False
self.set_album(folder)
return True
def set_metadata_basename(self, new_basename):
"""Update the basename attribute in the metadata dict for this instance.
This is used for when we update the EXIF title of a media file. Since
that determines the name of a file if we update the title of a file
more than once it appends to the file name.
i.e. 2015-12-31_00-00-00-my-first-title-my-second-title.jpg
:param str new_basename: New basename of file (with the old title
removed).
"""
self.get_metadata()
self.metadata['base_name'] = new_basename
def set_metadata(self, **kwargs):
"""Method to manually update attributes in metadata.
:params dict kwargs: Named parameters to update.
"""
metadata = self.get_metadata()
for key in kwargs:
if(key in metadata):
self.metadata[key] = kwargs[key]
@classmethod
def get_class_by_file(cls, _file, classes):
if not isinstance(_file, basestring) or not os.path.isfile(_file):
return None
extension = os.path.splitext(_file)[1][1:].lower()
for i in classes:
if(extension in i.extensions):
return i(_file)
return None
@classmethod
def get_valid_extensions(cls):
"""Static method to access static extensions variable.
:returns: tuple(str)
"""
return cls.extensions

View File

@ -1,5 +1,5 @@
""" """
The media module provides a base :class:`Media` class for all objects that The media module provides a base :class:`Media` class for media objects that
are tracked by Elodie. The Media class provides some base functionality used are tracked by Elodie. The Media class provides some base functionality used
by all the media types, but isn't itself used to represent anything. Its by all the media types, but isn't itself used to represent anything. Its
sub-classes (:class:`~elodie.media.audio.Audio`, sub-classes (:class:`~elodie.media.audio.Audio`,
@ -12,15 +12,15 @@ are used to represent the actual files.
# load modules # load modules
from elodie import constants from elodie import constants
from elodie.dependencies import get_exiftool from elodie.dependencies import get_exiftool
from elodie.media.base import Base
import mimetypes
import os import os
import pyexiv2 import pyexiv2
import re import re
import subprocess import subprocess
class Media(object): class Media(Base):
"""The base class for all media objects. """The base class for all media objects.
@ -35,7 +35,7 @@ class Media(object):
} }
def __init__(self, source=None): def __init__(self, source=None):
self.source = source super(Media, self).__init__(source)
self.exif_map = { self.exif_map = {
'date_taken': ['Exif.Photo.DateTimeOriginal', 'Exif.Image.DateTime', 'Exif.Photo.DateTimeDigitized'], # , 'EXIF FileDateTime'], # noqa 'date_taken': ['Exif.Photo.DateTimeOriginal', 'Exif.Image.DateTime', 'Exif.Photo.DateTimeDigitized'], # , 'EXIF FileDateTime'], # noqa
'latitude': 'Exif.GPSInfo.GPSLatitude', 'latitude': 'Exif.GPSInfo.GPSLatitude',
@ -43,7 +43,6 @@ class Media(object):
'longitude': 'Exif.GPSInfo.GPSLongitude', 'longitude': 'Exif.GPSInfo.GPSLongitude',
'longitude_ref': 'Exif.GPSInfo.GPSLongitudeRef', 'longitude_ref': 'Exif.GPSInfo.GPSLongitudeRef',
} }
self.reset_cache()
def get_album(self): def get_album(self):
"""Get album from EXIF """Get album from EXIF
@ -59,23 +58,6 @@ class Media(object):
return exiftool_attributes['album'] return exiftool_attributes['album']
def get_file_path(self):
"""Get the full path to the video.
:returns: string
"""
return self.source
def is_valid(self):
"""The default is_valid() always returns false.
This should be overridden in a child class to return true if the
source is valid, and false otherwise.
:returns: bool
"""
return False
def get_exif(self): def get_exif(self):
"""Read EXIF from a photo file. """Read EXIF from a photo file.
@ -140,61 +122,6 @@ class Media(object):
return self.exiftool_attributes return self.exiftool_attributes
def get_extension(self):
"""Get the file extension as a lowercased string.
:returns: string or None for a non-video
"""
if(not self.is_valid()):
return None
source = self.source
return os.path.splitext(source)[1][1:].lower()
def get_metadata(self, update_cache=False):
"""Get a dictionary of metadata for a photo.
All keys will be present and have a value of None if not obtained.
:returns: dict or None for non-photo files
"""
if(not self.is_valid()):
return None
if(self.metadata is not None and update_cache is False):
return self.metadata
source = self.source
self.metadata = {
'date_taken': self.get_date_taken(),
'latitude': self.get_coordinate('latitude'),
'longitude': self.get_coordinate('longitude'),
'album': self.get_album(),
'title': self.get_title(),
'mime_type': self.get_mimetype(),
'base_name': os.path.splitext(os.path.basename(source))[0],
'extension': self.get_extension(),
'directory_path': os.path.dirname(source)
}
return self.metadata
def get_mimetype(self):
"""Get the mimetype of the file.
:returns: str or None for a non-video
"""
if(not self.is_valid()):
return None
source = self.source
mimetype = mimetypes.guess_type(source)
if(mimetype is None):
return None
return mimetype[0]
def get_title(self): def get_title(self):
"""Get the title for a photo of video """Get the title for a photo of video
@ -211,8 +138,10 @@ class Media(object):
return exiftool_attributes['title'] return exiftool_attributes['title']
def reset_cache(self): def reset_cache(self):
"""Resets any internal cache
"""
self.exiftool_attributes = None self.exiftool_attributes = None
self.metadata = None super(Media, self).reset_cache()
def set_album(self, name): def set_album(self, name):
"""Set album for a photo """Set album for a photo
@ -252,64 +181,3 @@ class Media(object):
self.set_metadata(album=name) self.set_metadata(album=name)
self.reset_cache() self.reset_cache()
return True return True
def set_album_from_folder(self):
metadata = self.get_metadata()
# If this file has an album already set we do not overwrite EXIF
if(metadata['album'] is not None):
return False
folder = os.path.basename(metadata['directory_path'])
# If folder is empty we skip
if(len(folder) == 0):
return False
self.set_album(folder)
return True
def set_metadata_basename(self, new_basename):
"""Update the basename attribute in the metadata dict for this instance.
This is used for when we update the EXIF title of a media file. Since
that determines the name of a file if we update the title of a file
more than once it appends to the file name.
i.e. 2015-12-31_00-00-00-my-first-title-my-second-title.jpg
:param str new_basename: New basename of file (with the old title
removed).
"""
self.get_metadata()
self.metadata['base_name'] = new_basename
def set_metadata(self, **kwargs):
"""Method to manually update attributes in metadata.
:params dict kwargs: Named parameters to update.
"""
metadata = self.get_metadata()
for key in kwargs:
if(key in metadata):
self.metadata[key] = kwargs[key]
@classmethod
def get_class_by_file(cls, _file, classes):
if not isinstance(_file, basestring) or not os.path.isfile(_file):
return None
extension = os.path.splitext(_file)[1][1:].lower()
for i in classes:
if(extension in i.extensions):
return i(_file)
return None
@classmethod
def get_valid_extensions(cls):
"""Static method to access static extensions variable.
:returns: tuple(str)
"""
return cls.extensions

164
elodie/media/text.py Normal file
View File

@ -0,0 +1,164 @@
"""
The text module provides a base :class:`Text` class for text files that
are tracked by Elodie.
.. moduleauthor:: Jaisen Mathai <jaisen@jmathai.com>
"""
# load modules
from elodie import constants
from elodie.media.base import Base
from json import dumps, loads
import os
from shutil import copyfileobj
import time
class Text(Base):
"""The class for all text files.
:param str source: The fully qualified path to the text file.
"""
__name__ = 'Text'
#: Valid extensions for text files.
extensions = ('txt')
def __init__(self, source=None):
super(Text, self).__init__(source)
self.reset_cache()
def get_album(self):
self.parse_metadata_line()
if not self.metadata_line or 'album' not in self.metadata_line:
return None
return self.metadata_line['album']
def get_coordinate(self, type='latitude'):
self.parse_metadata_line()
if not self.metadata_line:
return None
elif type in self.metadata_line:
if type == 'latitude':
return self.metadata_line['latitude'] or None
elif type == 'longitude':
return self.metadata_line['longitude'] or None
return None
def get_date_taken(self):
source = self.source
self.parse_metadata_line()
# We return the value if found in metadata
if self.metadata_line and 'date_taken' in self.metadata_line:
return time.gmtime(self.metadata_line['date_taken'])
# If there's no date_taken in the metadata we return
# from the filesystem
seconds_since_epoch = min(
os.path.getmtime(source),
os.path.getctime(source)
)
return time.gmtime(seconds_since_epoch)
def get_metadata(self):
self.parse_metadata_line()
return super(Text, self).get_metadata()
def get_title(self):
self.parse_metadata_line()
if not self.metadata_line or 'title' not in self.metadata_line:
return None
return self.metadata_line['title']
def reset_cache(self):
"""Resets any internal cache
"""
self.metadata_line = None
super(Text, self).reset_cache()
def set_album(self, name):
status = self.write_metadata(album=name)
self.reset_cache()
return status
def set_location(self, latitude, longitude):
status = self.write_metadata(latitude=latitude, longitude=longitude)
self.reset_cache()
return status
def set_date_taken(self, passed_in_time):
if(time is None):
return False
seconds_since_epoch = time.mktime(passed_in_time.timetuple())
status = self.write_metadata(date_taken=seconds_since_epoch)
self.reset_cache()
return status
def parse_metadata_line(self):
if self.metadata_line:
return self.metadata_line
source = self.source
if source is None:
return None
with open(source, 'r') as f:
first_line = f.readline().strip()
try:
parsed_json = loads(first_line)
self.metadata_line = parsed_json
except ValueError:
if(constants.debug is True):
print 'Could not parse JSON from first line: %s' % first_line
pass
def write_metadata(self, **kwargs):
if len(kwargs) == 0:
return False
source = self.source
self.parse_metadata_line()
# Set defaults for a file without metadata
# Check if self.metadata_line is set and use that instead
metadata_line = {}
has_metadata = False
if self.metadata_line:
metadata_line = self.metadata_line
has_metadata = True
for name in kwargs:
metadata_line[name] = kwargs[name]
metadata_as_json = dumps(metadata_line)
if has_metadata:
# Update the first line of this file in place
# http://stackoverflow.com/a/14947384
with open(source, 'r') as f_read:
f_read.readline()
with open(source, 'w') as f_write:
f_write.write("{}\n".format(metadata_as_json))
copyfileobj(f_read, f_write)
else:
# Prepend the metadata to the file
with open(source, 'r') as f_read:
original_contents = f_read.read()
with open(source, 'w') as f_write:
f_write.write("{}\n{}".format(
metadata_as_json,
original_contents)
)
self.reset_cache()
return True

View File

@ -19,6 +19,7 @@ import time
from elodie import constants from elodie import constants
from elodie import plist_parser from elodie import plist_parser
from elodie.dependencies import get_exiftool from elodie.dependencies import get_exiftool
from media import Base
from media import Media from media import Media
@ -163,16 +164,6 @@ class Video(Media):
) )
return process_output.stdout.read() return process_output.stdout.read()
def is_valid(self):
"""Check the file extension against valid file extensions.
The list of valid file extensions come from self.extensions.
:returns: bool
"""
source = self.source
return os.path.splitext(source)[1][1:].lower() in self.extensions
def set_date_taken(self, date_taken_as_datetime): def set_date_taken(self, date_taken_as_datetime):
""" """
Set the date/time a photo was taken Set the date/time a photo was taken
@ -377,7 +368,7 @@ class Video(Media):
# Before we do anything destructive we confirm that the # Before we do anything destructive we confirm that the
# file is in tact. # file is in tact.
check_media = Media.get_class_by_file(temp_movie, [self.__class__]) check_media = Base.get_class_by_file(temp_movie, [self.__class__])
check_metadata = check_media.get_metadata() check_metadata = check_media.get_metadata()
if( if(
( (
@ -397,7 +388,7 @@ class Video(Media):
# gh-89 Before we wrap up we check if an album was previously set # gh-89 Before we wrap up we check if an album was previously set
# and if so we re-apply that album because avmetareadwrite # and if so we re-apply that album because avmetareadwrite
# clobbers it # clobbers it
source_media = Media.get_class_by_file(source, [self.__class__]) source_media = Base.get_class_by_file(source, [self.__class__])
source_metadata = source_media.get_metadata() source_metadata = source_media.get_metadata()
if(source_metadata['album'] is not None): if(source_metadata['album'] is not None):
check_media.set_album(source_metadata['album']) check_media.set_album(source_metadata['album'])

View File

@ -0,0 +1 @@
This file has no header.

View File

@ -0,0 +1,3 @@
{"date_taken":1460027726.0,"latitude":"123.456","longitude":"234.567","title":"sample title"}
This file has a valid header.

View File

@ -0,0 +1,245 @@
# -*- coding: utf-8
# Project imports
import os
import sys
from datetime import datetime
import shutil
import tempfile
import time
from nose.plugins.skip import SkipTest
sys.path.insert(0, os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))))))
sys.path.insert(0, os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))))
import helper
from elodie.media.base import Base
from elodie.media.text import Text
os.environ['TZ'] = 'GMT'
def test_text_extensions():
text = Text()
extensions = text.extensions
assert 'txt' in extensions
valid_extensions = Text.get_valid_extensions()
assert extensions == valid_extensions, valid_extensions
def test_get_title():
text = Text(helper.get_file('valid.txt'))
text.get_metadata()
assert text.get_title() == 'sample title', text.get_title()
def test_get_default_coordinate():
text = Text(helper.get_file('valid.txt'))
text.get_metadata()
assert text.get_coordinate() == '123.456', text.get_coordinate()
def test_get_coordinate_latitude():
text = Text(helper.get_file('valid.txt'))
text.get_metadata()
assert text.get_coordinate('latitude') == '123.456', text.get_coordinate('latitude')
def test_get_coordinate_longitude():
text = Text(helper.get_file('valid.txt'))
text.get_metadata()
assert text.get_coordinate('longitude') == '234.567', text.get_coordinate('longitude')
def test_get_date_taken():
text = Text(helper.get_file('valid.txt'))
text.get_metadata()
date_taken = text.get_date_taken()
assert date_taken == helper.time_convert((2016, 4, 7, 11, 15, 26, 3, 98, 0)), date_taken
def test_set_album():
temporary_folder, folder = helper.create_working_folder()
origin = '%s/text.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin)
text = Text(origin)
metadata = text.get_metadata()
with open(origin, 'r') as f:
f.readline()
contents = f.read()
album_name = 'Test Album'
assert album_name != metadata['album']
status = text.set_album(album_name)
assert status == True, status
text_new = Text(origin)
metadata_new = text_new.get_metadata()
with open(origin, 'r') as f:
f.readline()
contents_new = f.read()
assert contents == contents_new, contents_new
shutil.rmtree(folder)
assert album_name == metadata_new['album'], metadata_new
def test_set_date_taken():
temporary_folder, folder = helper.create_working_folder()
origin = '%s/text.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin)
text = Text(origin)
metadata = text.get_metadata()
with open(origin, 'r') as f:
f.readline()
contents = f.read()
assert helper.time_convert((2013, 9, 30, 7, 6, 5, 0, 273, 0)) != metadata['date_taken'], metadata['date_taken']
status = text.set_date_taken(datetime(2013, 9, 30, 7, 6, 5))
assert status == True, status
text_new = Text(origin)
metadata_new = text_new.get_metadata()
with open(origin, 'r') as f:
f.readline()
contents_new = f.read()
assert contents == contents_new, contents_new
shutil.rmtree(folder)
assert helper.time_convert((2013, 9, 30, 7, 6, 5, 0, 273, 0)) == metadata_new['date_taken'], metadata_new['date_taken']
def test_set_location():
temporary_folder, folder = helper.create_working_folder()
origin = '%s/text.txt' % folder
shutil.copyfile(helper.get_file('valid.txt'), origin)
text = Text(origin)
origin_metadata = text.get_metadata()
with open(origin, 'r') as f:
f.readline()
contents = f.read()
# Verify that original photo has different location info that what we
# will be setting and checking
assert not helper.isclose(origin_metadata['latitude'], 11.1111111111), origin_metadata['latitude']
assert not helper.isclose(origin_metadata['longitude'], 99.9999999999), origin_metadata['longitude']
status = text.set_location(11.1111111111, 99.9999999999)
assert status == True, status
text_new = Text(origin)
metadata = text_new.get_metadata()
with open(origin, 'r') as f:
f.readline()
contents_new = f.read()
assert contents == contents_new, contents_new
shutil.rmtree(folder)
assert helper.isclose(metadata['latitude'], 11.1111111111), metadata['latitude']
def test_set_album_without_header():
temporary_folder, folder = helper.create_working_folder()
origin = '%s/text.txt' % folder
shutil.copyfile(helper.get_file('valid-without-header.txt'), origin)
text = Text(origin)
metadata = text.get_metadata()
with open(origin, 'r') as f:
contents = f.read()
album_name = 'Test Album'
assert album_name != metadata['album']
status = text.set_album(album_name)
assert status == True, status
text_new = Text(origin)
metadata_new = text_new.get_metadata()
with open(origin, 'r') as f:
f.readline()
contents_new = f.read()
assert contents == contents_new, contents_new
shutil.rmtree(folder)
assert album_name == metadata_new['album'], metadata_new
def test_set_date_taken_without_header():
temporary_folder, folder = helper.create_working_folder()
origin = '%s/text.txt' % folder
shutil.copyfile(helper.get_file('valid-without-header.txt'), origin)
text = Text(origin)
metadata = text.get_metadata()
with open(origin, 'r') as f:
contents = f.read()
assert helper.time_convert((2013, 9, 30, 7, 6, 5, 0, 273, 0)) != metadata['date_taken'], metadata['date_taken']
status = text.set_date_taken(datetime(2013, 9, 30, 7, 6, 5))
assert status == True, status
text_new = Text(origin)
metadata_new = text_new.get_metadata()
with open(origin, 'r') as f:
f.readline()
contents_new = f.read()
assert contents == contents_new, contents_new
shutil.rmtree(folder)
assert helper.time_convert((2013, 9, 30, 7, 6, 5, 0, 273, 0)) == metadata_new['date_taken'], metadata_new['date_taken']
def test_set_location_without_header():
temporary_folder, folder = helper.create_working_folder()
origin = '%s/text.txt' % folder
shutil.copyfile(helper.get_file('valid-without-header.txt'), origin)
text = Text(origin)
origin_metadata = text.get_metadata()
with open(origin, 'r') as f:
contents = f.read()
# Verify that original photo has different location info that what we
# will be setting and checking
assert not helper.isclose(origin_metadata['latitude'], 11.1111111111), origin_metadata['latitude']
assert not helper.isclose(origin_metadata['longitude'], 99.9999999999), origin_metadata['longitude']
status = text.set_location(11.1111111111, 99.9999999999)
assert status == True, status
text_new = Text(origin)
metadata = text_new.get_metadata()
with open(origin, 'r') as f:
f.readline()
contents_new = f.read()
assert contents == contents_new, contents_new
shutil.rmtree(folder)
assert helper.isclose(metadata['latitude'], 11.1111111111), metadata['latitude']