Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ml_inference processor for offline batch inference #5507

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

Zhangxunmt
Copy link

@Zhangxunmt Zhangxunmt commented Mar 6, 2025

Description

Adding a new ml_inference processor to interact with ml-commons plugin in OpenSearch for ML related applications.

Some examples that work well:

ml-batch-job-sagemaker-pipeline:
  source:
    s3:
      codec:
        ndjson:
      compression: none
      aws:
        region: "us-east-1"
      default_bucket_owner: <your aws account>
      scan:
        scheduling:
          interval: PT6M
        buckets:
          - bucket:
              name: "<your bucket>"
              data_selection: metadata_only
              filter:
                include_prefix:
                  - <your input prefix>
          - bucket:
              name: "offlinebatch"
              data_selection: data_only
              filter:
                include_prefix: 
                  - <your output prefix>

  buffer:
    bounded_blocking:
      buffer_size: 2048 # max number of records the buffer accepts
      batch_size: 512 # max number of records the buffer drains after each read

  processor:
    - ml_inference:
        host: "<your AOS endpoint url>"
        aws_sigv4: true
        action_type: "batch_predict"
        service_name: "sagemaker|bedrock"
        model_id: "<your model id used in vector search>"
        output_path: "s3://offlinebatch/sagemaker/output"
        aws:
          region: "us-east-1"
        ml_when: /bucket == "offlinebatch"
    - copy_values:
        entries:
          - to_key: chapter
            from_key: /content/0
          - to_key: title
            from_key: /content/1
          - to_key: chapter_embedding
            from_key: /SageMakerOutput/0
          - to_key: title_embedding
            from_key: /SageMakerOutput/1
    - delete_entries:
        with_keys: [content, SageMakerOutput]
  
  route:
      - ml-ingest-route: "/chapter != null and /title != null"

  sink:
    - opensearch:
        hosts: ["<your AOS endpoint url>"]
        aws_sigv4: true
        index: "test-nlp-index"
        routes: [ml-ingest-route]
        username: "<you username>"
        password: "<your password>"
        
ml-batch-job-bedrock-pipeline:
  source:
    s3:
      codec:
        ndjson:
      compression: none
      aws:
        region: "us-east-1"
      default_bucket_owner: <your aws account>
      scan:
        scheduling:
          interval: PT2M
        buckets:
          - bucket:
              name: "<your bucket>"
              data_selection: metadata_only
              filter:
                include_prefix:
                  - bedrock-multisource/my_batch
                exclude_suffix:
                  - .out
          - bucket:
              name: "<your bucket>"
              data_selection: data_only
              filter:
                include_prefix:
                  - bedrock-multisource/output-multisource/
                exclude_suffix:
                  - manifest.json.out

  buffer:
    bounded_blocking:
      buffer_size: 2048 # max number of records the buffer accepts
      batch_size: 512 # max number of records the buffer drains after each read

  processor:
    - ml_inference:
        host: "<your AOS endpoint url>"
        aws_sigv4: true
        action_type: "batch_predict"
        service_name: "bedrock"
        model_id: "<your model id used in vector search>"
        output_path: "s3://offlinebatch/bedrock-multisource/output-multisource/"
        aws:
          region: "us-east-1"
        ml_when: /bucket == "offlinebatch"
    - copy_values:
        entries:
          - to_key: chapter
            from_key: /modelInput/inputText
          - to_key: chapter_embedding
            from_key: /modelOutput/embedding
    - delete_entries:
        with_keys: [modelInput, modelOutput, recordId, s3]

  route:
      - ml-ingest-route: "/chapter != null and /chapter_embedding != null"

  sink:
    - opensearch:
        hosts: ["<your AOS endpoint url>"]
        aws_sigv4: true
        index: "my-nlp-index-bedrock"
        routes: [ml-ingest-route]

Issues Resolved

#5470
#5433
#5509

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

