-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtweet_trender.py
307 lines (271 loc) · 10.6 KB
/
tweet_trender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# COS 521 Final Project
# Algorithm Implementation
import fib_heap as fh
from collections import defaultdict
from collections import deque
import history as h
import key_functions
from dateutil import parser
from datetime import timedelta
from copy import deepcopy
# when parsing tweets, need to make sure
# the tweet sending model (from file)
# takes into account times.
# (so we fake the time sending, basically)
# (if we want to actually implement realtime,
# need a time counter while getting tweets)
# this is being run on FILES of previous twitter data
# with (hashtag, timestamp) format
# that's why we use the timestamp to calculate time
# instead of actual time as we would if it were real-time
# HOW TO USE:
# SmartTrendPredictor takes as input a file of tweets,
# in the format (id, timestamp, hashtag)
# the assumption is that the tweets are in chronological order.
# the trend predictor builds a datastructure that evalutes
# the top k tweets for that specific file.
# maybe we want to implement time snapshots INSIDE a given file?
# as in, top k for a certain time range? maybe not necessary,
# because here, we're just simulating "up to a certain point in time"
# with the file
class SmartTrendPredictor(object):
# NOTE THE HISTORY PARAMETERS, THRESHOLDS, AND OTHER STUFF
# MAY BE CHANGED INSIDE HERE.
# I COULD EXPOSE THESE PARAMETERS BUT I DID NOT. feel free to do so if you want.
def __init__(self, data_file):
# maintains list of hashtags
# that have occurred in the past time unit
# (say, 1 day -- can fix this however we want)
# it's set to zero again after it gets added to history
self.data_block = []
# number of seconds in a day
# a day is a block, can modify this
# this is for the history datastructure
# THIS NEEDS TO BE A TIME DELTA FOR COMPARISON
self.block_threshold = timedelta(days=1, hours=0, minutes=0, seconds=0)
# datetime of the time the last block ended at - we take times with reference to that
self.end_of_last_block = None
# 3 hours for now
# the resolution of the heap/ frequency hash table
# THIS NEEDS TO BE A TIME DELTA SINCE WE DO ADDITION
self.y_res = timedelta(hours=3, minutes=0, seconds=0)
self.tweets = defaultdict(list)
self.inverted_tweets_idx = defaultdict(list)
# current tweet storage
# queue for holding hashtags from the stream
# (hashtag, timestamp pairs)
# the right is the most recent
# the left is the oldest
self.input_queue = deque()
# map from hashtag to (frequency, ptr to heap) pairs
# holds hashtags from the last y minutes
self.hashtag_freq = dict()
# max Fib heap
# decrease_key is amortized constant
# To benefit from Fibonacci heaps in practice,
# you have to use them in an application where decrease_keys are incredibly frequent.
# we want to do k delete-max ops in the heap
# http://stromberg.dnsalias.org/~strombrg/fibonacci-heap-mod/
# this is a min heap, so enter all keys as negative to get the max heap.
self.heap = fh.Fibonacci_heap()
# current heap ptr value
self.curr_index = 0
# dict from index values (numbers from m to n) to the heap value
self.heap_ptrs = defaultdict(lambda:None)
# similar to Hokusai data structure
# pick some parameters here
# n = 6, m = 3500 (size of hash tables), d = 20 (# of tables)
# n: we do it for a month, so ~ 2^(6-1) days is definitely enough
# m = ceiling(e/epsilon), d = ceiling(ln(1/delta))
# error in query is in factor epilson with probability 1 - delta
# for d = 20, we have with high probability (1 - 1/e^20)
# we have error factor of e/3500 of the total number of hashtags
# total number of hashtags is ~ 15 million
# thus error bounds are in range ~ 12000 on a given count
# this error provides a 'good' amount of smoothing
self.hist = h.History(6, 3500, 20)
# first line bool
# used for checking if its the first line in the file
# when we parse the file
isFirstLine = True
# not finished
with open(data_file) as f:
for line in f:
try:
splitLine = line.strip().split(',')
timestamp = splitLine[1]
hashtag = ','.join(splitLine[2:])
# normalize hashtags
hashtag = hashtag.lower()
tweet_dt = parser.parse(timestamp)
if isFirstLine:
# 'last blocks' start at the beginning of the file
self.end_of_last_block = tweet_dt
isFirstLine = False
self.update_datastructs((hashtag, tweet_dt))
except Exception, e:
print e
continue
self.tweets[hashtag.lower()].append(tweet_dt)
self.inverted_tweets_idx[tweet_dt].append(hashtag.lower())
# update the hashtag frequency dict and heap data structures and queue
# new_data is in the format (hashtag, timestamp)
def update_datastructs(self, new_data):
hashtag, curr_dt = new_data
# update the history
# check timestamp and stuff to build current datablocks
# diff should be an integer in seconds, maybe
diff = curr_dt - self.end_of_last_block
# aggregate only for every block timeunit
if diff > self.block_threshold:
self.data_block.append(hashtag)
self.hist.aggregate_unit(self.data_block)
self.data_block = []
self.end_of_last_block = curr_dt
else:
self.data_block.append(hashtag)
# update the present structure in hist
# don't worry, this will get cleared when
# we add the whole block, and get re-added from the start
self.hist.update_present_only(hashtag)
# update all other structs
if hashtag in self.hashtag_freq:
# update the frequency
curr_freq = self.hashtag_freq[hashtag][0]
curr_freq += 1
self.hashtag_freq[hashtag][0] = curr_freq
# get the ptr and update the heap
ptr = self.hashtag_freq[hashtag][1]
entry = self.heap_ptrs[ptr]
val = entry.get_value()
old_priority = entry.get_priority()
# negative since we're using a min heap
# query value should never be 0
new_priority = (-1)*(curr_freq + 0.0)/ self.hist.query(hashtag)
if new_priority >= old_priority:
# delete enqueue to avoid decrease_key operation
# when we're not actually decreasing the key
self.heap.delete(entry)
ptr_val = self.heap.enqueue(val, new_priority)
# we simply update the ptr in the heap ptr
# the value in hashtag_freq does not change, since
# its the same ptr value
self.heap_ptrs[ptr] = ptr_val
else:
# we can decrease the key since new_priority < old_priority
self.heap.decrease_key(entry, new_priority)
else: # hashtag not yet seen
# build the ptr to the heap
# negative since we have a min heap
# query value should never be 0
priority = (-1)*1./self.hist.query(hashtag)
ptr_val = self.heap.enqueue(hashtag, priority)
self.heap_ptrs[self.curr_index] = ptr_val
# build hashtag_freq entry
# current frequency is 1
self.hashtag_freq[hashtag] = [1, self.curr_index]
# new pointer
self.curr_index += 1
# update the queue
# (appending to the right, most recent)
self.input_queue.append(new_data)
# check first thing in the queue (left)
# while the time of the element of the
# queue + y is less than current time
# (that means it's too old)
# remove it from the "current" data structures
# (the heap, the freq hashtable, the queue)
# (since we delete from the left, the index to look at is always 0)
while self.input_queue[0][1] + self.y_res < curr_dt:
hashtag_old, old_dt = self.input_queue.popleft()
# decrease the frequency of that hashtag by 1
new_freq = self.hashtag_freq[hashtag_old][0] - 1
if new_freq > 0:
# update hashtag frequency table
self.hashtag_freq[hashtag_old][0] = new_freq
# update heap
ptr = self.hashtag_freq[hashtag_old][1]
entry = self.heap_ptrs[ptr]
val = entry.get_value()
old_priority = entry.get_priority()
new_priority = (-1) * (new_freq + 0.0) / self.hist.query(hashtag_old)
if new_priority >= old_priority:
# delete enqueue to avoid decrease_key operation
# when we're not actually decreasing the key
self.heap.delete(entry)
ptr_val = self.heap.enqueue(val, new_priority)
# we simply update the ptr in the heap ptr
# the value in hashtag_freq does not change, since
# its the same ptr value
self.heap_ptrs[ptr] = ptr_val
else:
self.heap.decrease_key(entry, new_priority)
else: # frequency is 0, we delete this hashtag
ptr = self.hashtag_freq[hashtag_old][1]
# delete the pointer to this entry from the heap, since its frequency is 0
self.heap.delete(self.heap_ptrs[ptr])
# delete ptr from heap_ptrs
del self.heap_ptrs[ptr]
# delete the entry from the hashtable
del self.hashtag_freq[hashtag_old]
# the top hashtags in the min-heap
# (most negative)
# FORGOT TO CHECK POINTERS HERE
# need to write code that removes properly
def get_topk_hashtags(self, k):
# what we will return
# list of entries (have priority and value)
best_nodes = []
for i in range(k):
top_node = self.heap.dequeue_min()
val = top_node.get_value()
priority = top_node.get_priority()
ptr = self.hashtag_freq[val][1]
# self.heap_ptrs[ptr] should be the same as top_node
#print 'Priority: ' + str(-1*priority)
#print 'Value: ' + str(val)
best_nodes.append((priority, val, ptr))
for node in best_nodes:
new_node = self.heap.enqueue(node[1], node[0])
ptr = node[2]
self.heap_ptrs[ptr] = new_node
return sorted(best_nodes, reverse=True)
def print_all_keys(self):
iter_heap = deepcopy(self.heap)
key_counts = defaultdict(int)
print 'Printing all Key Priorities: '
while iter_heap.m_size > 0:
node = iter_heap.dequeue_min()
key = -1*node.get_priority()
key_counts[key] += 1
for key in key_counts:
print 'count of ' + str(key) + ': ' + str(key_counts[key])
# build a smart trend predictor,
# see what the key counts are (what priorities exist),
# see what the top ones are for the test file 'tweet_data',
# a file that only spans a few hours (so the history stuff doesn't get tested..)
def test():
stp = SmartTrendPredictor('tweet_data')
stp.print_all_keys()
print '============================'
top5 = stp.get_topk_hashtags(5)
top10 = stp.get_topk_hashtags(10)
top15 = stp.get_topk_hashtags(15)
top4000 = stp.get_topk_hashtags(4000)
print '----------------------------'
print 'Top 5 in order:'
for tag_pair in top5:
print str(tag_pair[1]) + ", " + str(tag_pair[0])
print '----------------------------'
print 'Top 10 in order:'
for tag_pair in top10:
print str(tag_pair[1]) + ", " + str(tag_pair[0])
print '----------------------------'
print 'Top 15 in order:'
for tag_pair in top15:
print str(tag_pair[1]) + ", " + str(tag_pair[0])
print '----------------------------'
print 'Top 4000 in order:'
for tag_pair in top4000:
print str(tag_pair[1]) + ", " + str(tag_pair[0])
print '----------------------------'