Skip to content

[Bug]: YAML Transform Provider with AnomalyDetection #35788

@charlespnh

Description

@charlespnh

What happened?

I have a custom anomaly detection model trained using the PyOD library, and I want to use it as part of my YAML pipeline. Beam's anomaly detection module offers a way to use a custom offline model (see this example notebook )

I implemented my own transform following that example, and exposed it through Provider:

import apache_beam as beam
from apache_beam.ml.anomaly.detectors.pyod_adapter import PyODFactory
from apache_beam.ml.anomaly.transforms import AnomalyDetection


class KNN(beam.PTransform):
  def __init__(self, model_artifact_path):
    self.model_artifact_path = model_artifact_path
    self.model = PyODFactory.create_detector(
        self.model_artifact_path,
        model_id="knn",
    )

  def expand(self, pcoll):
    return (
        pcoll
        | beam.Map(lambda x: x.embedding)
        | AnomalyDetection(detector=self.model)
        | beam.Map(lambda x: beam.Row(
            example=x.example,
            predictions=[pred.__dict__ for pred in x.predictions]))
    )

The following YAML pipeline:

pipeline:
  type: chain
  transforms:
    - type: ReadFromBigQuery
      name: ReadFromBigQuery
      config:
        table: "apache-beam-testing.charlesnguyen.test"
        fields: [embedding]

    - type: KNN
      name: KNN
      config:
        model_artifact_path: "gs://apache-beam-testing-charlesng/batch-log-analysis/knn_model.pkl"
#        model_artifact_path: ./knn_model.pkl

    - type: LogForTesting


providers:
  - type: pythonPackage
    config:
      packages:
        - ./dist/transform_provider-0.1.0.tar.gz
    transforms:
      KNN: "transform_provider.KNN"

options:
  yaml_experimental_features: [ 'ML' ]

... gives the error:

  File "/Users/charlesnguyen/Code/beam/sdks/python/apache_beam/runners/runner.py", line 195, in apply_PTransform
    return transform.expand(input)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/charlesnguyen/Code/beam/sdks/python/apache_beam/transforms/external.py", line 824, in expand
    response = service.Expand(request)
               ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/charlesnguyen/.virtualenvs/env/lib/python3.11/site-packages/grpc/_channel.py", line 1181, in __call__
    return _end_unary_response_blocking(state, call, False, None)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/charlesnguyen/.virtualenvs/env/lib/python3.11/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Socket closed"                                                                                                                                             debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Socket closed", grpc_status:14, created_time:"2025-08-05T14:59:02.47977-04:00"}"
>

But if I have the knn_model.pkl model locally and specify model_artifact_path argument with the local path to the model instead of gcs path, then it works.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions