Skip to content

Commit 5d99fb8

Browse files
authored
feat: support headers for message (numaproj#138)
Signed-off-by: Yashash H L <[email protected]>
1 parent 04914fa commit 5d99fb8

40 files changed

+295
-108
lines changed

examples/sink/log/example.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ def handler(self, datums: Iterator[Datum]) -> Responses:
2020
def udsink_handler(datums: Iterator[Datum]) -> Responses:
2121
responses = Responses()
2222
for msg in datums:
23-
_LOGGER.info("User Defined Sink %s", msg.value.decode("utf-8"))
23+
_LOGGER.info(
24+
"User Defined Sink: Payload %s , Headers %s", msg.value.decode("utf-8"), msg.headers
25+
)
2426
responses.append(Response.as_success(msg.id))
2527
return responses
2628

examples/source/async-source/example.py

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import uuid
2+
13
from collections.abc import AsyncIterable
24
from datetime import datetime
35

@@ -37,10 +39,12 @@ async def read_handler(self, datum: ReadRequest) -> AsyncIterable[Message]:
3739
return
3840

3941
for x in range(datum.num_records):
42+
headers = {"x-txn-id": str(uuid.uuid4())}
4043
yield Message(
4144
payload=str(self.read_idx).encode(),
4245
offset=Offset.offset_with_default_partition_id(str(self.read_idx).encode()),
4346
event_time=datetime.now(),
47+
headers=headers,
4448
)
4549
self.to_ack_set.add(str(self.read_idx))
4650
self.read_idx += 1

examples/source/simple-source/example.py

+5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import uuid
2+
13
from collections.abc import Iterable
24
from datetime import datetime
35

@@ -36,11 +38,14 @@ def read_handler(self, datum: ReadRequest) -> Iterable[Message]:
3638
if self.to_ack_set:
3739
return
3840

41+
headers = {"x-txn-id": str(uuid.uuid4())}
42+
3943
for x in range(datum.num_records):
4044
yield Message(
4145
payload=str(self.read_idx).encode(),
4246
offset=Offset.offset_with_default_partition_id(str(self.read_idx).encode()),
4347
event_time=datetime.now(),
48+
headers=headers,
4449
)
4550
self.to_ack_set.add(str(self.read_idx))
4651
self.read_idx += 1

pynumaflow/mapper/_dtypes.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from collections.abc import Iterator, Sequence, Awaitable
33
from dataclasses import dataclass
44
from datetime import datetime
5-
from typing import TypeVar, Callable, Union
5+
from typing import TypeVar, Callable, Union, Optional
66
from warnings import warn
77

88
from pynumaflow._constants import DROP
@@ -106,33 +106,39 @@ class Datum:
106106
value: the payload of the event.
107107
event_time: the event time of the event.
108108
watermark: the watermark of the event.
109+
headers: the headers of the event.
110+
109111
>>> # Example usage
110112
>>> from pynumaflow.mapper import Datum
111113
>>> from datetime import datetime, timezone
112114
>>> payload = bytes("test_mock_message", encoding="utf-8")
113115
>>> t1 = datetime.fromtimestamp(1662998400, timezone.utc)
114116
>>> t2 = datetime.fromtimestamp(1662998460, timezone.utc)
117+
>>> msg_headers = {"key1": "value1", "key2": "value2"}
115118
>>> d = Datum(
116119
... keys=["test_key"],
117120
... value=payload,
118121
... event_time=t1,
119122
... watermark=t2,
123+
... headers=msg_headers,
120124
... )
121125
"""
122126

123-
__slots__ = ("_keys", "_value", "_event_time", "_watermark")
127+
__slots__ = ("_keys", "_value", "_event_time", "_watermark", "_headers")
124128

125129
_keys: list[str]
126130
_value: bytes
127131
_event_time: datetime
128132
_watermark: datetime
133+
_headers: dict[str, str]
129134

130135
def __init__(
131136
self,
132137
keys: list[str],
133138
value: bytes,
134139
event_time: datetime,
135140
watermark: datetime,
141+
headers: Optional[dict[str, str]] = None,
136142
):
137143
self._keys = keys or list()
138144
self._value = value or b""
@@ -142,7 +148,9 @@ def __init__(
142148
if not isinstance(watermark, datetime):
143149
raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
144150
self._watermark = watermark
151+
self._headers = headers or {}
145152

153+
@property
146154
def keys(self) -> list[str]:
147155
"""Returns the keys of the event"""
148156
return self._keys
@@ -162,6 +170,11 @@ def watermark(self) -> datetime:
162170
"""Returns the watermark of the event."""
163171
return self._watermark
164172

173+
@property
174+
def headers(self) -> dict[str, str]:
175+
"""Returns the headers of the event."""
176+
return self._headers.copy()
177+
165178

166179
class Mapper(metaclass=ABCMeta):
167180
"""

pynumaflow/mapper/servicer/async_servicer.py

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async def MapFn(
3838
value=request.value,
3939
event_time=request.event_time.ToDatetime(),
4040
watermark=request.watermark.ToDatetime(),
41+
headers=request.headers,
4142
),
4243
)
4344
except Exception as e:

