@@ -18,7 +18,7 @@ use arrow::pyarrow::PyArrowType;
18
18
use chrono:: { DateTime , Duration , FixedOffset , Utc } ;
19
19
use datafusion_ffi:: table_provider:: FFI_TableProvider ;
20
20
use delta_kernel:: expressions:: Scalar ;
21
- use delta_kernel:: schema:: StructField ;
21
+ use delta_kernel:: schema:: { MetadataValue , StructField } ;
22
22
use deltalake:: arrow:: compute:: concat_batches;
23
23
use deltalake:: arrow:: ffi_stream:: { ArrowArrayStreamReader , FFI_ArrowArrayStream } ;
24
24
use deltalake:: arrow:: record_batch:: { RecordBatch , RecordBatchIterator } ;
@@ -63,13 +63,6 @@ use error::DeltaError;
63
63
use futures:: future:: join_all;
64
64
use tracing:: log:: * ;
65
65
66
- use pyo3:: exceptions:: { PyRuntimeError , PyValueError } ;
67
- use pyo3:: prelude:: * ;
68
- use pyo3:: pybacked:: PyBackedStr ;
69
- use pyo3:: types:: { PyCapsule , PyDict , PyFrozenSet } ;
70
- use serde_json:: { Map , Value } ;
71
- use uuid:: Uuid ;
72
-
73
66
use crate :: error:: DeltaProtocolError ;
74
67
use crate :: error:: PythonError ;
75
68
use crate :: features:: TableFeatures ;
@@ -78,6 +71,14 @@ use crate::merge::PyMergeBuilder;
78
71
use crate :: query:: PyQueryBuilder ;
79
72
use crate :: schema:: { schema_to_pyobject, Field } ;
80
73
use crate :: utils:: rt;
74
+ use deltalake:: operations:: update_field_metadata:: UpdateFieldMetadataBuilder ;
75
+ use deltalake:: protocol:: DeltaOperation :: UpdateFieldMetadata ;
76
+ use pyo3:: exceptions:: { PyRuntimeError , PyValueError } ;
77
+ use pyo3:: prelude:: * ;
78
+ use pyo3:: pybacked:: PyBackedStr ;
79
+ use pyo3:: types:: { PyCapsule , PyDict , PyFrozenSet } ;
80
+ use serde_json:: { Map , Value } ;
81
+ use uuid:: Uuid ;
81
82
82
83
#[ cfg( all( target_family = "unix" , not( target_os = "emscripten" ) ) ) ]
83
84
use jemallocator:: Jemalloc ;
@@ -1521,6 +1522,43 @@ impl RawDeltaTable {
1521
1522
}
1522
1523
}
1523
1524
1525
+ #[ pyo3( signature = ( field_name, metadata, commit_properties=None , post_commithook_properties=None ) ) ]
1526
+ pub fn set_column_metadata (
1527
+ & self ,
1528
+ py : Python ,
1529
+ field_name : & str ,
1530
+ metadata : HashMap < String , String > ,
1531
+ commit_properties : Option < PyCommitProperties > ,
1532
+ post_commithook_properties : Option < PyPostCommitHookProperties > ,
1533
+ ) -> PyResult < ( ) > {
1534
+ let table = py. allow_threads ( || {
1535
+ let mut cmd = UpdateFieldMetadataBuilder :: new ( self . log_store ( ) ?, self . cloned_state ( ) ?) ;
1536
+
1537
+ cmd = cmd. with_field_name ( field_name) . with_metadata (
1538
+ metadata
1539
+ . iter ( )
1540
+ . map ( |( k, v) | ( k. clone ( ) , MetadataValue :: String ( v. clone ( ) ) ) )
1541
+ . collect ( ) ,
1542
+ ) ;
1543
+
1544
+ if let Some ( commit_properties) =
1545
+ maybe_create_commit_properties ( commit_properties, post_commithook_properties)
1546
+ {
1547
+ cmd = cmd. with_commit_properties ( commit_properties)
1548
+ }
1549
+
1550
+ if self . log_store ( ) ?. name ( ) == "LakeFSLogStore" {
1551
+ cmd = cmd. with_custom_execute_handler ( Arc :: new ( LakeFSCustomExecuteHandler { } ) )
1552
+ }
1553
+
1554
+ rt ( ) . block_on ( cmd. into_future ( ) )
1555
+ . map_err ( PythonError :: from)
1556
+ . map_err ( PyErr :: from)
1557
+ } ) ?;
1558
+ self . set_state ( table. state ) ?;
1559
+ Ok ( ( ) )
1560
+ }
1561
+
1524
1562
fn __datafusion_table_provider__ < ' py > (
1525
1563
& self ,
1526
1564
py : Python < ' py > ,
0 commit comments