-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathimmich_auto_album.py
2043 lines (1760 loc) · 85.6 KB
/
immich_auto_album.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Python script for creating albums in Immich from folder names in an external library."""
from typing import Tuple
import argparse
import logging
import sys
import fnmatch
import os
import datetime
from collections import defaultdict, OrderedDict
import random
from urllib.error import HTTPError
import regex
import yaml
import urllib3
import requests
# Script Constants
# Constants holding script run modes
# Create albums based on folder names and script arguments
SCRIPT_MODE_CREATE = "CREATE"
# Create album names based on folder names, but delete these albums
SCRIPT_MODE_CLEANUP = "CLEANUP"
# Delete ALL albums
SCRIPT_MODE_DELETE_ALL = "DELETE_ALL"
# Environment variable to check if the script is running inside Docker
ENV_IS_DOCKER = "IS_DOCKER"
# List of allowed share user roles
SHARE_ROLES = ["editor", "viewer"]
# Immich API request timeout
REQUEST_TIMEOUT_DEFAULT = 20
# Constants for album thumbnail setting
ALBUM_THUMBNAIL_RANDOM_ALL = "random-all"
ALBUM_THUMBNAIL_RANDOM_FILTERED = "random-filtered"
ALBUM_THUMBNAIL_SETTINGS = ["first", "last", "random"]
ALBUM_THUMBNAIL_SETTINGS_GLOBAL = ALBUM_THUMBNAIL_SETTINGS + [ALBUM_THUMBNAIL_RANDOM_ALL, ALBUM_THUMBNAIL_RANDOM_FILTERED]
ALBUM_THUMBNAIL_STATIC_INDICES = {
"first": 0,
"last": -1,
}
# File name to use for album properties files
ALBUMPROPS_FILE_NAME = '.albumprops'
class AlbumMergeException(Exception):
"""Error thrown when trying to override an existing property"""
# Disable pylint rule for too many instance attributes
# pylint: disable=R0902
class AlbumModel:
"""Model of an album with all properties necessary for handling albums in the scope of this script"""
# Album Merge Mode indicating only properties should be merged that are
# not already set in the merge target
ALBUM_MERGE_MODE_EXCLUSIVE = 1
# Same as ALBUM_MERGE_MODE_EXCLUSIVE, but also raises an error
# if attempting to overwrite an existing property when merging
ALBUM_MERGE_MODE_EXCLUSIVE_EX = 2
# Override any property in the merge target if already exists
ALBUM_MERGE_MODE_OVERRIDE = 3
# List of class attribute names that are relevant for album properties handling
# This list is used for album model merging and validation
ALBUM_PROPERTIES_VARIABLES = ['override_name', 'description', 'share_with', 'thumbnail_setting', 'sort_order', 'archive', 'comments_and_likes_enabled']
def __init__(self, name : str):
# The album ID, set after it was created
self.id = None
# The album name
self.name = name
# The override album name, takes precedence over name for album creation
self.override_name = None
# The description to set for the album
self.description = None
# A list of dicts with Immich assets
self.assets = []
# a list of dicts with keys user and role, listing all users and their role to share the album with
self.share_with = []
# Either a fully qualified asset path or one of 'first', 'last', 'random'
self.thumbnail_setting = None
# Sorting order for this album, 'asc' or 'desc'
self.sort_order = None
# Boolean indicating whether assets in this album should be archived after adding
self.archive = None
# Boolean indicating whether assets in this albums can be commented on and liked
self.comments_and_likes_enabled = None
def get_album_properties_dict(self) -> dict:
"""
Returns this class' attributes relevant for album properties handling
as a dictionary
Returns
---------
A dictionary of all album properties
"""
props = dict(vars(self))
for prop in list(props.keys()):
if prop not in AlbumModel.ALBUM_PROPERTIES_VARIABLES:
del props[prop]
return props
def __str__(self) -> str:
"""
Returns a string representation of this most important album properties
Returns
---------
A string for printing this album model's properties
"""
return str(self.get_album_properties_dict())
def get_asset_uuids(self) -> list:
"""
Gathers UUIDs of all assets and returns them
Returns
---------
A list of asset UUIDs
"""
return [asset_to_add['id'] for asset_to_add in self.assets]
def find_incompatible_properties(self, other) -> list[str]:
"""
Checks whether this Album Model and the other album model are compatible in terms of
describing the same album for creation in a way that no album properties are in conflict
with each other.
All properties must either bei the same or not present in both objects, except for
- id
- name
- assets
Parameters
----------
other : AlbumModel
The other album model to check against
Returns
---------
A list of string representations for incompatible properties. The list is empty
if there are no incompatible properties
"""
if not isinstance(other, AlbumModel):
return False
incompatible_props = []
props = self.get_album_properties_dict()
other_props = other.get_album_properties_dict()
for prop in props:
if props[prop] != other_props[prop]:
incompatible_props.append(f'{prop}: {props[prop]} vs {other_props[prop]}')
return incompatible_props
def merge_from(self, other, merge_mode: int):
"""
Merges properties of other in self. The only properties not
considered for merging are
- id
- name
- assets
Parameters
----------
other : AlbumModel
The other album model to merge properties from
merge_mode: int
Defines how the merge should be performed:
- AlbumModel.ALBUM_MERGE_MODE_EXCLUSIVE: Only merge properties that are not already set in the merge target
- AlbumModel.ALBUM_MERGE_MODE_EXCLUSIVE_EX: Same as above, but also raises an exception if attempting to merge an existing property
- AlbumModel.ALBUM_MERGE_MODE_OVERRIDE: Overrides any existing property in merge target
"""
# Do not try to merge unrelated types
if not isinstance(other, AlbumModel):
logging.warning("Trying to merge AlbumModel with incompatible type!")
return
own_attribs = vars(self)
other_attribs = vars(other)
# Override merge mode
if merge_mode == AlbumModel.ALBUM_MERGE_MODE_OVERRIDE:
for prop_name in AlbumModel.ALBUM_PROPERTIES_VARIABLES:
if other_attribs[prop_name]:
own_attribs[prop_name] = other_attribs[prop_name]
# Exclusive merge modes
elif merge_mode in [AlbumModel.ALBUM_MERGE_MODE_EXCLUSIVE, AlbumModel.ALBUM_MERGE_MODE_EXCLUSIVE_EX]:
for prop_name in AlbumModel.ALBUM_PROPERTIES_VARIABLES:
if other_attribs[prop_name]:
if own_attribs[prop_name] and merge_mode == AlbumModel.ALBUM_MERGE_MODE_EXCLUSIVE_EX:
raise AlbumMergeException(f"Attempting to override {prop_name} in {self.name} with {other_attribs[prop_name]}")
own_attribs[prop_name] = other_attribs[prop_name]
def get_final_name(self) -> str:
"""
Gets the album model's name to use when talking to Immich, i.e.
returns override_name if set, otherwise name.
Returns
---------
override_name if set, otherwise name
"""
if self.override_name:
return self.override_name
return self.name
@staticmethod
def parse_album_properties_file(album_properties_file_path: str):
"""
Parses the provided album properties file into an AlbumModel
Parameters
----------
album_properties_file_path : str
The fully qualified path to a valid album properties file
Returns
---------
An AlbumModel that represents the album properties
Raises
---------
YAMLError
If the provided album properties file could not be found or parsed
"""
with open(album_properties_file_path, 'r', encoding="utf-8") as stream:
album_properties = yaml.safe_load(stream)
if album_properties:
album_props_template = AlbumModel(None)
album_props_template_vars = vars(album_props_template)
for album_prop_name in AlbumModel.ALBUM_PROPERTIES_VARIABLES:
if album_prop_name in album_properties:
album_props_template_vars[album_prop_name] = album_properties[album_prop_name]
return album_props_template
return None
def find_albumprops_files(paths: list[str]) -> list[str]:
"""
Recursively finds all album properties files in all passed paths.
Parameters
----------
paths : list[str]
A list of paths to search for album properties files
Returns
---------
A list of paths with all album properties files
"""
albumprops_files = []
for path in paths:
if not os.path.isdir(path):
logging.warning("Album Properties Discovery: Path %s does not exist!", path)
continue
for path_tuple in os.walk(path):
root = path_tuple[0]
filenames = path_tuple[2]
for filename in fnmatch.filter(filenames, ALBUMPROPS_FILE_NAME):
albumprops_files.append(os.path.join(root, filename))
return albumprops_files
def identify_root_path(path: str, root_path_list: list[str]) -> str:
"""
Identifies which root path is the parent of the provided path.
:param path: The path to find the root path for
:type path: str
:param root_path_list: The list of root paths to get the one path is a child of from
:type root_path_list: list[str]
:return: The root path from root_path_list that is the parent of path
:rtype: str
"""
for root_path in root_path_list:
if root_path in path:
return root_path
return None
def build_album_properties_templates() -> dict:
"""
Searches all root paths for album properties files,
applies ignore/filtering mechanisms, parses the files,
creates AlbumModel objects from them, performs validations and returns
a dictionary mapping mapping the album name (generated from the path the album properties file was found in)
to the album model file.
If a fatal error occurs during processing of album properties files (i.e. two files encountered targeting the same album with incompatible properties), the
program exits.
Returns
---------
A dictionary mapping mapping the album name (generated from the path the album properties file was found in)
to the album model files
"""
fatal_error_occurred = False
album_properties_file_paths = find_albumprops_files(root_paths)
# Dictionary mapping album name generated from album properties' path to the AlbumModel representing the
# album properties
album_props_templates = {}
album_name_to_album_properties_file_path = {}
for album_properties_file_path in album_properties_file_paths:
# First check global path_filter and ignore options
if is_path_ignored(album_properties_file_path):
continue
# Identify the root path
album_props_root_path = identify_root_path(album_properties_file_path, root_paths)
if not album_props_root_path:
continue
# Chunks of the asset's path below root_path
path_chunks = album_properties_file_path.replace(album_props_root_path, '').split('/')
# A single chunk means it's just the image file in no sub folder, ignore
if len(path_chunks) == 1:
continue
# remove last item from path chunks, which is the file name
del path_chunks[-1]
album_name = create_album_name(path_chunks, album_level_separator, album_name_post_regex)
try:
# Parse the album properties into an album model
album_props_template = AlbumModel.parse_album_properties_file(album_properties_file_path)
if not album_props_template:
logging.warning("Unable to parse album properties file %s", album_properties_file_path)
continue
album_props_template.name = album_name
if not album_name in album_props_templates:
album_props_templates[album_name] = album_props_template
album_name_to_album_properties_file_path[album_name] = album_properties_file_path
# There is already an album properties template with the same album name (maybe from a different root_path)
else:
incompatible_props = album_props_template.find_incompatible_properties(album_props_templates[album_name])
if len(incompatible_props) > 0:
logging.fatal("Album Properties files %s and %s create an album with identical name but have conflicting properties:",
album_name_to_album_properties_file_path[album_name], album_properties_file_path)
for incompatible_prop in incompatible_props:
logging.fatal(incompatible_prop)
fatal_error_occurred = True
except yaml.YAMLError as ex:
logging.error("Could not parse album properties file %s: %s", album_properties_file_path, ex)
if fatal_error_occurred:
logging.fatal("Encountered at least one fatal error during parsing or validating of album properties files, exiting!")
sys.exit(1)
# Now validate that all album properties templates with the same override_name are compatible with each other
validate_album_props_templates(album_props_templates.values(), album_name_to_album_properties_file_path)
return album_props_templates
def validate_album_props_templates(album_props_templates: list[AlbumModel], album_name_to_album_properties_file_path: dict):
"""
Validates the provided list of album properties.
Specifically, checks that if multiple album properties files specify the same override_name, all other specified properties
are the same as well.
If a validation error occurs, the program exits.
Parameters
----------
album_props_templates : list[AlbumModel]
The list AlbumModel objects to validate
album_name_to_album_properties_file_path : dict
A dictionary where the key is an album name and the value is the path to the album properties file the
album was generated from.
This method expects one entry in this dictionary for every AlbumModel in album_props_templates
"""
fatal_error_occurred = False
# This is a cache to remember checked names - keep time complexity down
checked_override_names = []
# Loop over all album properties templates
for album_props_template in album_props_templates:
# Check if override_name is set and not already checked
if album_props_template.override_name and album_props_template.override_name not in checked_override_names:
# Inner loop through album properties template
for album_props_template_to_check in album_props_templates:
# Do not check against ourselves and only check if the other template has the same override name (we already checked above that override_name is not None)
if (album_props_template is not album_props_template_to_check
and album_props_template.override_name == album_props_template_to_check.override_name):
if check_for_and_log_incompatible_properties(album_props_template, album_props_template_to_check, album_name_to_album_properties_file_path):
fatal_error_occurred = True
checked_override_names.append(album_props_template.override_name)
if fatal_error_occurred:
logging.fatal("Encountered at least one fatal error while validating album properties files, exiting!")
sys.exit(1)
def check_for_and_log_incompatible_properties(model1: AlbumModel, model2: AlbumModel, album_name_to_album_properties_file_path: dict) -> bool:
"""
Checks if model1 and model2 have incompatible properties (same properties set to different values). If so,
logs the the incompatible properties and returns True.
Parameters
----------
- model1 : AlbumModel
The first album model to check for incompatibility with the second model
- model2 : AlbumModel
The second album model to check for incompatibility with the first model
- album_name_to_album_properties_file_path : dict
A dictionary where the key is an album name and the value is the path to the album properties file the
album was generated from.
This method expects one entry in this dictionary for every AlbumModel in album_props_templates
Returns
---------
False if model1 and model2 are compatible, otherwise True
"""
incompatible_props = model1.find_incompatible_properties(model2)
if len(incompatible_props) > 0:
logging.fatal("Album properties files %s and %s define the same override_name but have incompatible properties:",
album_name_to_album_properties_file_path[model1.name],
album_name_to_album_properties_file_path[model2.name])
for incompatible_prop in incompatible_props:
logging.fatal(incompatible_prop)
return True
return False
def is_integer(string_to_test: str) -> bool:
"""
Trying to deal with python's isnumeric() function
not recognizing negative numbers, tests whether the provided
string is an integer or not.
Parameters
----------
string_to_test : str
The string to test for integer
Returns
---------
True if string_to_test is an integer, otherwise False
"""
try:
int(string_to_test)
return True
except ValueError:
return False
# Translation of GLOB-style patterns to Regex
# Source: https://stackoverflow.com/a/63212852
# FIXME_EVENTUALLY: Replace with glob.translate() introduced with Python 3.13
escaped_glob_tokens_to_re = OrderedDict((
# Order of ``**/`` and ``/**`` in RE tokenization pattern doesn't matter because ``**/`` will be caught first no matter what, making ``/**`` the only option later on.
# W/o leading or trailing ``/`` two consecutive asterisks will be treated as literals.
('/\\*\\*', '(?:/.+?)*'), # Edge-case #1. Catches recursive globs in the middle of path. Requires edge case #2 handled after this case.
('\\*\\*/', '(?:^.+?/)*'), # Edge-case #2. Catches recursive globs at the start of path. Requires edge case #1 handled before this case. ``^`` is used to ensure proper location for ``**/``.
('\\*', '[^/]*'), # ``[^/]*`` is used to ensure that ``*`` won't match subdirs, as with naive ``.*?`` solution.
('\\?', '.'),
('\\[\\*\\]', '\\*'), # Escaped special glob character.
('\\[\\?\\]', '\\?'), # Escaped special glob character.
('\\[!', '[^'), # Requires ordered dict, so that ``\\[!`` preceded ``\\[`` in RE pattern.
# Needed mostly to differentiate between ``!`` used within character class ``[]`` and outside of it, to avoid faulty conversion.
('\\[', '['),
('\\]', ']'),
))
escaped_glob_replacement = regex.compile('(%s)' % '|'.join(escaped_glob_tokens_to_re).replace('\\', '\\\\\\'))
def glob_to_re(pattern: str) -> str:
"""
Converts the provided GLOB pattern to
a regular expression.
Parameters
----------
pattern : str
A GLOB-style pattern to convert to a regular expression
Returns
---------
A regular expression matching the same strings as the provided GLOB pattern
"""
return escaped_glob_replacement.sub(lambda match: escaped_glob_tokens_to_re[match.group(0)], regex.escape(pattern))
def read_file(file_path: str) -> str:
"""
Reads and returns the contents of the provided file.
Parameters
----------
file_path : str
Path to the file to read
Raises
----------
FileNotFoundError if the file does not exist
Exception on any other error reading the file
Returns
---------
The file's contents
"""
with open(file_path, 'r', encoding="utf-8") as secret_file:
return secret_file.read().strip()
def read_api_key_from_file(file_path: str) -> str:
"""
Reads the API key from the provided file
Parameters
----------
file_path : str
Path to the file to read
Returns
---------
The API key or None on error
"""
try:
return read_file(file_path)
except FileNotFoundError:
logging.error("API Key file not found at %s", args["api_key"])
except OSError as ex:
logging.error("Error reading API Key file: %s", ex)
return None
def determine_api_key(api_key_source: str, key_type: str) -> str:
"""
Determines the API key base on key_type.
For key_type 'literal', api_key_source is returned as is.
For key'type 'file', api_key_source is a path to a file containing the API key,
and the file's contents are returned.
Parameters
----------
api_key_source : str
An API key or path to a file containing an API key
key_type : str
Must be either 'literal' or 'file'
Returns
---------
The API key or None on error
"""
if key_type == 'literal':
return api_key_source
if key_type == 'file':
return read_file(api_key_source)
# At this point key_type is not a valid value
logging.error("Unknown key type (-t, --key-type). Must be either 'literal' or 'file'.")
return None
def expand_to_glob(expr: str) -> str:
"""
Expands the passed expression to a glob-style
expression if it doesn't contain neither a slash nor an asterisk.
The resulting glob-style expression matches any path that contains the
original expression anywhere.
Parameters
----------
expr : str
Expression to expand to a GLOB-style expression if not already
one
Returns
---------
The original expression if it contained a slash or an asterisk,
otherwise \\*\\*/\\*\\<expr\\>\\*/\\*\\*
"""
if not '/' in expr and not '*' in expr:
glob_expr = f'**/*{expr}*/**'
logging.debug("expanding %s to %s", expr, glob_expr)
return glob_expr
return expr
def divide_chunks(full_list: list, chunk_size: int):
"""Yield successive n-sized chunks from l. """
# looping till length l
for j in range(0, len(full_list), chunk_size):
yield full_list[j:j + chunk_size]
def parse_separated_string(separated_string: str, separator: str) -> Tuple[str, str]:
"""
Parse a key, value pair, separated by the provided separator.
That's the reverse of ShellArgs.
On the command line (argparse) a declaration will typically look like:
foo=hello
or
foo="hello world"
"""
items = separated_string.split(separator)
key = items[0].strip() # we remove blanks around keys, as is logical
value = None
if len(items) > 1:
# rejoin the rest:
value = separator.join(items[1:])
return (key, value)
def parse_separated_strings(items: list[str]) -> dict:
"""
Parse a series of key-value pairs and return a dictionary
"""
parsed_strings_dict = {}
if items:
for item in items:
key, value = parse_separated_string(item, '=')
parsed_strings_dict[key] = value
return parsed_strings_dict
# pylint: disable=R0912
def create_album_name(asset_path_chunks: list[str], album_separator: str, album_name_postprocess_regex: list) -> str:
"""
Create album names from provided path_chunks string array.
The method uses global variables album_levels_range_arr or album_levels to
generate album names either by level range or absolute album levels. If multiple
album path chunks are used for album names they are separated by album_separator.
album_name_postprocess_regex is list of pairs of regex and replace, this is optional
"""
album_name_chunks = ()
logging.debug("path chunks = %s", list(asset_path_chunks))
# Check which path to take: album_levels_range or album_levels
if len(album_levels_range_arr) == 2:
if album_levels_range_arr[0] < 0:
album_levels_start_level_capped = min(len(asset_path_chunks), abs(album_levels_range_arr[0]))
album_levels_end_level_capped = album_levels_range_arr[1]+1
album_levels_start_level_capped *= -1
else:
album_levels_start_level_capped = min(len(asset_path_chunks)-1, album_levels_range_arr[0])
# Add 1 to album_levels_end_level_capped to include the end index, which is what the user intended to. It's not a problem
# if the end index is out of bounds.
album_levels_end_level_capped = min(len(asset_path_chunks)-1, album_levels_range_arr[1]) + 1
logging.debug("album_levels_start_level_capped = %d", album_levels_start_level_capped)
logging.debug("album_levels_end_level_capped = %d", album_levels_end_level_capped)
# album start level is not equal to album end level, so we want a range of levels
if album_levels_start_level_capped is not album_levels_end_level_capped:
# if the end index is out of bounds.
if album_levels_end_level_capped < 0 and abs(album_levels_end_level_capped) >= len(asset_path_chunks):
album_name_chunks = asset_path_chunks[album_levels_start_level_capped:]
else:
album_name_chunks = asset_path_chunks[album_levels_start_level_capped:album_levels_end_level_capped]
# album start and end levels are equal, we want exactly that level
else:
# create on-the-fly array with a single element taken from
album_name_chunks = [asset_path_chunks[album_levels_start_level_capped]]
else:
album_levels_int = int(album_levels)
# either use as many path chunks as we have,
# or the specified album levels
album_name_chunk_size = min(len(asset_path_chunks), abs(album_levels_int))
if album_levels_int < 0:
album_name_chunk_size *= -1
# Copy album name chunks from the path to use as album name
album_name_chunks = asset_path_chunks[:album_name_chunk_size]
if album_name_chunk_size < 0:
album_name_chunks = asset_path_chunks[album_name_chunk_size:]
logging.debug("album_name_chunks = %s", album_name_chunks)
# final album name before regex
album_name = album_separator.join(album_name_chunks)
logging.debug("Album Name %s", album_name)
# apply regex if any
if album_name_postprocess_regex:
for pattern, *repl in album_name_postprocess_regex:
# If no replacement string provided, default to empty string
replace = repl[0] if repl else ''
album_name = regex.sub(pattern, replace, album_name)
logging.debug("Album Post Regex s/%s/%s/g --> %s", pattern, replace, album_name)
return album_name.strip()
def fetch_server_version() -> dict:
"""
Fetches the API version from the immich server.
If the API endpoint for getting the server version cannot be reached,
raises HTTPError
Returns
-------
Dictionary with keys
- major
- minor
- patch
"""
api_endpoint = f'{root_url}server/version'
r = requests.get(api_endpoint, **requests_kwargs, timeout=api_timeout)
# The API endpoint changed in Immich v1.118.0, if the new endpoint
# was not found try the legacy one
if r.status_code == 404:
api_endpoint = f'{root_url}server-info/version'
r = requests.get(api_endpoint, **requests_kwargs, timeout=api_timeout)
if r.status_code == 200:
server_version = r.json()
logging.info("Detected Immich server version %s.%s.%s", server_version['major'], server_version['minor'], server_version['patch'])
# Any other errors mean communication error with API
else:
logging.error("Communication with Immich API failed! Make sure the passed API URL is correct!")
check_api_response(r)
return server_version
def fetch_assets(is_not_in_album: bool, find_archived: bool) -> list:
"""
Fetches assets from the Immich API.
Uses the /search/meta-data call. Much more efficient than the legacy method
since this call allows to filter for assets that are not in an album only.
Parameters
----------
is_not_in_album : bool
Flag indicating whether to fetch only assets that are not part
of an album or not. If set to False, will find images in albums and
not part of albums
find_archived : bool
Flag indicating whether to only fetch assets that are archived. If set to False,
will find archived and unarchived images
Returns
---------
An array of asset objects
"""
return fetch_assets_with_options({'isNotInAlbum': is_not_in_album, 'withArchived': find_archived})
def fetch_assets_with_options(search_options: dict) -> list:
"""
Fetches assets from the Immich API using specific search options.
The search options directly correspond to the body used for the search API request.
Parameters
----------
search_options: dict
Dictionary containing options to pass to the search/metadata API endpoint
Returns
---------
An array of asset objects
"""
body = search_options
assets_found = []
# prepare request body
# This API call allows a maximum page size of 1000
number_of_assets_to_fetch_per_request_search = min(1000, number_of_assets_to_fetch_per_request)
body['size'] = number_of_assets_to_fetch_per_request_search
# Initial API call, let's fetch our first chunk
page = 1
body['page'] = str(page)
r = requests.post(root_url+'search/metadata', json=body, **requests_kwargs, timeout=api_timeout)
r.raise_for_status()
response_json = r.json()
assets_received = response_json['assets']['items']
logging.debug("Received %s assets with chunk %s", len(assets_received), page)
assets_found = assets_found + assets_received
# If we got a full chunk size back, let's perform subsequent calls until we get less than a full chunk size
while len(assets_received) == number_of_assets_to_fetch_per_request_search:
page += 1
body['page'] = page
r = requests.post(root_url+'search/metadata', json=body, **requests_kwargs, timeout=api_timeout)
check_api_response(r)
response_json = r.json()
assets_received = response_json['assets']['items']
logging.debug("Received %s assets with chunk %s", len(assets_received), page)
assets_found = assets_found + assets_received
return assets_found
def fetch_albums():
"""Fetches albums from the Immich API"""
api_endpoint = 'albums'
r = requests.get(root_url+api_endpoint, **requests_kwargs, timeout=api_timeout)
check_api_response(r)
return r.json()
def fetch_album_info(album_id_for_info: str):
"""
Fetches information about a specific album
Parameters
----------
album_id_for_info : str
The ID of the album to fetch information for
"""
api_endpoint = f'albums/{album_id_for_info}'
r = requests.get(root_url+api_endpoint, **requests_kwargs, timeout=api_timeout)
check_api_response(r)
return r.json()
def delete_album(album_delete: dict):
"""
Deletes an album identified by album_to_delete['id']
If the album could not be deleted, logs an error.
Parameters
----------
album_delete : dict
Dictionary with the following keys:
- id
- albumName
Returns
---------
True if the album was deleted, otherwise False
"""
api_endpoint = 'albums'
logging.debug("Deleting Album: Album ID = %s, Album Name = %s", album_delete['id'], album_delete['albumName'])
r = requests.delete(root_url+api_endpoint+'/'+album_delete['id'], **requests_kwargs, timeout=api_timeout)
try:
check_api_response(r)
return True
except HTTPError:
logging.error("Error deleting album %s: %s", album_delete['albumName'], r.reason)
return False
def create_album(album_name_to_create: str) -> str:
"""
Creates an album with the provided name and returns the ID of the created album
Parameters
----------
album_name_to_create : str
Name of the album to create
Returns
---------
True if the album was deleted, otherwise False
Raises
----------
Exception if the API call failed
"""
api_endpoint = 'albums'
data = {
'albumName': album_name_to_create
}
r = requests.post(root_url+api_endpoint, json=data, **requests_kwargs, timeout=api_timeout)
check_api_response(r)
return r.json()['id']
def is_path_ignored(path_to_check: str) -> bool:
"""
Determines if the asset should be ignored for the purpose of this script
based in its originalPath and global ignore and path_filter options.
Parameters
----------
asset_to_check : dict
The asset to check if it must be ignored or not. Must have the key 'originalPath'.
Returns
----------
True if the asset must be ignored, otherwise False
"""
is_path_ignored_result = False
asset_root_path = None
for root_path_to_check in root_paths:
if root_path_to_check in path_to_check:
asset_root_path = root_path_to_check
break
logging.debug("Identified root_path for asset %s = %s", path_to_check, asset_root_path)
if asset_root_path:
# First apply filter, if any
if len(path_filter_regex) > 0:
any_match = False
for path_filter_regex_entry in path_filter_regex:
if regex.fullmatch(path_filter_regex_entry, path_to_check.replace(asset_root_path, '')):
any_match = True
if not any_match:
logging.debug("Ignoring path %s due to path_filter setting!", path_to_check)
is_path_ignored_result = True
# If the asset "survived" the path filter, check if it is in the ignore_albums argument
if not is_path_ignored_result and len(ignore_albums_regex) > 0:
for ignore_albums_regex_entry in ignore_albums_regex:
if regex.fullmatch(ignore_albums_regex_entry, path_to_check.replace(asset_root_path, '')):
is_path_ignored_result = True
logging.debug("Ignoring path %s due to ignore_albums setting!", path_to_check)
break
return is_path_ignored_result
def add_assets_to_album(assets_add_album_id: str, asset_list: list[str]) -> list[str]:
"""
Adds the assets IDs provided in assets to the provided albumId.
If assets if larger than number_of_images_per_request, the list is chunked
and one API call is performed per chunk.
Only logs errors and successes.
Returns
Parameters
----------
assets_add_album_id : str
The ID of the album to add assets to
asset_list: list[str]
A list of asset IDs to add to the album
Returns
---------
The asset UUIDs that were actually added to the album (not respecting assets that were already part of the album)
"""
api_endpoint = 'albums'
# Divide our assets into chunks of number_of_images_per_request,
# So the API can cope
assets_chunked = list(divide_chunks(asset_list, number_of_images_per_request))
asset_list_added = []
for assets_chunk in assets_chunked:
data = {'ids':assets_chunk}
r = requests.put(root_url+api_endpoint+f'/{assets_add_album_id}/assets', json=data, **requests_kwargs, timeout=api_timeout)
check_api_response(r)
response = r.json()
for res in response:
if not res['success']:
if res['error'] != 'duplicate':
logging.warning("Error adding an asset to an album: %s", res['error'])
else:
asset_list_added.append(res['id'])
return asset_list_added
def fetch_users():
"""Queries and returns all users"""
api_endpoint = 'users'
r = requests.get(root_url+api_endpoint, **requests_kwargs, timeout=api_timeout)
check_api_response(r)
return r.json()
# Disable pylint for too many branches
# pylint: disable=R0912
def update_album_shared_state(album_to_share: AlbumModel, unshare_users: bool) -> None:
"""
Makes sure the album is shared with the users set in the model with the correct roles.
This involves fetching album info from Immich to check if/who the album is shared with and the share roles,
then either updating the share role, removing the user, or adding the users
Parameters
----------
album_to_share : AlbumModel
The album to share, with the expected share_with setting
unshare_users: bool
Flag indicating whether to actively unshare albums if shared with a user that is not in the current
share settings
Raises
----------
HTTPError if the API call fails
"""
# Parse and prepare expected share roles
# List all share users by share role
share_users_to_roles_expected = {}
for share_user in album.share_with:
# Find the user by configured name or email
share_user_in_immich = find_user_by_name_or_email(share_user['user'], users)
if not share_user_in_immich:
logging.warning("User %s to share album %s with does not exist!", share_user['user'], album.get_final_name())
continue
share_users_to_roles_expected[share_user_in_immich['id']] = share_user['role']
# No users to share with and unsharing is disabled?
if len(share_users_to_roles_expected) == 0 and not unshare_users:
return
# Now fetch reality
album_to_share_info = fetch_album_info(album_to_share.id)
# Dict mapping a user ID to share role
album_share_info = {}
for share_user_actual in album_to_share_info['albumUsers']:
album_share_info[share_user_actual['user']['id']] = share_user_actual['role']
# Group share users by share role
share_roles_to_users_expected = {}
# Now compare expectation with reality and update
for user_to_share_with, share_role_expected in share_users_to_roles_expected.items():
# Case: Album is not share with user
if user_to_share_with not in album_share_info:
# Gather all users to share the album with for this role
if not share_role_expected in share_roles_to_users_expected:
share_roles_to_users_expected[share_role_expected] = []
share_roles_to_users_expected[share_role_expected].append(user_to_share_with)
# Case: Album is shared, but with wrong role
elif album_share_info[user_to_share_with] != share_role_expected:
try:
update_album_share_user_role(album_to_share.id, user_to_share_with, share_role_expected)