Skip to content

Commit fe9cfb5

Browse files
remove chain names given it is not needed in the genral case
1 parent ed7be70 commit fe9cfb5

File tree

2 files changed

+30
-92
lines changed

2 files changed

+30
-92
lines changed

docs/src/transforms.md

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,12 @@ chain:
3434
name: "my-null-sink"
3535
```
3636
37-
Sub-chains inside transforms (like Tee or ValkeyCache) use a named format with `name` and `transforms` keys:
37+
Sub-chains inside transforms are configured as a bare list. The chain name is derived from the transform's `name` (ParallelMap appends an index), and Tee's `SubchainOnMismatch.name` can override the mismatch chain name.
3838

3939
```yaml
4040
chain:
41-
name: "my-sub-chain"
42-
transforms:
43-
- NullSink:
44-
name: "sub-sink"
41+
- NullSink:
42+
name: "sub-sink"
4543
```
4644

4745
## Transforms
@@ -606,7 +604,7 @@ Tee also exposes an optional HTTP API to switch which chain to use as the "resul
606604
# behavior:
607605
# SubchainOnMismatch:
608606
# name: "mismatch-chain"
609-
# transforms:
607+
# chain:
610608
# - QueryTypeFilter:
611609
# name: "mismatch-filter"
612610
# DenyList: [Read]
@@ -622,15 +620,13 @@ Tee also exposes an optional HTTP API to switch which chain to use as the "resul
622620
# The number of message batches that the tee can hold onto in its buffer of messages to send.
623621
# If they arent sent quickly enough and the buffer is full then tee will drop new incoming messages.
624622
buffer_size: 10000
625-
# The sub chain to send duplicate messages through (named sub-chain format)
623+
# The sub chain to send duplicate messages through
626624
chain:
627-
name: "tee-chain"
628-
transforms:
629-
- QueryTypeFilter:
630-
name: "tee-filter"
631-
DenyList: [Read]
632-
- NullSink:
633-
name: "tee-sink"
625+
- QueryTypeFilter:
626+
name: "tee-filter"
627+
DenyList: [Read]
628+
- NullSink:
629+
name: "tee-sink"
634630
```
635631
636632
This transform emits a metrics [counter](user-guide/observability.md#counter) named `tee_dropped_messages` and the label `chain` as `Tee`.

shotover/src/config/chain.rs

Lines changed: 20 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::transforms::{
33
DownChainProtocol, TransformBuilder, TransformConfig, TransformContextConfig, UpChainProtocol,
44
};
55
use anyhow::{Result, anyhow};
6-
use serde::de::{Deserializer, MapAccess, SeqAccess, Visitor};
6+
use serde::de::{Deserializer, SeqAccess, Visitor};
77
use serde::ser::{SerializeSeq, Serializer};
88
use serde::{Deserialize, Serialize};
99
use std::fmt::{self, Debug};
@@ -16,34 +16,18 @@ pub struct NamedTransformEntry {
1616

1717
#[derive(Debug)]
1818
pub struct TransformChainConfig {
19-
pub name: Option<String>,
2019
pub transforms: Vec<NamedTransformEntry>,
2120
}
2221

2322
impl TransformChainConfig {
2423
pub fn new_unnamed(transforms: Vec<NamedTransformEntry>) -> Self {
25-
TransformChainConfig {
26-
name: None,
27-
transforms,
28-
}
29-
}
30-
31-
pub fn new_named(name: String, transforms: Vec<NamedTransformEntry>) -> Self {
32-
TransformChainConfig {
33-
name: Some(name),
34-
transforms,
35-
}
24+
TransformChainConfig { transforms }
3625
}
3726

3827
pub async fn get_builder(
3928
&self,
4029
mut transform_context: TransformContextConfig,
4130
) -> Result<TransformChainBuilder> {
42-
let chain_name = self
43-
.name
44-
.clone()
45-
.unwrap_or_else(|| transform_context.chain_name.clone());
46-
4731
let mut builders: Vec<(Box<dyn TransformBuilder>, String)> = Vec::new();
4832
let mut upchain_protocol = transform_context.up_chain_protocol;
4933
for entry in &self.transforms {
@@ -70,15 +54,15 @@ impl TransformChainConfig {
7054
DownChainProtocol::Terminating => upchain_protocol,
7155
}
7256
}
73-
Ok(TransformChainBuilder::new(builders, chain_name.leak()))
57+
Ok(TransformChainBuilder::new(
58+
builders,
59+
transform_context.chain_name.leak(),
60+
))
7461
}
7562

7663
/// Recursively collects all chain names from this chain config and any nested sub-chains.
7764
pub fn collect_all_chain_names(&self) -> Vec<String> {
7865
let mut names = Vec::new();
79-
if let Some(ref name) = self.name {
80-
names.push(name.clone());
81-
}
8266
for entry in &self.transforms {
8367
names.extend(entry.config.get_extra_chain_names(&entry.name));
8468
for sub_chain in entry.config.get_sub_chain_configs() {
@@ -103,9 +87,18 @@ impl TransformChainConfig {
10387

10488
// Recurse into sub-chains. For unnamed sub-chains the chain name
10589
// is derived from the transform name at runtime, so use that here.
106-
for sub_chain in entry.config.get_sub_chain_configs() {
107-
let sub_chain_name = sub_chain.name.as_deref().unwrap_or(&entry.name);
108-
errors.extend(sub_chain.validate_names(sub_chain_name));
90+
let sub_chain_configs = entry.config.get_sub_chain_configs();
91+
let extra_chain_names = entry.config.get_extra_chain_names(&entry.name);
92+
if extra_chain_names.len() == sub_chain_configs.len() {
93+
for (sub_chain, sub_chain_name) in
94+
sub_chain_configs.iter().zip(extra_chain_names.iter())
95+
{
96+
errors.extend(sub_chain.validate_names(sub_chain_name));
97+
}
98+
} else {
99+
for sub_chain in sub_chain_configs {
100+
errors.extend(sub_chain.validate_names(&entry.name));
101+
}
109102
}
110103
}
111104

@@ -157,7 +150,7 @@ impl<'de> Deserialize<'de> for TransformChainConfig {
157150
where
158151
D: Deserializer<'de>,
159152
{
160-
deserializer.deserialize_any(TransformChainConfigVisitor)
153+
deserializer.deserialize_seq(TransformChainConfigVisitor)
161154
}
162155
}
163156

@@ -167,7 +160,7 @@ impl<'de> Visitor<'de> for TransformChainConfigVisitor {
167160
type Value = TransformChainConfig;
168161

169162
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
170-
formatter.write_str("a list of named transforms or a map with 'name' and 'transforms'")
163+
formatter.write_str("a list of named transforms")
171164
}
172165

173166
fn visit_seq<S>(self, mut seq: S) -> std::result::Result<Self::Value, S::Error>
@@ -181,57 +174,6 @@ impl<'de> Visitor<'de> for TransformChainConfigVisitor {
181174
entries.push(entry);
182175
}
183176
Ok(TransformChainConfig {
184-
name: None,
185-
transforms: entries,
186-
})
187-
}
188-
189-
fn visit_map<M>(self, mut map: M) -> std::result::Result<Self::Value, M::Error>
190-
where
191-
M: MapAccess<'de>,
192-
{
193-
let mut name: Option<String> = None;
194-
let mut transforms_value: Option<serde_yaml::Value> = None;
195-
196-
while let Some(key) = map.next_key::<String>()? {
197-
match key.as_str() {
198-
"name" => {
199-
name = Some(map.next_value()?);
200-
}
201-
"transforms" => {
202-
transforms_value = Some(map.next_value()?);
203-
}
204-
other => {
205-
return Err(serde::de::Error::unknown_field(
206-
other,
207-
&["name", "transforms"],
208-
));
209-
}
210-
}
211-
}
212-
213-
let name = name.ok_or_else(|| serde::de::Error::missing_field("name"))?;
214-
let transforms_value =
215-
transforms_value.ok_or_else(|| serde::de::Error::missing_field("transforms"))?;
216-
217-
let transforms_seq = match transforms_value {
218-
serde_yaml::Value::Sequence(seq) => seq,
219-
_ => {
220-
return Err(serde::de::Error::custom(
221-
"'transforms' must be a list of transform entries",
222-
));
223-
}
224-
};
225-
226-
let mut entries = Vec::new();
227-
for value in transforms_seq {
228-
let entry =
229-
deserialize_named_transform_entry(value).map_err(serde::de::Error::custom)?;
230-
entries.push(entry);
231-
}
232-
233-
Ok(TransformChainConfig {
234-
name: Some(name),
235177
transforms: entries,
236178
})
237179
}

0 commit comments

Comments
 (0)