Skip to content

Conversation

@BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Dec 19, 2025

The pending/partition methods are not defined as async in our abstract class

def pending_handler(self) -> PendingResponse:
"""
The simple source always returns zero to indicate there is no pending record.
"""
pass
@abstractmethod
def partitions_handler(self) -> PartitionsResponse:
"""
The simple source always returns zero to indicate there is no pending record.
"""
pass

Also, there is no handler() method on the Sourcer abstract class.

Implementation:

import asyncio
import logging
from datetime import UTC, datetime

from pynumaflow.shared.asynciter import NonBlockingIterator
from pynumaflow.sourcer import (
    AckRequest,
    Message,
    NackRequest,
    Offset,
    PartitionsResponse,
    PendingResponse,
    ReadRequest,
    Sourcer,
    get_default_partitions,
)

logger = logging.getLogger(__name__)


class AsyncSource(Sourcer):
    """
    Simple async source that emits **1 message every 2 seconds**.

    Notes:
    - Numaflow may request more than 1 record via ReadRequest.num_records; this
      implementation intentionally emits at most 1 record per read cycle to keep
      the cadence stable.
    - It also blocks producing new data while there are un-acked offsets.
    """

    def __init__(self) -> None:
        self._next_emit_ts: float = 0.0
        self._read_idx: int = 0
        self._to_ack: set[bytes] = set()
        self._nacked: set[bytes] = set()

    async def read_handler(self, datum: ReadRequest, output: NonBlockingIterator):
        if self._to_ack:
            return

        loop = asyncio.get_running_loop()
        now = loop.time()
        if now < self._next_emit_ts:
            await asyncio.sleep(self._next_emit_ts - now)

        if self._nacked:
            offset_bytes = self._nacked.pop()
        else:
            offset_bytes = str(self._read_idx).encode()
            self._read_idx += 1

        payload = f"source_message:{offset_bytes.decode()}".encode()
        await output.put(
            Message(
                payload=payload,
                offset=Offset.offset_with_default_partition_id(offset_bytes),
                event_time=datetime.now(UTC),
                headers={"content-type": "text/plain"},
            )
        )
        self._to_ack.add(offset_bytes)
        self._next_emit_ts = loop.time() + 2.0
        logger.info("User Defined Source emitted offset=%s", offset_bytes.decode())

    def ack_handler(self, ack_request: AckRequest):
        for offset in ack_request.offsets:
            self._to_ack.discard(offset.offset)

    def nack_handler(self, nack_request: NackRequest):
        for offset in nack_request.offsets:
            self._to_ack.discard(offset.offset)
            self._nacked.add(offset.offset)

    def pending_handler(self) -> PendingResponse:
        # We don't have an external backlog; treat as no pending.
        return PendingResponse(count=0)

    def partitions_handler(self) -> PartitionsResponse:
        return PartitionsResponse(partitions=get_default_partitions())

Seeing error:

[2025-12-19T04:38:17+0000] INFO [] module=server: Async GRPC Server listening on: unix:///var/run/numaflow/source.sock with max threads: 4
2025-12-19 04:38:18 CRITICAL PartitionsFn Error
Traceback (most recent call last):
File "/app/.venv/lib/python3.14/site-packages/pynumaflow/sourcer/servicer/async_servicer.py", line 227, in PartitionsFn
partitions = await self.__source_partitions_handler()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'PartitionsResponse' object can't be awaited
[2025-12-19T04:38:18+0000] CRITICAL [] module=async_servicer: PartitionsFn Error
Traceback (most recent call last):
File "/app/.venv/lib/python3.14/site-packages/pynumaflow/sourcer/servicer/async_servicer.py", line 227, in PartitionsFn
partitions = await self.__source_partitions_handler()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'PartitionsResponse' object can't be awaited
2025-12-19 04:38:18 CRITICAL Traceback (most recent call last):
File "/app/.venv/lib/python3.14/site-packages/pynumaflow/sourcer/servicer/async_servicer.py", line 227, in PartitionsFn
partitions = await self.__source_partitions_handler()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'PartitionsResponse' object can't be awaited
[2025-12-19T04:38:18+0000] CRITICAL [] module=server: Traceback (most recent call last):
File "/app/.venv/lib/python3.14/site-packages/pynumaflow/sourcer/servicer/async_servicer.py", line 227, in PartitionsFn
partitions = await self.__source_partitions_handler()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'PartitionsResponse' object can't be awaited
2025-12-19 04:38:18 CRITICAL 'PartitionsResponse' object can't be awaited
[2025-12-19T04:38:18+0000] CRITICAL [] module=server: 'PartitionsResponse' object can't be awaited
2025-12-19 04:38:18 INFO Killing process: Got exception UDF_EXECUTION_ERROR(udsource): TypeError("'PartitionsResponse' object can't be awaited")
[2025-12-19T04:38:18+0000] INFO [] module=server: Killing process: Got exception UDF_EXECUTION_ERROR(udsource): TypeError("'PartitionsResponse' object can't be awaited")

Signed-off-by: Sreekanth <[email protected]>
@vigith
Copy link
Member

vigith commented Dec 19, 2025

Shouldn't we make them (pending and partitions) async?

@BulkBeing
Copy link
Contributor Author

Shouldn't we make them (pending and partitions) async?

Ideally, yes. It will be an API change. Also, the tests like tests/source/test_async_source_err.py::TestAsyncServerErrorScenario::test_pending_error assumes these are async methods

@kohlisid
Copy link
Contributor

+1 let's keep them async for consistency

@codecov
Copy link

codecov bot commented Dec 19, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 93.88%. Comparing base (5500636) to head (6b9dd50).

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #301      +/-   ##
==========================================
+ Coverage   93.85%   93.88%   +0.02%     
==========================================
  Files          66       66              
  Lines        3009     3007       -2     
  Branches      155      155              
==========================================
- Hits         2824     2823       -1     
+ Misses        135      134       -1     
  Partials       50       50              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

_LOGGER.critical("PendingFn Error", exc_info=True)
await handle_async_error(context, err, ERR_UDF_EXCEPTION_STRING)
return
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise

handle_async_error will pkill, i don't think you need return or raise

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants