Skip to content

Commit 480e30c

Browse files
committed
feat(hdfs): Print more logs on fs failure operations
1 parent 3c59aed commit 480e30c

File tree

2 files changed

+50
-6
lines changed

2 files changed

+50
-6
lines changed

src/error.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ pub enum WorkerError {
8585

8686
#[error("urpc stream message type not found")]
8787
STREAM_MESSAGE_TYPE_NOT_FOUND,
88+
89+
#[error("{0}. error: {1}")]
90+
HDFS_IO_ERROR(String, anyhow::Error),
8891
}
8992

9093
impl From<AcquireError> for WorkerError {
@@ -114,12 +117,21 @@ impl From<FromUtf8Error> for WorkerError {
114117
#[cfg(test)]
115118
mod tests {
116119

117-
use anyhow::Result;
120+
use crate::error::WorkerError::HDFS_IO_ERROR;
121+
use anyhow::{Error, Result};
118122

119123
#[test]
120124
pub fn error_test() -> Result<()> {
121125
// bail macro means it will return directly.
122126
// bail!(WorkerError::APP_PURGE_EVENT_SEND_ERROR("error_test_app_id".into(), None));
123127
Ok(())
124128
}
129+
130+
#[test]
131+
pub fn hdfs_io_test() -> Result<()> {
132+
let e = Error::from(std::io::Error::new(std::io::ErrorKind::Other, "oh no!"));
133+
let raw = format!("{}", HDFS_IO_ERROR("".to_string(), e));
134+
assert_eq!(". error: oh no!", raw);
135+
Ok(())
136+
}
125137
}

src/store/hdfs.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,15 +184,39 @@ impl HdfsStore {
184184
let parent_path_str = format!("{}/", parent_dir.to_str().unwrap());
185185
debug!("creating dir: {}", parent_path_str.as_str());
186186

187-
&filesystem.create_dir(parent_path_str.as_str()).await?;
187+
&filesystem
188+
.create_dir(parent_path_str.as_str())
189+
.await
190+
.map_err(|e| {
191+
error!("Errors on creating dir of {}", parent_path_str.as_str());
192+
e
193+
})?;
188194

189195
let data_file_complete_path = format!("{}_{}.data", &data_file_path_prefix, 0);
190196
let index_file_complete_path =
191197
format!("{}_{}.index", &index_file_path_prefix, 0);
192198

193199
// setup the file
194-
&filesystem.touch(&data_file_complete_path).await?;
195-
&filesystem.touch(&index_file_complete_path).await?;
200+
&filesystem
201+
.touch(&data_file_complete_path)
202+
.await
203+
.map_err(|e| {
204+
error!(
205+
"Errors on touching file of {}",
206+
data_file_complete_path.as_str()
207+
);
208+
e
209+
})?;
210+
&filesystem
211+
.touch(&index_file_complete_path)
212+
.await
213+
.map_err(|e| {
214+
error!(
215+
"Errors on touching file of {}",
216+
index_file_complete_path.as_str()
217+
);
218+
e
219+
})?;
196220

197221
self.partition_cached_meta
198222
.insert(data_file_path_prefix.to_owned(), Default::default());
@@ -290,7 +314,11 @@ impl HdfsStore {
290314
"hdfs writing [data] with {} bytes. path: {}",
291315
data_len, &data_file_path
292316
))
293-
.await?;
317+
.await
318+
.map_err(|e| {
319+
error!("Errors on appending data into path: {}", &data_file_path);
320+
e
321+
})?;
294322
let index_bytes = index_bytes_holder.freeze();
295323
let index_len = index_bytes.len();
296324
filesystem
@@ -299,7 +327,11 @@ impl HdfsStore {
299327
"hdfs writing [index] with {} bytes. path: {}",
300328
index_len, &index_file_path
301329
))
302-
.await?;
330+
.await
331+
.map_err(|e| {
332+
error!("Errors on appending index into path: {}", &index_file_path);
333+
e
334+
})?;
303335
Ok(())
304336
}
305337
}

0 commit comments

Comments
 (0)