Skip to content

Commit 7785983

Browse files
committed
Fix failing tests
1 parent 1484192 commit 7785983

File tree

5 files changed

+90
-116
lines changed

5 files changed

+90
-116
lines changed

LICENSE-3rdparty.csv

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
Component,Origin,License,Copyright
22
Inflector,https://github.com/whatisinternet/inflector,BSD-2-Clause,Josh Teeter<[email protected]>
3-
RustyXML,https://github.com/Florob/RustyXML,MIT OR Apache-2.0,Florian Zeitz <[email protected]>
43
addr2line,https://github.com/gimli-rs/addr2line,Apache-2.0 OR MIT,The addr2line Authors
54
adler2,https://github.com/oyvindln/adler2,0BSD OR MIT OR Apache-2.0,"Jonas Schievink <[email protected]>, oyvindln <[email protected]>"
65
adler32,https://github.com/remram44/adler32-rs,Zlib,Remi Rampin <[email protected]>
@@ -104,11 +103,8 @@ aws-types,https://github.com/smithy-lang/smithy-rs,Apache-2.0,"AWS Rust SDK Team
104103
axum,https://github.com/tokio-rs/axum,MIT,The axum Authors
105104
axum-core,https://github.com/tokio-rs/axum,MIT,The axum-core Authors
106105
azure_core,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft
107-
azure_core,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft Corp.
108-
azure_identity,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft
109-
azure_storage,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft Corp.
110-
azure_storage_blobs,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft Corp.
111-
azure_svc_blobstorage,https://github.com/azure/azure-sdk-for-rust,MIT,The azure_svc_blobstorage Authors
106+
azure_core_macros,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft
107+
azure_storage_blob,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft
112108
backoff,https://github.com/ihrwein/backoff,MIT OR Apache-2.0,Tibor Benke <[email protected]>
113109
backon,https://github.com/Xuanwo/backon,Apache-2.0,The backon Authors
114110
backtrace,https://github.com/rust-lang/backtrace-rs,MIT OR Apache-2.0,The Rust Project Developers
@@ -358,7 +354,6 @@ http-body,https://github.com/hyperium/http-body,MIT,"Carl Lerche <me@carllerche.
358354
http-body-util,https://github.com/hyperium/http-body,MIT,"Carl Lerche <[email protected]>, Lucio Franco <[email protected]>, Sean McArthur <[email protected]>"
359355
http-range-header,https://github.com/MarcusGrass/parse-range-headers,MIT,The http-range-header Authors
360356
http-serde,https://gitlab.com/kornelski/http-serde,Apache-2.0 OR MIT,Kornel <[email protected]>
361-
http-types,https://github.com/http-rs/http-types,MIT OR Apache-2.0,Yoshua Wuyts <[email protected]>
362357
httparse,https://github.com/seanmonstar/httparse,MIT OR Apache-2.0,Sean McArthur <[email protected]>
363358
httpdate,https://github.com/pyfisch/httpdate,MIT OR Apache-2.0,Pyfisch <[email protected]>
364359
humantime,https://github.com/chronotope/humantime,MIT OR Apache-2.0,The humantime Authors
@@ -389,7 +384,6 @@ idna_adapter,https://github.com/hsivonen/idna_adapter,Apache-2.0 OR MIT,The rust
389384
indexmap,https://github.com/bluss/indexmap,Apache-2.0 OR MIT,The indexmap Authors
390385
indexmap,https://github.com/indexmap-rs/indexmap,Apache-2.0 OR MIT,The indexmap Authors
391386
indoc,https://github.com/dtolnay/indoc,MIT OR Apache-2.0,David Tolnay <[email protected]>
392-
infer,https://github.com/bojand/infer,MIT,Bojan <[email protected]>
393387
influxdb-line-protocol,https://github.com/influxdata/influxdb_iox/tree/main/influxdb_line_protocol,MIT OR Apache-2.0,InfluxDB IOx Project Developers
394388
inotify,https://github.com/hannobraun/inotify,ISC,"Hanno Braun <[email protected]>, Félix Saparelli <[email protected]>, Cristian Kubis <[email protected]>, Frank Denis <[email protected]>"
395389
inotify-sys,https://github.com/hannobraun/inotify-sys,ISC,Hanno Braun <[email protected]>
@@ -404,6 +398,7 @@ ipconfig,https://github.com/liranringel/ipconfig,MIT OR Apache-2.0,Liran Ringel
404398
ipcrypt-rs,https://github.com/jedisct1/rust-ipcrypt2,ISC,Frank Denis <[email protected]>
405399
ipnet,https://github.com/krisprice/ipnet,MIT OR Apache-2.0,Kris Price <[email protected]>
406400
ipnetwork,https://github.com/achanda/ipnetwork,MIT OR Apache-2.0,"Abhishek Chanda <[email protected]>, Linus Färnstrand <[email protected]>"
401+
iri-string,https://github.com/lo48576/iri-string,MIT OR Apache-2.0,YOSHIOKA Takuma <[email protected]>
407402
is-terminal,https://github.com/sunfishcode/is-terminal,MIT,"softprops <[email protected]>, Dan Gohman <[email protected]>"
408403
is_ci,https://github.com/zkat/is_ci,ISC,Kat Marchán <[email protected]>
409404
itertools,https://github.com/rust-itertools/itertools,MIT OR Apache-2.0,bluss
@@ -621,7 +616,6 @@ rand,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Dev
621616
rand_chacha,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers, The CryptoCorrosion Contributors"
622617
rand_core,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers"
623618
rand_distr,https://github.com/rust-random/rand_distr,MIT OR Apache-2.0,The Rand Project Developers
624-
rand_hc,https://github.com/rust-random/rand,MIT OR Apache-2.0,The Rand Project Developers
625619
rand_xorshift,https://github.com/rust-random/rngs,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers"
626620
ratatui,https://github.com/ratatui/ratatui,MIT,"Florian Dehau <[email protected]>, The Ratatui Developers"
627621
raw-cpuid,https://github.com/gz/rust-cpuid,MIT,Gerd Zellweger <[email protected]>
@@ -700,7 +694,6 @@ serde_json,https://github.com/serde-rs/json,MIT OR Apache-2.0,"Erick Tryzelaar <
700694
serde_nanos,https://github.com/caspervonb/serde_nanos,MIT OR Apache-2.0,Casper Beyer <[email protected]>
701695
serde_path_to_error,https://github.com/dtolnay/path-to-error,MIT OR Apache-2.0,David Tolnay <[email protected]>
702696
serde_plain,https://github.com/mitsuhiko/serde-plain,MIT OR Apache-2.0,Armin Ronacher <[email protected]>
703-
serde_qs,https://github.com/samscott89/serde_qs,MIT OR Apache-2.0,Sam Scott <[email protected]>
704697
serde_repr,https://github.com/dtolnay/serde-repr,MIT OR Apache-2.0,David Tolnay <[email protected]>
705698
serde_spanned,https://github.com/toml-rs/toml,MIT OR Apache-2.0,The serde_spanned Authors
706699
serde_urlencoded,https://github.com/nox/serde_urlencoded,MIT OR Apache-2.0,Anthony Ramine <[email protected]>
@@ -876,6 +869,7 @@ wasm-timer,https://github.com/tomaka/wasm-timer,MIT,Pierre Krieger <pierre.krieg
876869
web-sys,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/web-sys,MIT OR Apache-2.0,The wasm-bindgen Developers
877870
web-time,https://github.com/daxpedda/web-time,MIT OR Apache-2.0,The web-time Authors
878871
webbrowser,https://github.com/amodm/webbrowser-rs,MIT OR Apache-2.0,Amod Malviya @amodm
872+
webpki-roots,https://github.com/rustls/webpki-roots,CDLA-Permissive-2.0,The webpki-roots Authors
879873
webpki-roots,https://github.com/rustls/webpki-roots,MPL-2.0,The webpki-roots Authors
880874
whoami,https://github.com/ardaku/whoami,Apache-2.0 OR BSL-1.0 OR MIT,The whoami Authors
881875
widestring,https://github.com/starkat99/widestring-rs,MIT OR Apache-2.0,Kathryn Long <[email protected]>

