forked from whole-tale/deploy-dev
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_stream.py
More file actions
executable file
·28 lines (25 loc) · 848 Bytes
/
Copy pathtest_stream.py
File metadata and controls
executable file
·28 lines (25 loc) · 848 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python
import json
import pprint
import sseclient
import sys
import time
token = sys.argv[1]
def with_urllib3(url):
"""Get a streaming response for the given event feed using urllib3."""
import urllib3
http = urllib3.PoolManager()
return http.request('GET', url, preload_content=False)
def with_requests(url):
"""Get a streaming response for the given event feed using requests."""
import requests
headers = {'Girder-Token': token}
return requests.get(url, stream=True, headers=headers)
t = int(time.time())
url = 'https://girder.local.wholetale.org/api/v1/notification/stream?since={}'.format(t)
response = with_requests(url)
client = sseclient.SSEClient(response)
for event in client.events():
data = json.loads(event.data)
if (data['type'] == 'wt_progress'):
pprint.pprint(data)