Skip to content

Commit cf4cc25

Browse files
Commit profile script
1 parent 7bccdbc commit cf4cc25

3 files changed

Lines changed: 61 additions & 1 deletion

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
/target
2-
/data/*
2+
/data/*
3+
profile.json.gz
File renamed without changes.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use std::fs::File;
2+
3+
use datafusion::prelude::SessionContext;
4+
use flate2::read::GzDecoder;
5+
use std::io::{BufRead, BufReader};
6+
7+
use arrow::array::{ArrayRef, RecordBatch, StringArray};
8+
use arrow::datatypes::{DataType, Field, Schema};
9+
use datafusion::logical_expr::ScalarUDF;
10+
use datafusion_variant::{JsonToVariantUdf, VariantGetUdf};
11+
use std::sync::Arc;
12+
13+
#[tokio::main]
14+
async fn main() {
15+
let mut args = std::env::args();
16+
17+
let num_rows = {
18+
args.next();
19+
let n_str = args.next().expect("expected argument specifiying num rows");
20+
21+
n_str.parse::<usize>().expect("expected number")
22+
};
23+
24+
let file_path = "data/bluesky/file_0001.json.gz";
25+
26+
// load data
27+
let file = File::open(file_path).expect("make sure to run ./bin/download_data.sh");
28+
let decoder = GzDecoder::new(file);
29+
let reader = BufReader::new(decoder);
30+
31+
let json_strings = reader
32+
.lines()
33+
.take(num_rows)
34+
.map(|l| l.unwrap())
35+
.collect::<Vec<_>>();
36+
37+
let ctx = SessionContext::new();
38+
let schema = Schema::new(vec![Field::new("json_data", DataType::Utf8, false)]);
39+
let string_array: ArrayRef = Arc::new(StringArray::from(json_strings));
40+
let batch = RecordBatch::try_new(Arc::new(schema), vec![string_array]).unwrap();
41+
42+
let provider =
43+
datafusion::datasource::MemTable::try_new(batch.schema(), vec![vec![batch]]).unwrap();
44+
45+
ctx.register_table("bsky", Arc::new(provider)).unwrap();
46+
47+
// register variant udfs
48+
ctx.register_udf(ScalarUDF::new_from_impl(JsonToVariantUdf::default()));
49+
ctx.register_udf(ScalarUDF::new_from_impl(VariantGetUdf::default()));
50+
51+
let _df = ctx
52+
.sql(
53+
r"
54+
select json_to_variant(bsky.json_data) from bsky
55+
",
56+
)
57+
.await
58+
.unwrap();
59+
}

0 commit comments

Comments
 (0)