-
Notifications
You must be signed in to change notification settings - Fork 596
Use the TransformationPolicy API directly as rustformation config #12803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this work for requestion transformations that get run on both on_request_headers and on_request_body? Does it only apply the transformation once? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The on_request_body is not implemented yet and will be next. Basically, if there is a body, we will ask envoy to buffer the entire body first because calling us with on_request_body and only do the transformation there and not in on_request_headers. That's kind of the behavior of the C++ filter (except it buffer the body bytes inside the filter instead of having envoy to do that) but it buffer regardless if we use the body or not. We cannot buffer the body ourselves here because the buffer_bytes limit is not exposed by the rust sdk and I will see if we can skip body buffering if not needed. Stay tuned. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,41 +1,46 @@ | ||
| use envoy_proxy_dynamic_modules_rust_sdk::*; | ||
| use lazy_static::lazy_static; | ||
| use serde::{Deserialize, Serialize}; | ||
| use serde::Deserialize; | ||
| use std::collections::HashMap; | ||
| use std::ops::Deref; | ||
| use transformations::{LocalTransformationConfig, TransformationOps}; | ||
|
|
||
| #[cfg(test)] | ||
| use mockall::*; | ||
|
|
||
| lazy_static! { | ||
| static ref EMPTY_MAP: HashMap<String, String> = HashMap::new(); | ||
| } | ||
| #[derive(Serialize, Deserialize, Clone)] | ||
| pub struct PerRouteConfig { | ||
| #[serde(default)] | ||
| request_headers_setter: Vec<(String, String)>, | ||
| #[serde(default)] | ||
| response_headers_setter: Vec<(String, String)>, | ||
| #[derive(Deserialize, Clone)] | ||
| pub struct FilterConfig { | ||
| transformations: LocalTransformationConfig, | ||
| } | ||
|
|
||
| impl PerRouteConfig { | ||
| pub fn new(config: &str) -> Option<Self> { | ||
| let per_route_config: PerRouteConfig = match serde_json::from_str(config) { | ||
| Ok(cfg) => cfg, | ||
| Err(err) => { | ||
| envoy_log_error!("Error parsing per route config: {config} {err}"); | ||
| return None; | ||
| } | ||
| }; | ||
| Some(per_route_config) | ||
| struct EnvoyTransformationOps<'a> { | ||
| envoy_filter: &'a mut dyn EnvoyHttpFilter, | ||
| } | ||
|
|
||
| impl TransformationOps for EnvoyTransformationOps<'_> { | ||
| fn set_request_header(&mut self, key: &str, value: &[u8]) -> bool { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do these need to match some envoy style for the transformations? Or can we return an error here with the failure message if it doesn't apply? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the envoy rust sdk functions, so have to follow the same function signature. However, we can return error in the main transform_request_header() if this return false. I actually plan to do that because in the trasnformation crate, I intentionally not to have any envoy specific logic and dependency in there, so I don't have access to the |
||
| self.envoy_filter.set_request_header(key, value) | ||
| } | ||
| fn remove_request_header(&mut self, key: &str) -> bool { | ||
| self.envoy_filter.remove_request_header(key) | ||
| } | ||
| fn set_response_header(&mut self, key: &str, value: &[u8]) -> bool { | ||
| self.envoy_filter.set_response_header(key, value) | ||
| } | ||
| fn remove_response_header(&mut self, key: &str) -> bool { | ||
| self.envoy_filter.remove_response_header(key) | ||
| } | ||
| } | ||
|
|
||
| #[derive(Serialize, Deserialize, Clone)] | ||
| pub struct FilterConfig { | ||
| #[serde(default)] | ||
| request_headers_setter: Vec<(String, String)>, | ||
| #[serde(default)] | ||
| response_headers_setter: Vec<(String, String)>, | ||
| impl Deref for FilterConfig { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this? Can we just expose calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. correct, technically we don't need this. I don't remember what I was trying to do but end up not doing that. This can be removed later with some minor change as there are only a few places using this. |
||
| type Target = LocalTransformationConfig; | ||
|
|
||
| fn deref(&self) -> &Self::Target { | ||
| &self.transformations | ||
| } | ||
| } | ||
|
|
||
| impl FilterConfig { | ||
|
|
@@ -44,19 +49,23 @@ impl FilterConfig { | |
| /// filter_config is the filter config from the Envoy config here: | ||
| /// https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/dynamic_modules/v3/dynamic_modules.proto#envoy-v3-api-msg-extensions-dynamic-modules-v3-dynamicmoduleconfig | ||
| pub fn new(filter_config: &str) -> Option<Self> { | ||
| let filter_config: FilterConfig = match serde_json::from_str(filter_config) { | ||
| // TODO(nfuden): Handle optional configuration entries more cleanly. Currently all values are required to be present | ||
| let config: LocalTransformationConfig = match serde_json::from_str(filter_config) { | ||
| Ok(cfg) => cfg, | ||
| Err(err) => { | ||
| // TODO(nfuden): Dont panic if there is incorrect configuration | ||
| // Dont panic if there is incorrect configuration | ||
| envoy_log_error!("Error parsing filter config: {filter_config} {err}"); | ||
| return None; | ||
| } | ||
| }; | ||
| Some(filter_config) | ||
| Some(FilterConfig { | ||
| transformations: config, | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // Since PerRouteConfig is the same as the FilterConfig, for now just just a type alias | ||
| pub type PerRouteConfig = FilterConfig; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does FilterConfig have the same envoy error log when it fails to parse the input config? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Currently, the control-plane in kgateway actually never use FilterConfig and always use PerRouteConfig. |
||
|
|
||
| impl<EHF: EnvoyHttpFilter> HttpFilterConfig<EHF> for FilterConfig { | ||
| /// This is called for each new HTTP filter. | ||
| fn new_http_filter(&mut self, _envoy: &mut EHF) -> Box<dyn HttpFilter<EHF>> { | ||
|
|
@@ -140,35 +149,39 @@ impl Filter { | |
| } | ||
|
|
||
| fn transform_request_headers<EHF: EnvoyHttpFilter>(&self, envoy_filter: &mut EHF) { | ||
| let setters = match self.get_per_route_config() { | ||
| Some(config) => &config.request_headers_setter, | ||
| None => &self.filter_config.request_headers_setter, | ||
| let set = match self.get_per_route_config() { | ||
| Some(config) => &config.request.as_ref().map(|r| &r.set), | ||
| None => &self.filter_config.request.as_ref().map(|r| &r.set), | ||
| }; | ||
|
|
||
| transformations::jinja::transform_request_headers( | ||
| setters, | ||
| &self.env, | ||
| self.get_request_headers_map(), | ||
| |key, value| envoy_filter.set_request_header(key, value), | ||
| ); | ||
| if let Some(setters) = set { | ||
| transformations::jinja::transform_request_headers( | ||
| setters, | ||
| &self.env, | ||
| self.get_request_headers_map(), | ||
| EnvoyTransformationOps { envoy_filter }, | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| fn transform_response_headers<EHF: EnvoyHttpFilter>(&self, envoy_filter: &mut EHF) { | ||
| let setters = match self.get_per_route_config() { | ||
| Some(config) => &config.response_headers_setter, | ||
| None => &self.filter_config.response_headers_setter, | ||
| let set = match self.get_per_route_config() { | ||
| Some(config) => &config.response.as_ref().map(|r| &r.set), | ||
| None => &self.filter_config.response.as_ref().map(|r| &r.set), | ||
| }; | ||
|
|
||
| // TODO(nfuden): find someone who knows rust to see if we really need this Hash map for serialization | ||
| let response_headers_map = self.create_headers_map(envoy_filter.get_response_headers()); | ||
|
|
||
| transformations::jinja::transform_response_headers( | ||
| setters, | ||
| &self.env, | ||
| self.get_request_headers_map(), | ||
| &response_headers_map, | ||
| |key, value| envoy_filter.set_response_header(key, value), | ||
| ); | ||
| if let Some(setters) = set { | ||
| // TODO(nfuden): find someone who knows rust to see if we really need this Hash map for serialization | ||
| let response_headers_map = self.create_headers_map(envoy_filter.get_response_headers()); | ||
|
|
||
| transformations::jinja::transform_response_headers( | ||
| setters, | ||
| &self.env, | ||
| self.get_request_headers_map(), | ||
| &response_headers_map, | ||
| EnvoyTransformationOps { envoy_filter }, | ||
| ); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -241,29 +254,28 @@ mod tests { | |
| let mut envoy_filter = envoy_proxy_dynamic_modules_rust_sdk::MockEnvoyHttpFilter::default(); | ||
|
|
||
| // construct the filter config | ||
| // most upstream tests start with the filter itself but we are tryign to add heavier logic | ||
| // to the config factory strat rather than running it on header calls | ||
| let mut filter_conf = FilterConfig { | ||
| request_headers_setter: vec![ | ||
| ( | ||
| "X-substring".to_string(), | ||
| "{{substring(\"ENVOYPROXY something\", 5, 10) }}".to_string(), | ||
| ), | ||
| ( | ||
| "X-substring-no-3rd".to_string(), | ||
| "{{substring(\"ENVOYPROXY something\", 5) }}".to_string(), | ||
| ), | ||
| ( | ||
| "X-donor-header-contents".to_string(), | ||
| "{{ header(\"x-donor\") }}".to_string(), | ||
| ), | ||
| ( | ||
| "X-donor-header-substringed".to_string(), | ||
| "{{ substring( header(\"x-donor\"), 0, 7)}}".to_string(), | ||
| ), | ||
| ], | ||
| response_headers_setter: vec![("X-Bar".to_string(), "foo".to_string())], | ||
| }; | ||
| // most upstream tests start with the filter itself but we are trying to add heavier logic | ||
| // to the config factory start rather than running it on header calls | ||
| let json_str = r#" | ||
| { | ||
| "request": { | ||
| "set": [ | ||
| { "name": "X-substring", "value": "{{substring(\"ENVOYPROXY something\", 5, 10) }}" }, | ||
| { "name": "X-substring-no-3rd", "value": "{{substring(\"ENVOYPROXY something\", 5) }}" }, | ||
| { "name": "X-donor-header-contents", "value": "{{ header(\"x-donor\") }}" }, | ||
| { "name": "X-donor-header-substringed", "value": "{{ substring( header(\"x-donor\"), 0, 7)}}" } | ||
| ] | ||
| }, | ||
| "response": { | ||
| "set": [ | ||
| { "name": "X-Bar", "value": "foo" } | ||
| ] | ||
| }, | ||
| "foo": "This is a fake field to make sure the parser will ignore an new fields from the control plane for compatibility" | ||
| } | ||
| "#; | ||
| let mut filter_conf = | ||
| FilterConfig::new(json_str).expect("Failed to parse filter config json: {json_str}"); | ||
| let mut filter = filter_conf.new_http_filter(&mut envoy_filter); | ||
|
|
||
| envoy_filter | ||
|
|
@@ -356,13 +368,22 @@ mod tests { | |
| // construct the filter config | ||
| // most upstream tests start with the filter itself but we are trying to add heavier logic | ||
| // to the config factory start rather than running it on header calls | ||
| let mut filter_conf = FilterConfig { | ||
| request_headers_setter: vec![( | ||
| "X-if-truth".to_string(), | ||
| "{%- if true -%}supersuper{% endif %}".to_string(), | ||
| )], | ||
| response_headers_setter: vec![("X-Bar".to_string(), "foo".to_string())], | ||
| }; | ||
| let json_str = r#" | ||
| { | ||
| "request": { | ||
| "set": [ | ||
| { "name": "X-if-truth", "value": "{%- if true -%}supersuper{% endif %}" } | ||
| ] | ||
| }, | ||
| "response": { | ||
| "set": [ | ||
| { "name": "X-Bar", "value": "foo" } | ||
| ] | ||
| } | ||
| } | ||
| "#; | ||
| let mut filter_conf = | ||
| FilterConfig::new(json_str).expect("Failed to parse filter config json: {json_str}"); | ||
| let mut filter = filter_conf.new_http_filter(&mut envoy_filter); | ||
|
|
||
| envoy_filter | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| use crate::TransformationOps; | ||
| use minijinja::value::Rest; | ||
| use minijinja::{context, Environment, State}; | ||
| use serde::Deserialize; | ||
|
|
@@ -87,15 +88,17 @@ pub fn new_jinja_env() -> Environment<'static> { | |
| env | ||
| } | ||
|
|
||
| pub fn transform_request_headers<F>( | ||
| pub fn transform_request_headers<T: TransformationOps>( | ||
| setters: &Vec<(String, String)>, | ||
| env: &Environment<'static>, | ||
| request_headers_map: &HashMap<String, String>, | ||
| mut set_request_header: F, | ||
| ) where | ||
| F: FnMut(&str, &[u8]) -> bool, | ||
| { | ||
| mut ops: T, | ||
| ) { | ||
| for (key, value) in setters { | ||
| if value.is_empty() { | ||
| ops.remove_request_header(key); | ||
| continue; | ||
| } | ||
| let tmpl = env.template_from_str(value).unwrap(); | ||
| let rendered = tmpl.render( | ||
| context!(headers => request_headers_map, request_headers => request_headers_map), | ||
|
|
@@ -106,20 +109,22 @@ pub fn transform_request_headers<F>( | |
| } else { | ||
| eprintln!("Error rendering template: {}", rendered.err().unwrap()); | ||
| } | ||
| set_request_header(key, rendered_str.as_bytes()); | ||
| ops.set_request_header(key, rendered_str.as_bytes()); | ||
| } | ||
| } | ||
|
|
||
| pub fn transform_response_headers<F>( | ||
| pub fn transform_response_headers<T: TransformationOps>( | ||
| setters: &Vec<(String, String)>, | ||
| env: &Environment<'static>, | ||
| request_headers_map: &HashMap<String, String>, | ||
| response_headers_map: &HashMap<String, String>, | ||
| mut set_response_header: F, | ||
| ) where | ||
| F: FnMut(&str, &[u8]) -> bool, | ||
| { | ||
| mut ops: T, | ||
| ) { | ||
| for (key, value) in setters { | ||
| if value.is_empty() { | ||
| ops.remove_response_header(key); | ||
| continue; | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not related to your code- but I think template_from_str can return an error? Can we check that with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch. yeah, I should check all the unwrap() call. I think originally as a POC, we don't care much if it just crash. I will change this. |
||
| let tmpl = env.template_from_str(value).unwrap(); | ||
| let rendered = tmpl.render( | ||
| context!(headers => response_headers_map, request_headers => request_headers_map), | ||
|
|
@@ -130,6 +135,6 @@ pub fn transform_response_headers<F>( | |
| } else { | ||
| eprintln!("Error rendering template: {}", rendered.err().unwrap()); | ||
| } | ||
| set_response_header(key, rendered_str.as_bytes()); | ||
| ops.set_response_header(key, rendered_str.as_bytes()); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| use serde::de::{self, Deserializer}; | ||
| use serde::Deserialize; | ||
| use serde_with::serde_as; | ||
| use serde_json::Value; | ||
| type Strng = String; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What was this alias for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's another thing originally, I was trying to mimic the config structure of agentgateway's transformation, in there, it uses ArcStr but I ended up not using that because it also require a fork from John's repo for some other lib to work with ArcStr. So, I just alias that to normal String here. It's really not necessary now and I probably will remove it. |
||
|
|
||
| pub mod jinja; | ||
|
|
@@ -12,17 +13,54 @@ pub struct LocalTransformationConfig { | |
| pub response: Option<LocalTransform>, | ||
| } | ||
|
|
||
| #[serde_as] | ||
| #[derive(Default, Clone, Deserialize)] | ||
| pub struct LocalTransform { | ||
| #[serde(default)] | ||
| #[serde_as(as = "serde_with::Map<_, _>")] | ||
| #[serde(deserialize_with = "deserialize_name_value")] | ||
| pub add: Vec<(Strng, Strng)>, | ||
| #[serde(default)] | ||
| #[serde_as(as = "serde_with::Map<_, _>")] | ||
| #[serde(deserialize_with = "deserialize_name_value")] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a very good point. I was mimicing agentgateway and was thinking if we have the same interface, we can pull code from there to here and vice versa but that's really not easily feasible, so no reason to do this. |
||
| pub set: Vec<(Strng, Strng)>, | ||
| #[serde(default)] | ||
| pub remove: Vec<Strng>, | ||
| #[serde(default)] | ||
| pub body: Option<Strng>, | ||
| pub body: Option<BodyTransform>, | ||
| } | ||
|
|
||
| #[derive(Default, Clone, Deserialize)] | ||
| pub struct BodyTransform { | ||
| #[serde(default)] | ||
| pub parse_as: Strng, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh... serde can map json string to enum, nice! I will do that. |
||
| #[serde(default)] | ||
| pub value: String, | ||
| } | ||
|
|
||
| fn deserialize_name_value<'de, D>(deserializer: D) -> Result<Vec<(Strng, Strng)>, D::Error> | ||
| where | ||
| D: Deserializer<'de>, | ||
| { | ||
| let raw: Vec<Value> = Deserialize::deserialize(deserializer)?; | ||
| let mut result = Vec::new(); | ||
|
|
||
| for item in raw { | ||
| if let Some(name) = item.get("name") { | ||
| let header_name = name.as_str().unwrap().to_string(); | ||
| let mut header_value = String::new(); | ||
| if let Some(value) = item.get("value") { | ||
| header_value = value.as_str().unwrap().to_string(); | ||
| } | ||
| result.push((header_name, header_value)); | ||
| } else { | ||
| return Err(de::Error::custom("missing name in header item")); | ||
| } | ||
| } | ||
|
|
||
| Ok(result) | ||
| } | ||
|
|
||
| pub trait TransformationOps { | ||
| fn set_request_header(&mut self, key: &str, value: &[u8]) -> bool; | ||
| fn remove_request_header(&mut self, key: &str) -> bool; | ||
| fn set_response_header(&mut self, key: &str, value: &[u8]) -> bool; | ||
| fn remove_response_header(&mut self, key: &str) -> bool; | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added in my last PR but turns out it's not needed, so removing this.