Skip to content

Commit dc7c326

Browse files
Merge pull request #4 from solarwinds/NH-139352
zstd compression and more flexible configuration
2 parents 394d047 + e0cd597 commit dc7c326

6 files changed

Lines changed: 139 additions & 49 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ on:
66
pull_request:
77
workflow_dispatch:
88

9+
env:
10+
SUFFIX: ${{ (github.head_ref || github.ref_name) == 'main' && '' || format('-{0}', github.head_ref || github.ref_name) }}
11+
912
jobs:
1013
checks:
1114
runs-on: ubuntu-latest
@@ -72,7 +75,7 @@ jobs:
7275
run: |
7376
LAYER_ARN=$(
7477
aws lambda publish-layer-version \
75-
--layer-name diet-lambda-${{ matrix.arch }} \
78+
--layer-name diet-lambda-${{ matrix.arch }}$SUFFIX \
7679
--license-info "Apache 2.0" \
7780
--compatible-architectures ${{ matrix.arch == 'x86_64' && 'x86_64' || 'arm64' }} \
7881
--zip-file fileb://${{ steps.download.outputs.download-path }}/diet-lambda-${{ matrix.arch }}.zip \
@@ -112,7 +115,7 @@ jobs:
112115
with:
113116
context: .
114117
push: true
115-
tags: ghcr.io/${{ github.repository }}:${{ matrix.arch }}
118+
tags: ghcr.io/${{ github.repository }}:${{ matrix.arch }}${{ env.SUFFIX }}
116119

117120
multiarch:
118121
needs: docker
@@ -134,6 +137,6 @@ jobs:
134137

135138
- run: |
136139
docker buildx imagetools create \
137-
--tag ghcr.io/${{ github.repository }}:latest \
138-
ghcr.io/${{ github.repository }}:x86_64 \
139-
ghcr.io/${{ github.repository }}:aarch64
140+
--tag ghcr.io/${{ github.repository }}:latest$SUFFIX \
141+
ghcr.io/${{ github.repository }}:x86_64$SUFFIX \
142+
ghcr.io/${{ github.repository }}:aarch64$SUFFIX

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ bytes = "1"
1616
cfg-if = "1"
1717
chrono = { version = "0.4.43", features = ["serde"] }
1818
const-hex = { version = "1.17.0", features = ["serde"] }
19+
envy = "0.4.2"
1920
flate2 = "1"
2021
futures-util = "0.3"
2122
http-body-util = { version = "0.1.3", features = [] }

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ The collector listens for `HTTP/OTLP`, `HTTP/JSON` and `gRPC/OTLP` traces, metri
88

99
This collector uses the same `SW_APM_API_TOKEN` and `SW_APM_DATA_CENTER` environment variables as the full collector. It does not support configuration via a `yaml` file, or any custom processing logic.
1010

11+
- Service name - `OTEL_SERVICE_NAME` -> `SW_APM_SERVICE_KEY` -> `AWS_LAMBDA_FUNCTION_NAME`
12+
- API token - `SW_APM_API_TOKEN` -> `SW_APM_SERVICE_KEY`
13+
- APM collector endpoint - `SW_APM_COLLECTOR` -> `SW_APM_DATA_CENTER`
14+
- OTLP exporter endpoints - `SW_EXPORTER_OTLP_$signal_ENDPOINT` -> `SW_EXPORTER_OTLP_ENDPOINT` -> `SW_APM_COLLECTOR` -> `SW_APM_DATA_CENTER`
15+
1116
## Docker Images
1217

1318
These are also updated on new commits to main. The collector is built on Amazon Linux and expects OpenSSL to be available in the image.

src/env.rs

Lines changed: 86 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,46 @@ use std::{
66
};
77

88
use anyhow::{Context, Error};
9+
use serde::Deserialize;
10+
11+
#[derive(Deserialize, Default)]
12+
pub struct Env {
13+
otel_service_name: Option<String>,
14+
aws_lambda_function_name: Option<String>,
15+
16+
aws_lambda_initialization_type: Option<String>,
17+
aws_lambda_runtime_api: Option<String>,
18+
sw_exporter_compression: Option<String>,
19+
20+
sw_apm_api_token: Option<String>,
21+
sw_apm_service_key: Option<String>,
22+
23+
sw_apm_data_center: Option<String>,
24+
sw_apm_collector: Option<String>,
25+
sw_exporter_otlp_endpoint: Option<String>,
26+
sw_exporter_otlp_traces_endpoint: Option<String>,
27+
sw_exporter_otlp_metrics_endpoint: Option<String>,
28+
sw_exporter_otlp_logs_endpoint: Option<String>,
29+
sw_exporter_otlp_profiles_endpoint: Option<String>,
30+
}
931

1032
pub struct Config {
1133
pub _service: String,
1234
pub token: String,
1335

1436
pub executable: String,
1537
pub managed: bool,
38+
pub compression: Compression,
1639

1740
pub urls: UrlsConfig,
1841
}
1942

43+
#[derive(Clone, Copy)]
44+
pub enum Compression {
45+
Gzip,
46+
Zstd,
47+
}
48+
2049
pub struct UrlsConfig {
2150
pub settings: String,
2251
pub exporters: ExportersUrlsConfig,
@@ -65,28 +94,55 @@ impl Config {
6594
const LOCAL_HOST: &str = "sandbox.localdomain";
6695

6796
pub fn parse() -> Result<Arc<Self>, Error> {
68-
let service_key = env::var("SW_APM_SERVICE_KEY").ok();
69-
let service_key = service_key.as_ref().and_then(|s| s.split_once(':'));
97+
let Env {
98+
otel_service_name,
99+
aws_lambda_function_name,
70100

71-
let service_name = env::var("OTEL_SERVICE_NAME")
72-
.ok()
101+
aws_lambda_initialization_type,
102+
aws_lambda_runtime_api,
103+
sw_exporter_compression,
104+
105+
sw_apm_api_token,
106+
sw_apm_service_key,
107+
108+
sw_apm_data_center,
109+
sw_apm_collector,
110+
sw_exporter_otlp_endpoint,
111+
sw_exporter_otlp_traces_endpoint,
112+
sw_exporter_otlp_metrics_endpoint,
113+
sw_exporter_otlp_logs_endpoint,
114+
sw_exporter_otlp_profiles_endpoint,
115+
} = envy::from_env().unwrap_or_default();
116+
117+
let service_key = sw_apm_service_key.as_ref().and_then(|s| s.split_once(':'));
118+
119+
let service_name = otel_service_name
73120
.or_else(|| service_key.map(|(name, _)| name.to_string()))
74-
.or_else(|| env::var("AWS_LAMBDA_FUNCTION_NAME").ok())
121+
.or(aws_lambda_function_name)
75122
.context("missing service name")?;
76123

77-
let api_token = env::var("SW_APM_API_TOKEN")
78-
.ok()
79-
.or_else(|| service_key.map(|(_, token)| token.to_string())).unwrap_or_else(|| {
124+
let managed =
125+
aws_lambda_initialization_type.is_some_and(|v| v == "lambda-managed-instances");
126+
let api_host = aws_lambda_runtime_api.unwrap_or_else(|| Self::API_HOST.to_string());
127+
128+
let api_token = sw_apm_api_token
129+
.or_else(|| service_key.map(|(_, token)| token.to_string()))
130+
.unwrap_or_else(|| {
80131
eprintln!("Missing SolarWinds APM API token. Please set the `SW_APM_API_TOKEN` environment variable to enable sampling.");
81132
"missing".to_string()
82133
});
83134

84-
let data_center = env::var("SW_APM_DATA_CENTER")
85-
.ok()
86-
.unwrap_or_else(|| "na-01".to_string());
135+
let data_center = sw_apm_data_center.unwrap_or_else(|| "na-01".to_string());
136+
let mut collector = sw_apm_collector
137+
.unwrap_or_else(|| format!("https://apm.collector.{data_center}.cloud.solarwinds.com"));
138+
let mut exporter = sw_exporter_otlp_endpoint
139+
.unwrap_or_else(|| collector.replace("apm.collector", "otel.collector"));
87140

88-
let api_host =
89-
env::var("AWS_LAMBDA_RUNTIME_API").unwrap_or_else(|_| Self::API_HOST.to_string());
141+
for url in [&mut collector, &mut exporter] {
142+
if !url.starts_with("https://") && !url.starts_with("http://") {
143+
*url = format!("https://{url}");
144+
}
145+
}
90146

91147
let executable = env::current_exe()
92148
.ok()
@@ -96,31 +152,26 @@ impl Config {
96152
})
97153
.unwrap_or_else(|| env!("CARGO_PKG_NAME").to_string());
98154

99-
let managed = env::var("AWS_LAMBDA_INITIALIZATION_TYPE")
100-
.is_ok_and(|v| v == "lambda-managed-instances");
155+
let compression = sw_exporter_compression
156+
.and_then(|c| match c.to_lowercase().trim() {
157+
"gzip" | "gz" => Some(Compression::Gzip),
158+
"zstd" => Some(Compression::Zstd),
159+
_ => None,
160+
})
161+
.unwrap_or(Compression::Gzip);
101162

102163
Ok(Arc::new(Self {
103164
urls: UrlsConfig {
104-
settings: format!(
105-
"https://apm.collector.{data_center}.cloud.solarwinds.com/v1/settings/{service_name}/{service_name}",
106-
),
165+
settings: format!("{collector}/v1/settings/{service_name}/{service_name}",),
107166
exporters: ExportersUrlsConfig {
108-
traces: format!(
109-
"https://otel.collector.{data_center}.cloud.solarwinds.com{}",
110-
Self::TRACES_ROUTE
111-
),
112-
metrics: format!(
113-
"https://otel.collector.{data_center}.cloud.solarwinds.com{}",
114-
Self::METRICS_ROUTE
115-
),
116-
logs: format!(
117-
"https://otel.collector.{data_center}.cloud.solarwinds.com{}",
118-
Self::LOGS_ROUTE
119-
),
120-
profiles: format!(
121-
"https://otel.collector.{data_center}.cloud.solarwinds.com{}",
122-
Self::PROFILES_ROUTE
123-
),
167+
traces: sw_exporter_otlp_traces_endpoint
168+
.unwrap_or_else(|| format!("{exporter}{}", Self::TRACES_ROUTE)),
169+
metrics: sw_exporter_otlp_metrics_endpoint
170+
.unwrap_or_else(|| format!("{exporter}{}", Self::METRICS_ROUTE)),
171+
logs: sw_exporter_otlp_logs_endpoint
172+
.unwrap_or_else(|| format!("{exporter}{}", Self::LOGS_ROUTE)),
173+
profiles: sw_exporter_otlp_profiles_endpoint
174+
.unwrap_or_else(|| format!("{exporter}{}", Self::PROFILES_ROUTE)),
124175
},
125176
extension: ExtensionUrlsConfig {
126177
register: format!(
@@ -151,6 +202,7 @@ impl Config {
151202

152203
executable,
153204
managed,
205+
compression,
154206
}))
155207
}
156208
}

src/exporter.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use std::{
33
};
44

55
use anyhow::Error;
6-
use async_compression::{Level, tokio::bufread::GzipEncoder};
6+
use async_compression::{
7+
Level,
8+
tokio::bufread::{GzipEncoder, ZstdEncoder},
9+
};
710
use bytes::BytesMut;
811
use futures_util::TryStreamExt;
912
use http_body_util::{BodyExt, StreamBody};
@@ -26,6 +29,7 @@ use opentelemetry_proto::tonic::{
2629
};
2730
use prost::Message;
2831
use tokio::{
32+
io::AsyncRead,
2933
sync::{mpsc, watch},
3034
task::JoinSet,
3135
time::{self, MissedTickBehavior},
@@ -37,7 +41,7 @@ use uuid::Uuid;
3741

3842
use crate::{
3943
ServiceRequest,
40-
env::Config,
44+
env::{Compression, Config},
4145
util::{Client, body, flatten},
4246
};
4347

@@ -185,6 +189,7 @@ async fn send<R>(
185189
mut client: FollowRedirect<Client>,
186190
url: String,
187191
token: String,
192+
compression: Compression,
188193
instance_id: Uuid,
189194
attributes: Arc<[KeyValue]>,
190195
) -> Result<(), Error>
@@ -202,13 +207,23 @@ where
202207
let mut buf = BytesMut::with_capacity(request.encoded_len());
203208
request.encode(&mut buf)?;
204209

205-
let compressed = StreamBody::new(
206-
ReaderStream::new(GzipEncoder::with_quality(
207-
Cursor::new(buf),
208-
Level::Precise(6),
209-
))
210-
.map_ok(Frame::data),
211-
);
210+
let (boxed, encoding): (Box<dyn AsyncRead + Unpin + Send>, &str) = match compression {
211+
Compression::Zstd => (
212+
Box::new(ZstdEncoder::with_quality(
213+
Cursor::new(buf),
214+
Level::Precise(4),
215+
)),
216+
"zstd",
217+
),
218+
Compression::Gzip => (
219+
Box::new(GzipEncoder::with_quality(
220+
Cursor::new(buf),
221+
Level::Precise(6),
222+
)),
223+
"gzip",
224+
),
225+
};
226+
let compressed = StreamBody::new(ReaderStream::new(boxed).map_ok(Frame::data));
212227

213228
future::poll_fn(|cx| client.poll_ready(cx)).await?;
214229
let response = client
@@ -217,7 +232,7 @@ where
217232
.method("POST")
218233
.uri(url)
219234
.header(CONTENT_TYPE, "application/x-protobuf")
220-
.header(CONTENT_ENCODING, "gzip")
235+
.header(CONTENT_ENCODING, encoding)
221236
.header(AUTHORIZATION, format!("Bearer {token}"))
222237
.header(USER_AGENT, Config::USER_AGENT)
223238
.body(body(compressed))?,
@@ -296,6 +311,7 @@ fn export(state: &mut State, config: &Config, id: Option<String>) {
296311
state.client.clone(),
297312
config.urls.exporters.traces.clone(),
298313
config.token.clone(),
314+
config.compression,
299315
state.instance_id,
300316
state.attributes.clone(),
301317
));
@@ -305,6 +321,7 @@ fn export(state: &mut State, config: &Config, id: Option<String>) {
305321
state.client.clone(),
306322
config.urls.exporters.metrics.clone(),
307323
config.token.clone(),
324+
config.compression,
308325
state.instance_id,
309326
state.attributes.clone(),
310327
));
@@ -314,6 +331,7 @@ fn export(state: &mut State, config: &Config, id: Option<String>) {
314331
state.client.clone(),
315332
config.urls.exporters.logs.clone(),
316333
config.token.clone(),
334+
config.compression,
317335
state.instance_id,
318336
state.attributes.clone(),
319337
));
@@ -324,6 +342,7 @@ fn export(state: &mut State, config: &Config, id: Option<String>) {
324342
state.client.clone(),
325343
config.urls.exporters.profiles.clone(),
326344
config.token.clone(),
345+
config.compression,
327346
state.instance_id,
328347
state.attributes.clone(),
329348
));

0 commit comments

Comments
 (0)