Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions custom-transforms-example/config/topology.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ sources:
listen_addr: "127.0.0.1:6379"
chain:
- ValkeyGetRewrite:
name: "valkey-get-rewrite"
result: "Rewritten value"
- ValkeySinkSingle:
name: "valkey-sink"
remote_address: "127.0.0.1:1111"
connect_timeout_ms: 3000
11 changes: 11 additions & 0 deletions custom-transforms-example/src/valkey_get_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ use shotover::transforms::{DownChainProtocol, TransformContextBuilder, UpChainPr
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct ValkeyGetRewriteConfig {
pub name: String,
pub result: String,
}

const NAME: &str = "ValkeyGetRewrite";
#[typetag::serde(name = "ValkeyGetRewrite")]
#[async_trait(?Send)]
impl TransformConfig for ValkeyGetRewriteConfig {
fn get_name(&self) -> &str {
&self.name
}

async fn get_builder(
&self,
_transform_context: TransformContextConfig,
Expand All @@ -34,6 +39,12 @@ impl TransformConfig for ValkeyGetRewriteConfig {
fn down_chain_protocol(&self) -> DownChainProtocol {
DownChainProtocol::SameAsUpChain
}

fn get_sub_chain_configs(
&self,
) -> Vec<(&shotover::config::chain::TransformChainConfig, String)> {
vec![]
}
}

pub struct ValkeyGetRewriteBuilder {
Expand Down
65 changes: 52 additions & 13 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,32 @@ Debug transforms can be temporarily used to test how your Shotover configuration

Future transforms won't be added to the public API while in alpha. But in these early days we have chosen to publish these alpha transforms to demonstrate the direction we want to take the project.

## Transform Naming

Every transform in the topology configuration must have a user-provided `name` field. This name is used in logging, metrics, and error messages to identify specific transforms.

```yaml
chain:
- DebugPrinter:
name: "print-inbound"
```

Sub-chains inside transforms are configured as a bare list. The chain name is derived from the transform's `name` (ParallelMap appends an index).

Tee's `SubchainOnMismatch.name` defines the mismatch chain name (and is validated for uniqueness) instead of deriving it from the parent transform name.

```yaml
chain:
- Tee:
name: "tee-primary"
behavior:
SubchainOnMismatch:
name: "mismatch-chain"
chain:
- NullSink:
name: "mismatch-sink"
```

## Transforms

| Transform | Terminating | Implementation Status |
Expand Down Expand Up @@ -374,14 +400,16 @@ This transform emits a metrics [counter](user-guide/observability.md#counter) na
This transform will drop any messages it receives and return an empty response.

```yaml
- NullSink
- NullSink:
name: "null-sink"
```

### ParallelMap

This transform will send messages in a single batch in parallel across multiple instances of the chain.

If we have a parallelism of 3 then we would have 3 instances of the chain: C1, C2, C3. If the batch then contains messages M1, M2, M3, M4. Then the messages would be sent as follows:
The `parallelism` field determines how many instances of the chain are created. The parallel chain instances are named using the transform's `name` — at runtime they are named `{name}[0]`, `{name}[1]`, etc.
If we have parallelism of 3 then we would have 3 instances of the chain: C1, C2, C3. If the batch then contains messages M1, M2, M3, M4. Then the messages would be sent as follows:

* M1 would be sent to C1
* M2 would be sent to C2
Expand All @@ -390,16 +418,19 @@ If we have a parallelism of 3 then we would have 3 instances of the chain: C1, C

```yaml
- ParallelMap:
# Number of duplicate chains to send messages through.
parallelism: 1
name: "my-parallel-map"
# Number of parallel chain instances (named "my-parallel-map[0]", "my-parallel-map[1]").
parallelism: 2
# if true then responses will be returned in the same as order as the queries went out.
# if it is false then response may return in any order.
ordered_results: true
# The chain that messages are sent through
chain:
- QueryCounter:
name: "DR chain"
name: "query-counter"
counter_name: "DR chain"
- ValkeySinkSingle:
name: "valkey-sink"
remote_address: "127.0.0.1:6379"
connect_timeout_ms: 3000
```
Expand All @@ -411,11 +442,12 @@ The log can be accessed via the [Shotover metrics](user-guide/configuration.md#o

```yaml
- QueryCounter:
# this name will be logged with the query count
name: "DR chain"
name: "query-counter"
# this counter_name will be logged with the query count
counter_name: "DR chain"
```

This transform emits a metrics [counter](user-guide/observability.md#counter) named `query_count` with the label `name` defined as the name from the config, in the example it will be `DR chain`.
This transform emits a metrics [counter](user-guide/observability.md#counter) named `query_count` with the label `name` defined as the `counter_name` from the config, in the example it will be `DR chain`.

### QueryTypeFilter

Expand Down Expand Up @@ -560,6 +592,7 @@ Tee also exposes an optional HTTP API to switch which chain to use as the "resul

```yaml
- Tee:
name: "my-tee"
# Ignore responses returned by the sub chain
behavior: Ignore

Expand All @@ -574,11 +607,15 @@ Tee also exposes an optional HTTP API to switch which chain to use as the "resul
# If the responses returned by the sub chain do not equal the responses returned by down-chain,
# then the original message is also sent down the SubchainOnMismatch sub chain.
# This is useful for logging failed messages.
# behavior:
# behavior:
# SubchainOnMismatch:
# - QueryTypeFilter:
# DenyList: [Read]
# - NullSink
# name: "mismatch-chain"
# chain:
# - QueryTypeFilter:
# name: "mismatch-filter"
# DenyList: [Read]
# - NullSink:
# name: "mismatch-sink"

# The port that the HTTP API will listen on.
# When this field is not provided the HTTP API will not be run.
Expand All @@ -592,8 +629,10 @@ Tee also exposes an optional HTTP API to switch which chain to use as the "resul
# The sub chain to send duplicate messages through
chain:
- QueryTypeFilter:
name: "tee-filter"
DenyList: [Read]
- NullSink
- NullSink:
name: "tee-sink"
```

This transform emits a metrics [counter](user-guide/observability.md#counter) named `tee_dropped_messages` and the label `chain` as `Tee`.
Expand Down
7 changes: 5 additions & 2 deletions shotover-proxy/benches/windsock/cassandra/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,17 +403,19 @@ impl CassandraBench {
}

fn generate_topology_yaml(&self, host_address: String, cassandra_address: String) -> String {
let mut transforms = vec![];
let mut transforms: Vec<Box<dyn TransformConfig>> = vec![];
if let Shotover::ForcedMessageParsed = self.shotover {
transforms.push(Box::new(DebugForceEncodeConfig {
name: "debug-force-encode".to_string(),
encode_requests: true,
encode_responses: true,
}) as Box<dyn TransformConfig>);
}));
}

match self.topology {
CassandraTopology::Cluster3 => {
transforms.push(Box::new(CassandraSinkClusterConfig {
name: "cassandra-sink-cluster".to_string(),
first_contact_points: vec![cassandra_address],
tls: None,
connect_timeout_ms: 3000,
Expand All @@ -429,6 +431,7 @@ impl CassandraBench {
}
CassandraTopology::Single => {
transforms.push(Box::new(CassandraSinkSingleConfig {
name: "cassandra-sink-single".to_string(),
address: cassandra_address,
tls: None,
connect_timeout_ms: 3000,
Expand Down
7 changes: 5 additions & 2 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,25 @@ impl KafkaBench {
}

fn generate_topology_yaml(&self, host_address: String, kafka_address: String) -> String {
let mut transforms = vec![];
let mut transforms: Vec<Box<dyn TransformConfig>> = vec![];
if let Shotover::ForcedMessageParsed = self.shotover {
transforms.push(Box::new(DebugForceEncodeConfig {
name: "debug-force-encode".to_string(),
encode_requests: true,
encode_responses: true,
}) as Box<dyn TransformConfig>);
}));
}

transforms.push(match self.topology {
KafkaTopology::Single => Box::new(KafkaSinkSingleConfig {
name: "kafka-sink-single".to_string(),
destination_port: 9192,
connect_timeout_ms: 3000,
read_timeout: None,
tls: None,
}),
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => Box::new(KafkaSinkClusterConfig {
name: "kafka-sink-cluster".to_string(),
connect_timeout_ms: 3000,
read_timeout: None,
check_shotover_peers_delay_ms: Some(3000),
Expand Down
7 changes: 5 additions & 2 deletions shotover-proxy/benches/windsock/valkey/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,19 @@ impl ValkeyBench {
Encryption::None => None,
};

let mut transforms = vec![];
let mut transforms: Vec<Box<dyn TransformConfig>> = vec![];
if let Shotover::ForcedMessageParsed = self.shotover {
transforms.push(Box::new(DebugForceEncodeConfig {
name: "debug-force-encode".to_string(),
encode_requests: true,
encode_responses: true,
}) as Box<dyn TransformConfig>);
}));
}

match self.topology {
ValkeyTopology::Cluster3 => {
transforms.push(Box::new(ValkeySinkClusterConfig {
name: "valkey-sink-cluster".to_string(),
first_contact_points: vec![valkey_address],
direct_destination: None,
tls: tls_connector,
Expand All @@ -123,6 +125,7 @@ impl ValkeyBench {
}
ValkeyTopology::Single => {
transforms.push(Box::new(ValkeySinkSingleConfig {
name: "valkey-sink-single".to_string(),
address: valkey_address,
tls: tls_connector,
connect_timeout_ms: 3000,
Expand Down
6 changes: 4 additions & 2 deletions shotover-proxy/config/topology.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ sources:
# A DebugPrinter transform, reports an INFO log for every message that passes through this transform.
# You should delete this transform and add as many other transforms in this chain as you need.
# For a list of possible transforms: https://shotover.io/docs/latest/transforms/#transforms-1
- DebugPrinter
- DebugPrinter:
name: "debug-printer"

# A NullSink transform, drops all messages it receives.
# You will want to replace this with a sink transform to send the message to a database.
# For a list of possible transforms: https://shotover.io/docs/latest/transforms/#transforms-1
- NullSink
- NullSink:
name: "null-sink"
4 changes: 2 additions & 2 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn passthrough_cassandra_down() {
format!("Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed.

Caused by:
0: CassandraSinkSingle transform failed
0: cassandra-sink-single transform failed (chain: cassandra)
1: Failed to connect to destination 127.0.0.1:9043
2: Connection refused (os error {CONNECTION_REFUSED_OS_ERROR})"
));
Expand All @@ -170,7 +170,7 @@ Caused by:

Caused by:
0: Chain failed to send and/or receive messages, the connection will now be closed.
1: CassandraSinkSingle transform failed
1: cassandra-sink-single transform failed (chain: cassandra)
2: Failed to connect to destination 127.0.0.1:9043
3: Connection refused (os error {CONNECTION_REFUSED_OS_ERROR})"#,
))
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ async fn cluster_sasl_scram_over_mtls_nodejs_and_python() {
r"connection was unexpectedly terminated\s+",
r"Caused by:\s+",
r"0: Chain failed to send and/or receive messages, the connection will now be closed\.\s+",
r"1: KafkaSinkCluster transform failed\s+",
r"1: kafka-sink-cluster transform failed \(chain: kafka\)\s+",
r"2: Failed to receive responses \(without sending requests\)\s+",
r"3: Outgoing connection had pending requests, those requests/responses are lost so connection recovery cannot be attempted\.\s+",
r"4: Failed to receive from ControlConnection\s+",
Expand Down
16 changes: 8 additions & 8 deletions shotover-proxy/tests/runner/runner_int_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Caused by:
Topology errors
valkey source:
valkey chain:
Non-terminating transform \"DebugPrinter\" is last in chain. Last transform must be terminating.
Non-terminating transform \"debug-printer\" is last in chain. Last transform must be terminating.
")])
.await;
}
Expand All @@ -110,7 +110,7 @@ Caused by:
Topology errors
valkey source:
valkey chain:
Terminating transform \"NullSink\" is not last in chain. Terminating transform must be last in chain.
Terminating transform \"null-sink-1\" is not last in chain. Terminating transform must be last in chain.
")])
.await;
}
Expand All @@ -129,15 +129,15 @@ Caused by:
Topology errors
valkey1 source:
valkey1 chain:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
Terminating transform "sink-1" is not last in chain. Terminating transform must be last in chain.
Terminating transform "sink-2" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "debug" is last in chain. Last transform must be terminating.
valkey2 source:
valkey2 chain:
ParallelMap:
parallel_map_chain chain:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
pmap[0] chain:
Terminating transform "p-sink" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "p-debug" is last in chain. Last transform must be terminating.
"#),
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ sources:
listen_addr: "127.0.0.1:9042"
chain:
- CassandraSinkSingle:
name: "cassandra-sink-single"
remote_address: "127.0.0.1:9043"
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ sources:
listen_addr: "127.0.0.1:9042"
chain:
- CassandraSinkCluster:
name: "cassandra-sink-cluster"
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a"
shotover_nodes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ sources:
listen_addr: "127.0.0.2:9042"
chain:
- CassandraSinkCluster:
name: "cassandra-sink-cluster"
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
local_shotover_host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4"
shotover_nodes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ sources:
listen_addr: "127.0.0.3:9042"
chain:
- CassandraSinkCluster:
name: "cassandra-sink-cluster"
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
local_shotover_host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b"
shotover_nodes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ sources:
listen_addr: "127.0.0.1:9042"
chain:
- CassandraSinkCluster:
name: "cassandra-sink-cluster"
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a"
shotover_nodes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ sources:
listen_addr: "127.0.0.2:9042"
chain:
- CassandraSinkCluster:
name: "cassandra-sink-cluster"
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
local_shotover_host_id: "3c3c4e2d-ba74-4f76-b52e-fb5bcee6a9f4"
shotover_nodes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ sources:
listen_addr: "127.0.0.3:9042"
chain:
- CassandraSinkCluster:
name: "cassandra-sink-cluster"
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
local_shotover_host_id: "fa74d7ec-1223-472b-97de-04a32ccdb70b"
shotover_nodes:
Expand Down
Loading
Loading