1- use futures:: stream:: TryStreamExt ;
21use futures_util:: stream:: StreamExt ;
2+ use warp:: http:: HeaderMap ;
3+ use futures:: stream:: TryStreamExt ;
34use mpart_async:: server:: MultipartStream ;
45use std:: convert:: Infallible ;
56use std:: path:: { Path , PathBuf } ;
67use std:: sync:: Arc ;
7- use warp:: http:: HeaderMap ;
88
9- use super :: parse_auth_headers;
10- use super :: server_state:: ServerState ;
11- use super :: user_session:: { org_authz_with_default, AuthzError , AuthzTopic } ;
129use crate :: video_pipeline:: IncomingFile ;
1310use crate :: video_pipeline:: TranscodePreference ;
11+ use super :: parse_auth_headers;
12+ use super :: server_state:: ServerState ;
13+ use super :: user_session:: { org_authz_with_default, AuthzTopic , AuthzError } ;
1414
1515use lib_clapshot_grpc:: proto;
1616use proto:: org:: authz_user_action_request as authz_req;
1717
18+
1819/// Warp filter for multipart/form-data file upload
1920///
2021/// # Arguments
@@ -30,66 +31,45 @@ pub async fn handle_multipart_upload(
3031 mime : mime:: Mime ,
3132 hdrs : HeaderMap ,
3233 server : ServerState ,
33- body : impl warp:: Stream < Item = Result < impl bytes:: Buf , warp:: Error > > + Unpin ,
34- ) -> Result < warp:: reply:: WithStatus < String > , Infallible > {
35- let ( user_id , user_name , is_admin , mut cookies , filtered_headers , remote_error ) =
36- parse_auth_headers ( & hdrs, & server. default_user , & server. org_http_headers_regex ) ;
34+ body : impl warp:: Stream < Item = Result < impl bytes:: Buf , warp:: Error > > + Unpin )
35+ -> Result < warp:: reply:: WithStatus < String > , Infallible >
36+ {
37+ let ( user_id , user_name , is_admin , mut cookies , filtered_headers , remote_error ) = parse_auth_headers ( & hdrs, & server. default_user , & server. org_http_headers_regex ) ;
3738
3839 // If X-Remote-Error is set, return error response
3940 if let Some ( error_msg) = remote_error {
4041 return Ok ( warp:: reply:: with_status (
4142 format ! ( "Authentication Error: {}" , error_msg) ,
42- warp:: http:: StatusCode :: FORBIDDEN ,
43+ warp:: http:: StatusCode :: FORBIDDEN
4344 ) ) ;
4445 }
4546
4647 // Check from organizer if user is allowed to upload.
4748 // Allow by default if organizer is not configured or doesn't care.
4849 if let Some ( uri) = & server. organizer_uri {
49- if server
50- . organizer_has_connected
51- . load ( std:: sync:: atomic:: Ordering :: Relaxed )
52- {
50+ if server. organizer_has_connected . load ( std:: sync:: atomic:: Ordering :: Relaxed ) {
5351 let organizer = match crate :: grpc:: grpc_client:: connect ( uri. clone ( ) ) . await {
5452 Ok ( c) => Arc :: new ( tokio:: sync:: Mutex :: new ( c) ) ,
5553 Err ( e) => {
5654 tracing:: error!( "Failed to connect to organizer: {}" , e) ;
57- return Ok ( warp:: reply:: with_status (
58- "Internal error: failed to connect to organizer" . into ( ) ,
59- warp:: http:: StatusCode :: INTERNAL_SERVER_ERROR ,
60- ) ) ;
55+ return Ok ( warp:: reply:: with_status ( "Internal error: failed to connect to organizer" . into ( ) , warp:: http:: StatusCode :: INTERNAL_SERVER_ERROR ) ) ;
6156 }
6257 } ;
6358
6459 let org_session = proto:: org:: UserSessionData {
6560 sid : "<upload--not-set>" . to_string ( ) ,
66- user : Some ( proto:: UserInfo {
67- id : user_id. clone ( ) ,
68- name : user_name. clone ( ) ,
69- } ) ,
61+ user : Some ( proto:: UserInfo { id : user_id. clone ( ) , name : user_name. clone ( ) } ) ,
7062 is_admin,
7163 cookies : cookies. clone ( ) ,
7264 http_headers : filtered_headers,
7365 } ;
7466
75- match org_authz_with_default (
76- & org_session,
77- "upload media file" ,
78- true ,
79- & server,
80- & Some ( organizer) ,
81- true ,
82- AuthzTopic :: Other ( None , authz_req:: other_op:: Op :: UploadMediaFile ) ,
83- )
84- . await
85- {
86- Ok ( _) => { }
67+ match org_authz_with_default ( & org_session, "upload media file" , true , & server, & Some ( organizer) ,
68+ true , AuthzTopic :: Other ( None , authz_req:: other_op:: Op :: UploadMediaFile ) ) . await {
69+ Ok ( _) => { } ,
8770 Err ( AuthzError :: Denied ) => {
88- return Ok ( warp:: reply:: with_status (
89- "Permission denied" . into ( ) ,
90- warp:: http:: StatusCode :: FORBIDDEN ,
91- ) ) ;
92- }
71+ return Ok ( warp:: reply:: with_status ( "Permission denied" . into ( ) , warp:: http:: StatusCode :: FORBIDDEN ) ) ;
72+ } ,
9373 }
9474 }
9575 }
@@ -114,17 +94,9 @@ pub async fn handle_multipart_upload(
11494 let boundary = mime. get_param ( "boundary" ) . map ( |v| v. to_string ( ) ) ;
11595 let boundary = match boundary {
11696 Some ( b) => b,
117- None => {
118- return Ok ( warp:: reply:: with_status (
119- "Missing boundary" . into ( ) ,
120- warp:: http:: StatusCode :: BAD_REQUEST ,
121- ) )
122- }
97+ None => return Ok ( warp:: reply:: with_status ( "Missing boundary" . into ( ) , warp:: http:: StatusCode :: BAD_REQUEST ) ) ,
12398 } ;
124- let mut stream = MultipartStream :: new (
125- boundary,
126- body. map_ok ( |mut buf| buf. copy_to_bytes ( buf. remaining ( ) ) ) ,
127- ) ;
99+ let mut stream = MultipartStream :: new ( boundary, body. map_ok ( |mut buf| buf. copy_to_bytes ( buf. remaining ( ) ) ) ) ;
128100 let mut uploaded_file: PathBuf = PathBuf :: new ( ) ;
129101
130102 while let Ok ( Some ( mut field) ) = stream. try_next ( ) . await {
@@ -134,79 +106,55 @@ pub async fn handle_multipart_upload(
134106 Err ( e) => {
135107 let msg = format ! ( "Error getting filename: {}" , e) ;
136108 tracing:: error!( msg) ;
137- return Ok ( warp:: reply:: with_status (
138- msg,
139- warp:: http:: StatusCode :: BAD_REQUEST ,
140- ) ) ;
141- }
142- Ok ( filename) => {
109+ return Ok ( warp:: reply:: with_status ( msg, warp:: http:: StatusCode :: BAD_REQUEST ) ) ;
110+ } ,
111+ Ok ( filename) =>
112+ {
143113 let path = Path :: new ( & filename) ;
144114 if path. file_name ( ) != Some ( path. as_os_str ( ) ) {
145- return Ok ( warp:: reply:: with_status (
146- "Filename must not contain path" . into ( ) ,
147- warp:: http:: StatusCode :: BAD_REQUEST ,
148- ) ) ;
115+ return Ok ( warp:: reply:: with_status ( "Filename must not contain path" . into ( ) , warp:: http:: StatusCode :: BAD_REQUEST ) ) ;
149116 }
150117
151118 // Make a unique upload dir
152119 let uuid = uuid:: Uuid :: new_v4 ( ) ;
153- let new_dir =
154- async_std:: path:: PathBuf :: from ( & upload_dir) . join ( uuid. to_string ( ) ) ;
155- let dst = new_dir. join ( path. file_name ( ) . unwrap ( ) ) ;
120+ let new_dir = async_std:: path:: PathBuf :: from ( & upload_dir) . join ( uuid. to_string ( ) ) ;
121+ let dst = new_dir. join ( path. file_name ( ) . unwrap ( ) ) ;
156122 if dst. exists ( ) . await {
157123 tracing:: error!( "Upload dst '{}' already exists, even tough it was prefixed with uuid4. Bug??" , dst. display( ) ) ;
158- return Ok ( warp:: reply:: with_status (
159- "Internal error: file already exists" . into ( ) ,
160- warp:: http:: StatusCode :: INTERNAL_SERVER_ERROR ,
161- ) ) ;
124+ return Ok ( warp:: reply:: with_status ( "Internal error: file already exists" . into ( ) , warp:: http:: StatusCode :: INTERNAL_SERVER_ERROR ) ) ;
162125 }
163126 if let Err ( e) = async_std:: fs:: create_dir_all ( & new_dir) . await {
164127 tracing:: error!( "Failed to create upload dir: {}" , e) ;
165- return Ok ( warp:: reply:: with_status (
166- "Internal error: failed to create upload dir" . into ( ) ,
167- warp:: http:: StatusCode :: INTERNAL_SERVER_ERROR ,
168- ) ) ;
128+ return Ok ( warp:: reply:: with_status ( "Internal error: failed to create upload dir" . into ( ) , warp:: http:: StatusCode :: INTERNAL_SERVER_ERROR ) ) ;
169129 }
170130
171131 // Create the file and stream the data into it
172132 match async_std:: fs:: File :: create ( & dst) . await {
173133 Err ( e) => {
174- let msg =
175- format ! ( "Failed to create file '{}': {}" , dst. display( ) , e) ;
134+ let msg = format ! ( "Failed to create file '{}': {}" , dst. display( ) , e) ;
176135 tracing:: error!( msg) ;
177- return Ok ( warp:: reply:: with_status (
178- msg,
179- warp:: http:: StatusCode :: INTERNAL_SERVER_ERROR ,
180- ) ) ;
181- }
182- Ok ( mut f) => {
136+ return Ok ( warp:: reply:: with_status ( msg, warp:: http:: StatusCode :: INTERNAL_SERVER_ERROR ) ) ;
137+ } ,
138+ Ok ( mut f) =>
139+ {
183140 // Read and write in parallel
184- let ( buff_tx, mut buff_rx) =
185- tokio:: sync:: mpsc:: channel :: < bytes:: Bytes > ( 16 ) ;
141+ let ( buff_tx, mut buff_rx) = tokio:: sync:: mpsc:: channel :: < bytes:: Bytes > ( 16 ) ;
186142
187143 // Read chunks from HTTP
188144 let read_all_chunks = async move {
189145 while let Some ( chunk) = field. next ( ) . await {
190146 match chunk {
191- Ok ( data) => {
192- buff_tx. send ( data) . await . unwrap ( ) ;
193- }
194- Err ( e) => {
195- return Err ( e. to_string ( ) ) ;
196- }
197- }
198- }
199- Ok ( ( ) ) // buff_tx dropped
147+ Ok ( data) => { buff_tx. send ( data) . await . unwrap ( ) ; } ,
148+ Err ( e) => { return Err ( e. to_string ( ) ) ; }
149+ } } ; Ok ( ( ) ) // buff_tx dropped
200150 } ;
201151
202152 // Write chunks to the file
203153 let write_all_chunks = async move {
204154 while let Some ( data) = buff_rx. recv ( ) . await {
205- futures_util:: AsyncWriteExt :: write_all ( & mut f, & data)
206- . await
155+ futures_util:: AsyncWriteExt :: write_all ( & mut f, & data) . await
207156 . map_err ( |e| e. to_string ( ) ) ?;
208- }
209- Ok ( ( ) )
157+ } ; Ok ( ( ) )
210158 } ;
211159
212160 // Run both tasks in parallel, cleanup on error
@@ -215,49 +163,28 @@ pub async fn handle_multipart_upload(
215163 tracing:: error!( "Upload failed: {}" , e) ;
216164 // Remove the file & dir, since it's incomplete
217165 if let Err ( e) = async_std:: fs:: remove_file ( & dst) . await {
218- tracing:: warn!(
219- "Failed to remove incomplete upload file: {}" ,
220- e
221- ) ;
222- } else if let Err ( e) = async_std:: fs:: remove_dir ( new_dir) . await
223- {
224- tracing:: warn!(
225- "Failed to remove incomplete upload dir: {}" ,
226- e
227- ) ;
166+ tracing:: warn!( "Failed to remove incomplete upload file: {}" , e) ;
167+ } else if let Err ( e) = async_std:: fs:: remove_dir ( new_dir) . await {
168+ tracing:: warn!( "Failed to remove incomplete upload dir: {}" , e) ;
228169 }
229- return Ok ( warp:: reply:: with_status (
230- format ! ( "Upload failed: {e}" ) ,
231- warp:: http:: StatusCode :: BAD_REQUEST ,
232- ) ) ;
170+ return Ok ( warp:: reply:: with_status ( format ! ( "Upload failed: {e}" ) , warp:: http:: StatusCode :: BAD_REQUEST ) ) ;
233171 }
234- tracing:: info!( dst = dst. display( ) . to_string( ) , "File uploaded." ) ;
172+ tracing:: info!( dst= dst. display( ) . to_string( ) , "File uploaded." ) ;
235173 uploaded_file = dst. into ( ) ;
236174 }
237175 } ;
238176 }
239177 }
240- }
178+ } ,
241179 fieldname => {
242180 tracing:: info!( "Skipping UNKNOWN multipart POST field '{fieldname}'" ) ;
243- }
181+ } ,
244182 }
245183 }
246184
247- if let Err ( e) = upload_done. send ( IncomingFile {
248- file_path : uploaded_file,
249- user_id,
250- cookies,
251- transcode_preference,
252- } ) {
185+ if let Err ( e) = upload_done. send ( IncomingFile { file_path : uploaded_file, user_id, cookies, transcode_preference } ) {
253186 tracing:: error!( "Failed to send upload ok signal: {:?}" , e) ;
254- return Ok ( warp:: reply:: with_status (
255- "Internal error: failed to send upload ok signal" . into ( ) ,
256- warp:: http:: StatusCode :: INTERNAL_SERVER_ERROR ,
257- ) ) ;
187+ return Ok ( warp:: reply:: with_status ( "Internal error: failed to send upload ok signal" . into ( ) , warp:: http:: StatusCode :: INTERNAL_SERVER_ERROR ) ) ;
258188 }
259- Ok ( warp:: reply:: with_status (
260- "Ok" . into ( ) ,
261- warp:: http:: StatusCode :: OK ,
262- ) )
189+ Ok ( warp:: reply:: with_status ( "Ok" . into ( ) , warp:: http:: StatusCode :: OK ) )
263190}
0 commit comments