Skip to content

Commit b84390d

Browse files
committed
More perf adjustments.
1 parent e042c48 commit b84390d

4 files changed

Lines changed: 26 additions & 10 deletions

File tree

lib/marin/src/marin/processing/classification/deduplication/fuzzy.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,11 @@ def compute_minhash_lsh_batches(batch: pa.RecordBatch) -> Iterator[dict]:
9595
doc_id_val = doc_id.as_py()
9696
for b in doc_buckets.as_py():
9797
counters.increment("minhash/buckets")
98-
yield {"bucket": str(b), "id": doc_id_val}
98+
# Reinterpret u64 as signed int64 so Arrow infers int64 instead of
99+
# failing on values >= 2^63. The bucket is only a grouping key so
100+
# the sign bit doesn't matter.
101+
bucket = b if b < (1 << 63) else b - (1 << 64)
102+
yield {"bucket": bucket, "id": doc_id_val}
99103

100104
ctx = ZephyrContext(
101105
name="fuzzy-dedup",

lib/zephyr/src/zephyr/plan.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,11 +291,11 @@ def _merge_key(row: dict) -> Any:
291291

292292
is_gen = inspect.isgeneratorfunction(reducer_fn)
293293
for start, end, key_value in _find_group_boundaries(key_col):
294-
group_items = item_col[start:end].to_pylist()
294+
group_items = (item_col[i].as_py() for i in range(start, end))
295295
if is_gen:
296-
yield from reducer_fn(key_value, iter(group_items))
296+
yield from reducer_fn(key_value, group_items)
297297
else:
298-
yield reducer_fn(key_value, iter(group_items))
298+
yield reducer_fn(key_value, group_items)
299299

300300

301301
def _reduce_gen(

lib/zephyr/src/zephyr/shuffle.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ def _ensure_writer(chunk_schema: pa.Schema) -> pa.Schema:
656656
seg_file = _segment_path(parquet_path, seg_idx)
657657
seg_paths.append(seg_file)
658658
ensure_parent_dir(seg_file)
659-
writer = pq.ParquetWriter(seg_file, schema)
659+
writer = pq.ParquetWriter(seg_file, schema, compression="zstd", compression_level=1)
660660
elif chunk_schema != schema:
661661
_flush_pending()
662662
writer.close()
@@ -668,7 +668,7 @@ def _ensure_writer(chunk_schema: pa.Schema) -> pa.Schema:
668668
seg_file = _segment_path(parquet_path, seg_idx)
669669
seg_paths.append(seg_file)
670670
ensure_parent_dir(seg_file)
671-
writer = pq.ParquetWriter(seg_file, schema)
671+
writer = pq.ParquetWriter(seg_file, schema, compression="zstd", compression_level=1)
672672
logger.info(
673673
"[shard %d] Schema evolved after %d chunks; starting segment %d",
674674
source_shard,

rust/dupekit/src/minhash_ops.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,23 @@ use pyo3::prelude::*;
44
use rand::{Rng, SeedableRng};
55
use rand_pcg::Pcg64;
66
use regex::Regex;
7-
use std::sync::Arc;
7+
use std::sync::{Arc, OnceLock};
88
use xxhash_rust::xxh3;
99

10+
static WHITESPACE_RE: OnceLock<Regex> = OnceLock::new();
11+
12+
fn whitespace_regex() -> &'static Regex {
13+
WHITESPACE_RE.get_or_init(|| Regex::new(r"\s+").unwrap())
14+
}
15+
1016
/// Clean text using the SlimPajama text cleaning process.
1117
/// 1. Lowercase
1218
/// 2. Remove punctuation
1319
/// 3. Replace multiple whitespace with single space
1420
/// 4. Trim
1521
pub fn clean_text(arr: &StringArray) -> PyResult<Arc<StringArray>> {
1622
let mut builder = StringBuilder::with_capacity(arr.len(), arr.len() * 50);
17-
let whitespace_re = Regex::new(r"\s+").map_err(|e| PyValueError::new_err(e.to_string()))?;
23+
let whitespace_re = whitespace_regex();
1824
let punctuation: &[char] = &[
1925
'!', '"', '#', '$', '%', '&', '\'', '(', ')', '*', '+', ',', '-', '.', '/', ':', ';', '<',
2026
'=', '>', '?', '@', '[', '\\', ']', '^', '_', '`', '{', '|', '}', '~',
@@ -73,9 +79,15 @@ pub fn compute_minhash(
7379
let hash = xxh3::xxh3_64(text.as_bytes()) as u128;
7480
update_signature(&mut signature, hash, &coeffs);
7581
} else {
82+
// Reusable buffer for encoding char windows to bytes, avoiding
83+
// a String allocation per ngram.
84+
let mut ngram_buf = Vec::with_capacity(ngram_size * 4);
7685
for window in chars.windows(ngram_size) {
77-
let s: String = window.iter().collect();
78-
let hash = xxh3::xxh3_64(s.as_bytes()) as u128;
86+
ngram_buf.clear();
87+
for &ch in window {
88+
ngram_buf.extend_from_slice(ch.encode_utf8(&mut [0; 4]).as_bytes());
89+
}
90+
let hash = xxh3::xxh3_64(&ngram_buf) as u128;
7991
update_signature(&mut signature, hash, &coeffs);
8092
}
8193
}

0 commit comments

Comments
 (0)