Description
Description
This is probably just me not understanding how things are supposed to work.
I have created a user-defined source, based on the async source example that sets up a REST API to accept requests that execute database queries and generate Numaflow messages for a pipeline to work off.
I am not sure what the read_handler
function should return when there aren't any results to pass on (this could be just because we are waiting for another REST request).
I tried just breaking out of the iterator but that resulted in a "Readiness probe" failure so K8s will restart the pod.
To Reproduce
Steps to reproduce the behavior:
- Modify the async-source example.py so that the
read_handler
returns after some number of messages, rather than running forever.
Quick and dirty:
From:
for x in range(datum.num_records):
To:
for x in range(self.read_idx, datum.num_records):
- Build the image
- Deploy the pipeline
- Monitor the deployment (k9s)
Expected behavior
I thought that the source would stop producing messages so the pipeline would flush all the queues and then wait for more work (which will never come in this test case, but could in the REST API scenario described above).
Environment
- Kubernetes: v1.27.6+k3s1
- Numaflow: quay.io/numaproj/numaflow:v1.1.1
- Numalogic: unknown (please advise where I might find this information)
- Numaflow-python: 0.6.0
Message from the maintainers:
Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.