Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions tests/integration/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def test_regression_soft_delete_schemas_should_be_registered(
config.admin_metadata_max_age = 2
config.group_id = group_id
config.topic_name = topic_name
config.producer_acks = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add reasoning in commit message, why this change?


master_coordinator = MasterCoordinator(config=config)
master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper())
Expand Down Expand Up @@ -113,10 +114,10 @@ async def test_regression_soft_delete_schemas_should_be_registered(
value=json_encode(value, binary=True),
)
producer.flush()
msg = future.result()

schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5)
msg = future.result(timeout=2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some justification for this to the commit message, I have no idea why we would add a timeout here, as we are flushing right before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, don't have a hard reason for this , but generally I think it is resource intensive sometimes.
And timeout on the future object is provided by the api to handle such cases where processes are slow.


seen = schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5)
assert seen is True
schemas = database.find_subject_schemas(subject=Subject(subject), include_deleted=True)
assert len(schemas) == 1, "Deleted schemas must have been registered"

Expand All @@ -141,7 +142,7 @@ async def test_regression_soft_delete_schemas_should_be_registered(
value=json_encode(value, binary=True),
)
producer.flush()
msg = future.result()
msg = future.result(timeout=2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we have flushed, waiting should not change anything. Have we actually observed something that would hint at this solving a problem we have? (I.e., I think this would raise if the future is not complete when calling .result(). Have we seen such exceptions?)


seen = schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5)
assert seen is True
Expand All @@ -166,6 +167,7 @@ async def test_regression_config_for_inexisting_object_should_not_throw(
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.admin_metadata_max_age = 2
config.group_id = group_id
config.producer_acks = 1

master_coordinator = MasterCoordinator(config=config)
master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper())
Expand Down Expand Up @@ -200,10 +202,10 @@ async def test_regression_config_for_inexisting_object_should_not_throw(
value=json_encode(value, binary=True),
)
producer.flush()
msg = future.result()
msg = future.result(timeout=2)
Comment on lines -203 to +205
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.


schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5)

seen = schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5)
assert seen is True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we stop asserting here?

assert (
database.find_subject(subject=Subject(subject)) is not None
), "The above message should be handled gracefully"
Expand Down
Loading