-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlab2_apache_log_student.py
128 lines (103 loc) · 4.17 KB
/
lab2_apache_log_student.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# 1. Parsing Each Log Line
import re
import datetime
from pyspark.sql import Row
month_map = {'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12}
def parse_apache_time(s):
""" Convert Apache time format into a Python datetime object
Args:
s (str): date and time in Apache time format
Returns:
datetime: datetime object (ignore timezone for now)
"""
return datetime.datetime(int(s[7:11]),
month_map[s[3:6]],
int(s[0:2]),
int(s[12:14]),
int(s[15:17]),
int(s[18:20]))
def parseApacheLogLine(logline):
""" Parse a line in the Apache Common Log format
Args:
logline (str): a line of text in the Apache Common Log format
Returns:
tuple: either a dictionary containing the parts of the Apache Access Log and 1,
or the original invalid log line and 0
"""
match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
if match is None:
return (logline, 0)
size_field = match.group(9)
if size_field == '-':
size = long(0)
else:
size = long(match.group(9))
return (Row(
host = match.group(1),
client_identd = match.group(2),
user_id = match.group(3),
date_time = parse_apache_time(match.group(4)),
method = match.group(5),
endpoint = match.group(6),
protocol = match.group(7),
response_code = int(match.group(8)),
content_size = size
), 1)
# A regular expression pattern to extract fields from the log line
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
# 2. Configuration and Initial RDD Creation
import sys
import os
from test_helper import Test
baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab2', 'apache.access.log.PROJECT')
logFile = os.path.join(baseDir, inputPath)
def parseLogs():
""" Read and parse log file """
parsed_logs = (sc
.textFile(logFile)
.map(parseApacheLogLine)
.cache())
access_logs = (parsed_logs
.filter(lambda s: s[1] == 1)
.map(lambda s: s[0])
.cache())
failed_logs = (parsed_logs
.filter(lambda s: s[1] == 0)
.map(lambda s: s[0]))
failed_logs_count = failed_logs.count()
if failed_logs_count > 0:
print 'Number of invalid logline: %d' % failed_logs.count()
for line in failed_logs.take(20):
print 'Invalid logline: %s' % line
print 'Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count())
return parsed_logs, access_logs, failed_logs
parsed_logs, access_logs, failed_logs = parseLogs()
# 3. data cleaning
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*).*" (\d{3}) (\S+)'
parsed_logs, access_logs, failed_logs = parseLogs()
# 4. Exercise: Number of Unique Hosts
hosts = access_logs.map(lambda x:(x.host,1))
uniqueHosts = hosts.reduceByKey(lambda x,y:x+y)
uniqueHostCount = uniqueHosts.count()
print 'Unique hosts: %d' % uniqueHostCount
# 5. Number of Unique Daily Hosts
dayToHostPairTuple = access_logs.map(lambda x:(x.date_time.date().day,x.host))
dayGroupedHosts = dayToHostPairTuple.groupBy(lambda (x,y):x)
dayHostCount = dayGroupedHosts.map(lambda (x,y):(x,len(set(y))))
dailyHosts = (dayHostCount
.sortByKey()
.cache())
dailyHostsList = dailyHosts.take(30)
print 'Unique hosts per day: %s' % dailyHostsList
#Visualizing the Number of Unique Daily Hosts
daysWithHosts = dailyHosts.map(lambda (x,y):x).collect()
hosts = dailyHosts.map(lambda (x,y):y).collect()
fig = plt.figure(figsize=(8,4.5), facecolor='white', edgecolor='white')
plt.axis([min(daysWithHosts), max(daysWithHosts), 0, max(hosts)+500])
plt.grid(b=True, which='major', axis='y')
plt.xlabel('Day')
plt.ylabel('Hosts')
plt.plot(daysWithHosts, hosts)
pass