src/sinks/azure_blob/integration_tests.rs

Lines changed: 42 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
use std::{
2-
io::{BufRead, BufReader},
3-
num::NonZeroU32,
4-
};
1+
use std::io::{BufRead, BufReader};
52

63
use azure_core::http::StatusCode;
74

8-
use azure_storage_blob::BlobContainerClient;
95
use bytes::{Buf, BytesMut};
106
use flate2::read::GzDecoder;
117
use futures::{Stream, StreamExt, stream};
@@ -83,7 +79,7 @@ async fn azure_blob_insert_lines_into_blob() {
8379
let blobs = config.list_blobs(blob_prefix).await;
8480
assert_eq!(blobs.len(), 1);
8581
assert!(blobs[0].clone().ends_with(".log"));
86-
let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await;
82+
let (content_type, _content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await;
8783
assert_eq!(content_type, Some(String::from("text/plain")));
8884
assert_eq!(lines, blob_lines);
8985
}
@@ -138,9 +134,9 @@ async fn azure_blob_insert_lines_into_blob_gzip() {
138134
let blobs = config.list_blobs(blob_prefix).await;
139135
assert_eq!(blobs.len(), 1);
140136
assert!(blobs[0].clone().ends_with(".log.gz"));
141-
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
142-
assert_eq!(blob.properties.content_encoding, Some(String::from("gzip")));
143-
assert_eq!(blob.properties.content_type, String::from("text/plain"));
137+
let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await;
138+
assert_eq!(content_encoding, Some(String::from("gzip")));
139+
assert_eq!(content_type, Some(String::from("text/plain")));
144140
assert_eq!(lines, blob_lines);
145141
}
146142

@@ -168,12 +164,9 @@ async fn azure_blob_insert_json_into_blob_gzip() {
168164
let blobs = config.list_blobs(blob_prefix).await;
169165
assert_eq!(blobs.len(), 1);
170166
assert!(blobs[0].clone().ends_with(".log.gz"));
171-
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
172-
assert_eq!(blob.properties.content_encoding, Some(String::from("gzip")));
173-
assert_eq!(
174-
blob.properties.content_type,
175-
String::from("application/x-ndjson")
176-
);
167+
let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await;
168+
assert_eq!(content_encoding, Some(String::from("gzip")));
169+
assert_eq!(content_type, Some(String::from("application/x-ndjson")));
177170
let expected = events
178171
.iter()
179172
.map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap())
@@ -204,7 +197,7 @@ async fn azure_blob_rotate_files_after_the_buffer_size_is_reached() {
204197
assert_eq!(blobs.len(), 3);
205198
let response = stream::iter(blobs)
206199
.fold(Vec::new(), |mut acc, blob| async {
207-
let (_, lines) = config.get_blob(blob).await;
200+
let (_, _, lines) = config.get_blob(blob).await;
208201
acc.push(lines);
209202
acc
210203
})
@@ -260,32 +253,24 @@ impl AzureBlobSinkConfig {
260253
)
261254
.unwrap();
262255

263-
// Use new SDK pager to fetch first page and collect blob names.
256+
// Iterate pager results and collect blob names. Filter by prefix server-side.
264257
let mut pager = client
265258
.list_blobs(None)
266259
.expect("Failed to start list blobs pager");
267-
let page = pager
268-
.next()
269-
.await
270-
.expect("Failed to fetch blobs")
271-
.into_body();
272-
273-
// Best-effort extraction of names from the page body.
274-
// Depending on SDK struct names, this may need tweaking:
275-
// ListBlobsFlatSegmentResponse { segment: { blob_items: [{ name, .. }, ..] }, .. }
276-
let names = page
277-
.segment
278-
.blob_items
279-
.into_iter()
280-
.map(|b| b.name)
281-
.filter(|name| name.starts_with(&prefix))
282-
.collect::<Vec<_>>();
260+
let mut names = Vec::new();
261+
while let Some(result) = pager.next().await {
262+
let item = result.expect("Failed to fetch blobs");
263+
if let Some(name) = item.name.and_then(|bn| bn.content)
264+
&& name.starts_with(&prefix)
265+
{
266+
names.push(name);
267+
}
268+
}
283269

284270
names
285271
}
286272

287273
pub async fn get_blob(&self, blob: String) -> (Option<String>, Option<String>, Vec<String>) {
288-
use azure_storage_blob::clients::BlobClient as _;
289274
let client = azure_common::config::build_client(
290275
self.connection_string.clone().into(),
291276
self.container_name.clone(),
@@ -295,27 +280,39 @@ impl AzureBlobSinkConfig {
295280
let blob_client = client.blob_client(&blob);
296281

297282
// Fetch properties to obtain content-type and content-encoding
298-
let props = blob_client
283+
let props_resp = blob_client
299284
.get_properties(None)
300285
.await
301-
.expect("Failed to get blob properties")
302-
.into_body();
303-
304-
let content_type = props.content_type.clone();
305-
let content_encoding = props.content_encoding.clone();
286+
.expect("Failed to get blob properties");
287+
let headers = props_resp.headers();
288+
let content_type = headers.iter().find_map(|(name, value)| {
289+
let key = name.as_str();
290+
if key.eq_ignore_ascii_case("content-type") {
291+
Some(value.as_str().to_string())
292+
} else {
293+
None
294+
}
295+
});
296+
let content_encoding = headers.iter().find_map(|(name, value)| {
297+
let key = name.as_str();
298+
if key.eq_ignore_ascii_case("content-encoding") {
299+
Some(value.as_str().to_string())
300+
} else {
301+
None
302+
}
303+
});
306304

307305
// Download blob content (full or first MB as needed)
308306
let downloaded = blob_client
309307
.download(None)
310308
.await
311309
.expect("Failed to download blob");
312-
let data = downloaded
310+
let body_bytes = downloaded
313311
.into_body()
314-
.data
315312
.collect()
316313
.await
317-
.expect("Failed to read blob body")
318-
.to_vec();
314+
.expect("Failed to read blob body");
315+
let data = body_bytes.to_vec();
319316

320317
(content_type, content_encoding, self.get_blob_content(data))
321318
}
@@ -344,7 +341,7 @@ impl AzureBlobSinkConfig {
344341
let response = match result {
345342
Ok(_) => Ok(()),
346343
Err(error) => match error.http_status() {
347-
Some(status) if status.as_u16() == StatusCode::Conflict => Ok(()),
344+
Some(StatusCode::Conflict) => Ok(()),
348345
_ => Err(error),
349346
},
350347
};

src/sinks/azure_common/connection_string.rs

Lines changed: 38 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@ SAS handling
3333
Examples:
3434
- Access key connection string:
3535
"DefaultEndpointsProtocol=https;AccountName=myacct;AccountKey=base64key==;EndpointSuffix=core.windows.net"
36-
Container URL: https://myacct.blob.core.windows.net/logs
37-
Blob URL: https://myacct.blob.core.windows.net/logs/file.txt
36+
Container URL: <https://myacct.blob.core.windows.net/logs>
37+
Blob URL: <https://myacct.blob.core.windows.net/logs/file.txt>
3838
3939
- SAS connection string:
40-
"BlobEndpoint=https://myacct.blob.core.windows.net/;SharedAccessSignature=sv=2022-11-02&ss=b&..."
41-
Container URL (with SAS): https://myacct.blob.core.windows.net/logs?sv=2022-11-02&ss=b&...
42-
Blob URL (with SAS): https://myacct.blob.core.windows.net/logs/file.txt?sv=2022-11-02&ss=b&...
40+
"BlobEndpoint=<https://myacct.blob.core.windows.net/>;SharedAccessSignature=sv=2022-11-02&ss=b&..."
41+
Container URL (with SAS): <https://myacct.blob.core.windows.net/logs?sv=2022-11-02&ss=b&...>
42+
Blob URL (with SAS): <https://myacct.blob.core.windows.net/logs/file.txt?sv=2022-11-02&ss=b&...>
4343
4444
- Azurite/dev storage:
4545
"UseDevelopmentStorage=true;DefaultEndpointsProtocol=http;AccountName=devstoreaccount1"
46-
Container URL: http://127.0.0.1:10000/devstoreaccount1/logs
46+
Container URL: <http://127.0.0.1:10000/devstoreaccount1/logs>
4747
*/
4848

4949
use std::collections::HashMap;
@@ -87,7 +87,7 @@ pub enum Auth {
8787
}
8888

8989
/// A parsed Azure Storage connection string and helpers to compose URLs for containers/blobs.
90-
#[derive(Debug, Clone)]
90+
#[derive(Debug, Clone, Default)]
9191
pub struct ParsedConnectionString {
9292
pub account_name: Option<String>,
9393
pub account_key: Option<String>,
@@ -99,21 +99,6 @@ pub struct ParsedConnectionString {
9999
pub development_storage_proxy_uri: Option<String>,
100100
}
101101

102-
impl Default for ParsedConnectionString {
103-
fn default() -> Self {
104-
Self {
105-
account_name: None,
106-
account_key: None,
107-
shared_access_signature: None,
108-
default_endpoints_protocol: None,
109-
endpoint_suffix: None,
110-
blob_endpoint: None,
111-
use_development_storage: false,
112-
development_storage_proxy_uri: None,
113-
}
114-
}
115-
}
116-
117102
impl ParsedConnectionString {
118103
/// Parse a connection string into a `ParsedConnectionString`.
119104
///
@@ -135,28 +120,23 @@ impl ParsedConnectionString {
135120
}
136121

137122
// Build the structure from the parsed map.
138-
let mut parsed = ParsedConnectionString::default();
139-
140-
parsed.account_name = map.get("accountname").cloned();
141-
parsed.account_key = map.get("accountkey").cloned();
142-
parsed.shared_access_signature = map
143-
.get("sharedaccesssignature")
144-
.map(|s| normalize_sas(s.as_str()));
145-
146-
parsed.default_endpoints_protocol = map
147-
.get("defaultendpointsprotocol")
148-
.map(|s| s.to_ascii_lowercase());
149-
150-
parsed.endpoint_suffix = map.get("endpointsuffix").cloned();
151-
152-
parsed.blob_endpoint = map.get("blobendpoint").cloned();
153-
154-
parsed.use_development_storage = map
155-
.get("usedevelopmentstorage")
156-
.map(|v| v.eq_ignore_ascii_case("true"))
157-
.unwrap_or(false);
158-
159-
parsed.development_storage_proxy_uri = map.get("developmentstorageproxyuri").cloned();
123+
let parsed = ParsedConnectionString {
124+
account_name: map.get("accountname").cloned(),
125+
account_key: map.get("accountkey").cloned(),
126+
shared_access_signature: map
127+
.get("sharedaccesssignature")
128+
.map(|s| normalize_sas(s.as_str())),
129+
default_endpoints_protocol: map
130+
.get("defaultendpointsprotocol")
131+
.map(|s| s.to_ascii_lowercase()),
132+
endpoint_suffix: map.get("endpointsuffix").cloned(),
133+
blob_endpoint: map.get("blobendpoint").cloned(),
134+
use_development_storage: map
135+
.get("usedevelopmentstorage")
136+
.map(|v| v.eq_ignore_ascii_case("true"))
137+
.unwrap_or(false),
138+
development_storage_proxy_uri: map.get("developmentstorageproxyuri").cloned(),
139+
};
160140

161141
Ok(parsed)
162142
}
@@ -256,15 +236,18 @@ impl ParsedConnectionString {
256236

257237
/// Build a blob URL, optionally appending SAS if present.
258238
pub fn blob_url(&self, container: &str, blob: &str) -> Result<String, ConnectionStringError> {
259-
let container_url = self.container_url(container)?;
239+
// Build the base container URL without SAS, then append the blob path,
240+
// and finally append the SAS so it appears after the full path.
241+
let base = self.blob_account_endpoint()?;
242+
let container_no_sas = format!("{}/{}", trim_trailing_slash(&base), container);
243+
let blob_full = format!(
244+
"{}/{}",
245+
trim_trailing_slash(&container_no_sas),
246+
encode_path_segment(blob)
247+
);
260248
Ok(append_query_segment(
261-
&format!(
262-
"{}/{}",
263-
trim_trailing_slash(&container_url),
264-
encode_path_segment(blob)
265-
),
266-
// container_url already handled SAS; if it already had query args, append with '&'
267-
None, // SAS already appended at container level if present
249+
&blob_full,
250+
self.shared_access_signature.as_deref(),
268251
))
269252
}
270253
}
@@ -278,7 +261,7 @@ fn normalize_sas(s: &str) -> String {
278261
fn append_query_segment(base_url: &str, sas: Option<&str>) -> String {
279262
match sas {
280263
None => base_url.to_string(),
281-
Some(q) if q.is_empty() => base_url.to_string(),
264+
Some("") => base_url.to_string(),
282265
Some(q) => {
283266
let sep = if base_url.contains('?') { '&' } else { '?' };
284267
format!("{base_url}{sep}{q}")
@@ -288,8 +271,8 @@ fn append_query_segment(base_url: &str, sas: Option<&str>) -> String {
288271

289272
/// Trim exactly one trailing slash from a string, if present.
290273
fn trim_trailing_slash(s: &str) -> String {
291-
if s.ends_with('/') {
292-
s[..s.len() - 1].to_string()
274+
if let Some(stripped) = s.strip_suffix('/') {
275+
stripped.to_string()
293276
} else {
294277
s.to_string()
295278
}

0 commit comments

Comments
 (0)