@@ -142,6 +142,7 @@ impl<D: CsvDecoder + Clone> FileSource for ParallelCsvSource<D> {
142142 Ok ( Arc :: new ( ParallelCsvOpener {
143143 schema : self . schema . clone ( ) ,
144144 batch_size : self . batch_size ,
145+ boundaries : self . boundaries . clone ( ) ,
145146 state : self . state . clone ( ) ,
146147 decoder : Arc :: new ( self . decoder . clone ( ) ) ,
147148 object_store,
@@ -189,9 +190,12 @@ impl<D: CsvDecoder + Clone> fmt::Debug for ParallelCsvSource<D> {
189190 }
190191}
191192
193+ const PADDING_BYTES : usize = 64 * 1024 ;
194+
192195struct ParallelCsvOpener {
193196 schema : SchemaRef ,
194197 batch_size : usize ,
198+ boundaries : Arc < [ usize ] > ,
195199 state : Arc < SharedState > ,
196200 decoder : Arc < dyn CsvDecoder > ,
197201 object_store : Arc < dyn ObjectStore > ,
@@ -205,6 +209,7 @@ impl FileOpener for ParallelCsvOpener {
205209 let state = self . state . clone ( ) ;
206210 let schema = self . schema . clone ( ) ;
207211 let batch_size = self . batch_size ;
212+ let boundaries = self . boundaries . clone ( ) ;
208213 let decoder = self . decoder . clone ( ) ;
209214 let object_store = self . object_store . clone ( ) ;
210215 let file_path = self . file_path . clone ( ) ;
@@ -214,26 +219,35 @@ impl FileOpener for ParallelCsvOpener {
214219 coord. partition_range ( partition_idx)
215220 } ;
216221
222+ let num_partitions = boundaries. len ( ) - 1 ;
223+ let file_end = boundaries[ num_partitions] ;
224+
225+ let fetch_start = start. saturating_sub ( PADDING_BYTES ) ;
226+ let fetch_end = ( end + PADDING_BYTES ) . min ( file_end) ;
227+
217228 Ok ( Box :: pin ( async move {
218- // phase 1: read byte range and scan for quote parity
229+ // phase 1: fetch a range with some padding on both sides
219230 let result = object_store
220231 . get_opts (
221232 & file_path,
222- GetOptions :: new ( ) . with_range ( Some ( start as u64 ..end as u64 ) ) ,
233+ GetOptions :: new ( ) . with_range ( Some ( fetch_start as u64 ..fetch_end as u64 ) ) ,
223234 )
224235 . await
225236 . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?;
226237
227- let bytes = result
238+ let full_bytes = result
228239 . bytes ( )
229240 . await
230241 . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?;
231242
243+ // phase 2: classify only the core [start, end) range
244+ let core_offset = start - fetch_start;
245+ let core_len = end - start;
232246 let classify_result = {
233- let bytes = bytes . clone ( ) ;
247+ let core_bytes = full_bytes . slice ( core_offset..core_offset + core_len ) ;
234248 tokio:: task:: spawn_blocking ( move || {
235- let mut scanner = SubPartition :: new ( 64 * 1024 ) ;
236- scanner. ingest ( & bytes ) ;
249+ let mut scanner = SubPartition :: new ( PADDING_BYTES ) ;
250+ scanner. ingest ( & core_bytes ) ;
237251 scanner. finish ( )
238252 } )
239253 . await
@@ -244,41 +258,40 @@ impl FileOpener for ParallelCsvOpener {
244258 let mut results = state. classify_results . lock ( ) . unwrap ( ) ;
245259 results[ partition_idx] = Some ( classify_result) ;
246260 }
261+
247262 state. barrier . wait ( ) . await ;
248263
249- // phase 2 : resolve quotes and produce aligned ranges
264+ // phase 3 : resolve quotes and produce aligned ranges
250265 let resolved = state
251266 . resolved
252267 . get_or_init ( || async {
253- let mut coordinator = state. coordinator . lock ( ) . unwrap ( ) ;
268+ let mut resolver = state. coordinator . lock ( ) . unwrap ( ) ;
254269 {
255270 let mut results = state. classify_results . lock ( ) . unwrap ( ) ;
256271 for ( i, result) in results. iter_mut ( ) . enumerate ( ) {
257- coordinator . submit ( i, result. take ( ) . unwrap ( ) ) ;
272+ resolver . submit ( i, result. take ( ) . unwrap ( ) ) ;
258273 }
259274 }
260- coordinator . resolve ( )
275+ resolver . resolve ( )
261276 } )
262277 . await ;
263278
264- // phase 3: read aligned range from object store, decode
279+ // phase 4: slice to get the aligned range
265280 let split = & resolved[ partition_idx] ;
266-
267- // note: this is the second object store request per parallel branch :P
268- let opts = GetOptions :: new ( ) . with_range ( Some ( split. start as u64 ..split. end as u64 ) ) ;
269- let result = object_store
270- . get_opts ( & file_path, opts)
271- . await
272- . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?;
273-
274- let aligned_bytes = result
275- . bytes ( )
276- . await
277- . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?;
278-
279- let batches = decoder
280- . decode ( schema. clone ( ) , batch_size, & aligned_bytes)
281- . map_err ( DataFusionError :: from) ?;
281+ let local_start = split
282+ . start
283+ . checked_sub ( fetch_start)
284+ . expect ( "resolved start before fetch_start" ) ;
285+ let local_end = split. end - fetch_start;
286+
287+ let aligned_bytes = full_bytes. slice ( local_start..local_end) ;
288+
289+ let batches = tokio:: task:: spawn_blocking ( move || {
290+ decoder. decode ( schema, batch_size, & aligned_bytes)
291+ } )
292+ . await
293+ . map_err ( |e| DataFusionError :: External ( Box :: new ( e) ) ) ?
294+ . map_err ( DataFusionError :: from) ?;
282295
283296 let stream = futures:: stream:: iter ( batches. into_iter ( ) . map ( Ok :: < _ , DataFusionError > ) ) ;
284297
0 commit comments