-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathautograbber.py
executable file
·139 lines (111 loc) · 4.15 KB
/
autograbber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python3
from common import load_root_node
import fnmatch
import logging
import os
import shutil
import sys
HISTORY_FILENAME = ".history.txt"
PATTERN_FILENAME = ".patterns.txt"
EXCLUDE_FILENAME = ".excludes.txt"
class DownloadList(object):
def __init__(self):
self.exclude_list = set()
self._load_exclude_list()
self.seen_list = set()
self._load_history_file()
self.f = open(HISTORY_FILENAME, "a")
def _load_exclude_list(self):
try:
with open(EXCLUDE_FILENAME, "r") as f:
for line in f:
self.exclude_list.add(line.strip())
except Exception as e:
pass
def _load_history_file(self):
self._move_old_file("downloaded_auto.txt")
self._move_old_file(".downloaded_auto.txt")
try:
with open(HISTORY_FILENAME, "r") as f:
for line in f:
self.seen_list.add(line.strip())
except Exception as e:
logging.error("Could not open history file: %s -- %s", HISTORY_FILENAME, e)
def _move_old_file(self, old_filename):
if os.path.isfile(old_filename) and not os.path.isfile(HISTORY_FILENAME):
logging.info("Migrating download history from %s to %s", old_filename, HISTORY_FILENAME)
shutil.move(old_filename, HISTORY_FILENAME)
def wants(self, node):
title = node.title.strip()
if title in self.seen_list:
return False
for exclude in self.exclude_list:
if fnmatch.fnmatch(title, exclude):
return False
return True
def mark_seen(self, node):
self.seen_list.add(node.title.strip())
self.f.write(node.title.strip() + "\n")
self.f.flush()
def match(download_list, node, pattern, count=0):
logging.debug("match(%s, %s, %s, %s)", download_list.seen_list, node.title, pattern, count)
if node.can_download:
if download_list.wants(node):
logging.debug("Attempting download of %s", node.title)
if node.download():
download_list.mark_seen(node)
else:
logging.error("Failed to download! %s", node.title)
return
if count >= len(pattern):
logging.error("No match found for pattern:", "/".join(pattern))
return
p = pattern[count]
for child in node.get_children():
logging.debug("Matching '%s' against '%s'", child.title, p)
if fnmatch.fnmatch(child.title, p):
logging.debug("Matched!")
match(download_list, child, pattern, count+1)
def process_one_dir(destdir, patternfile):
os.chdir(destdir)
node = load_root_node()
download_list = DownloadList()
for line in open(patternfile):
search = line.strip().split("/")
match(download_list, node, search)
def check_directories(download_dirs):
result = []
failed = False
for d in download_dirs:
d = os.path.abspath(d)
if not os.path.isdir(d):
print("Not a directory!", d)
failed = True
pattern_filename = os.path.join(d, PATTERN_FILENAME)
if not os.path.isfile(pattern_filename):
print("Missing file!", pattern_filename)
failed = True
result.append((d, pattern_filename))
if failed:
print("Exiting!")
sys.exit(1)
return result
def process_dirs(download_dirs):
for download_dir, pattern_filename in check_directories(download_dirs):
logging.info("Processing directory: %s", download_dir)
process_one_dir(download_dir, pattern_filename)
if __name__ == "__main__":
if len(sys.argv) <= 1:
print("Usage: %s download_dir [download_dir ...]" % sys.argv[0])
sys.exit(1)
if len(sys.argv) == 3 and os.path.isfile(sys.argv[2]):
# Backwards compatibility with old argument format
destdir = os.path.abspath(sys.argv[1])
patternfile = os.path.abspath(sys.argv[2])
run = lambda: process_one_dir(destdir, patternfile)
else:
run = lambda: process_dirs(sys.argv[1:])
try:
run()
except (KeyboardInterrupt, EOFError):
print("\nExiting...")