Skip to content

Commit 6ad59bd

Browse files
authored
chore: update SideInput UDF example (numaproj#195)
Signed-off-by: Sidhant Kohli <[email protected]>
1 parent ec00728 commit 6ad59bd

File tree

1 file changed

+56
-15
lines changed

1 file changed

+56
-15
lines changed

examples/sideinput/simple_sideinput/udf/example.py

+56-15
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,71 @@
1+
import os
2+
import threading
13
from threading import Thread
24
import pynumaflow.sideinput as sideinputsdk
3-
from pynumaflow.mapper import Messages, MapServer, Message, Datum
5+
from pynumaflow.mapper import Messages, MapServer, Message, Datum, Mapper
46
from watchfiles import watch
57

68

7-
def my_handler(keys: list[str], datum: Datum) -> Messages:
8-
messages = Messages()
9-
messages.append(Message(str.encode("Some Value")))
10-
return messages
11-
12-
13-
def watcher():
9+
class SideInputHandler(Mapper):
1410
"""
15-
This function is used to watch the side input directory for changes.
11+
This is a class that inherits from the Mapper class.
12+
It implements the handler method that is called for each datum.
1613
"""
17-
path = sideinputsdk.SIDE_INPUT_DIR_PATH
18-
for changes in watch(path):
19-
print(changes)
14+
15+
# variable and lock for thread safety
16+
data_value = "no_value"
17+
data_value_lock = threading.Lock()
18+
19+
# Side input file that we are watching
20+
watched_file = "myticker"
21+
22+
def handler(self, keys: list[str], datum: Datum) -> Messages:
23+
with self.data_value_lock:
24+
current_value = self.data_value
25+
26+
messages = Messages()
27+
messages.append(Message(str.encode(current_value)))
28+
return messages
29+
30+
def file_watcher(self):
31+
"""
32+
This function is used to watch the side input directory for changes.
33+
"""
34+
path = sideinputsdk.SIDE_INPUT_DIR_PATH
35+
for changes in watch(path):
36+
for change in changes:
37+
change_type, file_path = change
38+
if file_path.endswith(self.watched_file):
39+
with self.data_value_lock:
40+
self.update_data_from_file(file_path)
41+
42+
def init_data_value(self):
43+
# Read the SIDE INPUT FILE for initial value before starting the server
44+
path = os.path.join(sideinputsdk.SIDE_INPUT_DIR_PATH, self.watched_file)
45+
print(path)
46+
self.update_data_from_file(path)
47+
48+
def update_data_from_file(self, path):
49+
try:
50+
with open(path) as file:
51+
value = file.read().strip()
52+
self.data_value = value
53+
print(f"Data value variable set to: {self.data_value}")
54+
except Exception as e:
55+
print(f"Error reading file: {e}")
2056

2157

2258
if __name__ == "__main__":
2359
"""
24-
This function is used to start the GRPC server and the watcher thread.
60+
This function is used to start the GRPC server and the file_watcher thread.
2561
"""
26-
daemon = Thread(target=watcher, daemon=True, name="Monitor")
27-
grpc_server = MapServer(my_handler)
62+
handler_instance = SideInputHandler()
63+
64+
# initialize data with value from side input
65+
handler_instance.init_data_value()
66+
67+
daemon = Thread(target=handler_instance.file_watcher, daemon=True, name="Monitor")
68+
grpc_server = MapServer(handler_instance)
2869
thread_server = Thread(target=grpc_server.start, daemon=True, name="GRPC Server")
2970
daemon.start()
3071
thread_server.start()

0 commit comments

Comments
 (0)