Skip to content

Commit b02e92d

Browse files
committed
[dagster-pipes] add ThreadedPipesMessageReader with extra runtime params
1 parent 784252e commit b02e92d

File tree

4 files changed

+229
-66
lines changed

4 files changed

+229
-66
lines changed

python_modules/dagster/dagster/_core/pipes/client.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from abc import ABC, abstractmethod
22
from contextlib import contextmanager
3-
from typing import TYPE_CHECKING, Any, Iterator, List, Optional, Sequence
3+
from typing import TYPE_CHECKING, Any, Iterator, List, Mapping, Optional, Sequence, TypedDict
44

55
from dagster_pipes import (
66
DagsterPipesError,
@@ -143,7 +143,18 @@ def no_messages_debug_text(self) -> str:
143143
"""
144144

145145

146+
class PipesLaunchedData(TypedDict):
147+
"""Payload generated on the Client side after the external process startup
148+
containing arbitrary information about the external process.
149+
"""
150+
151+
extras: Mapping[str, Any]
152+
153+
146154
class PipesMessageReader(ABC):
155+
launched_payload: Optional[PipesLaunchedData]
156+
opened_payload: Optional[PipesOpenedData]
157+
147158
@abstractmethod
148159
@contextmanager
149160
def read_messages(self, handler: "PipesMessageHandler") -> Iterator[PipesParams]:
@@ -170,6 +181,12 @@ def on_opened(self, opened_payload: PipesOpenedData) -> None:
170181
that can only be obtained from the external process.
171182
"""
172183

184+
def on_launched(self, params: Mapping[str, Any]) -> None:
185+
"""Can be called manually to submit extra information about the launched process
186+
to the message reader. Populates the `launched_payload` attribute.
187+
"""
188+
self.opened_payload = {"extras": params}
189+
173190
@abstractmethod
174191
def no_messages_debug_text(self) -> str:
175192
"""A message to be displayed when no messages are received from the external process to aid with

python_modules/dagster/dagster/_core/pipes/context.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@ def _resolve_metadata_value(
141141
# Type ignores because we currently validate in individual handlers
142142
def handle_message(self, message: PipesMessage) -> None:
143143
if self._received_closed_msg:
144-
self._context.log.warn(f"[pipes] unexpected message received after closed: `{message}`")
144+
self._context.log.warning(
145+
f"[pipes] unexpected message received after closed: `{message}`"
146+
)
145147

146148
method = cast(Method, message["method"])
147149
if method == "opened":

0 commit comments

Comments
 (0)