18
18
19
19
use crate :: event:: DEFAULT_TIMESTAMP_KEY ;
20
20
use crate :: utils:: arrow:: get_field;
21
- use anyhow:: { anyhow, Error as AnyError } ;
22
21
use serde:: { Deserialize , Serialize } ;
23
22
use std:: str;
24
23
25
24
use arrow_schema:: { DataType , Field , Schema , TimeUnit } ;
26
- use std:: { collections:: HashMap , sync:: Arc } ;
25
+ use std:: {
26
+ collections:: { HashMap , HashSet } ,
27
+ sync:: Arc ,
28
+ } ;
29
+
27
30
#[ derive( Debug , Clone , PartialEq , Serialize , Deserialize ) ]
28
31
pub struct StaticSchema {
29
32
fields : Vec < SchemaFields > ,
@@ -54,13 +57,12 @@ pub struct Fields {
54
57
}
55
58
56
59
#[ derive( Default , Debug , Clone , PartialEq , Serialize , Deserialize ) ]
57
-
58
60
pub struct Metadata { }
59
61
pub fn convert_static_schema_to_arrow_schema (
60
62
static_schema : StaticSchema ,
61
63
time_partition : & str ,
62
64
custom_partition : Option < & String > ,
63
- ) -> Result < Arc < Schema > , AnyError > {
65
+ ) -> Result < Arc < Schema > , StaticSchemaError > {
64
66
let mut parsed_schema = ParsedSchema {
65
67
fields : Vec :: new ( ) ,
66
68
metadata : HashMap :: new ( ) ,
@@ -83,11 +85,17 @@ pub fn convert_static_schema_to_arrow_schema(
83
85
84
86
for partition in & custom_partition_list {
85
87
if !custom_partition_exists. contains_key ( * partition) {
86
- return Err ( anyhow ! ( "custom partition field {partition} does not exist in the schema for the static schema logstream" ) ) ;
88
+ return Err ( StaticSchemaError :: MissingCustomPartition (
89
+ partition. to_string ( ) ,
90
+ ) ) ;
87
91
}
88
92
}
89
93
}
94
+
95
+ let mut existing_field_names: HashSet < String > = HashSet :: new ( ) ;
96
+
90
97
for mut field in static_schema. fields {
98
+ validate_field_names ( & field. name , & mut existing_field_names) ?;
91
99
if !time_partition. is_empty ( ) && field. name == time_partition {
92
100
time_partition_exists = true ;
93
101
field. data_type = "datetime" . to_string ( ) ;
@@ -127,29 +135,24 @@ pub fn convert_static_schema_to_arrow_schema(
127
135
parsed_schema. fields . push ( parsed_field) ;
128
136
}
129
137
if !time_partition. is_empty ( ) && !time_partition_exists {
130
- return Err ( anyhow ! {
131
- format!(
132
- "time partition field {time_partition} does not exist in the schema for the static schema logstream"
133
- ) ,
134
- } ) ;
138
+ return Err ( StaticSchemaError :: MissingTimePartition (
139
+ time_partition. to_string ( ) ,
140
+ ) ) ;
135
141
}
136
142
add_parseable_fields_to_static_schema ( parsed_schema)
137
143
}
138
144
139
145
fn add_parseable_fields_to_static_schema (
140
146
parsed_schema : ParsedSchema ,
141
- ) -> Result < Arc < Schema > , AnyError > {
147
+ ) -> Result < Arc < Schema > , StaticSchemaError > {
142
148
let mut schema: Vec < Arc < Field > > = Vec :: new ( ) ;
143
149
for field in parsed_schema. fields . iter ( ) {
144
150
let field = Field :: new ( field. name . clone ( ) , field. data_type . clone ( ) , field. nullable ) ;
145
151
schema. push ( Arc :: new ( field) ) ;
146
152
}
147
153
148
154
if get_field ( & schema, DEFAULT_TIMESTAMP_KEY ) . is_some ( ) {
149
- return Err ( anyhow ! (
150
- "field {} is a reserved field" ,
151
- DEFAULT_TIMESTAMP_KEY
152
- ) ) ;
155
+ return Err ( StaticSchemaError :: ReservedKey ( DEFAULT_TIMESTAMP_KEY ) ) ;
153
156
} ;
154
157
155
158
// add the p_timestamp field to the event schema to the 0th index
@@ -176,3 +179,57 @@ fn default_dict_id() -> i64 {
176
179
fn default_dict_is_ordered ( ) -> bool {
177
180
false
178
181
}
182
+
183
+ fn validate_field_names (
184
+ field_name : & str ,
185
+ existing_fields : & mut HashSet < String > ,
186
+ ) -> Result < ( ) , StaticSchemaError > {
187
+ if field_name. is_empty ( ) {
188
+ return Err ( StaticSchemaError :: EmptyFieldName ) ;
189
+ }
190
+
191
+ if !existing_fields. insert ( field_name. to_string ( ) ) {
192
+ return Err ( StaticSchemaError :: DuplicateField ( field_name. to_string ( ) ) ) ;
193
+ }
194
+
195
+ Ok ( ( ) )
196
+ }
197
+
198
+ #[ derive( Debug , thiserror:: Error ) ]
199
+ pub enum StaticSchemaError {
200
+ #[ error(
201
+ "custom partition field {0} does not exist in the schema for the static schema logstream"
202
+ ) ]
203
+ MissingCustomPartition ( String ) ,
204
+
205
+ #[ error(
206
+ "time partition field {0} does not exist in the schema for the static schema logstream"
207
+ ) ]
208
+ MissingTimePartition ( String ) ,
209
+
210
+ #[ error( "field {0:?} is a reserved field" ) ]
211
+ ReservedKey ( & ' static str ) ,
212
+
213
+ #[ error( "field name cannot be empty" ) ]
214
+ EmptyFieldName ,
215
+
216
+ #[ error( "duplicate field name: {0}" ) ]
217
+ DuplicateField ( String ) ,
218
+ }
219
+
220
+ #[ cfg( test) ]
221
+ mod tests {
222
+ use super :: * ;
223
+ #[ test]
224
+ fn empty_field_names ( ) {
225
+ let mut existing_field_names: HashSet < String > = HashSet :: new ( ) ;
226
+ assert ! ( validate_field_names( "" , & mut existing_field_names) . is_err( ) ) ;
227
+ }
228
+
229
+ #[ test]
230
+ fn duplicate_field_names ( ) {
231
+ let mut existing_field_names: HashSet < String > = HashSet :: new ( ) ;
232
+ let _ = validate_field_names ( "test_field" , & mut existing_field_names) ;
233
+ assert ! ( validate_field_names( "test_field" , & mut existing_field_names) . is_err( ) ) ;
234
+ }
235
+ }
0 commit comments