private String findCommonPrefix(Collection<Record<Event>> records) {
List<String> keys = new ArrayList<>();
for (Record<Event> record : records) {
keys.add(record.getData().getJsonNode().get("key").asText());
Copy link
Member

@dlvenable dlvenable Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to have a very rigid expectation on having a key named key. If you need this, it should be configurable by users.

Also, prefer:

record.getData().get(eventKey, String.class)

You can get an EventKey in the constructor. See #4636 for an example of how you can use this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the "input_key" to allow CX to configure the key.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for using the EventKey. I'm still unsure what input_key is doing though. What does this represent exactly? Are these the keys for obtaining the bucket name and the S3 key?

We should have more concrete names for the configuration. For example: bucket_key makes sense. But, that would also tend toward key_key which is confusing!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input_key here represents the S3 file uri. Currently the S3 scan in meta data mode sends data like this
{"bucket":"offlinebatch","length":158893,"time":1736279088.000000000,"key":"bedrock-multisource/my_batch.jsonl"}
So the input_key would be "key" in this S3 Scan mode, and that's why it's listed as "input_key: key" in the pipeline yaml file. I think it makes sense to change to "object_key"? Basically that defines the key name in the json record which has the S3 uri to be processed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the S3 Scan uses the "key" as the name of the S3Uri from the metadata scan, I use "key" as the default field to read S3Uris if this input_key is not provided in the pipeline. Otherwise, it uses the EventKey to read it from the input_key. Please let me know if you'd want a different name instead of input_key.

@dlvenable
Copy link
Member

@Zhangxunmt , Thank you for this great processor!

We will also need some unit tests. I'm ok accepting this PR without them as long as we have the @Experimental annotation.

@Zhangxunmt
Copy link
Author

Zhangxunmt commented Mar 12, 2025

@Zhangxunmt , Thank you for this great processor!

We will also need some unit tests. I'm ok accepting this PR without them as long as we have the @Experimental annotation.

dlvenable Thanks David for the comments. Looks like there're no major concerns. I will add the remaining UTs soon and the @experimental annotation.

@Zhangxunmt Zhangxunmt force-pushed the main branch 5 times, most recently from 0b384c9 to 9ce8a77 Compare March 24, 2025 22:26
@Zhangxunmt Zhangxunmt force-pushed the main branch 3 times, most recently from 5ae7861 to c670ca5 Compare March 25, 2025 18:28
@Zhangxunmt Zhangxunmt changed the title add ml processor for offline batch inference add ml-inference processor for offline batch inference Mar 25, 2025
@Zhangxunmt Zhangxunmt changed the title add ml-inference processor for offline batch inference add ml_inference processor for offline batch inference Mar 25, 2025
@Zhangxunmt Zhangxunmt force-pushed the main branch 3 times, most recently from d0aa269 to 0108602 Compare March 25, 2025 20:00
mlBatchJobCreator.createMLBatchJob(recordsToMlCommons);
numberOfMLProcessorSuccessCounter.increment();
} catch (Exception e) {
LOG.error(NOISY, e.getMessage(), e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still unresolved.

try {
mlBatchJobCreator.createMLBatchJob(recordsToMlCommons);
numberOfMLProcessorSuccessCounter.increment();
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still unresolved.

private String findCommonPrefix(Collection<Record<Event>> records) {
List<String> keys = new ArrayList<>();
for (Record<Event> record : records) {
keys.add(record.getData().getJsonNode().get("key").asText());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for using the EventKey. I'm still unsure what input_key is doing though. What does this represent exactly? Are these the keys for obtaining the bucket name and the S3 key?

We should have more concrete names for the configuration. For example: bucket_key makes sense. But, that would also tend toward key_key which is confusing!

@Zhangxunmt
Copy link
Author

Zhangxunmt commented Mar 31, 2025

@dlvenable Please review the latest commit for the updates to requested changes, after a rebase to the main. The Gradle Builds somehow fail due to unrelated tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants