Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/oxbow-lambda-shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ urlencoding = "=2"
url = { workspace = true }

oxbow = { path = "../oxbow" }
oxbow-sqs = { path = "../oxbow-sqs" }
35 changes: 35 additions & 0 deletions crates/oxbow-lambda-shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,41 @@ fn into_object_meta(s3object: &S3Object, prune_prefix: Option<&str>) -> ObjectMe
}
}

/// Simple function for extracting the necessary [S3EventRecord] structs from a given [SqsEvent]
///
/// This utilizes the `UNWRAP_SNS_ENVELOPE` environment variable to handle SNS-encoded bucket
/// notifications
pub fn extract_records_from(
events: impl IntoIterator<Item = SqsEvent>,
) -> DeltaResult<Vec<S3EventRecord>> {
let mut records = vec![];
for event in events {
let pieces = match std::env::var("UNWRAP_SNS_ENVELOPE") {
Ok(_) => s3_from_sns(event)?,
Err(_) => s3_from_sqs(event)?,
};
records.extend(pieces);
}
Ok(records_with_url_decoded_keys(&records))
}

/// Convert an [oxbow_sqs::Message] into an [SqsMessage]
///
/// The [SqsMessage] and [oxbow_sqs::Message] structs are mostly identical except one comes
/// from Lambda triggers and the other directly from SQS.
pub fn convert_from_sqs(message: oxbow_sqs::Message) -> aws_lambda_events::sqs::SqsMessage {
aws_lambda_events::sqs::SqsMessage {
message_id: message.message_id,
receipt_handle: message.receipt_handle,
body: message.body,
md5_of_body: message.md5_of_body,
md5_of_message_attributes: message.md5_of_message_attributes,
// Translating message_attributes structs is an exercise for later
//attributes: message.message_attributes.unwrap_or_default()
..Default::default()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions crates/oxbow-sqs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ tracing-subscriber = { workspace = true }
aws-config = "1.5.16"
aws-sdk-sqs = "1.58.0"
url = { workspace = true }
stats_alloc = "0.1.10"

[dev-dependencies]
serial_test = "3"
46 changes: 42 additions & 4 deletions crates/oxbow-sqs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
use aws_sdk_sqs::types::DeleteMessageBatchRequestEntry;
pub use aws_sdk_sqs::types::Message;
use stats_alloc::Region;
use tracing::log::*;
use url::Url;

use std::alloc::System;
use std::time::{Duration, Instant};

// How can I handle this:
Expand Down Expand Up @@ -51,8 +53,8 @@ impl Default for ConsumerConfig {

/// A [TimedConsumer] helps consume from Amazon SQS up until a certain threshold of time, typically
/// used to stop consuming messages at a certain amount of the Lambda's runtime
#[derive(Clone, Debug)]
pub struct TimedConsumer {
#[derive(Debug)]
pub struct TimedConsumer<'a> {
config: ConsumerConfig,
/// The [Instant] when this consumer was created. This value is used to determine when to
/// finalize based on the configured deadline
Expand All @@ -66,9 +68,14 @@ pub struct TimedConsumer {
pub limit: Option<u64>,
// Collection of needed to delete messages upon finalization
receive_handles: Vec<DeleteMessageBatchRequestEntry>,

// Optional memory limit
memory_limit_region: Option<Region<'a, System>>,
memory_limit_bytes: Option<usize>,
bytes_consumed: usize,
}

impl TimedConsumer {
impl<'a> TimedConsumer<'a> {
/// Initialize the [TimedConsumer]
///
/// `config` is a provided [aws_config::SdkConfig] to be used for interacting with AWS services like SQS
Expand All @@ -86,6 +93,10 @@ impl TimedConsumer {
start: Instant::now(),
limit: None,
receive_handles: vec![],

memory_limit_region: None,
memory_limit_bytes: None,
bytes_consumed: 0,
}
}

Expand Down Expand Up @@ -123,6 +134,25 @@ impl TimedConsumer {
return Ok(None);
}

if let Some(ref mut region) = self.memory_limit_region {
let stats = region.change_and_reset();
self.bytes_consumed += stats.bytes_allocated - stats.bytes_deallocated;
info!(
"current memory usage: {} bytes but {} was allocated since last check",
self.bytes_consumed, stats.bytes_allocated
);

if let Some(limit) = self.memory_limit_bytes {
if self.bytes_consumed >= limit {
info!(
"Memory limit of {}MB reached, stopping consumption",
limit / 1024 / 1024
);
return Ok(None);
}
}
}

let visibility_timeout: i32 = self
.deadline
.as_secs()
Expand Down Expand Up @@ -187,9 +217,17 @@ impl TimedConsumer {
self.receive_handles = vec![];
Ok(())
}

pub fn set_memory_limit(&mut self, mut region: Region<'a, System>, limit: usize) {
let stats = region.change_and_reset();
self.bytes_consumed = stats.bytes_allocated - stats.bytes_deallocated;

self.memory_limit_region = Some(region);
self.memory_limit_bytes = Some(limit);
}
}

impl std::ops::Drop for TimedConsumer {
impl std::ops::Drop for TimedConsumer<'_> {
fn drop(&mut self) {
if !self.receive_handles.is_empty() {
error!(
Expand Down
68 changes: 12 additions & 56 deletions lambdas/file-loader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
///
use aws_lambda_events::event::sqs::SqsEvent;
use aws_lambda_events::s3::S3EventRecord;
use aws_lambda_events::sqs::SqsMessage;
use deltalake::DeltaResult;
use deltalake::arrow::json::reader::{Decoder, ReaderBuilder};
use deltalake::writer::{DeltaWriter, record_batch::RecordBatchWriter};
Expand Down Expand Up @@ -52,7 +51,7 @@ async fn main() -> Result<(), Error> {
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
let table_uri = std::env::var("DELTA_TABLE_URI").expect("Failed to get `DELTA_TABLE_URI`");
debug!("Receiving event: {event:?}");
let mut region = Region::new(GLOBAL);
let region = Region::new(GLOBAL);
let mut records = extract_records_from(vec![event.payload])?;

info!("Processing {} bucket notifications", records.len());
Expand All @@ -69,9 +68,6 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
let mut writer = RecordBatchWriter::for_table(&table)?;
let schema = writer.arrow_schema();
info!("Schema for destination table: {schema:?}");
let stats = region.change_and_reset();
let mut bytes_consumed: usize = stats.bytes_allocated - stats.bytes_deallocated;
info!("table opened, current memory usage: {}", bytes_consumed);

let config = aws_config::load_from_env().await;
let client = aws_sdk_s3::Client::new(&config);
Expand All @@ -90,6 +86,16 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
.expect("Failed to compute a 64-bit deadline"),
),
);
if let Ok(bytes_to_consume) = std::env::var("BUFFER_MORE_MBYTES_ALLOWED") {
consumer.set_memory_limit(
region,
bytes_to_consume
.parse::<usize>()
.expect("BUFFER_MORE_MBYTES_ALLOWED must be parseable as a uint64")
* 1024
* 1024,
);
}

loop {
let next_up = consumer.next().await?;
Expand Down Expand Up @@ -140,25 +146,6 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
}
}
}
let stats = region.change_and_reset();
bytes_consumed += stats.bytes_allocated - stats.bytes_deallocated;
info!(
"loop of writes, current memory usage: {bytes_consumed} but {} was allocated during the loop",
stats.bytes_allocated
);

if let Ok(bytes_to_consume) = std::env::var("BUFFER_MORE_MBYTES_ALLOWED") {
let mbytes_to_consume: usize = str::parse(&bytes_to_consume)
.expect("BUFFER_MORE_BYTES_ALLOWED must be parseable as a uint64");

info!(
"Allocated {bytes_consumed} bytes thus far... I can only have {mbytes_to_consume}MB"
);
if bytes_consumed >= (mbytes_to_consume * 1024 * 1024) {
info!("Finalizing after consuming {bytes_consumed} bytes of memory");
break;
}
}
}

info!("Attempting to flush.. {table:?}");
Expand All @@ -170,24 +157,6 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
Ok(())
}

/// Simple function for extracting the necessary [S3EventRecord] structs from a given [SqsEvent]
///
/// This utilizes the `UNWRAP_SNS_ENVELOPE` environment variable to handle SNS-encoded bucket
/// notifications
fn extract_records_from(
events: impl IntoIterator<Item = SqsEvent>,
) -> DeltaResult<Vec<S3EventRecord>> {
let mut records = vec![];
for event in events {
let pieces = match std::env::var("UNWRAP_SNS_ENVELOPE") {
Ok(_) => s3_from_sns(event)?,
Err(_) => s3_from_sqs(event)?,
};
records.extend(pieces);
}
Ok(records_with_url_decoded_keys(&records))
}

/// Extract the suffix from the given [S3EventRecord] for matching and data loading
fn suffix_from_record(record: &S3EventRecord) -> RecordType {
if let Some(key) = record.s3.object.key.as_ref()
Expand All @@ -203,20 +172,6 @@ fn suffix_from_record(record: &S3EventRecord) -> RecordType {
RecordType::Unknown
}

/// Convert an [oxbow_sqs::Message] into an [SqsMessage]
fn convert_from_sqs(message: oxbow_sqs::Message) -> SqsMessage {
SqsMessage {
message_id: message.message_id,
receipt_handle: message.receipt_handle,
body: message.body,
md5_of_body: message.md5_of_body,
md5_of_message_attributes: message.md5_of_message_attributes,
// Translating message_attributes structs is an exercise for later
//attributes: message.message_attributes.unwrap_or_default()
..Default::default()
}
}

/// Deserialize the bytes which have been provided by the input and write them into the
/// [RecordBatchWriter]
///
Expand Down Expand Up @@ -270,6 +225,7 @@ async fn deserialize_bytes(
mod tests {
use super::*;
use aws_lambda_events::s3::{S3Entity, S3EventRecord, S3Object};
use aws_lambda_events::sqs::SqsMessage;
use deltalake::arrow::array::RecordBatch;
use deltalake::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
use deltalake::datafusion::prelude::SessionContext;
Expand Down
29 changes: 29 additions & 0 deletions lambdas/parquet-concat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "parquet-concat"
version.workspace = true
edition.workspace = true
repository.workspace = true
homepage.workspace = true

[dependencies]
bytes = "*"
lambda_runtime = { version = "0.11" }
oxbow = { path = "../../crates/oxbow" }
oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" }
oxbow-sqs = { path = "../../crates/oxbow-sqs" }

anyhow = { workspace = true }
aws_lambda_events = { workspace = true, default-features = false, features = ["sqs"] }
deltalake = { workspace = true }
tokio = { workspace = true }
tokio-stream = { version = "0.1.17" }
aws-config = "1.5.10"
aws-sdk-s3 = "1.60.0"
object_store = { version = "0.12.1", features = ["cloud"]}
parquet = { version = "55.0.0", features = ["async", "object_store"] }
stats_alloc = "0.1"
tracing-subscriber = { workspace = true }
uuid = { version = "1.6.1", features = ["v4"] }

[dev-dependencies]
tempfile = "3.8"
57 changes: 57 additions & 0 deletions lambdas/parquet-concat/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
ifdef::env-github[]
:tip-caption: :bulb:
:note-caption: :information_source:
:important-caption: :heavy_exclamation_mark:
:caution-caption: :fire:
:warning-caption: :warning:
endif::[]
:toc: macro

= Parquet Concat

The `parquet-concat` is an S3 Event Notifications triggered Lambda whose responsibility is to read multiple Parquet files and concatenate them together.

This is helpful to speed up oxbow ingestion by reducing the number of small versions.

toc::[]

== Environment Variables

|===

| Name | Default Value | Notes

| `INPUT_BUCKET`
|
| S3 bucket name where input Parquet files are located

| `INPUT_PREFIX`
|
| S3 prefix/path that should be removed from the input

| `OUTPUT_BUCKET`
|
| S3 bucket name where consolidated Parquet files will be written

| `OUTPUT_PREFIX`
|
| S3 prefix/path where consolidated files will be stored

| `BUFFER_MORE_QUEUE_URL`
|
| SQS queue URL for buffering additional messages beyond the initial Lambda trigger

| `BUFFER_MORE_MBYTES_ALLOWED`
|
| Memory limit in MB for processing additional messages (optional)

| `RUST_LOG`
| `error`
| Log level (default: `error`). Can be scoped to specific modules, e.g. `oxbow=debug`

| `UNWRAP_SNS_ENVELOPE`
| _null_
| Should only be used if S3 Event Notifications are first going to SNS and then routing to SQS for Oxbow

|===

Loading
Loading