Skip to content

Commit d3f5292

Browse files
kohlisidSidhant Kohli
and
Sidhant Kohli
authored
chore: Update Numaflow version rollouts (#138)
Signed-off-by: Sidhant Kohli <[email protected]> Co-authored-by: Sidhant Kohli <[email protected]>
1 parent c3ed347 commit d3f5292

13 files changed

+255
-305
lines changed

numaprom/tools.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def inner_function(*args, **kwargs):
4141
msgs = Messages()
4242
for json_data in json_list:
4343
if json_data:
44-
msgs.append(Message.to_all(json_data))
44+
msgs.append(Message(json_data))
4545
else:
4646
msgs.append(Message.to_drop())
4747
return msgs
@@ -55,7 +55,7 @@ def inner_function(*args, **kwargs):
5555
json_data = handler_func(*args, **kwargs)
5656
msgs = Messages()
5757
if json_data:
58-
msgs.append(Message.to_all(value=json_data))
58+
msgs.append(Message(value=json_data))
5959
else:
6060
msgs.append(Message.to_drop())
6161
return msgs
@@ -70,7 +70,7 @@ def inner_function(*args, **kwargs) -> Messages:
7070
msgs = Messages()
7171
for vertex, json_data in data:
7272
if json_data and vertex:
73-
msgs.append(Message.to_vtx(key=vertex.encode(), value=json_data))
73+
msgs.append(Message(value=json_data, tags=[vertex.encode()]))
7474
else:
7575
msgs.append(Message.to_drop())
7676
return msgs

numaprom/udf/filter.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
@catch_exception
1414
@msg_forward
15-
def metric_filter(_: str, datum: Datum) -> Optional[Messages]:
15+
def metric_filter(_: list[str], datum: Datum) -> Optional[Messages]:
1616
"""
1717
UDF to filter metrics by labels
1818
"""

numaprom/udf/inference.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import time
2-
32
from numalogic.config import NumalogicConf
43
from numalogic.models.autoencoder import AutoencoderTrainer
54
from numalogic.registry import ArtifactData, RedisRegistry
@@ -43,7 +42,7 @@ def _run_inference(
4342

4443

4544
@msg_forward
46-
def inference(_: str, datum: Datum) -> bytes:
45+
def inference(_: list[str], datum: Datum) -> bytes:
4746
_start_time = time.perf_counter()
4847

4948
_in_msg = datum.value.decode("utf-8")

numaprom/udf/postprocess.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def _publish(final_score: float, payload: StreamPayload) -> List[bytes]:
156156

157157

158158
@msgs_forward
159-
def postprocess(_: str, datum: Datum) -> List[bytes]:
159+
def postprocess(_: List[str], datum: Datum) -> List[bytes]:
160160
"""
161161
UDF for performing the following steps:
162162

numaprom/udf/preprocess.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import time
3+
from typing import List
34

45
import orjson
56
from numalogic.registry import RedisRegistry
@@ -26,7 +27,7 @@
2627

2728

2829
@msg_forward
29-
def preprocess(_: str, datum: Datum) -> bytes:
30+
def preprocess(_: List[str], datum: Datum) -> bytes:
3031
_start_time = time.perf_counter()
3132
_in_msg = datum.value.decode("utf-8")
3233

numaprom/udf/threshold.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import time
22
from collections import OrderedDict
3+
from typing import List
34

45
from numalogic.registry import RedisRegistry
56
from numalogic.tools.exceptions import RedisRegistryError
@@ -32,7 +33,7 @@ def _get_static_thresh_payload(payload, metric_config) -> bytes:
3233

3334

3435
@conditional_forward
35-
def threshold(_: str, datum: Datum) -> list[tuple[str, bytes]]:
36+
def threshold(_: List[str], datum: Datum) -> list[tuple[str, bytes]]:
3637
_start_time = time.perf_counter()
3738
_in_msg = datum.value.decode("utf-8")
3839

numaprom/udf/window.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
import time
33
import uuid
4-
from typing import Optional
4+
from typing import Optional, List
55

66
import numpy as np
77
import numpy.typing as npt
@@ -56,7 +56,7 @@ def __aggregate_window(
5656

5757

5858
@msg_forward
59-
def window(_: str, datum: Datum) -> Optional[bytes]:
59+
def window(_: List[str], datum: Datum) -> Optional[bytes]:
6060
"""
6161
UDF to construct windowing of the streaming input data, required by ML models.
6262
"""

0 commit comments

Comments
 (0)