Skip to content

Commit a042f7e

Browse files
committed
fix(hdfs): Touch the file when creating another retry attempt append file
1 parent 3643296 commit a042f7e

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

src/store/hdfs.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ impl HdfsStore {
240240
info!("Writing path: {}", &data_file_path);
241241
match self
242242
.write_data_and_index(
243-
filesystem,
243+
&filesystem,
244244
&data_file_path,
245245
data_bytes_holder,
246246
&index_file_path,
@@ -251,6 +251,14 @@ impl HdfsStore {
251251
Err(e) => {
252252
partition_cached_meta.reset_offset(0);
253253
partition_cached_meta.inc_retry_time();
254+
let retry_time = partition_cached_meta.retry_time;
255+
drop(partition_cached_meta);
256+
257+
let data_file_path = format!("{}_{}.data", &data_file_path_prefix, retry_time);
258+
let index_file_path = format!("{}_{}.index", &index_file_path_prefix, retry_time);
259+
filesystem.touch(&data_file_path).await?;
260+
filesystem.touch(&index_file_path).await?;
261+
254262
error!("Errors on appending data into path: {}", &data_file_path);
255263
return Err(Other(e.into()));
256264
}
@@ -265,7 +273,7 @@ impl HdfsStore {
265273

266274
async fn write_data_and_index(
267275
&self,
268-
filesystem: Arc<Box<dyn HdfsDelegator>>,
276+
filesystem: &Arc<Box<dyn HdfsDelegator>>,
269277
data_file_path: &String,
270278
data_bytes_holder: BytesMut,
271279
index_file_path: &String,

0 commit comments

Comments
 (0)