@@ -19,11 +19,10 @@ use std::fmt::Debug;
1919use std:: fmt:: Formatter ;
2020use std:: sync:: Arc ;
2121
22- use hdfs_native:: HdfsError ;
23- use hdfs_native:: WriteOptions ;
2422use log:: debug;
2523
2624use super :: HDFS_NATIVE_SCHEME ;
25+ use super :: core:: HdfsNativeCore ;
2726use super :: delete:: HdfsNativeDeleter ;
2827use super :: error:: parse_hdfs_error;
2928use super :: lister:: HdfsNativeLister ;
@@ -110,9 +109,37 @@ impl Builder for HdfsNativeBuilder {
110109
111110 // need to check if root dir exists, create if not
112111 Ok ( HdfsNativeBackend {
113- root,
114- client : Arc :: new ( client) ,
115- enable_append : self . config . enable_append ,
112+ core : Arc :: new ( HdfsNativeCore {
113+ info : {
114+ let am = AccessorInfo :: default ( ) ;
115+ am. set_scheme ( HDFS_NATIVE_SCHEME )
116+ . set_root ( & root)
117+ . set_native_capability ( Capability {
118+ stat : true ,
119+
120+ read : true ,
121+
122+ write : true ,
123+ write_can_append : self . config . enable_append ,
124+
125+ create_dir : true ,
126+ delete : true ,
127+
128+ list : true ,
129+
130+ rename : true ,
131+
132+ shared : true ,
133+
134+ ..Default :: default ( )
135+ } ) ;
136+
137+ am. into ( )
138+ } ,
139+ root,
140+ client : Arc :: new ( client) ,
141+ enable_append : self . config . enable_append ,
142+ } ) ,
116143 } )
117144 }
118145}
@@ -128,209 +155,67 @@ impl Builder for HdfsNativeBuilder {
128155/// Backend for hdfs-native services.
129156#[ derive( Debug , Clone ) ]
130157pub struct HdfsNativeBackend {
131- pub root : String ,
132- pub client : Arc < hdfs_native:: Client > ,
133- enable_append : bool ,
158+ core : Arc < HdfsNativeCore > ,
134159}
135160
136- /// hdfs_native::Client is thread-safe.
137- unsafe impl Send for HdfsNativeBackend { }
138- unsafe impl Sync for HdfsNativeBackend { }
139-
140161impl Access for HdfsNativeBackend {
141162 type Reader = HdfsNativeReader ;
142163 type Writer = HdfsNativeWriter ;
143164 type Lister = Option < HdfsNativeLister > ;
144165 type Deleter = oio:: OneShotDeleter < HdfsNativeDeleter > ;
145166
146167 fn info ( & self ) -> Arc < AccessorInfo > {
147- let am = AccessorInfo :: default ( ) ;
148- am. set_scheme ( HDFS_NATIVE_SCHEME )
149- . set_root ( & self . root )
150- . set_native_capability ( Capability {
151- stat : true ,
152-
153- read : true ,
154-
155- write : true ,
156- write_can_append : self . enable_append ,
157-
158- create_dir : true ,
159- delete : true ,
160-
161- list : true ,
162-
163- rename : true ,
164-
165- shared : true ,
166-
167- ..Default :: default ( )
168- } ) ;
169-
170- am. into ( )
168+ self . core . info . clone ( )
171169 }
172170
173171 async fn create_dir ( & self , path : & str , _args : OpCreateDir ) -> Result < RpCreateDir > {
174- let p = build_rooted_abs_path ( & self . root , path) ;
175-
176- self . client
177- . mkdirs ( & p, 0o777 , true )
178- . await
179- . map_err ( parse_hdfs_error) ?;
180-
172+ self . core . hdfs_create_dir ( path) . await ?;
181173 Ok ( RpCreateDir :: default ( ) )
182174 }
183175
184176 async fn stat ( & self , path : & str , _args : OpStat ) -> Result < RpStat > {
185- let p = build_rooted_abs_path ( & self . root , path) ;
186-
187- let status: hdfs_native:: client:: FileStatus = self
188- . client
189- . get_file_info ( & p)
190- . await
191- . map_err ( parse_hdfs_error) ?;
192-
193- let mode = if status. isdir {
194- EntryMode :: DIR
195- } else {
196- EntryMode :: FILE
197- } ;
198-
199- let mut metadata = Metadata :: new ( mode) ;
200- metadata
201- . set_last_modified ( Timestamp :: from_millisecond (
202- status. modification_time as i64 ,
203- ) ?)
204- . set_content_length ( status. length as u64 ) ;
205-
206- Ok ( RpStat :: new ( metadata) )
177+ let m = self . core . hdfs_stat ( path) . await ?;
178+ Ok ( RpStat :: new ( m) )
207179 }
208180
209181 async fn read ( & self , path : & str , args : OpRead ) -> Result < ( RpRead , Self :: Reader ) > {
210- let p = build_rooted_abs_path ( & self . root , path ) ;
182+ let ( f , offset , size ) = self . core . hdfs_read ( path , & args ) . await ? ;
211183
212- let f = self . client . read ( & p) . await . map_err ( parse_hdfs_error) ?;
213-
214- let r = HdfsNativeReader :: new (
215- f,
216- args. range ( ) . offset ( ) as _ ,
217- args. range ( ) . size ( ) . unwrap_or ( u64:: MAX ) as _ ,
218- ) ;
184+ let r = HdfsNativeReader :: new ( f, offset as _ , size as _ ) ;
219185
220186 Ok ( ( RpRead :: new ( ) , r) )
221187 }
222188
223189 async fn write ( & self , path : & str , args : OpWrite ) -> Result < ( RpWrite , Self :: Writer ) > {
224- let target_path = build_rooted_abs_path ( & self . root , path) ;
225- let mut initial_size = 0 ;
226-
227- let target_exists = match self . client . get_file_info ( & target_path) . await {
228- Ok ( status) => {
229- initial_size = status. length as u64 ;
230- true
231- }
232- Err ( err) => match & err {
233- HdfsError :: FileNotFound ( _) => false ,
234- _ => return Err ( parse_hdfs_error ( err) ) ,
235- } ,
236- } ;
237-
238- let f = if target_exists {
239- if args. append ( ) {
240- assert ! ( self . enable_append, "append is not enabled" ) ;
241- self . client
242- . append ( & target_path)
243- . await
244- . map_err ( parse_hdfs_error) ?
245- } else {
246- initial_size = 0 ;
247- self . client
248- . create ( & target_path, WriteOptions :: default ( ) . overwrite ( true ) )
249- . await
250- . map_err ( parse_hdfs_error) ?
251- }
252- } else {
253- initial_size = 0 ;
254- self . client
255- . create ( & target_path, WriteOptions :: default ( ) )
256- . await
257- . map_err ( parse_hdfs_error) ?
258- } ;
190+ let ( f, initial_size) = self . core . hdfs_write ( path, & args) . await ?;
259191
260192 Ok ( ( RpWrite :: new ( ) , HdfsNativeWriter :: new ( f, initial_size) ) )
261193 }
262194
263195 async fn delete ( & self ) -> Result < ( RpDelete , Self :: Deleter ) > {
264196 Ok ( (
265197 RpDelete :: default ( ) ,
266- oio:: OneShotDeleter :: new ( HdfsNativeDeleter :: new ( Arc :: new ( self . clone ( ) ) ) ) ,
198+ oio:: OneShotDeleter :: new ( HdfsNativeDeleter :: new ( Arc :: clone ( & self . core ) ) ) ,
267199 ) )
268200 }
269201
270202 async fn list ( & self , path : & str , _args : OpList ) -> Result < ( RpList , Self :: Lister ) > {
271- let p: String = build_rooted_abs_path ( & self . root , path) ;
272-
273- let isdir = match self . client . get_file_info ( & p) . await {
274- Ok ( status) => status. isdir ,
275- Err ( err) => {
276- return match & err {
277- HdfsError :: FileNotFound ( _) => Ok ( ( RpList :: default ( ) , None ) ) ,
278- _ => Err ( parse_hdfs_error ( err) ) ,
279- } ;
280- }
281- } ;
282- let current_path = if isdir {
283- if !path. ends_with ( "/" ) {
284- Some ( path. to_string ( ) + "/" )
285- } else {
286- Some ( path. to_string ( ) )
287- }
288- } else {
289- None
290- } ;
291-
292- Ok ( (
293- RpList :: default ( ) ,
294- Some ( HdfsNativeLister :: new (
295- & self . root ,
296- & self . client ,
297- & p,
298- current_path,
203+ match self . core . hdfs_list ( path) . await ? {
204+ Some ( ( p, current_path) ) => Ok ( (
205+ RpList :: default ( ) ,
206+ Some ( HdfsNativeLister :: new (
207+ & self . core . root ,
208+ & self . core . client ,
209+ & p,
210+ current_path,
211+ ) ) ,
299212 ) ) ,
300- ) )
213+ None => Ok ( ( RpList :: default ( ) , None ) ) ,
214+ }
301215 }
302216
303217 async fn rename ( & self , from : & str , to : & str , _args : OpRename ) -> Result < RpRename > {
304- let from_path = build_rooted_abs_path ( & self . root , from) ;
305- let to_path = build_rooted_abs_path ( & self . root , to) ;
306- match self . client . get_file_info ( & to_path) . await {
307- Ok ( status) => {
308- if status. isdir {
309- return Err ( Error :: new ( ErrorKind :: IsADirectory , "path should be a file" )
310- . with_context ( "input" , & to_path) ) ;
311- } else {
312- self . client
313- . delete ( & to_path, true )
314- . await
315- . map_err ( parse_hdfs_error) ?;
316- }
317- }
318- Err ( err) => match & err {
319- HdfsError :: FileNotFound ( _) => {
320- self . client
321- . create ( & to_path, WriteOptions :: default ( ) . create_parent ( true ) )
322- . await
323- . map_err ( parse_hdfs_error) ?;
324- }
325- _ => return Err ( parse_hdfs_error ( err) ) ,
326- } ,
327- } ;
328-
329- self . client
330- . rename ( & from_path, & to_path, true )
331- . await
332- . map_err ( parse_hdfs_error) ?;
333-
218+ self . core . hdfs_rename ( from, to) . await ?;
334219 Ok ( RpRename :: default ( ) )
335220 }
336221}
0 commit comments