pynumaflow/mapper/servicer/utils.py

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def _map_fn_util(
2020
value=request.value,
2121
event_time=request.event_time.ToDatetime(),
2222
watermark=request.watermark.ToDatetime(),
23+
headers=dict(request.headers),
2324
),
2425
)
2526
except Exception as err:

pynumaflow/mapstreamer/_dtypes.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from collections.abc import Iterator, Sequence
33
from dataclasses import dataclass
44
from datetime import datetime
5-
from typing import TypeVar, Callable, Union
5+
from typing import TypeVar, Callable, Union, Optional
66
from collections.abc import AsyncIterable
77
from warnings import warn
88

@@ -107,33 +107,39 @@ class Datum:
107107
value: the payload of the event.
108108
event_time: the event time of the event.
109109
watermark: the watermark of the event.
110+
headers: the headers of the event.
111+
110112
>>> # Example usage
111113
>>> from pynumaflow.mapstreamer import Datum
112114
>>> from datetime import datetime, timezone
113115
>>> payload = bytes("test_mock_message", encoding="utf-8")
114116
>>> t1 = datetime.fromtimestamp(1662998400, timezone.utc)
115117
>>> t2 = datetime.fromtimestamp(1662998460, timezone.utc)
118+
>>> msg_headers = {"key1": "value1", "key2": "value2"}
116119
>>> d = Datum(
117120
... keys=["test_key"],
118121
... value=payload,
119122
... event_time=t1,
120123
... watermark=t2,
124+
... headers=msg_headers,
121125
... )
122126
"""
123127

124-
__slots__ = ("_keys", "_value", "_event_time", "_watermark")
128+
__slots__ = ("_keys", "_value", "_event_time", "_watermark", "_headers")
125129

126130
_keys: list[str]
127131
_value: bytes
128132
_event_time: datetime
129133
_watermark: datetime
134+
_headers: dict[str, str]
130135

131136
def __init__(
132137
self,
133138
keys: list[str],
134139
value: bytes,
135140
event_time: datetime,
136141
watermark: datetime,
142+
headers: Optional[dict[str, str]] = None,
137143
):
138144
self._keys = keys or list()
139145
self._value = value or b""
@@ -143,6 +149,7 @@ def __init__(
143149
if not isinstance(watermark, datetime):
144150
raise TypeError(f"Wrong data type: {type(watermark)} for Datum.watermark")
145151
self._watermark = watermark
152+
self._headers = headers or {}
146153

147154
def keys(self) -> list[str]:
148155
"""Returns the keys of the event"""
@@ -163,6 +170,11 @@ def watermark(self) -> datetime:
163170
"""Returns the watermark of the event."""
164171
return self._watermark
165172

173+
@property
174+
def headers(self) -> dict[str, str]:
175+
"""Returns the headers of the event."""
176+
return self._headers.copy()
177+
166178

167179
class MapStreamer(metaclass=ABCMeta):
168180
"""

pynumaflow/mapstreamer/servicer/async_servicer.py

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ async def MapStreamFn(
4040
value=request.value,
4141
event_time=request.event_time.ToDatetime(),
4242
watermark=request.watermark.ToDatetime(),
43+
headers=request.headers,
4344
),
4445
):
4546
yield mapstream_pb2.MapStreamResponse(result=res)

pynumaflow/proto/mapper/map.proto

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ message MapRequest {
2121
bytes value = 2;
2222
google.protobuf.Timestamp event_time = 3;
2323
google.protobuf.Timestamp watermark = 4;
24+
map<string, string> headers = 5;
2425
}
2526

2627
/**

pynumaflow/proto/mapper/map_pb2.py

+14-11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pynumaflow/proto/mapstreamer/mapstream.proto

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ message MapStreamRequest {
2222
bytes value = 2;
2323
google.protobuf.Timestamp event_time = 3;
2424
google.protobuf.Timestamp watermark = 4;
25+
map<string, string> headers = 5;
2526
}
2627

2728
/**

pynumaflow/proto/mapstreamer/mapstream_pb2.py

+14-11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pynumaflow/proto/reducer/reduce.proto

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ message ReduceRequest {
3737
bytes value = 2;
3838
google.protobuf.Timestamp event_time = 3;
3939
google.protobuf.Timestamp watermark = 4;
40+
map<string, string> headers = 5;
4041
}
4142

4243
Payload payload = 1;

0 commit comments

Comments
 (0)