Skip to content

Commit 4533690

Browse files
committed
feat: Support generating updates and deletes in data generator
1 parent f57d3b6 commit 4533690

10 files changed

Lines changed: 489 additions & 46 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/data-generation/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ futures.workspace = true
2525
duckdb = { workspace = true, features = ["bundled", "vtab", "vtab-arrow"] }
2626
object_store = { workspace = true }
2727
parquet.workspace = true
28+
rand.workspace = true
2829
serde_json.workspace = true
2930
tokio.workspace = true
3031
tracing.workspace = true
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
Copyright 2024-2025 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
use std::collections::{HashMap, HashSet};
18+
use std::hash::Hash;
19+
20+
use rand::Rng;
21+
22+
/// A primary key value that can represent single-column or composite keys.
23+
///
24+
/// The `Single` variant stores a single `i64` inline (8 bytes, no heap
25+
/// allocation), which is optimal for the common case of a single integer
26+
/// primary key. The `Composite` variant uses a `Box<[i64]>` to support
27+
/// multi-column keys with minimal inline size (16 bytes for the fat pointer).
28+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29+
pub enum PrimaryKeyValue {
30+
/// A single-column primary key (8 bytes, no heap allocation).
31+
Single(i64),
32+
/// A composite (multi-column) primary key (heap-allocated).
33+
Composite(Box<[i64]>),
34+
}
35+
36+
impl PrimaryKeyValue {
37+
/// Creates a new single-column primary key value.
38+
pub fn single(value: i64) -> Self {
39+
Self::Single(value)
40+
}
41+
42+
/// Creates a new composite primary key value from a slice.
43+
pub fn composite(values: &[i64]) -> Self {
44+
Self::Composite(values.into())
45+
}
46+
}
47+
48+
/// An indexed set that supports O(1) amortized insertion, O(1) deletion, and
49+
/// O(1) uniform random selection by index.
50+
///
51+
/// Internally uses a dense [`Vec`] for random access paired with a [`HashMap`]
52+
/// for key-to-index lookup. Deletion uses swap-remove on the `Vec` to maintain
53+
/// density, and updates the moved element's index in the map.
54+
///
55+
/// # Memory
56+
///
57+
/// Each entry is stored both in the `Vec` (for random selection) and the
58+
/// `HashMap` (for lookup by value). For `i64` keys this is roughly 48 bytes
59+
/// per entry; for large sets (hundreds of millions) this can consume several
60+
/// gigabytes of RAM.
61+
///
62+
/// # Composite key support
63+
///
64+
/// The set is generic over any key type that implements `Eq + Hash + Clone`.
65+
/// Use [`PrimaryKeyValue`] for runtime-polymorphic single/composite keys, or
66+
/// specialize to `i64` for minimal overhead when the primary key is a single
67+
/// integer column.
68+
pub struct IndexedKeySet<K: Eq + Hash + Clone> {
69+
keys: Vec<K>,
70+
index: HashMap<K, usize>,
71+
}
72+
73+
impl<K: Eq + Hash + Clone> Default for IndexedKeySet<K> {
74+
fn default() -> Self {
75+
Self::new()
76+
}
77+
}
78+
79+
impl<K: Eq + Hash + Clone> IndexedKeySet<K> {
80+
/// Creates a new, empty `IndexedKeySet`.
81+
pub fn new() -> Self {
82+
Self {
83+
keys: Vec::new(),
84+
index: HashMap::new(),
85+
}
86+
}
87+
88+
/// Creates a new `IndexedKeySet` with pre-allocated capacity.
89+
pub fn with_capacity(cap: usize) -> Self {
90+
Self {
91+
keys: Vec::with_capacity(cap),
92+
index: HashMap::with_capacity(cap),
93+
}
94+
}
95+
96+
/// Inserts a key into the set. Returns `true` if the key was newly inserted.
97+
pub fn insert(&mut self, key: K) -> bool {
98+
if self.index.contains_key(&key) {
99+
return false;
100+
}
101+
let idx = self.keys.len();
102+
self.keys.push(key.clone());
103+
self.index.insert(key, idx);
104+
true
105+
}
106+
107+
/// Removes a key from the set. Returns `true` if the key was present.
108+
///
109+
/// Uses swap-remove on the internal `Vec` so that all operations remain
110+
/// O(1) amortized. The last element in the `Vec` is moved into the vacated
111+
/// slot and its index in the `HashMap` is updated.
112+
pub fn remove(&mut self, key: &K) -> bool {
113+
let Some(idx) = self.index.remove(key) else {
114+
return false;
115+
};
116+
self.keys.swap_remove(idx);
117+
// If an element was swapped from the end into `idx`, update its index.
118+
if idx < self.keys.len() {
119+
let moved = &self.keys[idx];
120+
self.index.insert(moved.clone(), idx);
121+
}
122+
true
123+
}
124+
125+
/// Returns a uniformly random key from the set, or `None` if empty.
126+
pub fn random_key(&self, rng: &mut impl Rng) -> Option<&K> {
127+
if self.keys.is_empty() {
128+
return None;
129+
}
130+
let idx = rng.random_range(0..self.keys.len());
131+
Some(&self.keys[idx])
132+
}
133+
134+
/// Samples up to `n` distinct keys uniformly at random.
135+
///
136+
/// If `n >= len()`, returns all keys in arbitrary order. Uses rejection
137+
/// sampling, which is efficient when `n` is small relative to `len()`.
138+
pub fn sample_keys(&self, n: usize, rng: &mut impl Rng) -> Vec<K> {
139+
let len = self.keys.len();
140+
if n >= len {
141+
return self.keys.clone();
142+
}
143+
if n == 0 {
144+
return Vec::new();
145+
}
146+
let mut chosen = HashSet::with_capacity(n);
147+
while chosen.len() < n {
148+
chosen.insert(rng.random_range(0..len));
149+
}
150+
chosen.into_iter().map(|i| self.keys[i].clone()).collect()
151+
}
152+
153+
/// Returns the number of keys in the set.
154+
pub fn len(&self) -> usize {
155+
self.keys.len()
156+
}
157+
158+
/// Returns `true` if the set is empty.
159+
pub fn is_empty(&self) -> bool {
160+
self.keys.is_empty()
161+
}
162+
}
163+
164+
#[cfg(test)]
165+
mod tests {
166+
use super::*;
167+
168+
#[test]
169+
fn insert_and_len() {
170+
let mut set = IndexedKeySet::new();
171+
assert!(set.insert(1i64));
172+
assert!(set.insert(2));
173+
assert!(!set.insert(1)); // duplicate
174+
assert_eq!(set.len(), 2);
175+
}
176+
177+
#[test]
178+
fn remove_and_consistency() {
179+
let mut set = IndexedKeySet::new();
180+
for i in 0..5i64 {
181+
set.insert(i);
182+
}
183+
assert!(set.remove(&2));
184+
assert!(!set.remove(&2)); // already removed
185+
assert_eq!(set.len(), 4);
186+
187+
// All remaining keys should be findable.
188+
for key in &set.keys {
189+
assert!(set.index.contains_key(key));
190+
assert_eq!(set.keys[set.index[key]], *key);
191+
}
192+
}
193+
194+
#[test]
195+
fn random_selection() {
196+
let mut set = IndexedKeySet::new();
197+
for i in 0..100i64 {
198+
set.insert(i);
199+
}
200+
let mut rng = rand::rng();
201+
let key = set.random_key(&mut rng).unwrap();
202+
assert!((0..100).contains(key));
203+
}
204+
205+
#[test]
206+
fn sample_keys_distinct() {
207+
let mut set = IndexedKeySet::new();
208+
for i in 0..100i64 {
209+
set.insert(i);
210+
}
211+
let mut rng = rand::rng();
212+
let sampled = set.sample_keys(10, &mut rng);
213+
assert_eq!(sampled.len(), 10);
214+
215+
// All sampled keys should be distinct.
216+
let unique: HashSet<_> = sampled.iter().collect();
217+
assert_eq!(unique.len(), 10);
218+
}
219+
220+
#[test]
221+
fn composite_key() {
222+
let mut set = IndexedKeySet::<PrimaryKeyValue>::new();
223+
set.insert(PrimaryKeyValue::composite(&[1, 2]));
224+
set.insert(PrimaryKeyValue::composite(&[3, 4]));
225+
assert_eq!(set.len(), 2);
226+
assert!(set.remove(&PrimaryKeyValue::composite(&[1, 2])));
227+
assert_eq!(set.len(), 1);
228+
}
229+
}

