-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlistener.py
More file actions
309 lines (254 loc) · 9.56 KB
/
listener.py
File metadata and controls
309 lines (254 loc) · 9.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#!/usr/bin/python
#
"""Darling Harbour Tree-of-Light Twitter Listener Module
This module listens to Twitter, and passes authenticated commands along to the parser.
It has one queues, outbound to the parser.
Homepage and documentation: http://dev.moorescloud.com/
Copyright (c) 2013, Mark Pesce.
License: MIT (see LICENSE for details)
"""
__author__ = 'Mark Pesce'
__version__ = '1.0a1'
__license__ = 'MIT'
import time, os, stat, sys, json, threading, logging
from multiprocessing import Queue
# Very sorry -- the OAuth code was written using one module package,
# While the streaming is done using another. Inefficient, but it works.
from twitter.oauth_dance import oauth_dance
import twitter
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy import API
# File name for the oauth info
#
# This will work for *NIX systems, not sure for Windows.
#
fn = os.path.join(os.path.expanduser('~'),'.tol-oauth')
# New codes specific for the Tree-of-Light twitter application
consumer_secret=con_secret = "5BOTziGnWOuGRNIWyBknuKck7Rn4gUPgO9EusgKsJhI"
consumer_key=con_key = "Hzj8ndSL6cGEjXOwMltRBQ"
# Do we have the correct OAuth credentials?
# If credentials exist, test them.
# If they fail, delete them.
# If they do not exist or fail, create them.
#
def check_twitter_auth():
authorized = False
if os.path.isfile(fn): # Does the token file exist?
tokens = twitter.oauth.read_token_file(fn)
#print 'OAuth tokens exist, will try to authorize with them...'
twapi = twitter.Twitter(auth = twitter.OAuth(token=tokens[0],
token_secret=tokens[1],
consumer_secret=con_secret,
consumer_key=con_key))
try:
result = twapi.account.verify_credentials()
twitter_id = result['id']
twitter_handle = result['screen_name']
#print 'Good, we seem to be authorized for username %s with id %d' % (twitter_handle, int(twitter_id))
authorized = twapi
#globs.twitter_apis.append(authorized) # Put that into the global module
#logging.debug("Got the API for twitter, appended in globs as element %d" % len(globs.twitter_apis))
except twitter.TwitterError as e:
logging.debug("Call failed, we don't seem to be authorized with existing credentials. Deleting...")
print e
os.remove(fn)
if authorized == False: # If not authorized, do the OAuth dance
logging.debug("Authorizing the app...")
tokens = oauth_dance(app_name='TreeOfLight', consumer_key=con_key, consumer_secret=con_secret, token_filename=fn)
os.chmod(fn, stat.S_IRUSR | stat.S_IWUSR) # Read/write, user-only
#
# Get an open API object for Twitter
#
twapi = twitter.Twitter(auth = twitter.OAuth(token=tokens[0],
token_secret=tokens[1],
consumer_secret=con_secret,
consumer_key=con_key))
try: # Is this going to work?
result = twapi.account.verify_credentials()
twitter_id = result['id']
twitter_handle = result['screen_name']
logging.debug("Good, we seem to be authorized for username %s with id %d" % (twitter_handle, int(twitter_id)))
authorized = twapi
except twitter.TwitterError as e: # Something bad happening, abort, abort!
logging.debug("Call failed, we don't seem to be authorized with new credentials. Deleting...")
print e
os.remove(fn)
return authorized
class UserVerify(object):
"""This object is used to keep track of users tweeting the Tree-of-Light
In order to assure they don't flood the Tree with tweets.
Right now we'll allow 5 accesses in a 10 minute period. After that, we block.
We sin bin them for 15 minutes."""
def clean_naughtylist(self):
"""This thread keeps the naughtylist cleaned."""
while True:
time.sleep(15) # Sleep for 15 seconds
current_timestamp = time.time()
#logging.debug("Cleaning naughtylist")
for naughty in self.naughtylist:
if (self.naughtylist[naughty] + self.SIN_BIN_SECS) < current_timestamp: # Time out?
del self.naughtylist[naughty] # Remove from list
def __init__(self):
"""Set up the data structures for the UserVerify object"""
# Will likely read whitelist and blacklist in from file system
self.MAX_HITS = 5 # 5 hits in 10 minutes, max
self.MAX_SECONDS = 600
self.SIN_BIN_SECS = 900 # 15 minutes in sin bin
try:
f = open('tol-whitelist.json')
d = f.read()
self.whitelist = json.loads(d)
logging.debug(self.whitelist)
except:
logging.debug("Could not read whitelist")
self.whitelist = []
try:
f = open('tol-blacklist.json')
d = f.read()
self.blacklist = json.loads(d)
logging.debug(self.blacklist)
except:
logging.debug("Could not read blacklist")
self.blacklist = []
self.userlist = {}
self.naughtylist = {}
# And start the cleaner thread
self.cleaner = threading.Thread(target=self.clean_naughtylist)
self.cleaner.start()
return
def test(self, username):
"""Return True if the user has permission to send a Tweet to the Tree, otherwise false"""
# Check on always whitelist (moorescloud, Roger, SHFA, etc)
if self.on_whitelist(username) == True:
return True
# Check on blacklist
if self.on_blacklist(username) == True:
return False
if self.on_naughtylist(username) == True:
return False
# Check regular list
current_timestamp = time.time()
if username in self.userlist:
# In the userlist, let's have a look at their statistics
# Count the number of tweets in the last 10 minutes
# If more than 5, they go onto the naughtylist
# datas tuple contains count, last timestamp, first timestamp
(hit_count, last_timestamp, first_timestamp) = self.userlist[username]
if ++hit_count > self.MAX_HITS:
add_naughtylist(username, current_timestamp)
del self.userlist[username] # delete entry from this list
return False
else:
last_timestamp = current_timestamp
self.userlist[username] = (hit_count, last_timestamp, first_timestamp)
return True
else:
# We need to insert into the user list
# Then return True
self.userlist[username] = (1, current_timestamp, current_timestamp)
return True
return True # Hope we never get here -- can we?
def add_naughtylist(self, username, current_timestamp):
"""Add a user to the sin bin for trying to be too chatty"""
self.naughtylist['username'] = current_timestamp
def on_whitelist(self, username):
"""Return True if on the whitelist"""
if username in self.whitelist:
return True
return False
def on_blacklist(self, username):
"""Return true if on the blacklist"""
if username in self.blacklist:
return True
return False
def on_naughtylist(self, username):
"""If the user is on the naughtlist, that's extra naughty - we reset their timer"""
if username in self.naughtylist:
self.naughtlist[username] = time.time()
return True
return False
# Instance the UserVerify object
uv = UserVerify()
class StdOutListener(StreamListener):
""" A listener handles tweets are the received from the stream.
Once received, their passed along for user testing and parsing.
"""
def on_data(self, data):
global uv, cmd_parser_queue
#printme("Got data")
#printme(data)
try:
djt = json.loads(data)
#printme(djt)
except ValueError:
printme("No JSON-parseable data, ignoring...")
return True
try:
user = djt['user']['screen_name']
printme(user)
if uv.test(user) == True:
# Got a good user, we can pass this along to the parser...
msg = djt['text']
#print msg
# Let's enqueue the message to the Command Parser
printme("Enqueuing %s from %s" % (msg, user))
cmd_parser_queue.put([user, msg])
# and we're done here
except KeyError:
printme("KeyError, skipping...")
pass
return True
def on_error(self, status):
logging.error(status)
cmd_parser_queue = None # Set up a global variable
def run(parser_queue):
"""So this can be loaded as a module and run via multiprocessing"""
global cmd_parser_queue
cmd_parser_queue = parser_queue
# Log into Twitter, get credentials.
#try:
# if (check_twitter_auth() == False):
# sys.exit()
# logging.debug("Authorized")
#except:
# logging.critical("FATAL: Authorization failed, exiting process.")
# We need to figure out what to do to recover from this failure, if it happens.
# sys.exit()
tokens = twitter.oauth.read_token_file(fn)
printme("We have authorization tokens")
l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(tokens[0], tokens[1])
# # Setup an API thingy
# try:
# a_thingy = API(auth)
# logging.debug("Got API of %s" % a_thingy)
# except:
# logging.critical("Failed to get the API for tweepy!")
# Set up the stream listener
stream = Stream(auth, l)
# Suspect we need to make a different call here that just gets all mentions
#stream.filter(track=[self.searchterm]) # Blocking call. We do not come back.
stream.userstream() # Blocking call. We do not come back. We think this is right. Possibly.
# If we've gotten here, the stream has died, in which case we need to restart this whole business.
# I've noticed the stream does die periodically. All we can really do is restart. And hope.
#
logging.critical("Stream died, restarting...")
import subprocess
subprocess.call(["/home/mpesce/scripts/restart.sh"]) # This should kill us dead, and restart us.
def printme(str):
"""A print function that can switch quickly to logging"""
#print(str)
logging.debug(str)
if __name__ == '__main__':
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
printme('Logging initialized')
printme("Running listener module from the command line.")
cmd_parser_queue = None
parser_queue = Queue()
printme("one")
printme("two")
printme("three")
run(parser_queue)