This repository was archived by the owner on Jul 11, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAccountActivity.py
109 lines (91 loc) · 4.46 KB
/
AccountActivity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import tweepy
import logging
import logging.handlers
from tweepy.binder import bind_api
from tweepy.error import TweepError
from tweepy.parsers import JSONParser, Parser
# Extensions to Tweepy for the Account Activity API
class ActivityAPI(tweepy.API):
pass
def enable_activity(self, *args, **kwargs):
""" :reference: https://developer.twitter.com/en/docs/accounts-and-users/subscribe-account-activity/api-reference/aaa-premium
:allowed_param:'url', 'env'
"""
post_data = {}
env = kwargs.pop('env')
if env is not None:
apiPath = '/account_activity/all/'+env+'/webhooks.json'
else:
apiPath = '/account_activity/all/webhooks.json'
self.parser=JSONParser()
return bind_api(
api=self,
path=apiPath,
method='POST',
payload_type='status',
allowed_param=['url'],
require_auth=True,
use_cache=False,
)(post_data=post_data, *args, **kwargs)
def subscribe_activity(self, *args, **kwargs):
""" :reference: https://developer.twitter.com/en/docs/accounts-and-users/subscribe-account-activity/api-reference/aaa-premium
:allowed_param:'url', 'env'
"""
post_data = {}
env = kwargs.pop("env")
apiPath = '/account_activity/all/'+env+'/subscriptions.json'
return bind_api(
api=self,
path=apiPath,
method='POST',
payload_type='status',
require_auth=True
)(post_data=post_data, *args, **kwargs)
def subscription(self, *args, **kwargs):
""" :reference: https://developer.twitter.com/en/docs/accounts-and-users/subscribe-account-activity/api-reference/aaa-premium
:allowed_param:'url', 'env'
"""
post_data = {}
env = kwargs.pop('env')
if env is not None:
apiPath = '/account_activity/all/'+env+'/subscriptions.json'
else:
apiPath = '/account_activity/all/subscriptions.json'
return bind_api(api=self, path=apiPath, method='GET', payload_type='status', require_auth=True, use_cache=False)()
def getWebID(self, *args, **kwargs):
""":reference: https://developer.twitter.com/en/docs/accounts-and-users/subscribe-account-activity/api-reference/aaa-premium
:allowed_param:'env'.
:returns: an array of JSON. The Twitter documentation is out of date the current format is
[{"id":"9999999999999999","url":"https://api.blah-blah/webhook","valid":true,"created_timestamp":"2019-09-16 16:47:16 +0000"}]
"""
post_data = {}
env = kwargs.pop('env')
if env is not None:
apiPath = '/account_activity/all/'+env+'/webhooks.json'
else:
apiPath = '/account_activity/all/webhooks.json'
return bind_api(api=self, path=apiPath, method='GET', payload_type='json', require_auth=True, use_cache=False)()
def getSubsUsers(self, *args, **kwargs):
""":reference: https://developer.twitter.com/en/docs/accounts-and-users/subscribe-account-activity/api-reference/aaa-premium
:allowed_param:'env'.
:returns: an array of JSON.
"""
post_data = {}
env = kwargs.pop('env')
if env is not None:
apiPath = '/account_activity/all/'+env+'/subscriptions/list.json'
else:
apiPath = '/account_activity/all/subscriptions/list.json'
return bind_api(api=self, path=apiPath, method='GET', payload_type='json', require_auth=True, use_cache=False)()
def getSubsCount(self, *args, **kwargs):
""":reference: https://developer.twitter.com/en/docs/accounts-and-users/subscribe-account-activity/api-reference/aaa-premium
:allowed_param:'env'.
:returns: an array of JSON.
"""
post_data = {}
env = kwargs.pop('env')
if env is not None:
apiPath = '/account_activity/all/subscriptions/count.json'
else:
apiPath = '/account_activity/all/subscriptions/count.json'
return bind_api(api=self, path=apiPath, method='GET', payload_type='json', require_auth=True, use_cache=False)()