Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions packages/pynumaflow/pynumaflow/sourcer/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def __init__(
payload: bytes,
offset: Offset,
event_time: datetime,
keys: list[str] = None,
keys: Optional[list[str]] = None,
headers: Optional[dict[str, str]] = None,
user_metadata: Optional[UserMetadata] = None,
):
Expand Down Expand Up @@ -261,12 +261,6 @@ class Sourcer(metaclass=ABCMeta):

"""

def __call__(self, *args, **kwargs):
"""
Allow to call handler function directly if class instance is sent
"""
return self.handler(*args, **kwargs)

@abstractmethod
async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
"""
Expand All @@ -278,28 +272,28 @@ async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
pass

@abstractmethod
def ack_handler(self, ack_request: AckRequest):
async def ack_handler(self, ack_request: AckRequest):
"""
The ack handler is used to acknowledge the offsets that have been read
"""
pass

@abstractmethod
def nack_handler(self, nack_request: NackRequest):
async def nack_handler(self, nack_request: NackRequest):
"""
The nack handler is used to negatively acknowledge the offsets on the source
"""
pass

@abstractmethod
def pending_handler(self) -> PendingResponse:
async def pending_handler(self) -> PendingResponse:
"""
The simple source always returns zero to indicate there is no pending record.
"""
pass

@abstractmethod
def partitions_handler(self) -> PartitionsResponse:
async def partitions_handler(self) -> PartitionsResponse:
"""
The simple source always returns zero to indicate there is no pending record.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async def PendingFn(
except BaseException as err:
_LOGGER.critical("PendingFn Error", exc_info=True)
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
return
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise

handle_async_error will pkill, i don't think you need return or raise

resp = source_pb2.PendingResponse.Result(count=count.count)
return source_pb2.PendingResponse(result=resp)

Expand All @@ -228,7 +228,7 @@ async def PartitionsFn(
except BaseException as err:
_LOGGER.critical("PartitionsFn Error", exc_info=True)
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
return
raise
resp = source_pb2.PartitionsResponse.Result(partitions=partitions.partitions)
return source_pb2.PartitionsResponse(result=resp)

Expand Down