This project provides a Benthos configuration that acts as a dynamic orchestrator. It listens for control messages on a GCP Pub/Sub subscription and, based on each message, generates and launches a new, separate Benthos instance to perform a data processing task (e.g., Kafka to Pub/Sub).
This serves as a foundational pattern for scenarios where Benthos pipelines need to be created and managed dynamically using Benthos itself.
The generated job is run as a benthos streams process, in a separate process, which is running in parallel to the controller. We can call this the benthos worker.
- Control Message Reception: The main orchestrator Benthos instance listens to a GCP Pub/Sub subscription (e.g.,
benthos-control-sub). - Dynamic Configuration Generation:
- Upon receiving a control message (JSON payload), it uses one of the Bloblang template to construct a complete Benthos YAML configuration string.
- This generated YAML defines a new Benthos pipeline that includes:
- A source (e.g., Kafka) to consume messages.
- A processor that applies a transformation based on the control message's
datafield. - A sink (e.g., GCP Pub/Sub) to publish processed messages.
- The control message can also include a custom Bloblang mapping string in its
datafield, which will be embedded into the generated pipeline for data transformation.
- Stream Launch:
- The generated YAML is POST-ed to the streams http server.
- A Benthos process is launched in the background within the streams benthos server
- Acknowledgement: Once the streams process is completed, the control message is acknowledged.
config.yaml: The configuration for the main orchestrator Benthos instance.blob_configs: The Bloblang templates used to generate configurations for the dynamically launched Benthos instances.
- Benthos: Installed and accessible in your PATH. (Version 4.x recommended). You can use the standalone
benthosbinary or tools that bundle it, likerpk. - GCP Project:
- A Pub/Sub topic for control messages.
- A Pub/Sub subscription for the orchestrator to listen to (e.g.,
benthos-control-sub). - Appropriate GCP credentials configured for Benthos (e.g., via
GOOGLE_APPLICATION_CREDENTIALSenvironment variable).
- Required resources for the requested job (e.g., Kafka topic, GCP Pub/Sub topic) must exist and be accessible by the dynamically launched Benthos instances.
- Ensure all prerequisites are met and environment variables are set.
- Place
config.yaml,blob_configsandobservability.yamlin the same directory. - Navigate to that directory and run:
- Using
rpk(Redpanda's tool, which bundles Benthos):rpk connect run config.yaml
- Or, using a standalone Benthos binary:
benthos -c benthos.yaml
- Using
- Open a separate shell and run (to run worker process):
rpk connect streams -o observability.yaml
Publish a JSON message to the GCP Pub/Sub topic that the orchestrator is subscribed to (e.g., projects/<YourProject>/topics/your-control-topic).
Example Control Message Payload:
{
"kafka_topic": "source-events-main",
"publish_project": "<YourGCPProjectID>",
"publish_topic": "processed-events-destination",
"consumer_name": "dynamic-consumer-group-main",
"client_id": "my-dynamic-benthos-1",
"benthos_meta_info": "ANY META DATA YOU WANT IN LOGS",
"data":"{\"data\":{\"userId\":this.distinct_id.string(),\"key1\":\"val1\",\"key1\":\"val1\",\"metaInfo\":{\"abc\":\"23432\",\"id\":234}}}"
}Output:
{
"data":{
"metaInfo":{
"abc":"23432",
"id":234
},
"key1":"val1",
"key2":"val2",
"userId":"841622982"
}
}The data field within your JSON control message is special. Its value must be a JSON string that itself represents the body of a Bloblang mapping. This mapping will be applied by the dynamically launched Benthos instance to each message it processes from its source (e.g., Kafka).
The orchestrator's dynamic_k2p_config.blobl takes this string and effectively does:
root.pipeline.processors = root.pipeline.processors.append({ "mapping": "root = " + this.data })
(Where this.data is the string you provide in the control message).
How to Create the data String:
- Define Your Target JSON Structure: Decide what the output JSON from the dynamic Benthos instance should look like for each processed message.
- Embed Bloblang Expressions for Dynamic Values: For parts of your target JSON that need to come from the source message (e.g., a field from a Kafka message), use Bloblang expressions.
this.field_name: Accesses a field from the source message..string(): Ensures the value is treated as a string..number(): Ensures the value is treated as a number.meta("kafka_key"): Accesses Kafka message metadata.
- Format as a String Literal: The entire structure, including Bloblang expressions, must be formatted as a single JSON string value for the
datakey.
Example:
Let's say your source Kafka messages look like:
{
"distinct_id": "841622982",
"event_name": "page_view",
"properties": {
"page_url": "/home"
}
}And you want to transform it to:
{
"userId": "841622982",
"key1": "val1",
"key2": "val2",
"metaInfo": {
"abc": "23432",
"id": 234
}
}The data field in your control message would look like:
{
"data": "{\"data\":{\"userId\":this.distinct_id.string(),\"key1\":\"val1\",\"key2\":\"val2\",\"metaInfo\":{\"abc\":\"23432\",\"id\":234}}}"
}While the current subprocess-based approach is functional, for a more robust and production-ready orchestrator, we'll move to using the Benthos Streams API. This transition will provide several advantages, including better lifecycle management, observability, and resource efficiency.:
Transition to Benthos Streams API:
Why this is the most impactful change:
The Benthos HTTP API (specifically the /streams endpoint) allows for creating, managing (reading status, updating), and deleting Benthos streams (pipelines) within a single, long-running Benthos process.
Key Benefits:
- Eliminates Subprocesses: No more
nohup benthos ... &. This significantly simplifies process management and reduces system overhead. - No Temporary Files: Configurations are sent directly via the API, removing the need for
/tmp/config_temp.yamland associated race condition concerns (even if currently mitigated bymax_outstanding_messages: 1). - Centralized Observability: Logs and metrics from all dynamically created streams can be managed and exposed by the main orchestrator Benthos instance, providing a unified view.
- Lifecycle Management: The Streams API provides endpoints to
CREATEstreams, check their status (GET /streams/{id}which shows uptime and activity, allowing inference of completion for finite inputs likeread_until), update (PUT /streams/{id}), and delete (DELETE /streams/{id}) dynamic pipelines gracefully. This enables programmatic polling of stream status and robust lifecycle control. - Resource Efficiency: Running multiple stream configurations within one Benthos process is generally more resource-efficient than spawning many individual Benthos processes.
How it would work:
- The orchestrator's main Benthos pipeline would change. Instead of
commandprocessors to write a file and launch a subprocess, it would use anhttp_clientprocessor (orhttpoutput). - This
http_clientwould makePOST /streams/{id}requests to its own Benthos API endpoint (e.g.,http://localhost:4195/streams/{dynamic_stream_id}), assuming the main orchestrator's HTTP server is enabled. - The body of the HTTP
POSTrequest would be the dynamically generated Benthos configuration. Crucially, thedynamic_k2p_config.blobl(or a similar Bloblang mapping used for this purpose) would need to be adjusted to output a Benthos configuration in JSON format, as the Streams API endpoint expects JSON, not YAML.
Contributions are welcome! Please open an issue to discuss any changes or new features before submitting a pull request. Focus areas include implementing the "Future Enhancements" listed above, especially the transition to the Benthos Streams API.
This project is licensed under the MIT License.