-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscanner.py
99 lines (79 loc) · 3.15 KB
/
scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import hashlib
from pathlib import Path
from typing import NoReturn
import fs_utlis
import db as DB
import hashers
class SymlinkFound(Exception):
pass
class UnexpectedPathType(Exception):
pass
# set of extensions that are supported by PIL.Image
img_extensions = ('.jpg', '.png', '.gif', '.tiff', '.jpeg', '.bmp')
class Scanner():
def __init__(self, db_path: Path | str) -> None:
self.db = DB.Database(db_path)
self.db.initialize()
self.dir_stack =[]
@property
def current_dir_id(self) -> int:
try:
id, _ = self.dir_stack[-1]
except IndexError:
id = self.db.rootDirID
return id
def file_handler(self, path: Path) -> None:
size = path.stat().st_size
if path.suffix.casefold() in img_extensions:
image_hash = hashers.image_hasher(path)
self.db.insertFile(str(path), size, self.current_dir_id, image_hash, image_hash)
else:
partial_hash = hashers.partial_hasher(path, size)
try:
self.db.insertFile(str(path), size, self.current_dir_id, partial_hash)
# Catch exception if identical partial hash exists
except DB.PartialHashCollisionException as e:
# Add complete hash to collided file if not exists
if not e.has_hash_complete:
e_full_hash = hashers.full_hasher(e.path)
self.db.updateFileCompleteHash(e.id, e_full_hash)
self.dir_hash_update(e.dir_id)
# Resummit insertion request
full_hash = hashers.full_hasher(path)
self.db.insertFile(str(path), size, self.current_dir_id, partial_hash, full_hash)
def dir_handler(self, path: Path) -> None:
if path is None:
id, path = self.dir_stack.pop()
self.dir_hash_update(id)
else:
id = self.db.insertDir(str(path), self.current_dir_id)
self.dir_stack.append((id, path))
def dir_hasher(self, id: int) -> str:
hashes = self.db.getChildrenHashes(id)
hash_str = "\n".join(hashes)
return hashlib.md5(hash_str.encode("ascii")).hexdigest()
def dir_hash_update(self, id: int) -> None:
if id != self.db.rootDirID:
dir_hash = self.dir_hasher(id)
self.db.updateDirHash(id, dir_hash)
parent = self.db.getDirParentID(id)
self.dir_hash_update(parent)
@staticmethod
def symlink_handler(path: Path) -> NoReturn:
raise SymlinkFound(f'Symlink "{path} found in directory, unable to handle it')
def switcher(self, type, *args) -> None:
if type == 'dir':
self.dir_handler(*args)
elif type == 'file':
self.file_handler(*args)
elif type == 'symlink':
Scanner.symlink_handler(*args)
else:
raise UnexpectedPathType
def scan(self, path: Path) -> None:
for type, p in fs_utlis.dir_dfs(path):
self.switcher(type, p)
def dumpResults(self) -> None:
self.db.dumpTable("dirs")
self.db.dumpTable("files")
self.db.dumpTable("duplicates")