Skip to content

Commit 23d5cbf

Browse files
authored
[Ready for review] add an option to make sidecar send msg func threadsafe (#1757)
* add an option to make sidecar send msg func threadsafe * fix send_mustsend threadsafe issue * fix typo
1 parent d23a526 commit 23d5cbf

File tree

2 files changed

+45
-19
lines changed

2 files changed

+45
-19
lines changed

metaflow/sidecar/sidecar.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,24 @@ def __init__(self, sidecar_type):
1313
if t is not None and t.get_worker() is not None:
1414
self._has_valid_worker = True
1515
self.sidecar_process = None
16+
# Whether to send msg in a thread-safe fashion.
17+
self._threadsafe_send_enabled = False
1618

1719
def start(self):
1820
if not self.is_active and self._has_valid_worker:
1921
self.sidecar_process = SidecarSubProcess(self._sidecar_type)
2022

23+
def enable_threadsafe_send(self):
24+
self._threadsafe_send_enabled = True
25+
26+
def disable_threadsafe_send(self):
27+
self._threadsafe_send_enabled = False
28+
2129
def send(self, msg):
2230
if self.is_active:
23-
self.sidecar_process.send(msg)
31+
self.sidecar_process.send(
32+
msg, thread_safe_send=self._threadsafe_send_enabled
33+
)
2434

2535
def terminate(self):
2636
if self.is_active:

metaflow/sidecar/sidecar_subprocess.py

+34-18
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
except:
2626
blockingError = OSError
2727

28+
import threading
29+
30+
lock = threading.Lock()
31+
2832

2933
class PipeUnavailableError(Exception):
3034
"""raised when unable to write to pipe given allotted time"""
@@ -113,16 +117,16 @@ def kill(self):
113117
except:
114118
pass
115119

116-
def send(self, msg, retries=3):
120+
def send(self, msg, retries=3, thread_safe_send=False):
117121
if msg.msg_type == MessageTypes.MUST_SEND:
118122
# If this is a must-send message, we treat it a bit differently. A must-send
119123
# message has to be properly sent before any of the other best effort messages.
120124
self._cached_mustsend = msg.payload
121125
self._send_mustsend_remaining_tries = MUST_SEND_RETRY_TIMES
122-
self._send_mustsend(retries)
126+
self._send_mustsend(retries, thread_safe_send)
123127
else:
124128
# Ignore return code for send.
125-
self._send_internal(msg, retries=retries)
129+
self._send_internal(msg, retries=retries, thread_safe_send=thread_safe_send)
126130

127131
def _start_subprocess(self, cmdline):
128132
for _ in range(3):
@@ -145,7 +149,7 @@ def _start_subprocess(self, cmdline):
145149
self._logger("Unknown popen error: %s" % repr(e))
146150
break
147151

148-
def _send_internal(self, msg, retries=3):
152+
def _send_internal(self, msg, retries=3, thread_safe_send=False):
149153
if self._process is None:
150154
return False
151155
try:
@@ -157,13 +161,13 @@ def _send_internal(self, msg, retries=3):
157161
# restart sidecar so use the PipeUnavailableError caught below
158162
raise PipeUnavailableError()
159163
elif self._send_mustsend_remaining_tries > 0:
160-
self._send_mustsend()
164+
self._send_mustsend(thread_safe_send=thread_safe_send)
161165
if self._send_mustsend_remaining_tries == 0:
162-
self._emit_msg(msg)
166+
self._emit_msg(msg, thread_safe_send)
163167
self._prev_message_error = False
164168
return True
165169
else:
166-
self._emit_msg(msg)
170+
self._emit_msg(msg, thread_safe_send)
167171
self._prev_message_error = False
168172
return True
169173
return False
@@ -184,22 +188,24 @@ def _send_internal(self, msg, retries=3):
184188
self._prev_message_error = True
185189
if retries > 0:
186190
self._logger("Retrying msg send to sidecar (due to %s)" % repr(ex))
187-
return self._send_internal(msg, retries - 1)
191+
return self._send_internal(msg, retries - 1, thread_safe_send)
188192
else:
189193
self._logger(
190194
"Error sending log message (exhausted retries): %s" % repr(ex)
191195
)
192196
return False
193197

194-
def _send_mustsend(self, retries=3):
198+
def _send_mustsend(self, retries=3, thread_safe_send=False):
195199
if (
196200
self._cached_mustsend is not None
197201
and self._send_mustsend_remaining_tries > 0
198202
):
199203
# If we don't succeed in sending the must-send, we will try again
200204
# next time.
201205
if self._send_internal(
202-
Message(MessageTypes.MUST_SEND, self._cached_mustsend), retries
206+
Message(MessageTypes.MUST_SEND, self._cached_mustsend),
207+
retries,
208+
thread_safe_send,
203209
):
204210
self._cached_mustsend = None
205211
self._send_mustsend_remaining_tries = 0
@@ -211,14 +217,7 @@ def _send_mustsend(self, retries=3):
211217
self._send_mustsend_remaining_tries = -1
212218
return False
213219

214-
def _emit_msg(self, msg):
215-
# If the previous message had an error, we want to prepend a "\n" to this message
216-
# to maximize the chance of this message being valid (for example, if the
217-
# previous message only partially sent for whatever reason, we want to "clear" it)
218-
msg = msg.serialize()
219-
if self._prev_message_error:
220-
msg = "\n" + msg
221-
msg_ser = msg.encode("utf-8")
220+
def _write_bytes(self, msg_ser):
222221
written_bytes = 0
223222
while written_bytes < len(msg_ser):
224223
# self._logger("Sent %d out of %d bytes" % (written_bytes, len(msg_ser)))
@@ -235,6 +234,23 @@ def _emit_msg(self, msg):
235234
# sidecar is disabled, ignore all messages
236235
break
237236

237+
def _emit_msg(self, msg, thread_safe_send=False):
238+
# If the previous message had an error, we want to prepend a "\n" to this message
239+
# to maximize the chance of this message being valid (for example, if the
240+
# previous message only partially sent for whatever reason, we want to "clear" it)
241+
msg = msg.serialize()
242+
if self._prev_message_error:
243+
msg = "\n" + msg
244+
msg_ser = msg.encode("utf-8")
245+
246+
# If threadsafe send is enabled, we will use a lock to ensure that only one thread
247+
# can send a message at a time. This is to avoid interleaving of messages.
248+
if thread_safe_send:
249+
with lock:
250+
self._write_bytes(msg_ser)
251+
else:
252+
self._write_bytes(msg_ser)
253+
238254
def _logger(self, msg):
239255
if debug.sidecar:
240256
print("[sidecar:%s] %s" % (self._worker_type, msg), file=sys.stderr)

0 commit comments

Comments
 (0)