460 lines
16 KiB
Python
460 lines
16 KiB
Python
""" Yet another simple exiftool wrapper from:
|
|
https://github.com/RhetTbull/osxphotos/blob/master/osxphotos/exiftool.py
|
|
"""
|
|
|
|
import atexit
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
from abc import ABC, abstractmethod
|
|
from functools import lru_cache # pylint: disable=syntax-error
|
|
|
|
# exiftool -stay_open commands outputs this EOF marker after command is run
|
|
EXIFTOOL_STAYOPEN_EOF = "{ready}"
|
|
EXIFTOOL_STAYOPEN_EOF_LEN = len(EXIFTOOL_STAYOPEN_EOF)
|
|
|
|
# list of exiftool processes to cleanup when exiting or when terminate is called
|
|
EXIFTOOL_PROCESSES = []
|
|
|
|
|
|
def exiftool_is_running():
|
|
ps = subprocess.run(["ps"], capture_output=True)
|
|
stdout = ps.stdout.decode("utf-8")
|
|
return "exiftool" in stdout
|
|
|
|
|
|
@atexit.register
|
|
def terminate_exiftool():
|
|
"""Terminate any running ExifTool subprocesses; call this to cleanup when done using ExifTool """
|
|
for proc in EXIFTOOL_PROCESSES:
|
|
proc._stop_proc()
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def get_exiftool_path():
|
|
""" return path of exiftool, cache result """
|
|
exiftool_path = shutil.which("exiftool")
|
|
if exiftool_path:
|
|
return exiftool_path.rstrip()
|
|
else:
|
|
raise FileNotFoundError(
|
|
"Could not find exiftool. Please download and install from "
|
|
"https://exiftool.org/"
|
|
)
|
|
|
|
|
|
class _ExifToolProc:
|
|
"""Runs exiftool in a subprocess via Popen
|
|
Creates a singleton object"""
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
""" create new object or return instance of already created singleton """
|
|
if not hasattr(cls, "instance") or not cls.instance:
|
|
cls.instance = super().__new__(cls)
|
|
|
|
return cls.instance
|
|
|
|
def __init__(self, exiftool=None, logger=logging.getLogger()):
|
|
"""construct _ExifToolProc singleton object or return instance of already created object
|
|
exiftool: optional path to exiftool binary (if not provided, will search path to find it)"""
|
|
|
|
self.logger = logger
|
|
if hasattr(self, "_process_running") and self._process_running:
|
|
# already running
|
|
if exiftool is not None and exiftool != self._exiftool:
|
|
self.logger.warning(
|
|
f"exiftool subprocess already running, "
|
|
f"ignoring exiftool={exiftool}"
|
|
)
|
|
return
|
|
|
|
self._process_running = False
|
|
self._exiftool = exiftool or get_exiftool_path()
|
|
self._start_proc()
|
|
|
|
@property
|
|
def process(self):
|
|
""" return the exiftool subprocess """
|
|
if self._process_running:
|
|
return self._process
|
|
else:
|
|
self._start_proc()
|
|
return self._process
|
|
|
|
@property
|
|
def pid(self):
|
|
""" return process id (PID) of the exiftool process """
|
|
return self._process.pid
|
|
|
|
@property
|
|
def exiftool(self):
|
|
""" return path to exiftool process """
|
|
return self._exiftool
|
|
|
|
def _start_proc(self):
|
|
""" start exiftool in batch mode """
|
|
|
|
if self._process_running:
|
|
self.logger.warning("exiftool already running: {self._process}")
|
|
return
|
|
|
|
# open exiftool process
|
|
self._process = subprocess.Popen(
|
|
[
|
|
self._exiftool,
|
|
"-stay_open", # keep process open in batch mode
|
|
"True", # -stay_open=True, keep process open in batch mode
|
|
"-@", # read command-line arguments from file
|
|
"-", # read from stdin
|
|
"-common_args", # specifies args common to all commands subsequently run
|
|
"-n", # no print conversion (e.g. print tag values in machine readable format)
|
|
"-P", # Preserve file modification date/time
|
|
"-G", # print group name for each tag
|
|
],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
self._process_running = True
|
|
|
|
EXIFTOOL_PROCESSES.append(self)
|
|
|
|
def _stop_proc(self):
|
|
""" stop the exiftool process if it's running, otherwise, do nothing """
|
|
|
|
if not self._process_running:
|
|
return
|
|
|
|
try:
|
|
self._process.stdin.write(b"-stay_open\n")
|
|
self._process.stdin.write(b"False\n")
|
|
self._process.stdin.flush()
|
|
except Exception as e:
|
|
pass
|
|
|
|
try:
|
|
self._process.communicate(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
self._process.kill()
|
|
self._process.communicate()
|
|
|
|
del self._process
|
|
self._process_running = False
|
|
|
|
|
|
class ExifTool:
|
|
""" Basic exiftool interface for reading and writing EXIF tags """
|
|
|
|
def __init__(self, filepath, exiftool=None, overwrite=True, flags=None, logger=logging.getLogger()):
|
|
"""Create ExifTool object
|
|
|
|
Args:
|
|
file: path to image file
|
|
exiftool: path to exiftool, if not specified will look in path
|
|
overwrite: if True, will overwrite image file without creating backup, default=False
|
|
flags: optional list of exiftool flags to prepend to exiftool command when writing metadata (e.g. -m or -F)
|
|
|
|
Returns:
|
|
ExifTool instance
|
|
"""
|
|
self.file = filepath
|
|
self.overwrite = overwrite
|
|
self.flags = flags or []
|
|
self.data = {}
|
|
self.warning = None
|
|
self.error = None
|
|
# if running as a context manager, self._context_mgr will be True
|
|
self._context_mgr = False
|
|
self._exiftoolproc = _ExifToolProc(exiftool=exiftool, logger=logger)
|
|
self._read_exif()
|
|
|
|
@property
|
|
def _process(self):
|
|
return self._exiftoolproc.process
|
|
|
|
def setvalue(self, tag, value):
|
|
"""Set tag to value(s); if value is None, will delete tag
|
|
|
|
Args:
|
|
tag: str; name of tag to set
|
|
value: str; value to set tag to
|
|
|
|
Returns:
|
|
True if success otherwise False
|
|
|
|
If error generated by exiftool, returns False and sets self.error to error string
|
|
If warning generated by exiftool, returns True (unless there was also an error) and sets self.warning to warning string
|
|
If called in context manager, returns True (execution is delayed until exiting context manager)
|
|
"""
|
|
|
|
if value is None:
|
|
value = ""
|
|
command = [f"-{tag}={value}"]
|
|
if self.overwrite and not self._context_mgr:
|
|
command.append("-overwrite_original")
|
|
|
|
# avoid "Warning: Some character(s) could not be encoded in Latin" warning
|
|
command.append("-iptc:codedcharacterset=utf8")
|
|
|
|
if self._context_mgr:
|
|
self._commands.extend(command)
|
|
return True
|
|
else:
|
|
_, _, error = self.run_commands(*command)
|
|
return error == ""
|
|
|
|
def addvalues(self, tag, *values):
|
|
"""Add one or more value(s) to tag
|
|
If more than one value is passed, each value will be added to the tag
|
|
|
|
Args:
|
|
tag: str; tag to set
|
|
*values: str; one or more values to set
|
|
|
|
Returns:
|
|
True if success otherwise False
|
|
|
|
If error generated by exiftool, returns False and sets self.error to error string
|
|
If warning generated by exiftool, returns True (unless there was also an error) and sets self.warning to warning string
|
|
If called in context manager, returns True (execution is delayed until exiting context manager)
|
|
|
|
Notes: exiftool may add duplicate values for some tags so the caller must ensure
|
|
the values being added are not already in the EXIF data
|
|
For some tags, such as IPTC:Keywords, this will add a new value to the list of keywords,
|
|
but for others, such as EXIF:ISO, this will literally add a value to the existing value.
|
|
It's up to the caller to know what exiftool will do for each tag
|
|
If setvalue called before addvalues, exiftool does not appear to add duplicates,
|
|
but if addvalues called without first calling setvalue, exiftool will add duplicate values
|
|
"""
|
|
if not values:
|
|
raise ValueError("Must pass at least one value")
|
|
|
|
command = []
|
|
for value in values:
|
|
if value is None:
|
|
raise ValueError("Can't add None value to tag")
|
|
command.append(f"-{tag}+={value}")
|
|
|
|
if self.overwrite and not self._context_mgr:
|
|
command.append("-overwrite_original")
|
|
|
|
if self._context_mgr:
|
|
self._commands.extend(command)
|
|
return True
|
|
else:
|
|
_, _, error = self.run_commands(*command)
|
|
return error == ""
|
|
|
|
def run_commands(self, *commands, no_file=False):
|
|
"""Run commands in the exiftool process and return result.
|
|
|
|
Args:
|
|
*commands: exiftool commands to run
|
|
no_file: (bool) do not pass the filename to exiftool (default=False)
|
|
by default, all commands will be run against self.file
|
|
use no_file=True to run a command without passing the filename
|
|
Returns:
|
|
(output, warning, errror)
|
|
output: bytes is containing output of exiftool commands
|
|
warning: if exiftool generated warnings, string containing warning otherwise empty string
|
|
error: if exiftool generated errors, string containing otherwise empty string
|
|
|
|
Note: Also sets self.warning and self.error if warning or error generated.
|
|
"""
|
|
if not (hasattr(self, "_process") and self._process):
|
|
raise ValueError("exiftool process is not running")
|
|
|
|
if not commands:
|
|
raise TypeError("must provide one or more command to run")
|
|
|
|
if self._context_mgr and self.overwrite:
|
|
commands = list(commands)
|
|
commands.append("-overwrite_original")
|
|
|
|
filename = os.fsencode(self.file) if not no_file else b""
|
|
|
|
if self.flags:
|
|
# need to split flags, e.g. so "--ext AVI" becomes ["--ext", "AVI"]
|
|
flags = []
|
|
for f in self.flags:
|
|
flags.extend(f.split())
|
|
command_str = b"\n".join([f.encode("utf-8") for f in flags])
|
|
command_str += b"\n"
|
|
else:
|
|
command_str = b""
|
|
|
|
command_str += (
|
|
b"\n".join([c.encode("utf-8") for c in commands])
|
|
+ b"\n"
|
|
+ filename
|
|
+ b"\n"
|
|
+ b"-execute\n"
|
|
)
|
|
|
|
# send the command
|
|
self._process.stdin.write(command_str)
|
|
self._process.stdin.flush()
|
|
|
|
# read the output
|
|
output = b""
|
|
warning = b""
|
|
error = b""
|
|
while EXIFTOOL_STAYOPEN_EOF not in str(output):
|
|
line = self._process.stdout.readline()
|
|
if line.startswith(b"Warning"):
|
|
warning += line.strip()
|
|
elif line.startswith(b"Error"):
|
|
error += line.strip()
|
|
else:
|
|
output += line.strip()
|
|
warning = "" if warning == b"" else warning.decode("utf-8")
|
|
error = "" if error == b"" else error.decode("utf-8")
|
|
self.warning = warning
|
|
self.error = error
|
|
return output[:-EXIFTOOL_STAYOPEN_EOF_LEN], warning, error
|
|
|
|
@property
|
|
def pid(self):
|
|
""" return process id (PID) of the exiftool process """
|
|
return self._process.pid
|
|
|
|
@property
|
|
def version(self):
|
|
""" returns exiftool version """
|
|
ver, _, _ = self.run_commands("-ver", no_file=True)
|
|
return ver.decode("utf-8")
|
|
|
|
def asdict(self, tag_groups=True, normalized=False):
|
|
"""return dictionary of all EXIF tags and values from exiftool
|
|
returns empty dict if no tags
|
|
|
|
Args:
|
|
tag_groups: if True (default), dict keys have tag groups, e.g. "IPTC:Keywords"; if False, drops groups from keys, e.g. "Keywords"
|
|
normalized: if True, dict keys are all normalized to lower case (default is False)
|
|
"""
|
|
json_str, _, _ = self.run_commands("-json")
|
|
if not json_str:
|
|
return dict()
|
|
|
|
try:
|
|
exifdict = json.loads(json_str)
|
|
except Exception as e:
|
|
# will fail with some commands, e.g --ext AVI which produces
|
|
# 'No file with specified extension' instead of json
|
|
return dict()
|
|
|
|
exifdict = exifdict[0]
|
|
if not tag_groups:
|
|
# strip tag groups
|
|
exif_new = {}
|
|
for k, v in exifdict.items():
|
|
k = re.sub(r".*:", "", k)
|
|
exif_new[k] = v
|
|
exifdict = exif_new
|
|
|
|
if normalized:
|
|
exifdict = {k.lower(): v for (k, v) in exifdict.items()}
|
|
|
|
return exifdict
|
|
|
|
def json(self):
|
|
""" returns JSON string containing all EXIF tags and values from exiftool """
|
|
json, _, _ = self.run_commands("-json")
|
|
return json
|
|
|
|
def _read_exif(self):
|
|
""" read exif data from file """
|
|
data = self.asdict()
|
|
self.data = {k: v for k, v in data.items()}
|
|
|
|
def __str__(self):
|
|
return f"file: {self.file}\nexiftool: {self._exiftoolproc._exiftool}"
|
|
|
|
def __enter__(self):
|
|
self._context_mgr = True
|
|
self._commands = []
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
if exc_type:
|
|
return False
|
|
elif self._commands:
|
|
# run_commands sets self.warning and self.error as needed
|
|
self.run_commands(*self._commands)
|
|
|
|
|
|
class ExifToolCaching(ExifTool):
|
|
""" Basic exiftool interface for reading and writing EXIF tags, with caching.
|
|
Use this only when you know the file's EXIF data will not be changed by any external process.
|
|
|
|
Creates a singleton cached ExifTool instance """
|
|
|
|
_singletons = {}
|
|
|
|
def __new__(cls, filepath, exiftool=None, logger=logging.getLogger()):
|
|
""" create new object or return instance of already created singleton """
|
|
if filepath not in cls._singletons:
|
|
cls._singletons[filepath] = _ExifToolCaching(filepath,
|
|
exiftool=exiftool, logger=logger)
|
|
return cls._singletons[filepath]
|
|
|
|
|
|
class _ExifToolCaching(ExifTool):
|
|
def __init__(self, filepath, exiftool=None, logger=logging.getLogger()):
|
|
"""Create read-only ExifTool object that caches values
|
|
|
|
Args:
|
|
file: path to image file
|
|
exiftool: path to exiftool, if not specified will look in path
|
|
|
|
Returns:
|
|
ExifTool instance
|
|
"""
|
|
self._json_cache = None
|
|
self._asdict_cache = {}
|
|
super().__init__(filepath, exiftool=exiftool, overwrite=False,
|
|
flags=None, logger=logger)
|
|
|
|
def run_commands(self, *commands, no_file=False):
|
|
if commands[0] not in ["-json", "-ver"]:
|
|
raise NotImplementedError(f"{self.__class__} is read-only")
|
|
return super().run_commands(*commands, no_file=no_file)
|
|
|
|
def setvalue(self, tag, value):
|
|
raise NotImplementedError(f"{self.__class__} is read-only")
|
|
|
|
def addvalues(self, tag, *values):
|
|
raise NotImplementedError(f"{self.__class__} is read-only")
|
|
|
|
def json(self):
|
|
if not self._json_cache:
|
|
self._json_cache = super().json()
|
|
return self._json_cache
|
|
|
|
def asdict(self, tag_groups=True, normalized=False):
|
|
"""return dictionary of all EXIF tags and values from exiftool
|
|
returns empty dict if no tags
|
|
|
|
Args:
|
|
tag_groups: if True (default), dict keys have tag groups, e.g. "IPTC:Keywords"; if False, drops groups from keys, e.g. "Keywords"
|
|
normalized: if True, dict keys are all normalized to lower case (default is False)
|
|
"""
|
|
try:
|
|
return self._asdict_cache[tag_groups][normalized]
|
|
except KeyError:
|
|
if tag_groups not in self._asdict_cache:
|
|
self._asdict_cache[tag_groups] = {}
|
|
self._asdict_cache[tag_groups][normalized] = super().asdict(
|
|
tag_groups=tag_groups, normalized=normalized
|
|
)
|
|
return self._asdict_cache[tag_groups][normalized]
|
|
|
|
def flush_cache(self):
|
|
""" Clear cached data so that calls to json or asdict return fresh data """
|
|
self._json_cache = None
|
|
self._asdict_cache = {}
|
|
|