crates/data-generation/src/dataset/mod.rs

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17+
pub mod key_set;
1718
pub mod simple_sequence;
1819
pub mod tpch;
1920

@@ -26,6 +27,8 @@ use arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
2627
use async_trait::async_trait;
2728

2829
use crate::config::DatasetConfig;
30+
use crate::dataset::simple_sequence::SimpleSequenceDataset;
31+
use crate::dataset::tpch::TpchDataset;
2932

3033
/// Metadata about a table in a dataset.
3134
#[derive(Debug, Clone)]
@@ -148,6 +151,26 @@ impl DatasetTable {
148151
}
149152
}
150153

154+
#[derive(Debug, Clone)]
155+
pub struct MutationConfig {
156+
pub(crate) update_ratio: f64,
157+
pub(crate) delete_ratio: f64,
158+
}
159+
160+
impl MutationConfig {
161+
pub fn new(update_ratio: f64, delete_ratio: f64) -> Self {
162+
let total = update_ratio + delete_ratio;
163+
if total > 1.0 {
164+
panic!("Mutation ratios must sum to 1.0 or less");
165+
}
166+
167+
Self {
168+
update_ratio,
169+
delete_ratio,
170+
}
171+
}
172+
}
173+
151174
#[async_trait]
152175
pub trait Dataset: Send + Sync {
153176
/// Creates a new instance of this dataset from the given configuration.
@@ -157,13 +180,9 @@ pub trait Dataset: Send + Sync {
157180
///
158181
/// The default implementation returns an error; concrete dataset types
159182
/// should override this.
160-
fn create(config: &DatasetConfig) -> anyhow::Result<Arc<dyn Dataset>>
183+
fn create(config: &DatasetConfig, mutations: &MutationConfig) -> anyhow::Result<Arc<dyn Dataset>>
161184
where
162-
Self: Sized + 'static,
163-
{
164-
let _ = config;
165-
anyhow::bail!("create() is not implemented for this dataset type")
166-
}
185+
Self: Sized + 'static;
167186

168187
/// Returns the batch IDs that would be produced for a given table after a
169188
/// successful generation run.
@@ -278,6 +297,12 @@ pub trait Dataset: Send + Sync {
278297
}
279298
}
280299

300+
/// Returns the primary key column names for the given table.
301+
///
302+
/// The returned `Vec` may contain multiple column names for composite keys.
303+
/// Returns an empty `Vec` if the table has no defined primary key.
304+
fn primary_key(&self, table: &str) -> Vec<String>;
305+
281306
/// Returns the tables this dataset produces, including metadata, keyed by table name.
282307
fn tables(&self) -> HashMap<String, DatasetTable>;
283308

@@ -297,6 +322,17 @@ pub trait Dataset: Send + Sync {
297322

298323
#[async_trait]
299324
impl Dataset for Arc<dyn Dataset> {
325+
fn create(config: &DatasetConfig, mutations: &MutationConfig) -> anyhow::Result<Arc<dyn Dataset>>
326+
where
327+
Self: Sized + 'static,
328+
{
329+
match config.dataset_type.as_str() {
330+
"tpch" => TpchDataset::create(config, mutations),
331+
"simple_sequence" => SimpleSequenceDataset::create(config, mutations),
332+
other => anyhow::bail!("Unknown dataset type: {other}. Supported: tpch, simple_sequence"),
333+
}
334+
}
335+
300336
fn batch_ids(&self, table: &str) -> VecDeque<u64> {
301337
(**self).batch_ids(table)
302338
}
@@ -317,6 +353,10 @@ impl Dataset for Arc<dyn Dataset> {
317353
(**self).next_batches().await
318354
}
319355

356+
fn primary_key(&self, table: &str) -> Vec<String> {
357+
(**self).primary_key(table)
358+
}
359+
320360
fn tables(&self) -> HashMap<String, DatasetTable> {
321361
(**self).tables()
322362
}

0 commit comments

Comments
 (0)