Add RhetTbull python exiftool wrapper

This commit is contained in:
Cédric Leporcq 2021-08-08 11:35:47 +02:00
parent 144e8f428c
commit 71f0ecaba1
2 changed files with 647 additions and 0 deletions

454
dozo/exiftool.py Normal file
View File

@ -0,0 +1,454 @@
""" Yet another simple exiftool wrapper
I rolled my own for following reasons:
1. I wanted something under MIT license (best alternative was licensed under GPL/BSD)
2. I wanted singleton behavior so only a single exiftool process was ever running
3. When used as a context manager, I wanted the operations to batch until exiting the context (improved performance)
If these aren't important to you, I highly recommend you use Sven Marnach's excellent
pyexiftool: https://github.com/smarnach/pyexiftool which provides more functionality """
import atexit
import json
import logging
import os
import re
import shutil
import subprocess
from abc import ABC, abstractmethod
from functools import lru_cache # pylint: disable=syntax-error
# exiftool -stay_open commands outputs this EOF marker after command is run
EXIFTOOL_STAYOPEN_EOF = "{ready}"
EXIFTOOL_STAYOPEN_EOF_LEN = len(EXIFTOOL_STAYOPEN_EOF)
# list of exiftool processes to cleanup when exiting or when terminate is called
EXIFTOOL_PROCESSES = []
@atexit.register
def terminate_exiftool():
"""Terminate any running ExifTool subprocesses; call this to cleanup when done using ExifTool """
for proc in EXIFTOOL_PROCESSES:
proc._stop_proc()
@lru_cache(maxsize=1)
def get_exiftool_path():
""" return path of exiftool, cache result """
exiftool_path = shutil.which("exiftool")
if exiftool_path:
return exiftool_path.rstrip()
else:
raise FileNotFoundError(
"Could not find exiftool. Please download and install from "
"https://exiftool.org/"
)
class _ExifToolProc:
"""Runs exiftool in a subprocess via Popen
Creates a singleton object"""
def __new__(cls, *args, **kwargs):
""" create new object or return instance of already created singleton """
if not hasattr(cls, "instance") or not cls.instance:
cls.instance = super().__new__(cls)
return cls.instance
def __init__(self, exiftool=None):
"""construct _ExifToolProc singleton object or return instance of already created object
exiftool: optional path to exiftool binary (if not provided, will search path to find it)"""
if hasattr(self, "_process_running") and self._process_running:
# already running
if exiftool is not None and exiftool != self._exiftool:
logging.warning(
f"exiftool subprocess already running, "
f"ignoring exiftool={exiftool}"
)
return
self._process_running = False
self._exiftool = exiftool or get_exiftool_path()
self._start_proc()
@property
def process(self):
""" return the exiftool subprocess """
if self._process_running:
return self._process
else:
self._start_proc()
return self._process
@property
def pid(self):
""" return process id (PID) of the exiftool process """
return self._process.pid
@property
def exiftool(self):
""" return path to exiftool process """
return self._exiftool
def _start_proc(self):
""" start exiftool in batch mode """
if self._process_running:
logging.warning("exiftool already running: {self._process}")
return
# open exiftool process
self._process = subprocess.Popen(
[
self._exiftool,
"-stay_open", # keep process open in batch mode
"True", # -stay_open=True, keep process open in batch mode
"-@", # read command-line arguments from file
"-", # read from stdin
"-common_args", # specifies args common to all commands subsequently run
"-n", # no print conversion (e.g. print tag values in machine readable format)
"-P", # Preserve file modification date/time
"-G", # print group name for each tag
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
self._process_running = True
EXIFTOOL_PROCESSES.append(self)
def _stop_proc(self):
""" stop the exiftool process if it's running, otherwise, do nothing """
if not self._process_running:
return
try:
self._process.stdin.write(b"-stay_open\n")
self._process.stdin.write(b"False\n")
self._process.stdin.flush()
except Exception as e:
pass
try:
self._process.communicate(timeout=5)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.communicate()
del self._process
self._process_running = False
class ExifTool:
""" Basic exiftool interface for reading and writing EXIF tags """
def __init__(self, filepath, exiftool=None, overwrite=True, flags=None):
"""Create ExifTool object
Args:
file: path to image file
exiftool: path to exiftool, if not specified will look in path
overwrite: if True, will overwrite image file without creating backup, default=False
flags: optional list of exiftool flags to prepend to exiftool command when writing metadata (e.g. -m or -F)
Returns:
ExifTool instance
"""
self.file = filepath
self.overwrite = overwrite
self.flags = flags or []
self.data = {}
self.warning = None
self.error = None
# if running as a context manager, self._context_mgr will be True
self._context_mgr = False
self._exiftoolproc = _ExifToolProc(exiftool=exiftool)
self._read_exif()
@property
def _process(self):
return self._exiftoolproc.process
def setvalue(self, tag, value):
"""Set tag to value(s); if value is None, will delete tag
Args:
tag: str; name of tag to set
value: str; value to set tag to
Returns:
True if success otherwise False
If error generated by exiftool, returns False and sets self.error to error string
If warning generated by exiftool, returns True (unless there was also an error) and sets self.warning to warning string
If called in context manager, returns True (execution is delayed until exiting context manager)
"""
if value is None:
value = ""
command = [f"-{tag}={value}"]
if self.overwrite and not self._context_mgr:
command.append("-overwrite_original")
# avoid "Warning: Some character(s) could not be encoded in Latin" warning
command.append("-iptc:codedcharacterset=utf8")
if self._context_mgr:
self._commands.extend(command)
return True
else:
_, _, error = self.run_commands(*command)
return error == ""
def addvalues(self, tag, *values):
"""Add one or more value(s) to tag
If more than one value is passed, each value will be added to the tag
Args:
tag: str; tag to set
*values: str; one or more values to set
Returns:
True if success otherwise False
If error generated by exiftool, returns False and sets self.error to error string
If warning generated by exiftool, returns True (unless there was also an error) and sets self.warning to warning string
If called in context manager, returns True (execution is delayed until exiting context manager)
Notes: exiftool may add duplicate values for some tags so the caller must ensure
the values being added are not already in the EXIF data
For some tags, such as IPTC:Keywords, this will add a new value to the list of keywords,
but for others, such as EXIF:ISO, this will literally add a value to the existing value.
It's up to the caller to know what exiftool will do for each tag
If setvalue called before addvalues, exiftool does not appear to add duplicates,
but if addvalues called without first calling setvalue, exiftool will add duplicate values
"""
if not values:
raise ValueError("Must pass at least one value")
command = []
for value in values:
if value is None:
raise ValueError("Can't add None value to tag")
command.append(f"-{tag}+={value}")
if self.overwrite and not self._context_mgr:
command.append("-overwrite_original")
if self._context_mgr:
self._commands.extend(command)
return True
else:
_, _, error = self.run_commands(*command)
return error == ""
def run_commands(self, *commands, no_file=False):
"""Run commands in the exiftool process and return result.
Args:
*commands: exiftool commands to run
no_file: (bool) do not pass the filename to exiftool (default=False)
by default, all commands will be run against self.file
use no_file=True to run a command without passing the filename
Returns:
(output, warning, errror)
output: bytes is containing output of exiftool commands
warning: if exiftool generated warnings, string containing warning otherwise empty string
error: if exiftool generated errors, string containing otherwise empty string
Note: Also sets self.warning and self.error if warning or error generated.
"""
if not (hasattr(self, "_process") and self._process):
raise ValueError("exiftool process is not running")
if not commands:
raise TypeError("must provide one or more command to run")
if self._context_mgr and self.overwrite:
commands = list(commands)
commands.append("-overwrite_original")
filename = os.fsencode(self.file) if not no_file else b""
if self.flags:
# need to split flags, e.g. so "--ext AVI" becomes ["--ext", "AVI"]
flags = []
for f in self.flags:
flags.extend(f.split())
command_str = b"\n".join([f.encode("utf-8") for f in flags])
command_str += b"\n"
else:
command_str = b""
command_str += (
b"\n".join([c.encode("utf-8") for c in commands])
+ b"\n"
+ filename
+ b"\n"
+ b"-execute\n"
)
# send the command
self._process.stdin.write(command_str)
self._process.stdin.flush()
# read the output
output = b""
warning = b""
error = b""
while EXIFTOOL_STAYOPEN_EOF not in str(output):
line = self._process.stdout.readline()
if line.startswith(b"Warning"):
warning += line.strip()
elif line.startswith(b"Error"):
error += line.strip()
else:
output += line.strip()
warning = "" if warning == b"" else warning.decode("utf-8")
error = "" if error == b"" else error.decode("utf-8")
self.warning = warning
self.error = error
return output[:-EXIFTOOL_STAYOPEN_EOF_LEN], warning, error
@property
def pid(self):
""" return process id (PID) of the exiftool process """
return self._process.pid
@property
def version(self):
""" returns exiftool version """
ver, _, _ = self.run_commands("-ver", no_file=True)
return ver.decode("utf-8")
def asdict(self, tag_groups=True, normalized=False):
"""return dictionary of all EXIF tags and values from exiftool
returns empty dict if no tags
Args:
tag_groups: if True (default), dict keys have tag groups, e.g. "IPTC:Keywords"; if False, drops groups from keys, e.g. "Keywords"
normalized: if True, dict keys are all normalized to lower case (default is False)
"""
json_str, _, _ = self.run_commands("-json")
if not json_str:
return dict()
try:
exifdict = json.loads(json_str)
except Exception as e:
# will fail with some commands, e.g --ext AVI which produces
# 'No file with specified extension' instead of json
return dict()
exifdict = exifdict[0]
if not tag_groups:
# strip tag groups
exif_new = {}
for k, v in exifdict.items():
k = re.sub(r".*:", "", k)
exif_new[k] = v
exifdict = exif_new
if normalized:
exifdict = {k.lower(): v for (k, v) in exifdict.items()}
return exifdict
def json(self):
""" returns JSON string containing all EXIF tags and values from exiftool """
json, _, _ = self.run_commands("-json")
return json
def _read_exif(self):
""" read exif data from file """
data = self.asdict()
self.data = {k: v for k, v in data.items()}
def __str__(self):
return f"file: {self.file}\nexiftool: {self._exiftoolproc._exiftool}"
def __enter__(self):
self._context_mgr = True
self._commands = []
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
return False
elif self._commands:
# run_commands sets self.warning and self.error as needed
self.run_commands(*self._commands)
class ExifToolCaching(ExifTool):
""" Basic exiftool interface for reading and writing EXIF tags, with caching.
Use this only when you know the file's EXIF data will not be changed by any external process.
Creates a singleton cached ExifTool instance """
_singletons = {}
def __new__(cls, filepath, exiftool=None):
""" create new object or return instance of already created singleton """
if filepath not in cls._singletons:
cls._singletons[filepath] = _ExifToolCaching(filepath, exiftool=exiftool)
return cls._singletons[filepath]
class _ExifToolCaching(ExifTool):
def __init__(self, filepath, exiftool=None):
"""Create read-only ExifTool object that caches values
Args:
file: path to image file
exiftool: path to exiftool, if not specified will look in path
Returns:
ExifTool instance
"""
self._json_cache = None
self._asdict_cache = {}
super().__init__(filepath, exiftool=exiftool, overwrite=False, flags=None)
def run_commands(self, *commands, no_file=False):
if commands[0] not in ["-json", "-ver"]:
raise NotImplementedError(f"{self.__class__} is read-only")
return super().run_commands(*commands, no_file=no_file)
def setvalue(self, tag, value):
raise NotImplementedError(f"{self.__class__} is read-only")
def addvalues(self, tag, *values):
raise NotImplementedError(f"{self.__class__} is read-only")
def json(self):
if not self._json_cache:
self._json_cache = super().json()
return self._json_cache
def asdict(self, tag_groups=True, normalized=False):
"""return dictionary of all EXIF tags and values from exiftool
returns empty dict if no tags
Args:
tag_groups: if True (default), dict keys have tag groups, e.g. "IPTC:Keywords"; if False, drops groups from keys, e.g. "Keywords"
normalized: if True, dict keys are all normalized to lower case (default is False)
"""
try:
return self._asdict_cache[tag_groups][normalized]
except KeyError:
if tag_groups not in self._asdict_cache:
self._asdict_cache[tag_groups] = {}
self._asdict_cache[tag_groups][normalized] = super().asdict(
tag_groups=tag_groups, normalized=normalized
)
return self._asdict_cache[tag_groups][normalized]
def flush_cache(self):
""" Clear cached data so that calls to json or asdict return fresh data """
self._json_cache = None
self._asdict_cache = {}

193
tests/test_exiftool.py Normal file
View File

@ -0,0 +1,193 @@
import json
import pytest
import subprocess
import dozo.exiftool
from dozo.exiftool import get_exiftool_path
TEST_FILE_ONE_KEYWORD = "samples/images/wedding.jpg"
TEST_FILE_BAD_IMAGE = "samples/images/badimage.jpeg"
TEST_FILE_WARNING = "samples/images/exiftool_warning.heic"
TEST_FILE_MULTI_KEYWORD = "samples/images/Tulips.jpg"
TEST_MULTI_KEYWORDS = [
"Top Shot",
"flowers",
"flower",
"design",
"Stock Photography",
"vibrant",
"plastic",
"Digital Nomad",
"close up",
"stock photo",
"outdoor",
"wedding",
"Reiseblogger",
"fake",
"colorful",
"Indoor",
"display",
"photography",
]
PHOTOS_DB = "tests/Test-10.15.4.photoslibrary"
EXIF_UUID = {
"6191423D-8DB8-4D4C-92BE-9BBBA308AAC4": {
"EXIF:DateTimeOriginal": "2019:07:04 16:24:01",
"EXIF:LensModel": "XF18-55mmF2.8-4 R LM OIS",
"IPTC:Keywords": [
"Digital Nomad",
"Indoor",
"Reiseblogger",
"Stock Photography",
"Top Shot",
"close up",
"colorful",
"design",
"display",
"fake",
"flower",
"outdoor",
"photography",
"plastic",
"stock photo",
"vibrant",
],
"IPTC:DocumentNotes": "https://flickr.com/e/l7FkSm4f2lQkSV3CG6xlv8Sde5uF3gVu4Hf0Qk11AnU%3D",
},
"E9BC5C36-7CD1-40A1-A72B-8B8FAC227D51": {
"EXIF:Make": "NIKON CORPORATION",
"EXIF:Model": "NIKON D810",
"IPTC:DateCreated": "2019:04:15",
},
}
EXIF_UUID_NO_GROUPS = {
"6191423D-8DB8-4D4C-92BE-9BBBA308AAC4": {
"DateTimeOriginal": "2019:07:04 16:24:01",
"LensModel": "XF18-55mmF2.8-4 R LM OIS",
"Keywords": [
"Digital Nomad",
"Indoor",
"Reiseblogger",
"Stock Photography",
"Top Shot",
"close up",
"colorful",
"design",
"display",
"fake",
"flower",
"outdoor",
"photography",
"plastic",
"stock photo",
"vibrant",
],
"DocumentNotes": "https://flickr.com/e/l7FkSm4f2lQkSV3CG6xlv8Sde5uF3gVu4Hf0Qk11AnU%3D",
},
"E9BC5C36-7CD1-40A1-A72B-8B8FAC227D51": {
"Make": "NIKON CORPORATION",
"Model": "NIKON D810",
"DateCreated": "2019:04:15",
},
}
EXIF_UUID_NONE = ["A1DD1F98-2ECD-431F-9AC9-5AFEFE2D3A5C"]
try:
exiftool = get_exiftool_path()
except:
exiftool = None
if exiftool is None:
pytest.skip("could not find exiftool in path", allow_module_level=True)
def test_get_exiftool_path():
exiftool = dozo.exiftool.get_exiftool_path()
assert exiftool is not None
def test_version():
exif = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif.version is not None
assert isinstance(exif.version, str)
def test_read():
exif = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif.data["File:MIMEType"] == "image/jpeg"
assert exif.data["EXIF:ISO"] == 160
assert exif.data["IPTC:Keywords"] == "wedding"
def test_singleton():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exif2 = dozo.exiftool.ExifTool(TEST_FILE_MULTI_KEYWORD)
assert exif1._process.pid == exif2._process.pid
def test_pid():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif1.pid == exif1._process.pid
def test_exiftoolproc_process():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif1._exiftoolproc.process is not None
def test_exiftoolproc_exiftool():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif1._exiftoolproc.exiftool == dozo.exiftool.get_exiftool_path()
def test_as_dict():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exifdata = exif1.asdict()
assert exifdata["XMP:TagsList"] == "wedding"
def test_as_dict_normalized():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exifdata = exif1.asdict(normalized=True)
assert exifdata["xmp:tagslist"] == "wedding"
assert "XMP:TagsList" not in exifdata
def test_as_dict_no_tag_groups():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exifdata = exif1.asdict(tag_groups=False)
assert exifdata["TagsList"] == "wedding"
def test_json():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
exifdata = json.loads(exif1.json())
assert exifdata[0]["XMP:TagsList"] == "wedding"
def test_str():
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert "file: " in str(exif1)
assert "exiftool: " in str(exif1)
def test_exiftool_terminate():
""" Test that exiftool process is terminated when exiftool.terminate() is called """
exif1 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
ps = subprocess.run(["ps"], capture_output=True)
stdout = ps.stdout.decode("utf-8")
assert "exiftool" in stdout
dozo.exiftool.terminate_exiftool()
ps = subprocess.run(["ps"], capture_output=True)
stdout = ps.stdout.decode("utf-8")
assert "exiftool" not in stdout
# verify we can create a new instance after termination
exif2 = dozo.exiftool.ExifTool(TEST_FILE_ONE_KEYWORD)
assert exif2.asdict()["IPTC:Keywords"] == "wedding"