Skip to content

Commit 86523a0

Browse files
[FEAT] connect: createDataFrame (Eventual-Inc#3363)
1 parent d1d0fab commit 86523a0

File tree

22 files changed

+679
-51
lines changed

22 files changed

+679
-51
lines changed

Cargo.lock

Lines changed: 56 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ daft-dsl = {path = "src/daft-dsl"}
200200
daft-hash = {path = "src/daft-hash"}
201201
daft-local-execution = {path = "src/daft-local-execution"}
202202
daft-logical-plan = {path = "src/daft-logical-plan"}
203+
daft-micropartition = {path = "src/daft-micropartition"}
203204
daft-scan = {path = "src/daft-scan"}
204205
daft-schema = {path = "src/daft-schema"}
205206
daft-table = {path = "src/daft-table"}

daft/dataframe/dataframe.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,34 @@
6363
ManyColumnsInputType = Union[ColumnInputType, Iterable[ColumnInputType]]
6464

6565

66+
def to_logical_plan_builder(*parts: MicroPartition) -> LogicalPlanBuilder:
67+
"""Creates a Daft DataFrame from a single Table.
68+
69+
Args:
70+
parts: The Tables that we wish to convert into a Daft DataFrame.
71+
72+
Returns:
73+
DataFrame: Daft DataFrame created from the provided Table.
74+
"""
75+
if not parts:
76+
raise ValueError("Can't create a DataFrame from an empty list of tables.")
77+
78+
result_pset = LocalPartitionSet()
79+
80+
for i, part in enumerate(parts):
81+
result_pset.set_partition_from_table(i, part)
82+
83+
context = get_context()
84+
cache_entry = context.get_or_create_runner().put_partition_set_into_cache(result_pset)
85+
size_bytes = result_pset.size_bytes()
86+
num_rows = len(result_pset)
87+
88+
assert size_bytes is not None, "In-memory data should always have non-None size in bytes"
89+
return LogicalPlanBuilder.from_in_memory_scan(
90+
cache_entry, parts[0].schema(), result_pset.num_partitions(), size_bytes, num_rows=num_rows
91+
)
92+
93+
6694
class DataFrame:
6795
"""A Daft DataFrame is a table of data. It has columns, where each column has a type and the same
6896
number of items (rows) as all other columns.

src/arrow2/src/io/ipc/read/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub fn read_record_batch<R: Read + Seek>(
8787
file_size: u64,
8888
scratch: &mut Vec<u8>,
8989
) -> Result<Chunk<Box<dyn Array>>> {
90-
assert_eq!(fields.len(), ipc_schema.fields.len());
90+
assert_eq!(fields.len(), ipc_schema.fields.len(), "IPC schema fields and Arrow schema fields must be the same length");
9191
let buffers = batch
9292
.buffers()
9393
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))?

src/arrow2/src/io/ipc/read/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,4 @@ pub type Dictionaries = AHashMap<i64, Box<dyn Array>>;
4242
pub(crate) type Node<'a> = arrow_format::ipc::FieldNodeRef<'a>;
4343
pub(crate) type IpcBuffer<'a> = arrow_format::ipc::BufferRef<'a>;
4444
pub(crate) type Compression<'a> = arrow_format::ipc::BodyCompressionRef<'a>;
45-
pub(crate) type Version = arrow_format::ipc::MetadataVersion;
45+
pub type Version = arrow_format::ipc::MetadataVersion;

src/daft-connect/Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,31 @@
11
[dependencies]
2-
arrow2 = {workspace = true}
2+
arrow2 = {workspace = true, features = ["io_json_integration"]}
33
async-stream = "0.3.6"
4+
color-eyre = "0.6.3"
45
common-daft-config = {workspace = true}
56
daft-core = {workspace = true}
67
daft-dsl = {workspace = true}
78
daft-local-execution = {workspace = true}
89
daft-logical-plan = {workspace = true}
10+
daft-micropartition = {workspace = true}
911
daft-scan = {workspace = true}
1012
daft-schema = {workspace = true}
1113
daft-table = {workspace = true}
1214
dashmap = "6.1.0"
15+
derive_more = {workspace = true}
1316
eyre = "0.6.12"
1417
futures = "0.3.31"
18+
itertools = {workspace = true}
1519
pyo3 = {workspace = true, optional = true}
20+
serde_json = {workspace = true}
1621
spark-connect = {workspace = true}
1722
tokio = {version = "1.40.0", features = ["full"]}
1823
tonic = "0.12.3"
1924
tracing = {workspace = true}
2025
uuid = {version = "1.10.0", features = ["v4"]}
2126

2227
[features]
23-
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python", "daft-dsl/python", "daft-schema/python", "daft-core/python"]
28+
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python", "daft-dsl/python", "daft-schema/python", "daft-core/python", "daft-micropartition/python"]
2429

2530
[lints]
2631
workspace = true

src/daft-connect/src/lib.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#![feature(iter_from_coroutine)]
66
#![feature(stmt_expr_attributes)]
77
#![feature(try_trait_v2_residual)]
8-
#![deny(clippy::print_stdout)]
98

109
use dashmap::DashMap;
1110
use eyre::Context;
@@ -23,7 +22,7 @@ use spark_connect::{
2322
ReleaseExecuteResponse, ReleaseSessionRequest, ReleaseSessionResponse,
2423
};
2524
use tonic::{transport::Server, Request, Response, Status};
26-
use tracing::{debug, info};
25+
use tracing::info;
2726
use uuid::Uuid;
2827

2928
use crate::session::Session;
@@ -325,8 +324,6 @@ impl SparkConnectService for DaftSparkConnectService {
325324
result: Some(analyze_plan_response::Result::Schema(schema)),
326325
};
327326

328-
debug!("response: {response:#?}");
329-
330327
Ok(Response::new(response))
331328
}
332329
_ => unimplemented_err!("Analyze plan operation is not yet implemented"),
@@ -346,7 +343,6 @@ impl SparkConnectService for DaftSparkConnectService {
346343
&self,
347344
_request: Request<InterruptRequest>,
348345
) -> Result<Response<InterruptResponse>, Status> {
349-
println!("got interrupt");
350346
unimplemented_err!("interrupt operation is not yet implemented")
351347
}
352348

@@ -361,17 +357,26 @@ impl SparkConnectService for DaftSparkConnectService {
361357
#[tracing::instrument(skip_all)]
362358
async fn release_execute(
363359
&self,
364-
_request: Request<ReleaseExecuteRequest>,
360+
request: Request<ReleaseExecuteRequest>,
365361
) -> Result<Response<ReleaseExecuteResponse>, Status> {
366-
unimplemented_err!("release_execute operation is not yet implemented")
362+
let request = request.into_inner();
363+
364+
let session = self.get_session(&request.session_id)?;
365+
366+
let response = ReleaseExecuteResponse {
367+
session_id: session.client_side_session_id().to_string(),
368+
server_side_session_id: session.server_side_session_id().to_string(),
369+
operation_id: None, // todo: set but not strictly required
370+
};
371+
372+
Ok(Response::new(response))
367373
}
368374

369375
#[tracing::instrument(skip_all)]
370376
async fn release_session(
371377
&self,
372378
_request: Request<ReleaseSessionRequest>,
373379
) -> Result<Response<ReleaseSessionResponse>, Status> {
374-
println!("got release session");
375380
unimplemented_err!("release_session operation is not yet implemented")
376381
}
377382

@@ -380,7 +385,6 @@ impl SparkConnectService for DaftSparkConnectService {
380385
&self,
381386
_request: Request<FetchErrorDetailsRequest>,
382387
) -> Result<Response<FetchErrorDetailsResponse>, Status> {
383-
println!("got fetch error details");
384388
unimplemented_err!("fetch_error_details operation is not yet implemented")
385389
}
386390
}

src/daft-connect/src/op/execute/root.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, future::ready, sync::Arc};
1+
use std::{future::ready, sync::Arc};
22

33
use common_daft_config::DaftExecutionConfig;
44
use daft_local_execution::NativeExecutor;
@@ -10,6 +10,7 @@ use crate::{
1010
op::execute::{ExecuteStream, PlanIds},
1111
session::Session,
1212
translation,
13+
translation::Plan,
1314
};
1415

1516
impl Session {
@@ -31,13 +32,11 @@ impl Session {
3132
let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(1);
3233
tokio::spawn(async move {
3334
let execution_fut = async {
34-
let plan = translation::to_logical_plan(command)?;
35-
let optimized_plan = plan.optimize()?;
35+
let Plan { builder, psets } = translation::to_logical_plan(command)?;
36+
let optimized_plan = builder.optimize()?;
3637
let cfg = Arc::new(DaftExecutionConfig::default());
3738
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
38-
let mut result_stream = native_executor
39-
.run(HashMap::new(), cfg, None)?
40-
.into_stream();
39+
let mut result_stream = native_executor.run(psets, cfg, None)?.into_stream();
4140

4241
while let Some(result) = result_stream.next().await {
4342
let result = result?;

src/daft-connect/src/translation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ mod literal;
66
mod logical_plan;
77
mod schema;
88

9-
pub use datatype::{to_daft_datatype, to_spark_datatype};
9+
pub use datatype::{deser_spark_datatype, to_daft_datatype, to_spark_datatype};
1010
pub use expr::to_daft_expr;
1111
pub use literal::to_daft_literal;
12-
pub use logical_plan::to_logical_plan;
12+
pub use logical_plan::{to_logical_plan, Plan};
1313
pub use schema::relation_to_schema;

src/daft-connect/src/translation/datatype.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ use eyre::{bail, ensure, WrapErr};
33
use spark_connect::data_type::Kind;
44
use tracing::warn;
55

6+
mod codec;
7+
pub use codec::deser as deser_spark_datatype;
8+
69
pub fn to_spark_datatype(datatype: &DataType) -> spark_connect::DataType {
710
match datatype {
811
DataType::Null => spark_connect::DataType {

0 commit comments

Comments
 (0)