Skip to content

Commit df6afe6

Browse files
authored
feat: add support for new reduce proto (numaproj#133)
Signed-off-by: Sidhant Kohli <[email protected]>
1 parent 882302b commit df6afe6

10 files changed

+504
-198
lines changed

pynumaflow/exceptions.py

+4
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,7 @@ class NoPublicConstructorError(TypeError):
44

55
class SocketError(Exception):
66
"""To raise an error while creating socket or setting its property"""
7+
8+
9+
class UDFError(Exception):
10+
"""To Raise an error while executing a UDF call"""

pynumaflow/proto/reducer/reduce.proto

+40-5
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,57 @@ service Reduce {
1818
* ReduceRequest represents a request element.
1919
*/
2020
message ReduceRequest {
21-
repeated string keys = 1;
22-
bytes value = 2;
23-
google.protobuf.Timestamp event_time = 3;
24-
google.protobuf.Timestamp watermark = 4;
21+
// WindowOperation represents a window operation.
22+
// For Aligned windows, OPEN, APPEND and CLOSE events are sent.
23+
message WindowOperation {
24+
enum Event {
25+
OPEN = 0;
26+
CLOSE = 1;
27+
APPEND = 4;
28+
}
29+
30+
Event event = 1;
31+
repeated Window windows = 2;
32+
}
33+
34+
// Payload represents a payload element.
35+
message Payload {
36+
repeated string keys = 1;
37+
bytes value = 2;
38+
google.protobuf.Timestamp event_time = 3;
39+
google.protobuf.Timestamp watermark = 4;
40+
}
41+
42+
Payload payload = 1;
43+
WindowOperation operation = 2;
44+
}
45+
46+
// Window represents a window.
47+
// Since the client doesn't track keys, window doesn't have a keys field.
48+
message Window {
49+
google.protobuf.Timestamp start = 1;
50+
google.protobuf.Timestamp end = 2;
51+
string slot = 3;
2552
}
2653

2754
/**
2855
* ReduceResponse represents a response element.
2956
*/
3057
message ReduceResponse {
58+
// Result represents a result element. It contains the result of the reduce function.
3159
message Result {
3260
repeated string keys = 1;
3361
bytes value = 2;
3462
repeated string tags = 3;
3563
}
36-
repeated Result results = 1;
64+
65+
Result result = 1;
66+
67+
// window represents a window to which the result belongs.
68+
Window window = 2;
69+
70+
// EOF represents the end of the response for a window.
71+
bool EOF = 3;
3772
}
3873

3974
/**

pynumaflow/proto/reducer/reduce_pb2.py

+18-10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pynumaflow/reducer/_dtypes.py

+108-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from collections.abc import Iterator, Sequence, Awaitable
44
from dataclasses import dataclass
55
from datetime import datetime
6+
from enum import IntEnum
67
from typing import TypeVar, Callable, Union
78
from collections.abc import AsyncIterable
89
from warnings import warn
@@ -14,6 +15,20 @@
1415
Ms = TypeVar("Ms", bound="Messages")
1516

1617

18+
class WindowOperation(IntEnum):
19+
"""
20+
Enumerate the type of Window operation received.
21+
The operation can be one of the following:
22+
- OPEN: A new window is opened.
23+
- CLOSE: The window is closed.
24+
- APPEND: The window is appended with new data.
25+
"""
26+
27+
OPEN = (0,)
28+
CLOSE = (1,)
29+
APPEND = (4,)
30+
31+
1732
@dataclass(init=False)
1833
class Message:
1934
"""
@@ -190,6 +205,43 @@ def end(self):
190205
return self._end
191206

192207

208+
@dataclass(init=False)
209+
class ReduceWindow:
210+
"""
211+
Defines the window for a reduce operation which includes the
212+
interval window along with the slot.
213+
"""
214+
215+
__slots__ = ("_window", "_slot")
216+
217+
_window: IntervalWindow
218+
_slot: str
219+
220+
def __init__(self, start: datetime, end: datetime, slot: str = ""):
221+
self._window = IntervalWindow(start=start, end=end)
222+
self._slot = slot
223+
224+
@property
225+
def start(self):
226+
"""Returns the start timestamp of the interval window."""
227+
return self._window.start
228+
229+
@property
230+
def end(self):
231+
"""Returns the end timestamp of the interval window."""
232+
return self._window.end
233+
234+
@property
235+
def slot(self):
236+
"""Returns the slot from the window"""
237+
return self._slot
238+
239+
@property
240+
def window(self):
241+
"""Return the interval window"""
242+
return self._window
243+
244+
193245
@dataclass(init=False)
194246
class Metadata:
195247
"""Defines the metadata for the event."""
@@ -209,13 +261,21 @@ def interval_window(self):
209261

210262
@dataclass
211263
class ReduceResult:
212-
"""Defines the object to hold the result of reduce computation."""
264+
"""
265+
Defines the object to hold the result of reduce computation.
266+
It contains the following
267+
1. Future: The async awaitable result of computation
268+
2. Iterator: The handle to the input queue
269+
3. Key: The keys of the partition
270+
4. Window: The window of the reduce operation
271+
"""
213272

214-
__slots__ = ("_future", "_iterator", "_key")
273+
__slots__ = ("_future", "_iterator", "_key", "_window")
215274

216275
_future: Task
217276
_iterator: NonBlockingIterator
218277
_key: list[str]
278+
_window: ReduceWindow
219279

220280
@property
221281
def future(self):
@@ -232,6 +292,52 @@ def keys(self) -> list[str]:
232292
"""Returns the keys of the partition."""
233293
return self._key
234294

295+
@property
296+
def window(self) -> ReduceWindow:
297+
""""""
298+
return self._window
299+
300+
301+
@dataclass
302+
class ReduceRequest:
303+
"""Defines the object to hold a request for the reduce operation."""
304+
305+
__slots__ = ("_operation", "_windows", "_payload")
306+
307+
_operation: WindowOperation
308+
_windows: list[ReduceWindow]
309+
_payload: Datum
310+
311+
def __init__(self, operation: WindowOperation, windows: list[ReduceWindow], payload: Datum):
312+
self._operation = operation
313+
self._windows = windows
314+
self._payload = payload
315+
316+
@property
317+
def operation(self) -> WindowOperation:
318+
"""
319+
Returns the operation of the reduce request.
320+
The operation can be one of the following:
321+
- OPEN: A new window is opened.
322+
- CLOSE: The window is closed.
323+
- APPEND: The window is appended with new data.
324+
"""
325+
return self._operation
326+
327+
@property
328+
def windows(self) -> list[ReduceWindow]:
329+
"""
330+
Returns the windows of the reduce request.
331+
"""
332+
return self._windows
333+
334+
@property
335+
def payload(self) -> Datum:
336+
"""
337+
Returns the payload of the reduce request.
338+
"""
339+
return self._payload
340+
235341

236342
ReduceAsyncCallable = Callable[[list[str], AsyncIterable[Datum], Metadata], Awaitable[Messages]]
237343

0 commit comments

Comments
 (0)