11import base64
2- from asyncio import get_running_loop
2+ import encodings .utf_8
3+ from asyncio import get_running_loop , AbstractEventLoop
34
45from aiohttp import ClientSession
5- from mixpanel import MixpanelException
6+ from mixpanel import MixpanelException , Consumer
67
8+ _encoding = encodings .utf_8 .getregentry ().name
79
8- class AIOConsumer (object ):
9- def __init__ (self , client : ClientSession , events_url = None , people_url = None , import_url = None ):
10- self ._endpoints = {
11- 'events' : events_url or 'https://api.mixpanel.com/track' ,
12- 'people' : people_url or 'https://api.mixpanel.com/engage' ,
13- 'imports' : import_url or 'https://api.mixpanel.com/import' ,
14- }
10+
11+ class AIOConsumer (Consumer ):
12+ """
13+ A aiohttp-based consumer that sends an HTTP request directly to the Mixpanel service, one
14+ per call to :meth:`~.send`.
15+
16+ :param ClientSession client: a client session to use throughout the Consumer lifetime
17+ :param AbstractEventLoop loop: override current loop the relevant loop to create tasks with
18+ :param str events_url: override the default events API endpoint
19+ :param str people_url: override the default people API endpoint
20+ :param str import_url: override the default import API endpoint
21+ """
22+
23+ def __init__ (self , client : ClientSession , loop : AbstractEventLoop = None ,
24+ events_url = None , people_url = None , import_url = None ):
25+ super ().__init__ (events_url , people_url , import_url )
26+ self ._loop = loop or get_running_loop ()
1527 self ._client = client
1628
1729 def send (self , endpoint : str , json_message : str , api_key : str = None ):
18- """Immediately record an event or a profile update.
30+ """
31+ Record an event or a profile update.
32+
1933 :param endpoint: the Mixpanel API endpoint appropriate for the message
2034 :type endpoint: "events" | "people" | "imports"
2135 :param str json_message: a JSON message formatted for the endpoint
@@ -24,15 +38,16 @@ def send(self, endpoint: str, json_message: str, api_key: str = None):
2438 unreachable, or the message cannot be processed
2539 """
2640 if endpoint in self ._endpoints :
27- loop = get_running_loop ()
28- loop .create_task (self ._write_request (self ._endpoints [endpoint ], json_message , api_key ))
41+ self ._loop .create_task (self ._write_request (self ._endpoints [endpoint ], json_message , api_key ))
2942 else :
3043 raise MixpanelException ('No such endpoint "{0}". Valid endpoints are one of {1}'
3144 .format (endpoint , self ._endpoints .keys ()))
3245
33- async def _write_request (self , url : str , json_message : str , api_key : str ):
46+ async def _write_request (self , request_url : str , json_message : str , api_key : str = None ):
47+ b64message = base64 .b64encode (json_message .encode (_encoding )).decode (_encoding )
48+
3449 data = {
35- 'data' : base64 . b64encode ( json_message . encode ( 'utf8' )). decode ( "utf8" ) ,
50+ 'data' : b64message ,
3651 'verbose' : 1 ,
3752 'ip' : 0 ,
3853 }
@@ -41,12 +56,12 @@ async def _write_request(self, url: str, json_message: str, api_key: str):
4156 data .update (dict (api_key = api_key ))
4257
4358 try :
44- response = await self ._client .get (url , params = data , raise_for_status = True )
59+ response = await self ._client .get (request_url , params = data , raise_for_status = True )
4560 json = await response .json ()
4661 except Exception as e :
4762 raise MixpanelException (e )
4863
49- if json [ 'status' ] != 1 :
50- raise MixpanelException ('Mixpanel error: {0}' .format (json [ 'error' ] ))
64+ if json . get ( 'status' ) != 1 :
65+ raise MixpanelException ('Mixpanel error: {0}' .format (json . get ( 'error' ) ))
5166
5267 return True
0 commit comments