Skip to content

Commit b6bfdbc

Browse files
committed
fix(hdfs): Append another retry file when appending complete failed
1 parent 4c64a88 commit b6bfdbc

File tree

3 files changed

+238
-41
lines changed

3 files changed

+238
-41
lines changed

src/config.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl MemoryStoreConfig {
6666

6767
// =========================================================
6868

69-
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
69+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
7070
pub struct HdfsStoreConfig {
7171
#[serde(default = "as_default_max_concurrency")]
7272
pub max_concurrency: usize,
@@ -82,6 +82,16 @@ fn as_default_partition_write_max_concurrency() -> usize {
8282
20
8383
}
8484

85+
impl Default for HdfsStoreConfig {
86+
fn default() -> Self {
87+
Self {
88+
max_concurrency: as_default_max_concurrency(),
89+
partition_write_max_concurrency: as_default_partition_write_max_concurrency(),
90+
kerberos_security_config: None,
91+
}
92+
}
93+
}
94+
8595
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
8696
pub struct KerberosSecurityConfig {
8797
pub keytab_path: String,

src/semaphore_with_index.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ impl<'a> SemaphorePermitWithIndex<'a> {
2424
impl<'a> Drop for SemaphorePermitWithIndex<'a> {
2525
fn drop(&mut self) {
2626
let mut index_container = self.index_ref.lock();
27-
index_container.push_back(self.index);
27+
index_container.push_front(self.index);
2828
}
2929
}
3030

@@ -59,7 +59,6 @@ impl SemaphoreWithIndex {
5959
#[cfg(test)]
6060
mod test {
6161
use crate::semaphore_with_index::SemaphoreWithIndex;
62-
use std::sync::atomic::Ordering::SeqCst;
6362

6463
#[tokio::test]
6564
async fn test() -> anyhow::Result<()> {

0 commit comments

Comments
 (0)