Use pathlib in cli and fix constants upercase style
This commit is contained in:
parent
202366a8f9
commit
648930f139
|
@ -130,23 +130,17 @@ def _get_exclude(opt, exclude):
|
|||
|
||||
|
||||
def get_collection_config(root):
|
||||
return Config(os.path.join(root, '.ordigi', 'ordigi.conf'))
|
||||
return Config(root.joinpath('.ordigi', 'ordigi.conf'))
|
||||
|
||||
|
||||
def _get_paths(paths, root):
|
||||
root = Path(root).absolute()
|
||||
if not paths:
|
||||
paths = [root]
|
||||
paths = set(paths)
|
||||
|
||||
return paths, root
|
||||
|
||||
def _get_subpaths(relpaths, root):
|
||||
if not relpaths:
|
||||
paths = {root}
|
||||
else:
|
||||
paths = set()
|
||||
for relpath in relpaths:
|
||||
paths.add(os.path.join(root, relpath))
|
||||
for path in paths:
|
||||
paths.add(Path(path).absolute())
|
||||
|
||||
return paths, root
|
||||
|
||||
|
@ -248,7 +242,7 @@ def _sort(**kwargs):
|
|||
|
||||
subdirs = kwargs['subdirs']
|
||||
root = kwargs['dest']
|
||||
paths, root = _get_subpaths(subdirs, root)
|
||||
paths, root = _get_paths(subdirs, root)
|
||||
|
||||
cache = True
|
||||
if kwargs['reset_cache']:
|
||||
|
@ -335,7 +329,7 @@ def _clean(**kwargs):
|
|||
|
||||
subdirs = kwargs['subdirs']
|
||||
root = kwargs['collection']
|
||||
paths, root = _get_subpaths(subdirs, root)
|
||||
paths, root = _get_paths(subdirs, root)
|
||||
|
||||
clean_all = False
|
||||
if not folders:
|
||||
|
@ -386,7 +380,7 @@ def _init(**kwargs):
|
|||
"""
|
||||
Init media collection database.
|
||||
"""
|
||||
root = kwargs['path']
|
||||
root = Path(kwargs['path']).absolute()
|
||||
config = get_collection_config(root)
|
||||
opt = config.get_options()
|
||||
log_level = log.level(kwargs['verbose'], kwargs['debug'])
|
||||
|
@ -407,7 +401,7 @@ def _update(**kwargs):
|
|||
"""
|
||||
Update media collection database.
|
||||
"""
|
||||
root = kwargs['path']
|
||||
root = Path(kwargs['path']).absolute()
|
||||
config = get_collection_config(root)
|
||||
opt = config.get_options()
|
||||
log_level = log.level(kwargs['verbose'], kwargs['debug'])
|
||||
|
@ -430,7 +424,7 @@ def _check(**kwargs):
|
|||
"""
|
||||
log_level = log.level(kwargs['verbose'], kwargs['debug'])
|
||||
logger = log.get_logger(level=log_level)
|
||||
root = kwargs['path']
|
||||
root = Path(kwargs['path']).absolute()
|
||||
config = get_collection_config(root)
|
||||
opt = config.get_options()
|
||||
collection = Collection(root, exclude=opt['exclude'], logger=logger)
|
||||
|
@ -477,7 +471,7 @@ def _compare(**kwargs):
|
|||
|
||||
subdirs = kwargs['subdirs']
|
||||
root = kwargs['collection']
|
||||
paths, root = _get_subpaths(subdirs, root)
|
||||
paths, root = _get_paths(subdirs, root)
|
||||
|
||||
config = get_collection_config(root)
|
||||
opt = config.get_options()
|
||||
|
|
|
@ -140,8 +140,9 @@ class FPath:
|
|||
if date is not None:
|
||||
part = self.get_early_morning_photos_date(date, mask)
|
||||
elif item == 'folder':
|
||||
part = os.path.basename(metadata['subdirs'])
|
||||
|
||||
folder = os.path.basename(metadata['subdirs'])
|
||||
if folder != metadata['src_dir']:
|
||||
part = folder
|
||||
elif item == 'folders':
|
||||
folders = Path(metadata['subdirs']).parts
|
||||
folders = self._get_folders(folders, mask)
|
||||
|
@ -270,7 +271,7 @@ class Collection:
|
|||
):
|
||||
|
||||
# Attributes
|
||||
self.root = Path(root).expanduser().absolute()
|
||||
self.root = root.expanduser().absolute()
|
||||
if not self.root.exists():
|
||||
logger.error(f'Directory {self.root} does not exist')
|
||||
sys.exit(1)
|
||||
|
@ -742,7 +743,6 @@ class Collection:
|
|||
self, src_dirs, import_mode=None, ignore_tags=set(), loc=None
|
||||
):
|
||||
"""Get medias data"""
|
||||
src_dir_in_collection = False
|
||||
for src_dir in src_dirs:
|
||||
self.dest_list = []
|
||||
src_dir = self._check_path(src_dir)
|
||||
|
@ -750,9 +750,7 @@ class Collection:
|
|||
|
||||
# Get medias and src_dirs
|
||||
for src_path in self.src_list:
|
||||
if self.root in src_path.parents:
|
||||
src_dir_in_collection = True
|
||||
else:
|
||||
if self.root not in src_path.parents:
|
||||
if not import_mode:
|
||||
self.logger.error(f"""{src_path} not in {self.root}
|
||||
collection, use `ordigi import`""")
|
||||
|
@ -771,7 +769,7 @@ class Collection:
|
|||
)
|
||||
media.get_metadata(self.root, loc, self.db, self.cache)
|
||||
|
||||
yield media, src_dir_in_collection
|
||||
yield media
|
||||
|
||||
def _init_check_db(self, loc=None, ignore_tags=set()):
|
||||
if self.db.is_empty('metadata'):
|
||||
|
@ -875,11 +873,17 @@ class Collection:
|
|||
|
||||
return self.summary
|
||||
|
||||
def remove_empty_subdirs(self, directories):
|
||||
def _remove_empty_subdirs(self, directories, src_dirs):
|
||||
"""Remove empty subdir after moving files"""
|
||||
parents = set()
|
||||
for directory in directories:
|
||||
if not directory.is_dir():
|
||||
continue
|
||||
|
||||
if str(directory) in src_dirs:
|
||||
continue
|
||||
|
||||
# if folder empty, delete it
|
||||
if directory.is_dir():
|
||||
files = os.listdir(directory)
|
||||
if len(files) == 0:
|
||||
if not self.dry_run:
|
||||
|
@ -889,7 +893,31 @@ class Collection:
|
|||
parents.add(directory.parent)
|
||||
|
||||
if parents != set():
|
||||
self.remove_empty_subdirs(parents)
|
||||
self._remove_empty_subdirs(parents, src_dirs)
|
||||
|
||||
def remove_empty_folders(self, directory, remove_root=True):
|
||||
"""Remove empty sub-folders in collection"""
|
||||
if not os.path.isdir(directory):
|
||||
self.summary.append((directory, False))
|
||||
return self.summary
|
||||
|
||||
# remove empty subfolders
|
||||
files = os.listdir(directory)
|
||||
if len(files):
|
||||
for f in files:
|
||||
fullpath = os.path.join(directory, f)
|
||||
if os.path.isdir(fullpath):
|
||||
self.remove_empty_folders(fullpath)
|
||||
|
||||
# if folder empty, delete it
|
||||
files = os.listdir(directory)
|
||||
if len(files) == 0 and remove_root:
|
||||
self.logger.info(f"Removing empty folder: {directory}")
|
||||
if not self.dry_run:
|
||||
os.rmdir(directory)
|
||||
self.summary.append((directory, 'remove_empty_folders'))
|
||||
|
||||
return self.summary
|
||||
|
||||
def sort_file(self, src_path, dest_path, media, import_mode=False):
|
||||
if import_mode == 'copy':
|
||||
|
@ -926,23 +954,23 @@ class Collection:
|
|||
|
||||
# Get medias data
|
||||
files_data = []
|
||||
src_dirs_in_collection = set()
|
||||
for media, src_dir_in_collection in self._get_medias_data(
|
||||
subdirs = set()
|
||||
for media in self._get_medias_data(
|
||||
src_dirs,
|
||||
import_mode=import_mode, ignore_tags=ignore_tags, loc=loc,
|
||||
):
|
||||
# Get the destination path according to metadata
|
||||
fpath = FPath(path_format, self.day_begins, self.logger)
|
||||
relpath = Path(fpath.get_path(media.metadata))
|
||||
if src_dir_in_collection:
|
||||
src_dirs_in_collection.add(media.file_path.parent)
|
||||
subdirs.add(media.file_path.parent)
|
||||
|
||||
files_data.append((copy(media), relpath))
|
||||
|
||||
# Sort files and solve conflicts
|
||||
self._sort_medias(files_data, import_mode, remove_duplicates)
|
||||
|
||||
self.remove_empty_subdirs(src_dirs_in_collection)
|
||||
if import_mode != 'copy':
|
||||
self._remove_empty_subdirs(subdirs, src_dirs)
|
||||
|
||||
if not self._check_processed():
|
||||
self.summary.append((None, False))
|
||||
|
@ -972,7 +1000,7 @@ class Collection:
|
|||
|
||||
# Get medias data
|
||||
files_data = []
|
||||
for media, _ in self._get_medias_data(paths):
|
||||
for media in self._get_medias_data(paths):
|
||||
# Deduplicate the path
|
||||
src_path = media.file_path
|
||||
path_parts = src_path.relative_to(self.root).parts
|
||||
|
@ -999,30 +1027,6 @@ class Collection:
|
|||
|
||||
return self.summary
|
||||
|
||||
def remove_empty_folders(self, directory, remove_root=True):
|
||||
'Function to remove empty folders'
|
||||
if not os.path.isdir(directory):
|
||||
self.summary.append((directory, False))
|
||||
return self.summary
|
||||
|
||||
# remove empty subfolders
|
||||
files = os.listdir(directory)
|
||||
if len(files):
|
||||
for f in files:
|
||||
fullpath = os.path.join(directory, f)
|
||||
if os.path.isdir(fullpath):
|
||||
self.remove_empty_folders(fullpath)
|
||||
|
||||
# if folder empty, delete it
|
||||
files = os.listdir(directory)
|
||||
if len(files) == 0 and remove_root:
|
||||
self.logger.info(f"Removing empty folder: {directory}")
|
||||
if not self.dry_run:
|
||||
os.rmdir(directory)
|
||||
self.summary.append((directory, 'remove_empty_folders'))
|
||||
|
||||
return self.summary
|
||||
|
||||
def _get_images(self, path):
|
||||
"""
|
||||
:returns: iter
|
||||
|
|
|
@ -26,7 +26,7 @@ class Config:
|
|||
return False
|
||||
|
||||
def load_config(self):
|
||||
if not path.exists(self.conf_path):
|
||||
if not self.conf_path.exists():
|
||||
return {}
|
||||
|
||||
conf = RawConfigParser()
|
||||
|
@ -55,7 +55,7 @@ class Config:
|
|||
elif 'dirs_path' and 'name' in self.conf['Path']:
|
||||
return self.conf['Path']['dirs_path'] + '/' + self.conf['Path']['name']
|
||||
|
||||
return constants.default_path + '/' + constants.default_name
|
||||
return constants.DEFAULT_PATH + '/' + constants.DEFAULT_NAME
|
||||
|
||||
def get_options(self):
|
||||
"""Get config options
|
||||
|
@ -67,7 +67,7 @@ class Config:
|
|||
if geocoder and geocoder in ('Nominatim',):
|
||||
options['geocoder'] = geocoder
|
||||
else:
|
||||
options['geocoder'] = constants.default_geocoder
|
||||
options['geocoder'] = constants.DEFAULT_GEOCODER
|
||||
|
||||
prefer_english_names = self.get_option('prefer_english_names', 'Geolocation')
|
||||
if prefer_english_names:
|
||||
|
|
|
@ -2,22 +2,27 @@
|
|||
Settings.
|
||||
"""
|
||||
|
||||
from os import environ, path
|
||||
from os import environ
|
||||
from pathlib import Path
|
||||
|
||||
#: If True, debug messages will be printed.
|
||||
debug = False
|
||||
|
||||
# Ordigi settings directory.
|
||||
if 'XDG_CONFIG_HOME' in environ:
|
||||
confighome = environ['XDG_CONFIG_HOME']
|
||||
elif 'APPDATA' in environ:
|
||||
confighome = environ['APPDATA']
|
||||
else:
|
||||
confighome = path.join(environ['HOME'], '.config')
|
||||
application_directory = path.join(confighome, 'ordigi')
|
||||
def get_config_dir(name):
|
||||
if 'XDG_CONFIG_HOME' in environ:
|
||||
confighome = Path(environ['XDG_CONFIG_HOME'])
|
||||
elif 'APPDATA' in environ:
|
||||
confighome = Path(environ['APPDATA'])
|
||||
else:
|
||||
confighome = Path(environ['HOME'], '.config')
|
||||
|
||||
default_path = '{%Y-%m-%b}/{album}|{city}'
|
||||
default_name = '{%Y-%m-%d_%H-%M-%S}-{name}-{title}.%l{ext}'
|
||||
default_geocoder = 'Nominatim'
|
||||
return confighome / name
|
||||
|
||||
CONFIG_FILE = path.join(application_directory, 'ordigi.conf')
|
||||
APPLICATION_DIRECTORY = get_config_dir('ordigi')
|
||||
|
||||
DEFAULT_PATH = '{%Y-%m-%b}/{album}|{city}'
|
||||
DEFAULT_NAME = '{%Y-%m-%d_%H-%M-%S}-{name}-{title}.%l{ext}'
|
||||
DEFAULT_GEOCODER = 'Nominatim'
|
||||
|
||||
CONFIG_FILE = APPLICATION_DIRECTORY / 'ordigi.conf'
|
||||
|
|
|
@ -23,7 +23,7 @@ class TestFPath:
|
|||
@pytest.fixture(autouse=True)
|
||||
def setup_class(cls, sample_files_paths):
|
||||
cls.src_path, cls.file_paths = sample_files_paths
|
||||
cls.path_format = constants.default_path + '/' + constants.default_name
|
||||
cls.path_format = constants.DEFAULT_PATH + '/' + constants.DEFAULT_NAME
|
||||
cls.logger = log.get_logger(level=10)
|
||||
|
||||
def test_get_part(self, tmp_path):
|
||||
|
@ -121,7 +121,7 @@ class TestCollection:
|
|||
@pytest.fixture(autouse=True)
|
||||
def setup_class(cls, sample_files_paths):
|
||||
cls.src_path, cls.file_paths = sample_files_paths
|
||||
cls.path_format = constants.default_path + '/' + constants.default_name
|
||||
cls.path_format = constants.DEFAULT_PATH + '/' + constants.DEFAULT_NAME
|
||||
cls.logger = log.get_logger(level=10)
|
||||
|
||||
def teardown_class(self):
|
||||
|
|
|
@ -40,7 +40,7 @@ class TestConfig:
|
|||
def test_load_config_no_exist(self):
|
||||
# test file not exist
|
||||
config = Config()
|
||||
config.conf_path = 'filename'
|
||||
config.conf_path = Path('filename')
|
||||
assert config.load_config() == {}
|
||||
|
||||
def test_load_config_invalid(self, conf_path):
|
||||
|
|
Loading…
Reference in New Issue