Skip to content

Conversation

@Gsantomaggio
Copy link
Member

- Closes #91
- Add and example to show how to use it

Signed-off-by: Gabriele Santomaggio <[email protected]>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements support for using datetime objects as consumer offset specifications in stream queues, addressing issue #91. The implementation allows consumers to start consuming messages from a specific timestamp rather than just using fixed positions like "first", "last", or numeric offsets.

Key Changes:

  • Added datetime support to StreamConsumerOptions offset specification
  • Implemented timestamp conversion logic in the _offset method to convert Python datetime to AMQP timestamps
  • Added a comprehensive example demonstrating datetime-based offset consumption

Reviewed changes

Copilot reviewed 4 out of 5 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
rabbitmq_amqp_python_client/qpid/proton/_data.py Adds datetime import for type handling
rabbitmq_amqp_python_client/entities.py Extends offset specification to support datetime objects with timestamp conversion logic
examples/stream_consumer_offset_datetime/example_stream_consumer_offset_datetime.py New example demonstrating datetime offset functionality with two-phase publishing pattern
examples/README.md Documents the new datetime offset example
.gitignore Adds .DS_Store to ignored files (macOS system file)
Comments suppressed due to low confidence (1)

rabbitmq_amqp_python_client/entities.py:291

  • The docstring for this method is outdated and doesn't reflect the new datetime parameter. It should be updated to include datetime as one of the accepted types in the Args section.
        """
        Set the offset specification for the stream.

        Args:
            offset_specification: Either an OffsetSpecification enum value or
                                an integer offset
        """

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

# print("create a publisher and publish a test message")
publisher = connection.publisher(addr_queue)
logger.info("Publishing first set of messages...")
# publish with a filter of apple
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment contains outdated information. The comment says 'publish with a filter of apple' but the code doesn't apply any filter in this publishing loop. This comment should be removed or updated to accurately describe what's happening.

Suggested change
# publish with a filter of apple
# publish the first set of messages

Copilot uses AI. Check for mistakes.
Comment on lines +301 to +308
if isinstance(offset_specification, datetime):
# convert datetime to milliseconds since epoch
milliseconds_since_epoch = int(offset_specification.timestamp() * 1000)
# Create a timestamp instance
ts = timestamp(milliseconds_since_epoch)
self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described(
symbol(STREAM_OFFSET_SPEC), ts
)
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new datetime offset functionality lacks test coverage. Consider adding tests to verify that datetime offsets work correctly, including edge cases like timestamps before the stream's first message, timestamps in the future, and timestamps between published messages.

Copilot uses AI. Check for mistakes.

break

#
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this empty comment line. It serves no purpose and reduces code cleanliness.

Suggested change
#

Copilot uses AI. Check for mistakes.

logger.info("Publishing done. Waiting before publishing next set of messages...")
time.sleep(3) # wait 3 seconds before publishing next set of messages
starting_form_here = datetime.datetime.now()
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name contains a typo. Should be 'starting_from_here' instead of 'starting_form_here'.

Copilot uses AI. Check for mistakes.
message_handler=MyMessageHandler(),
# the consumer will only receive messages starting
consumer_options=StreamConsumerOptions(
offset_specification=starting_form_here
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name contains a typo. Should be 'starting_from_here' instead of 'starting_form_here'.

Copilot uses AI. Check for mistakes.
consumer = consumer_connection.consumer(
addr_queue,
message_handler=MyMessageHandler(),
# the consumer will only receive messages starting
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is incomplete. It should complete the sentence explaining what the consumer will only receive.

Suggested change
# the consumer will only receive messages starting
# the consumer will only receive messages starting from the specified datetime offset

Copilot uses AI. Check for mistakes.
#

import uuid
from datetime import datetime
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'datetime' is not used.

Suggested change
from datetime import datetime

Copilot uses AI. Check for mistakes.
try:
consumer.run()
except KeyboardInterrupt:
pass
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
pass
logger.info("Keyboard interrupt received, stopping consumer loop.")
break

Copilot uses AI. Check for mistakes.
Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio merged commit c37b01e into main Jan 7, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

the timestamp offset support for consumer

2 participants