Skip to content

Commit 8a3439e

Browse files
committed
dekaf: Fix connector_tags for Dekaf
This got broken in #2018. It was throwing errors like this: ``` agent::connector_tags: connector Spec RPC failed error=parsing local config Caused by: missing field `variant` at line 1 column 56 image=ghcr.io/estuary/dekaf-generic:v1 ``` Now that we don't bypass `Runtime::serve_materialize` for Dekaf anymore, we need to pass a valid `DekafConfig`.
1 parent 375da01 commit 8a3439e

File tree

1 file changed

+16
-5
lines changed

1 file changed

+16
-5
lines changed

Diff for: crates/agent/src/connector_tags.rs

+16-5
Original file line numberDiff line numberDiff line change
@@ -253,17 +253,28 @@ async fn spec_materialization(
253253
) -> anyhow::Result<ConnectorSpec> {
254254
use proto_flow::materialize;
255255

256-
let connector_type = if image.starts_with(models::DEKAF_IMAGE_NAME_PREFIX) {
257-
flow::materialization_spec::ConnectorType::Dekaf as i32
256+
let (connector_type, config_json) = if image.starts_with(models::DEKAF_IMAGE_NAME_PREFIX) {
257+
let variant = &image
258+
[models::DEKAF_IMAGE_NAME_PREFIX.len()..image.len() - models::DEKAF_IMAGE_TAG.len()];
259+
260+
(
261+
flow::materialization_spec::ConnectorType::Dekaf as i32,
262+
serde_json::to_string(
263+
&serde_json::json!({"variant": variant.to_string(), "config": {}}),
264+
)
265+
.unwrap(),
266+
)
258267
} else {
259-
flow::materialization_spec::ConnectorType::Image as i32
268+
(
269+
flow::materialization_spec::ConnectorType::Image as i32,
270+
serde_json::to_string(&serde_json::json!({"image": image, "config": {}})).unwrap(),
271+
)
260272
};
261273

262274
let req = materialize::Request {
263275
spec: Some(materialize::request::Spec {
264276
connector_type,
265-
config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}}))
266-
.unwrap(),
277+
config_json,
267278
}),
268279
..Default::default()
269280
};

0 commit comments

Comments
 (0)