Skip to content

Commit c1ee39b

Browse files
committed
fix
1 parent 02b9a79 commit c1ee39b

File tree

4 files changed

+45
-20
lines changed

4 files changed

+45
-20
lines changed

sdks/fs-python/function_stream/function.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ def get_metadata(key: str) -> Any:
429429
if key == "message_id":
430430
return message.message_id()
431431
raise KeyError(key)
432-
432+
433433
context.get_metadata = get_metadata
434434

435435
# Call the function with context as first argument and handle both sync and async results
@@ -449,7 +449,7 @@ def get_metadata(key: str) -> Any:
449449
logger.error(f"Error invoking process function: {str(e)}")
450450
raise Exception(f"Error invoking process function: {str(e)}") from e
451451
if response_data:
452-
resp_msgs.append(MsgWrapper(data=response_data, event_time=datetime.utcnow()))
452+
resp_msgs.append(MsgWrapper(data=response_data, event_time=datetime.now(timezone.utc)))
453453

454454
if not response_topic:
455455
logger.warning("No response_topic provided and no sink topic available. Skip messages")
@@ -481,7 +481,7 @@ def get_metadata(key: str) -> Any:
481481
await self._send_response(
482482
response_topic,
483483
request_id,
484-
[MsgWrapper(data={'error': str(e)}, event_time=datetime.utcnow())]
484+
[MsgWrapper(data={'error': str(e)}, event_time=datetime.now(timezone.utc))]
485485
)
486486
self.metrics.record_request_end(False, time.time() - start_time)
487487
self.metrics.record_event(False)
@@ -642,7 +642,7 @@ async def close(self):
642642
def __del__(self):
643643
"""
644644
Ensure resources are cleaned up when the object is destroyed.
645-
645+
646646
This finalizer ensures that all resources are properly closed when the
647647
object is garbage collected. It provides a safety net for resource cleanup
648648
in case the explicit close() method is not called.
Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
"""
2+
FSModule module provides the base class for all FunctionStream modules.
3+
4+
This module defines the abstract base class FSModule that all FunctionStream modules
5+
must inherit from. It provides a common interface for module initialization and
6+
data processing, ensuring consistency across different module implementations.
7+
"""
8+
19
from abc import ABC, abstractmethod
210
from typing import Dict, Any
311

@@ -8,32 +16,51 @@ class FSModule(ABC):
816
"""
917
Base class for all FunctionStream modules.
1018
11-
This class provides a common interface for all modules in the FunctionStream SDK.
12-
Each module must implement the process method to handle incoming data.
19+
This abstract base class provides a common interface for all modules in the
20+
FunctionStream SDK. Each module must implement the init and process methods
21+
to handle module initialization and incoming data processing.
1322
1423
Attributes:
15-
name (str): The name of the module
24+
name (str): The name of the module (to be set during initialization).
1625
"""
1726

1827
@abstractmethod
1928
def init(self, context: FSContext):
2029
"""
21-
Initialize the module with a name.
30+
Initialize the module with the provided context.
31+
32+
This method is called during module initialization to set up the module
33+
with the necessary context and configuration. Subclasses must implement
34+
this method to handle any required setup.
2235
2336
Args:
24-
name (str): The name of the module
37+
context (FSContext): The context object containing configuration and
38+
runtime information for the module.
2539
"""
40+
pass
2641

2742
@abstractmethod
2843
async def process(self, context: FSContext, data: Dict[str, Any]) -> Dict[str, Any]:
2944
"""
30-
Process incoming data.
45+
Process incoming data asynchronously.
46+
47+
This method is the core processing function that handles incoming data.
48+
Subclasses must implement this method to define the specific data processing
49+
logic for their module. The method should be asynchronous to support
50+
non-blocking operations.
3151
3252
Args:
33-
context (FSContext): The context object containing configuration and runtime information
34-
data (Dict[str, Any]): The input data to process
53+
context (FSContext): The context object containing configuration and
54+
runtime information.
55+
data (Dict[str, Any]): The input data to process. This is typically
56+
a dictionary containing the message payload
57+
and any associated metadata.
3558
3659
Returns:
37-
Union[Dict[str, Any], Awaitable[Dict[str, Any]]]: The processed data or an awaitable that will resolve to the processed data
60+
Dict[str, Any]: The processed data that should be returned as the
61+
result of the processing operation.
62+
63+
Raises:
64+
NotImplementedError: This method must be implemented by subclasses.
3865
"""
3966
raise NotImplementedError("Subclasses must implement process method")

sdks/fs-python/tests/test_context.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44

55
from unittest.mock import Mock
6-
from datetime import datetime
6+
from datetime import datetime, timezone
77

88
import pytest
99

@@ -73,16 +73,16 @@ def test_get_metadata_default_implementation(self, context):
7373
def test_produce_default_implementation(self, context):
7474
"""Test that produce does nothing by default."""
7575
test_data = {"key": "value"}
76-
test_time = datetime.utcnow()
77-
76+
test_time = datetime.now(timezone.utc)
77+
7878
# Should not raise any exception
7979
result = context.produce(test_data, test_time)
8080
assert result is None
8181

8282
def test_produce_without_event_time(self, context):
8383
"""Test produce method without event_time parameter."""
8484
test_data = {"key": "value"}
85-
85+
8686
# Should not raise any exception
8787
result = context.produce(test_data)
8888
assert result is None

sdks/fs-python/tests/test_function.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,7 @@ async def process_func(context: FSContext, data: Dict[str, Any]) -> Dict[str, An
115115
async def test_init(self):
116116
"""Test FSFunction initialization."""
117117
with patch('function_stream.function.Config.from_yaml') as mock_from_yaml, \
118-
patch('function_stream.function.Client') as mock_client, \
119-
patch('function_stream.function.Metrics') as mock_metrics, \
120-
patch('function_stream.function.MetricsServer') as mock_metrics_server:
118+
patch('function_stream.function.Client') as mock_client:
121119
mock_config = Mock(spec=Config)
122120
mock_config.module = "test_module"
123121
mock_config.subscriptionName = "test_subscription"

0 commit comments

Comments
 (0)