Skip to content

Commit 6da8458

Browse files
authored
feat: Support generating updates and deletes in data generator (#62)
* feat: Support generating updates and deletes in data generator * fix merge
1 parent e2c9377 commit 6da8458

10 files changed

Lines changed: 509 additions & 45 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/data-generation/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ futures.workspace = true
2525
duckdb = { workspace = true, features = ["bundled", "vtab", "vtab-arrow"] }
2626
object_store = { workspace = true }
2727
parquet.workspace = true
28+
rand.workspace = true
2829
serde_json.workspace = true
2930
tokio.workspace = true
3031
tracing.workspace = true
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
Copyright 2024-2025 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
use std::collections::{HashMap, HashSet};
18+
use std::hash::Hash;
19+
20+
use rand::Rng;
21+
22+
/// A primary key value that can represent single-column or composite keys.
23+
///
24+
/// The `Single` variant stores a single `i64` inline (8 bytes, no heap
25+
/// allocation), which is optimal for the common case of a single integer
26+
/// primary key. The `Composite` variant uses a `Box<[i64]>` to support
27+
/// multi-column keys with minimal inline size (16 bytes for the fat pointer).
28+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29+
pub enum PrimaryKeyValue {
30+
/// A single-column primary key (8 bytes, no heap allocation).
31+
Single(i64),
32+
/// A composite (multi-column) primary key (heap-allocated).
33+
Composite(Box<[i64]>),
34+
}
35+
36+
impl PrimaryKeyValue {
37+
/// Creates a new single-column primary key value.
38+
pub fn single(value: i64) -> Self {
39+
Self::Single(value)
40+
}
41+
42+
/// Creates a new composite primary key value from a slice.
43+
pub fn composite(values: &[i64]) -> Self {
44+
Self::Composite(values.into())
45+
}
46+
}
47+
48+
/// An indexed set that supports O(1) amortized insertion, O(1) deletion, and
49+
/// O(1) uniform random selection by index.
50+
///
51+
/// Internally uses a dense [`Vec`] for random access paired with a [`HashMap`]
52+
/// for key-to-index lookup. Deletion uses swap-remove on the `Vec` to maintain
53+
/// density, and updates the moved element's index in the map.
54+
///
55+
/// # Memory
56+
///
57+
/// Each entry is stored both in the `Vec` (for random selection) and the
58+
/// `HashMap` (for lookup by value). For `i64` keys this is roughly 48 bytes
59+
/// per entry; for large sets (hundreds of millions) this can consume several
60+
/// gigabytes of RAM.
61+
///
62+
/// # Composite key support
63+
///
64+
/// The set is generic over any key type that implements `Eq + Hash + Clone`.
65+
/// Use [`PrimaryKeyValue`] for runtime-polymorphic single/composite keys, or
66+
/// specialize to `i64` for minimal overhead when the primary key is a single
67+
/// integer column.
68+
pub struct IndexedKeySet<K: Eq + Hash + Clone> {
69+
keys: Vec<K>,
70+
index: HashMap<K, usize>,
71+
}
72+
73+
impl<K: Eq + Hash + Clone> Default for IndexedKeySet<K> {
74+
fn default() -> Self {
75+
Self::new()
76+
}
77+
}
78+
79+
impl<K: Eq + Hash + Clone> IndexedKeySet<K> {
80+
/// Creates a new, empty `IndexedKeySet`.
81+
pub fn new() -> Self {
82+
Self {
83+
keys: Vec::new(),
84+
index: HashMap::new(),
85+
}
86+
}
87+
88+
/// Creates a new `IndexedKeySet` with pre-allocated capacity.
89+
pub fn with_capacity(cap: usize) -> Self {
90+
Self {
91+
keys: Vec::with_capacity(cap),
92+
index: HashMap::with_capacity(cap),
93+
}
94+
}
95+
96+
/// Inserts a key into the set. Returns `true` if the key was newly inserted.
97+
pub fn insert(&mut self, key: K) -> bool {
98+
if self.index.contains_key(&key) {
99+
return false;
100+
}
101+
let idx = self.keys.len();
102+
self.keys.push(key.clone());
103+
self.index.insert(key, idx);
104+
true
105+
}
106+
107+
/// Removes a key from the set. Returns `true` if the key was present.
108+
///
109+
/// Uses swap-remove on the internal `Vec` so that all operations remain
110+
/// O(1) amortized. The last element in the `Vec` is moved into the vacated
111+
/// slot and its index in the `HashMap` is updated.
112+
pub fn remove(&mut self, key: &K) -> bool {
113+
let Some(idx) = self.index.remove(key) else {
114+
return false;
115+
};
116+
self.keys.swap_remove(idx);
117+
// If an element was swapped from the end into `idx`, update its index.
118+
if idx < self.keys.len() {
119+
let moved = &self.keys[idx];
120+
self.index.insert(moved.clone(), idx);
121+
}
122+
true
123+
}
124+
125+
/// Returns a uniformly random key from the set, or `None` if empty.
126+
pub fn random_key(&self, rng: &mut impl Rng) -> Option<&K> {
127+
if self.keys.is_empty() {
128+
return None;
129+
}
130+
let idx = rng.random_range(0..self.keys.len());
131+
Some(&self.keys[idx])
132+
}
133+
134+
/// Samples up to `n` distinct keys uniformly at random.
135+
///
136+
/// If `n >= len()`, returns all keys in arbitrary order. Uses rejection
137+
/// sampling, which is efficient when `n` is small relative to `len()`.
138+
pub fn sample_keys(&self, n: usize, rng: &mut impl Rng) -> Vec<K> {
139+
let len = self.keys.len();
140+
if n >= len {
141+
return self.keys.clone();
142+
}
143+
if n == 0 {
144+
return Vec::new();
145+
}
146+
let mut chosen = HashSet::with_capacity(n);
147+
while chosen.len() < n {
148+
chosen.insert(rng.random_range(0..len));
149+
}
150+
chosen.into_iter().map(|i| self.keys[i].clone()).collect()
151+
}
152+
153+
/// Returns the number of keys in the set.
154+
pub fn len(&self) -> usize {
155+
self.keys.len()
156+
}
157+
158+
/// Returns `true` if the set is empty.
159+
pub fn is_empty(&self) -> bool {
160+
self.keys.is_empty()
161+
}
162+
}
163+
164+
#[cfg(test)]
165+
mod tests {
166+
use super::*;
167+
168+
#[test]
169+
fn insert_and_len() {
170+
let mut set = IndexedKeySet::new();
171+
assert!(set.insert(1i64));
172+
assert!(set.insert(2));
173+
assert!(!set.insert(1)); // duplicate
174+
assert_eq!(set.len(), 2);
175+
}
176+
177+
#[test]
178+
fn remove_and_consistency() {
179+
let mut set = IndexedKeySet::new();
180+
for i in 0..5i64 {
181+
set.insert(i);
182+
}
183+
assert!(set.remove(&2));
184+
assert!(!set.remove(&2)); // already removed
185+
assert_eq!(set.len(), 4);
186+
187+
// All remaining keys should be findable.
188+
for key in &set.keys {
189+
assert!(set.index.contains_key(key));
190+
assert_eq!(set.keys[set.index[key]], *key);
191+
}
192+
}
193+
194+
#[test]
195+
fn random_selection() {
196+
let mut set = IndexedKeySet::new();
197+
for i in 0..100i64 {
198+
set.insert(i);
199+
}
200+
let mut rng = rand::rng();
201+
let key = set.random_key(&mut rng).unwrap();
202+
assert!((0..100).contains(key));
203+
}
204+
205+
#[test]
206+
fn sample_keys_distinct() {
207+
let mut set = IndexedKeySet::new();
208+
for i in 0..100i64 {
209+
set.insert(i);
210+
}
211+
let mut rng = rand::rng();
212+
let sampled = set.sample_keys(10, &mut rng);
213+
assert_eq!(sampled.len(), 10);
214+
215+
// All sampled keys should be distinct.
216+
let unique: HashSet<_> = sampled.iter().collect();
217+
assert_eq!(unique.len(), 10);
218+
}
219+
220+
#[test]
221+
fn composite_key() {
222+
let mut set = IndexedKeySet::<PrimaryKeyValue>::new();
223+
set.insert(PrimaryKeyValue::composite(&[1, 2]));
224+
set.insert(PrimaryKeyValue::composite(&[3, 4]));
225+
assert_eq!(set.len(), 2);
226+
assert!(set.remove(&PrimaryKeyValue::composite(&[1, 2])));
227+
assert_eq!(set.len(), 1);
228+
}
229+
}

crates/data-generation/src/dataset/mod.rs

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17+
pub mod key_set;
1718
pub mod simple_sequence;
1819
pub mod tpch;
1920

@@ -26,6 +27,8 @@ use arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
2627
use async_trait::async_trait;
2728

2829
use crate::config::DatasetConfig;
30+
use crate::dataset::simple_sequence::SimpleSequenceDataset;
31+
use crate::dataset::tpch::TpchDataset;
2932

3033
/// Metadata about a table in a dataset.
3134
#[derive(Debug, Clone)]
@@ -134,6 +137,26 @@ impl DatasetTable {
134137
}
135138
}
136139

