-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathtweepy_wrapper.py
296 lines (207 loc) · 7.53 KB
/
tweepy_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
#tweepy wrapper
import tweepy
from helper_functions import *
import os
import time
auth = tweepy.AppAuthHandler(os.environ["TWITTER_CONSUMER_KEY"], os.environ["TWITTER_CONSUMER_SECRET"]) #higher limits
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
from tweepy.models import Status, ResultSet
import re
from datetime import datetime
import random
def view_rate_limits():
"""
View's key rate limits for Twitter REST API with application authentication.
"""
rate_limit_dict = api.rate_limit_status()['resources'] #only one API call!
follow_remain = rate_limit_dict["friends"]['/friends/list']['remaining']
follow_reset_time = convert_UNIX_time(rate_limit_dict["friends"]['/friends/list']['reset'])
search_remain = rate_limit_dict['search']['/search/tweets']['remaining']
search_reset_time = convert_UNIX_time(rate_limit_dict['search']['/search/tweets']['reset'])
application_remain = rate_limit_dict['application']['/application/rate_limit_status']['remaining']
application_reset_time = convert_UNIX_time(rate_limit_dict['application']['/application/rate_limit_status']['reset'])
user_remain = rate_limit_dict['statuses']['/statuses/user_timeline']["remaining"]
user_reset = convert_UNIX_time(rate_limit_dict['statuses']['/statuses/user_timeline']["reset"])
print("""
Search Remaining: {}, Search Reset: {}
Follow Remaining: {}, Follow Reset: {}
User Remaining: {}, User Reset: {}
Application Remaining: {}, Application Reset: {}
The Current time is: {}
""".format(search_remain, search_reset_time, follow_remain, follow_reset_time, user_remain, user_reset, application_remain, application_reset_time, datetime.now().time()))
def search_twitter(query, number):
"""
Argument Order: query, number
Will search twitter for the query. Query can be a list.
Number relates to how many tweets
Returns a list of tweets
"""
assert type(query) == str, "Please enter a query in the form of a string"
assert type(number) == int, "Please enter the number of as an integer"
return list(tweepy.Cursor(api.search, q=query, lang='en', tweet_mode='extended').items(number))
def extract_handle(tweet):
"""
Argument Order: tweet
Extracts the twitter handle for a given tweet. @ symbol not included.
Returns the handle - string type
"""
assert type(tweet) == Status, "Please enter in a tweet of type Status"
return tweet.__dict__['user'].screen_name
def extract_text(tweet):
"""
Argument Order: tweet
Extracts the clean text of a tweet. Remove links and emoji's
Returns clean text of the tweet
"""
#this function can be mapped to a list of tweets (status type)
assert type(tweet) == Status, "Please enter in a tweet of type Status"
regex = r"http\S+"
subset = ""
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
"]+", flags=re.UNICODE)
if hasattr(tweet, "text"):
clean = re.sub(regex, subset, tweet.text.strip())
clean = emoji_pattern.sub(subset, clean).strip()
else:
clean = re.sub(regex, subset, tweet.full_text.strip())
return clean
# def extract_hashtags(tweet):
# """
# Argument Order: tweet
# Returns hastags present in a given tweet
# list(map(extract_hashtags, no_rt_gabr))
# """
# assert type(tweet) == Status, "Please enter in a tweet of type Status"
# if hasattr(tweet, "text"):
# return [i for i in tweet.text.split() if i.startswith("#")]
# else:
# return [i for i in tweet.full_text.split() if i.startswith("#")]
def extract_hashtags(tweet):
"""
Argument Order: tweet
Return a list of hastags present in a given tweet
"""
hashtags = []
assert type(tweet) == Status, "Please enter in a tweet of type Status"
if hasattr(tweet, "entities"):
if tweet.entities['hashtags'] == []:
return []
else:
for i in tweet.entities['hashtags']:
hashtags.append(i['text'])
else:
print("No entity method!")
return hashtags
def extract_datetime(tweet):
"""
Argument Order: tweet
Returns a datetime object
"""
assert type(tweet) == Status, "Please enter in a tweet of type Status"
return tweet.created_at
def extract_users_tweets(handle, number):
"""
Argument Order: handle, number of tweets to extract
Extract's a user's tweets
"""
final = ResultSet() #can change to resultset later if I want
try:
for status in tweepy.Cursor(api.user_timeline, screen_name=handle, count=200, include_rts=True).items(number):
final.append(status)
except:
print("{} is a protected user!")
return []
return final
# def remove_retweets(lst):
# """
# Given a ResultSet of tweets, removes those that are RT's
# Returns a ResultSet.
# """
# assert type(lst) == ResultSet, "Please enter a ResultSet of user's tweets to be filtered."
# final = ResultSet()
# if hasattr(lst[0], "text"):
# aux = (x for x in lst if "RT @" not in x.text) #dont need it to be a list
# for i in aux:
# final.append(i)
# else:
# aux = (x for x in lst if "RT @" not in x.full_text) #dont need it to be a list
# for i in aux:
# final.append(i)
# return final
def average_retweets(lst, handle):
"""
Argument Order: lst, handle
Given a ResultSet of tweets, calculate the average retweet count for all tweets in ResultSet.
Be sure to only apply this on a ResultSet that excludes retweets.
This function would be amazing with firehose API
"""
assert type(lst) == ResultSet, "Please enter a ResultSet of user's tweets."
count = 0
for tweet in lst:
count += tweet.retweet_count
return count/len(lst)
def get_all_following(handle):
"""
Argument Order: handle
Returns all the followers for a particular handle.
Warning: This burns through rate limit
"""
final = []
for friend in tweepy.Cursor(api.friends, screen_name=handle, count=200).items():
final.append(friend.screen_name)
return final
def get_100_following(handle):
"""
Argument Order: handle
Returns the 100 most recent handles that the specified user followed.
This function has been optimised for rate limiting.
NOTE: If given access to firehose API - this function could be altered slightly to obtain all friends.
"""
final = []
try:
for friend in tweepy.Cursor(api.friends, screen_name=handle, count=100).items(100):
final.append(friend.screen_name)
except:
print("Skipping - {} has protected tweets!".format(handle))
return []
return final
def second_layer_following(lst):
"""
Argument Order: lst
For a given list of twitter handles, extract who they follow.
This function will only extract the first 100 of followers for a given handle - this is due to rate limiting.
This function will return a 'flat' list of all followers.
NOTE: If given access to firehose API - this function could be altered slightly to obtain the entire secondary layer
"""
cnt = 0
second_layer = []
for handle in lst:
print("processing {}".format(handle))
second_layer.append(get_100_following(handle))
print()
cnt+= 1
if cnt%10 == 0:
print()
print("processed {} handles from a total of {}".format(cnt, len(lst)))
print()
flat_second_layer = sum(second_layer, [])
return flat_second_layer
def random_sample_lst(lst):
"""
Argument Order: lst
Extracts the a random 25% of a given list
"""
return random.sample(lst, len(lst)//4)
def distinct(lst1, lst2):
"""
Argument order: source following list, accumulated source's following list
"""
following = lst1
second_layer_following = lst2
unique = set(following)
final = [x for x in second_layer_following if x not in unique]
return final