1
- use crate :: dapr:: dapr:: proto:: { common:: v1 as common_v1, runtime:: v1 as dapr_v1} ;
2
- use prost_types:: Any ;
3
1
use std:: collections:: HashMap ;
4
- use tonic:: Streaming ;
5
- use tonic:: { transport:: Channel as TonicChannel , Request } ;
6
2
7
- use crate :: error:: Error ;
8
3
use async_trait:: async_trait;
4
+ use futures:: StreamExt ;
5
+ use prost_types:: Any ;
9
6
use serde:: { Deserialize , Serialize } ;
7
+ use tokio:: io:: AsyncRead ;
8
+ use tonic:: codegen:: tokio_stream;
9
+ use tonic:: { transport:: Channel as TonicChannel , Request } ;
10
+ use tonic:: { Status , Streaming } ;
11
+
12
+ use crate :: dapr:: dapr:: proto:: { common:: v1 as common_v1, runtime:: v1 as dapr_v1} ;
13
+ use crate :: error:: Error ;
10
14
11
15
#[ derive( Clone ) ]
12
16
pub struct Client < T > ( T ) ;
@@ -379,6 +383,78 @@ impl<T: DaprInterface> Client<T> {
379
383
} ;
380
384
self . 0 . unsubscribe_configuration ( request) . await
381
385
}
386
+
387
+ /// Encrypt binary data using Dapr. returns Vec<StreamPayload> to be used in decrypt method
388
+ ///
389
+ /// # Arguments
390
+ ///
391
+ /// * `payload` - ReaderStream to the data to encrypt
392
+ /// * `request_option` - Encryption request options.
393
+ pub async fn encrypt < R > (
394
+ & mut self ,
395
+ payload : ReaderStream < R > ,
396
+ request_options : EncryptRequestOptions ,
397
+ ) -> Result < Vec < StreamPayload > , Status >
398
+ where
399
+ R : AsyncRead + Send ,
400
+ {
401
+ // have to have it as a reference for the async move below
402
+ let request_options = & Some ( request_options) ;
403
+ let requested_items: Vec < EncryptRequest > = payload
404
+ . 0
405
+ . enumerate ( )
406
+ . fold ( vec ! [ ] , |mut init, ( i, bytes) | async move {
407
+ let stream_payload = StreamPayload {
408
+ data : bytes. unwrap ( ) . to_vec ( ) ,
409
+ seq : 0 ,
410
+ } ;
411
+ if i == 0 {
412
+ init. push ( EncryptRequest {
413
+ options : request_options. clone ( ) ,
414
+ payload : Some ( stream_payload) ,
415
+ } ) ;
416
+ } else {
417
+ init. push ( EncryptRequest {
418
+ options : None ,
419
+ payload : Some ( stream_payload) ,
420
+ } ) ;
421
+ }
422
+ init
423
+ } )
424
+ . await ;
425
+ self . 0 . encrypt ( requested_items) . await
426
+ }
427
+
428
+ /// Decrypt binary data using Dapr. returns Vec<u8>.
429
+ ///
430
+ /// # Arguments
431
+ ///
432
+ /// * `encrypted` - Encrypted data usually returned from encrypted, Vec<StreamPayload>
433
+ /// * `options` - Decryption request options.
434
+ pub async fn decrypt (
435
+ & mut self ,
436
+ encrypted : Vec < StreamPayload > ,
437
+ options : DecryptRequestOptions ,
438
+ ) -> Result < Vec < u8 > , Status > {
439
+ let requested_items: Vec < DecryptRequest > = encrypted
440
+ . iter ( )
441
+ . enumerate ( )
442
+ . map ( |( i, item) | {
443
+ if i == 0 {
444
+ DecryptRequest {
445
+ options : Some ( options. clone ( ) ) ,
446
+ payload : Some ( item. clone ( ) ) ,
447
+ }
448
+ } else {
449
+ DecryptRequest {
450
+ options : None ,
451
+ payload : Some ( item. clone ( ) ) ,
452
+ }
453
+ }
454
+ } )
455
+ . collect ( ) ;
456
+ self . 0 . decrypt ( requested_items) . await
457
+ }
382
458
}
383
459
384
460
#[ async_trait]
@@ -420,6 +496,11 @@ pub trait DaprInterface: Sized {
420
496
& mut self ,
421
497
request : UnsubscribeConfigurationRequest ,
422
498
) -> Result < UnsubscribeConfigurationResponse , Error > ;
499
+
500
+ async fn encrypt ( & mut self , payload : Vec < EncryptRequest > )
501
+ -> Result < Vec < StreamPayload > , Status > ;
502
+
503
+ async fn decrypt ( & mut self , payload : Vec < DecryptRequest > ) -> Result < Vec < u8 > , Status > ;
423
504
}
424
505
425
506
#[ async_trait]
@@ -535,6 +616,51 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
535
616
. await ?
536
617
. into_inner ( ) )
537
618
}
619
+
620
+ /// Encrypt binary data using Dapr. returns Vec<StreamPayload> to be used in decrypt method
621
+ ///
622
+ /// # Arguments
623
+ ///
624
+ /// * `payload` - ReaderStream to the data to encrypt
625
+ /// * `request_option` - Encryption request options.
626
+ async fn encrypt (
627
+ & mut self ,
628
+ request : Vec < EncryptRequest > ,
629
+ ) -> Result < Vec < StreamPayload > , Status > {
630
+ let request = Request :: new ( tokio_stream:: iter ( request) ) ;
631
+ let stream = self . encrypt_alpha1 ( request) . await ?;
632
+ let mut stream = stream. into_inner ( ) ;
633
+ let mut return_data = vec ! [ ] ;
634
+ while let Some ( resp) = stream. next ( ) . await {
635
+ if let Ok ( resp) = resp {
636
+ if let Some ( data) = resp. payload {
637
+ return_data. push ( data)
638
+ }
639
+ }
640
+ }
641
+ Ok ( return_data)
642
+ }
643
+
644
+ /// Decrypt binary data using Dapr. returns Vec<u8>.
645
+ ///
646
+ /// # Arguments
647
+ ///
648
+ /// * `encrypted` - Encrypted data usually returned from encrypted, Vec<StreamPayload>
649
+ /// * `options` - Decryption request options.
650
+ async fn decrypt ( & mut self , request : Vec < DecryptRequest > ) -> Result < Vec < u8 > , Status > {
651
+ let request = Request :: new ( tokio_stream:: iter ( request) ) ;
652
+ let stream = self . decrypt_alpha1 ( request) . await ?;
653
+ let mut stream = stream. into_inner ( ) ;
654
+ let mut data = vec ! [ ] ;
655
+ while let Some ( resp) = stream. next ( ) . await {
656
+ if let Ok ( resp) = resp {
657
+ if let Some ( mut payload) = resp. payload {
658
+ data. append ( payload. data . as_mut ( ) )
659
+ }
660
+ }
661
+ }
662
+ Ok ( data)
663
+ }
538
664
}
539
665
540
666
/// A request from invoking a service
@@ -614,6 +740,19 @@ pub type UnsubscribeConfigurationResponse = dapr_v1::UnsubscribeConfigurationRes
614
740
/// A tonic based gRPC client
615
741
pub type TonicClient = dapr_v1:: dapr_client:: DaprClient < TonicChannel > ;
616
742
743
+ /// Encryption gRPC request
744
+ pub type EncryptRequest = crate :: dapr:: dapr:: proto:: runtime:: v1:: EncryptRequest ;
745
+
746
+ /// Decrypt gRPC request
747
+ pub type DecryptRequest = crate :: dapr:: dapr:: proto:: runtime:: v1:: DecryptRequest ;
748
+
749
+ /// Encryption request options
750
+ pub type EncryptRequestOptions = crate :: dapr:: dapr:: proto:: runtime:: v1:: EncryptRequestOptions ;
751
+
752
+ /// Decryption request options
753
+ pub type DecryptRequestOptions = crate :: dapr:: dapr:: proto:: runtime:: v1:: DecryptRequestOptions ;
754
+
755
+ type StreamPayload = crate :: dapr:: dapr:: proto:: common:: v1:: StreamPayload ;
617
756
impl < K > From < ( K , Vec < u8 > ) > for common_v1:: StateItem
618
757
where
619
758
K : Into < String > ,
@@ -626,3 +765,11 @@ where
626
765
}
627
766
}
628
767
}
768
+
769
+ pub struct ReaderStream < T > ( tokio_util:: io:: ReaderStream < T > ) ;
770
+
771
+ impl < T : AsyncRead > ReaderStream < T > {
772
+ pub fn new ( data : T ) -> Self {
773
+ ReaderStream ( tokio_util:: io:: ReaderStream :: new ( data) )
774
+ }
775
+ }
0 commit comments