140+
#[derive(Debug, Clone)]
141+
pub struct MutationConfig {
142+
pub(crate) update_ratio: f64,
143+
pub(crate) delete_ratio: f64,
144+
}
145+
146+
impl MutationConfig {
147+
pub fn new(update_ratio: f64, delete_ratio: f64) -> Self {
148+
let total = update_ratio + delete_ratio;
149+
if total > 1.0 {
150+
panic!("Mutation ratios must sum to 1.0 or less");
151+
}
152+
153+
Self {
154+
update_ratio,
155+
delete_ratio,
156+
}
157+
}
158+
}
159+
137160
#[async_trait]
138161
pub trait Dataset: Send + Sync {
139162
/// Creates a new instance of this dataset from the given configuration.
@@ -143,13 +166,12 @@ pub trait Dataset: Send + Sync {
143166
///
144167
/// The default implementation returns an error; concrete dataset types
145168
/// should override this.
146-
fn create(config: &DatasetConfig) -> anyhow::Result<Arc<dyn Dataset>>
169+
fn create(
170+
config: &DatasetConfig,
171+
mutations: &MutationConfig,
172+
) -> anyhow::Result<Arc<dyn Dataset>>
147173
where
148-
Self: Sized + 'static,
149-
{
150-
let _ = config;
151-
anyhow::bail!("create() is not implemented for this dataset type")
152-
}
174+
Self: Sized + 'static;
153175

154176
/// Returns the batch IDs that would be produced for a given table after a
155177
/// successful generation run.
@@ -264,6 +286,12 @@ pub trait Dataset: Send + Sync {
264286
}
265287
}
266288

289+
/// Returns the primary key column names for the given table.
290+
///
291+
/// The returned `Vec` may contain multiple column names for composite keys.
292+
/// Returns an empty `Vec` if the table has no defined primary key.
293+
fn primary_key(&self, table: &str) -> Vec<String>;
294+
267295
/// Returns the tables this dataset produces, including metadata, keyed by table name.
268296
fn tables(&self) -> HashMap<String, DatasetTable>;
269297

@@ -282,6 +310,22 @@ pub trait Dataset: Send + Sync {
282310

283311
#[async_trait]
284312
impl Dataset for Arc<dyn Dataset> {
313+
fn create(
314+
config: &DatasetConfig,
315+
mutations: &MutationConfig,
316+
) -> anyhow::Result<Arc<dyn Dataset>>
317+
where
318+
Self: Sized + 'static,
319+
{
320+
match config.dataset_type.as_str() {
321+
"tpch" => TpchDataset::create(config, mutations),
322+
"simple_sequence" => SimpleSequenceDataset::create(config, mutations),
323+
other => {
324+
anyhow::bail!("Unknown dataset type: {other}. Supported: tpch, simple_sequence")
325+
}
326+
}
327+
}
328+
285329
fn batch_ids(&self, table: &str) -> VecDeque<u64> {
286330
(**self).batch_ids(table)
287331
}
@@ -302,6 +346,10 @@ impl Dataset for Arc<dyn Dataset> {
302346
(**self).next_batches().await
303347
}
304348

349+
fn primary_key(&self, table: &str) -> Vec<String> {
350+
(**self).primary_key(table)
351+
}
352+
305353
fn tables(&self) -> HashMap<String, DatasetTable> {
306354
(**self).tables()
307355
}

0 commit comments

Comments
 (0)