Skip to content

Commit 90e981f

Browse files
author
Jack Kordas
committed
async log event
1 parent 8b903b2 commit 90e981f

File tree

1 file changed

+61
-0
lines changed

1 file changed

+61
-0
lines changed

client/globus_cw_client/client.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Python client API for cwlogs daemon
33
"""
44

5+
import asyncio
56
import json
67
import socket
78
import time
@@ -41,6 +42,25 @@ def log_event(message, retries=10, wait=0.1):
4142
req["timestamp"] = int(time.time() * 1000)
4243
return _request(req, retries, wait)
4344

45+
async def log_event_async(message, retries=10, wait=0.1):
46+
if isinstance(message, bytes):
47+
message = message.decode("utf-8")
48+
_checktype(message, str, "message type must be bytes or unicode")
49+
50+
_checktype(retries, int, "retries must be an int")
51+
if retries < 0:
52+
raise ValueError("retries must be non-negative")
53+
54+
_checktype(wait, (int, float), "wait must be an int or float")
55+
if wait < 0:
56+
raise ValueError("wait must be non-negative")
57+
58+
req = {}
59+
req["message"] = message
60+
req["timestamp"] = int(time.time() * 1000)
61+
return await _request_async(req, retries, wait)
62+
63+
4464

4565
def _connect(retries, wait):
4666
"""
@@ -63,6 +83,21 @@ def _connect(retries, wait):
6383
raise CWLoggerConnectionError("couldn't connect to cw", error)
6484

6585

86+
async def _connect_async(retries, wait):
87+
addr = "\0org.globus.cwlogs"
88+
for _ in range(retries + 1):
89+
try:
90+
reader, writer = asyncio.open_unix_connection(path=addr)
91+
except Exception as err:
92+
if writer:
93+
writer.close()
94+
error = err
95+
else:
96+
return reader, writer
97+
time.sleep(wait):
98+
99+
100+
66101
def _request(req, retries, wait):
67102
buf = json.dumps(req, indent=None) + "\n"
68103
# dumps returns unicode with python3, but sock requires bytes
@@ -92,6 +127,32 @@ def _request(req, retries, wait):
92127
raise CWLoggerDaemonError("unknown response type", d)
93128

94129

130+
async def _request_async(req, retries, wait):
131+
buf = json.dumps(req, indent=None) + "\n"
132+
# dumps returns unicode with python3, but sock requires bytes
133+
if isinstance(buf, str):
134+
buf = buf.encode("utf-8")
135+
136+
reader, writer = await _connect_async(retries, wait)
137+
writer.write(buf)
138+
await writer.drain()
139+
140+
resp = await reader.readline()
141+
if not resp.endswith(b"\n"):
142+
raise Exception("no data")
143+
resp = resp.decode("utf-8")
144+
145+
d = json.loads(resp[:-1])
146+
if isinstance(d, dict):
147+
status = d["status"]
148+
if status == "ok":
149+
return d
150+
else:
151+
raise CWLoggerDaemonError("forwarded error", d["message"])
152+
else:
153+
raise CWLoggerDaemonError("unknown response type", d)
154+
155+
95156
"""
96157
Ignore (swallow) these exceptions at your own risk.
97158
CWLoggerDaemonError can be caused by many things, including but not limited to:

0 commit comments

Comments
 (0)