Skip to content

Cleanup opensearch output#947

Draft
Pablu23 wants to merge 1 commit intopoc-mainloopfrom
feat-async-output
Draft

Cleanup opensearch output#947
Pablu23 wants to merge 1 commit intopoc-mainloopfrom
feat-async-output

Conversation

@Pablu23
Copy link
Collaborator

@Pablu23 Pablu23 commented Mar 17, 2026

Description

Cleanup and optimize Opensearch async output

Assignee

  • The changes adhere to the contribution guidelines
  • I have performed a self-review of my code
  • My changes generate no new warnings (e.g. flake8/mypy/pytest/...) other than deprecations
  • Change base branch from poc-mainloop to main, after merge from mainloop

Documentation

Code Quality

  • Patch test coverage > 95% and does not decrease
  • New code uses correct & specific type hints

How did you verify that the changes work in practice?

  • List of (preferably easy reproducible) tests including OS

Reviewer


The rendered docs for this PR can be found here.

@Pablu23 Pablu23 self-assigned this Mar 17, 2026
Comment on lines 322 to +328
actions = (event.data for event in events)

index = 0
async for success, item in helpers.async_streaming_bulk(client, actions, **kwargs): # type: ignore
if index >= len(events):
break
async for success, item in helpers.async_streaming_bulk(client, actions, **kwargs):

event = events[index]
event.state.current_state = EventStateType.STORING_IN_OUTPUT
# This should not be possible!
assert index < len(events)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bulk_id = uuid.uuid4()
actions = {**event.data, "_id": f"{bulk_id}_{index}"} for index, event in enumerate(events)
index = 0
async for success, item in helpers.async_streaming_bulk(client, actions, **kwargs):

  # This should not be possible!
  assert index < len(events)
  
  assert index == int(item["create"]["_id"][37:])

This proofs that helpers.async_streaming_bulk keeps the order for actions and yield iteration the same

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could possibly stay in the code, but I dont like generating a uuid here and setting it as an id, if opensearch probably does that more performantly

Copy link
Collaborator

@mhoff mhoff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks for your work. Here the few comments we already discussed

async for success, item in helpers.async_streaming_bulk(client, actions, **kwargs): # type: ignore
if index >= len(events):
break
async for success, item in helpers.async_streaming_bulk(client, actions, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            # "queue_size": self.config.queue_size,
            # "thread_count": self.config.thread_count,

Please remove and describe that it's not used in the docs

Comment on lines 339 to 345
# parallel_bulk often returned item that allowed item.get("_op_type")
# streaming_bulk usually returns {"index": {...}} / {"create": {...}}
op_type = item.get("_op_type") if isinstance(item, dict) else None
if not op_type and isinstance(item, dict) and item:
op_type = self.config.default_op_type
if "_op_type" in item:
op_type = item["_op_type"]
elif isinstance(item, dict):
op_type = next(iter(item.keys()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please simplify this code as we are only using the async_streaming_bulk interface right now and don't need the downwards compatibility

error_info = (
item.get(op_type, {}) if isinstance(item.get(op_type), dict) else {}
)
if op_type in item and isinstance(item[op_type], dict):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can statically assume item to be a dict which can simplify this code quite a bit

async for success, item in helpers.async_streaming_bulk(client, actions, **kwargs): # type: ignore
if index >= len(events):
break
async for success, item in helpers.async_streaming_bulk(client, actions, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a follow-up ticket for us that we might want send the chunks concurrently in the future, depending on where we identify actual performance bottlenecks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants