-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfile_monitor.py
164 lines (139 loc) Β· 5.34 KB
/
file_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import subprocess
import time
import os
import psycopg2
import httpx
from dotenv import load_dotenv
# πΉ Load environment variables
load_dotenv()
# πΉ Configuration
WATCHED_FOLDER = os.getenv("WATCHED_FOLDER", "/home/ubuntu/test_directory")
TELEX_WEBHOOK_URL = os.getenv("TELEX_WEBHOOK_URL", "http://127.0.0.1:5000/telex-webhook")
# πΉ PostgreSQL Configuration
DB_CONFIG = {
"dbname": os.getenv("DB_NAME", "file_monitor"),
"user": os.getenv("DB_USER", "postgres"),
"password": os.getenv("DB_PASSWORD", "yourpassword"),
"host": os.getenv("DB_HOST", "localhost"),
"port": os.getenv("DB_PORT", "5432"),
}
# πΉ Store last processed event ID (prevents duplicate alerts)
last_event_id = None
def setup_database():
"""Creates the necessary table in PostgreSQL if it doesn't exist."""
try:
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS file_deletions (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
file_path TEXT NOT NULL,
deleted_by TEXT NOT NULL
)
""")
conn.commit()
cursor.close()
conn.close()
print("β
Database setup complete.")
except Exception as e:
print(f"β Database setup error: {e}")
def setup_auditd_rule():
"""Adds an auditd rule to monitor only file deletions."""
try:
rule_check = subprocess.run(["auditctl", "-l"], capture_output=True, text=True)
if WATCHED_FOLDER in rule_check.stdout:
print(f"β
Audit rule already exists for {WATCHED_FOLDER}")
else:
subprocess.run(["auditctl", "-w", WATCHED_FOLDER, "-p", "wa", "-k", "file_delete"], check=True)
print(f"π Added auditd rule to monitor {WATCHED_FOLDER}")
except subprocess.CalledProcessError as e:
print(f"β Failed to set up auditd rule: {e}")
def get_latest_deletion_log():
"""Fetches the most recent file deletion log from auditd."""
try:
result = subprocess.run(["ausearch", "-k", "file_delete", "--start", "recent"], capture_output=True, text=True)
logs = result.stdout.strip()
if "no matches" in logs.lower():
return None
return logs
except Exception as e:
print(f"β Error fetching deletion logs: {e}")
return None
def extract_deletion_info(logs):
"""Extracts details of the last deletion event and ensures it's new."""
global last_event_id
if not logs:
return None
lines = logs.split("\n")
event_id = None
filepath = None
user_id = None
for line in reversed(lines): # Start from the latest entry
if "type=DELETE" in line:
try:
event_id = line.split("msg=audit(")[-1].split(")")[0]
# πΉ Avoid duplicate alerts
if event_id == last_event_id:
return None
last_event_id = event_id
# πΉ Extract file path
if "name=" in line:
filepath = line.split("name=")[-1].split(" ")[0].strip('"')
# πΉ Extract user ID and convert to username
if "uid=" in line:
user_id = line.split("uid=")[1].split(" ")[0].strip()
user = subprocess.run(["id", "-un", user_id], capture_output=True, text=True).stdout.strip()
else:
user = "Unknown"
if filepath:
log_to_db(filepath, user) # πΉ Save to database
send_to_telex(filepath, user) # πΉ Send Alert
print(f"β
Logged to database: {filepath} by {user}")
return
except Exception as e:
print(f"β Error extracting deletion info: {e}")
return None
return None
def log_to_db(file_path, deleted_by):
"""Logs deletion events to PostgreSQL."""
try:
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
cursor.execute(
"INSERT INTO file_deletions (file_path, deleted_by) VALUES (%s, %s)",
(file_path, deleted_by)
)
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"β Database logging error: {e}")
def send_to_telex(file_path, user):
"""Sends a deletion alert to Telex."""
try:
payload = {
"message": f"π¨ File Deleted! \nπ Path: {file_path} \nπ€ User: {user}",
"event_name": "β DELETE ALERT",
"status": "success",
"username": "I-notify"
}
response = httpx.post(TELEX_WEBHOOK_URL, json=payload)
response.raise_for_status()
print("β
Telex alert sent!")
except httpx.HTTPError as e:
print(f"β Failed to send to Telex: {e}")
def monitor_deletions():
"""Monitors file deletions and logs them to the database."""
print("π Monitoring file deletions started...")
setup_database()
setup_auditd_rule()
while True:
logs = get_latest_deletion_log()
extract_deletion_info(logs)
time.sleep(5)
# π Start monitoring in a separate process
#import threading
#threading.Thread(target=monitor_deletions, daemon=True).start()
if __name__== "__main__":
monitor_deletions()