Skip to content

Added parsing otel json trace to span from kafka topic #5590

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

Mamol27
Copy link

@Mamol27 Mamol27 commented Apr 9, 2025

Description

Parsing of OpenTelemetry trace in JSON format into Span has been implemented to support trace handling via Kafka.
We had to implement it, as the client requires all data to be sent exclusively through Kafka.
To implement this, a new MessageFormat was added.

Issues Resolved

Resolves #5446

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@kkondaka
Copy link
Collaborator

@Mamol27 Thank you for your submission. Wondering if it makes sense to use "JSON" message format (because the input is in json) and then have something like "codec" which would convert JSON to OTEL trace.
Secondly, I am not sure if you need to have JsonOtelTracePojo.java That should be default behavior when using OTelProtoStandardCodec..OTelProtoEncoder.convertToResourceSpans

Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, there is an easier way leveraging the protobuf JsonFormat. This can lead to more code reusage in the mapping of the OTel data. This would be desirable to the project, as it has a lower maintenance effort.

I can only recommend to use the protobuf binary representation to store in Kafka over JSON. There is a much higher throughput in serialization and deserialization of protobuf over JSON.

Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for providing this change. This PR contains a lot of whitespace changes. Please check your code formatter configuration.

@Mamol27
Copy link
Author

Mamol27 commented Apr 15, 2025

Thanks for providing this change. This PR contains a lot of whitespace changes. Please check your code formatter configuration.

Where is reference to code formatting settings for your project?

@KarstenSchnitter
Copy link
Collaborator

KarstenSchnitter commented Apr 15, 2025

Thanks for providing this change. This PR contains a lot of whitespace changes. Please check your code formatter configuration.

Where is reference to code formatting settings for your project?

Have a look at the formatting section in the developer guide.

@Mamol27
Copy link
Author

Mamol27 commented Apr 24, 2025

@KarstenSchnitter Hi, could you help me? I ran the IT tests in KafkaBufferIT on my local machine, and they all passed except for two:

read_decrypts_data_from_the_predefined_key and write_and_read_encrypted

Both of those failed with:

Condition with lambda expression in org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper that uses [Ljava.util.Map$Entry;, [Ljava.util.Map$Entry;org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBuffer was not fulfilled within 30 seconds.
org.awaitility.core.ConditionTimeoutException: Condition with lambda expression in org.opensearch.dataprepper.plugins.kafka.buffer.ReadBufferHelper that uses [Ljava.util.Map$Entry;, [Ljava.util.Map$Entry;org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBuffer was not fulfilled within 30 seconds.

Could you advise what I should do? This isn’t the same error I see in GitHub Actions.

@KarstenSchnitter
Copy link
Collaborator

I have no idea, what might cause this issue. Hopefully one of the other maintainers can help with that.

Mamol27 and others added 12 commits May 6, 2025 17:24
…-project#5516)

* ENH: support default credentials in dynamic transformation

Signed-off-by: George Chen <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…oject#5508)

* Jackson Log/Metric/Span classes for OTEL standard specification compliant implementation

Signed-off-by: Krishna Kondaka <[email protected]>

* Added tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing tests. Addressed comments

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…earch-project#5470)

Add option to process only metadata of objects in S3 scan mode

Signed-off-by: Kondaka <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…pensearch-project#5528)

* FIX: invalid Jakarta annotation on enum type

Signed-off-by: George Chen <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…pensearch-project#5528)

* FIX: invalid Jakarta annotation on enum type

Signed-off-by: George Chen <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…pensearch-project#5502)

Change set allows using the kafka.schema.registry_url setting to set
non-standard endpoints when using a mock or fake AWS glue registry
(opensearch-project#5377).

Signed-off-by: Shane Schisler <[email protected]>
Co-authored-by: Shane Schisler <[email protected]>
Signed-off-by: mamol27 <[email protected]>
* FIX: data type of type in actions

Signed-off-by: George Chen <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…epper (opensearch-project#5524)

* Add support for generating Standard(OTEL) conformant events in DataPrepper

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: mamol27 <[email protected]>
kkondaka and others added 21 commits May 6, 2025 17:24
* Fix for Issue 5531

Signed-off-by: Kondaka <[email protected]>

* Added unit test

Signed-off-by: Kondaka <[email protected]>

---------

Signed-off-by: Kondaka <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…pensearch-project#5537)

* ENH: use default credentials for aws glue

Signed-off-by: George Chen <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…5540)

* Add circuit breaker support for lambda processor

Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…ltiple times. The recent addition of the PipelineRunner was getting the full list of processors from the Pipeline. This includes multiple instances of processors which caused them to be invoked multiple times. The solution is to use the list of Processors provided to any given PipelineRunnerImpl instance. For now, this also disables the support for SupportsPipelineRunner buffers in order to get the fix out now. Additionally, this improves the timing of the Connected_SingleExtraSinkIT test since this appears to be flaky locally for me. (opensearch-project#5545)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…te arrays when creating the request for the Lambda function. This uses the "unsafe" methods on SdkBytes. However, these are fine here because the byte[] is not accessible anywhere else for modification. This additionally adds some new unit tests to verify the actual behavior of the InMemoryBuffer as well. (opensearch-project#5547)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…te arrays when handling the response from the Lambda function. This uses the existing InputStream from SdkBytes. It also converts the "null" string to UTF-8 once to perform byte[] comparison on the response. (opensearch-project#5548)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: mamol27 <[email protected]>
… their dependencies which may lead to inconsistent dependencies in testing than runtime. This updates these to use the dependencies as used by the project-at-large. (opensearch-project#5518)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…can be deployed with all the necessary Gatling dependencies and run stand-alone. (opensearch-project#5552)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…dition or bulk request/operation failures (opensearch-project#5525)

Signed-off-by: Taylor Gray <[email protected]>
Signed-off-by: mamol27 <[email protected]>
* Add DLQ support to CloudWatchLogs sink

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…oject#5507)

Add ml processor for offline batch inference

Signed-off-by: Xun Zhang <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…#5539)

* Support trace groups in standard otel proto codec

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments. Verified the functionality for standard logs/traces/metrics

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Added string mappings for resource and scope

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: mamol27 <[email protected]>
…pensearch-project#5541)

feat: Add configurable stream read constraints for JSON input codec

- Introduced a configurable option for setting the maximum event length in the JSON input codec.
- Updated JsonDecoder to accept max_event_length parameter and apply it to StreamReadConstraints.
- Added validation to ensure the maximum event length is within acceptable limits.
Fixes opensearch-project#5466

* Set maxEventLength default to null for safer usage.
* Set min value of maxEventLength to Jackson's default

Signed-off-by: Pallempati Saketh <[email protected]>
Signed-off-by: mamol27 <[email protected]>
Signed-off-by: mamol27 <[email protected]>
Signed-off-by: mamol27 <[email protected]>
Signed-off-by: mamol27 <[email protected]>
Signed-off-by: mamol27 <[email protected]>
@Mamol27 Mamol27 force-pushed the parsing-otel-json-trace-kafka branch from 1efcb2c to da5fa63 Compare May 6, 2025 14:24
@Mamol27 Mamol27 closed this May 6, 2025
@Mamol27 Mamol27 deleted the parsing-otel-json-trace-kafka branch May 6, 2025 15:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

OTel Trace from kafka source with otel_trace_raw