@@ -4,21 +4,24 @@ use crate::{
44} ;
55use csi_driver:: {
66 context:: { CreateParams , CreateSnapshotParams , PublishParams , QuiesceFsCandidate } ,
7- node:: internal:: { node_plugin_client:: NodePluginClient , FreezeFsRequest , UnfreezeFsRequest } ,
7+ node:: internal:: {
8+ node_plugin_client:: NodePluginClient , ForceUnstageVolumeRequest , FreezeFsRequest ,
9+ UnfreezeFsRequest ,
10+ } ,
811} ;
912use rpc:: csi:: { volume_content_source:: Type , Topology as CsiTopology , * } ;
1013use stor_port:: types:: v0:: openapi:: {
1114 models,
1215 models:: {
13- AffinityGroup , LabelledTopology , NodeSpec , NodeStatus , Pool , PoolStatus , PoolTopology ,
14- SpecStatus , Volume , VolumeShareProtocol ,
16+ AffinityGroup , AppNode , LabelledTopology , NodeSpec , NodeStatus , Pool , PoolStatus ,
17+ PoolTopology , SpecStatus , Volume , VolumeShareProtocol ,
1518 } ,
1619} ;
17- use utils:: { dsp_created_by_key, DSP_OPERATOR } ;
20+ use utils:: { dsp_created_by_key, DEFAULT_REQ_TIMEOUT , DSP_OPERATOR } ;
1821
1922use regex:: Regex ;
20- use std:: { collections:: HashMap , str:: FromStr } ;
21- use tonic:: { Code , Request , Response , Status } ;
23+ use std:: { collections:: HashMap , str:: FromStr , time :: Duration } ;
24+ use tonic:: { transport :: Uri , Code , Request , Response , Status } ;
2225use tracing:: { debug, error, instrument, trace, warn} ;
2326use uuid:: Uuid ;
2427use volume_capability:: AccessType ;
@@ -31,13 +34,15 @@ const SNAPSHOT_NAME_PATTERN: &str =
3134#[ derive( Debug ) ]
3235pub ( crate ) struct CsiControllerSvc {
3336 create_volume_limiter : std:: sync:: Arc < tokio:: sync:: Semaphore > ,
37+ force_unstage_volume : bool ,
3438}
3539impl CsiControllerSvc {
3640 pub ( crate ) fn new ( cfg : & CsiControllerConfig ) -> Self {
3741 Self {
3842 create_volume_limiter : std:: sync:: Arc :: new ( tokio:: sync:: Semaphore :: new (
3943 cfg. create_volume_limit ( ) ,
4044 ) ) ,
45+ force_unstage_volume : cfg. force_unstage_volume ( ) ,
4146 }
4247 }
4348 async fn create_volume_permit ( & self ) -> Result < tokio:: sync:: SemaphorePermit , tonic:: Status > {
@@ -91,12 +96,30 @@ fn volume_app_node(volume: &Volume) -> Option<String> {
9196 }
9297}
9398
94- #[ tracing:: instrument]
99+ /// Create a new endpoint that connects to the provided Uri.
100+ /// This endpoint has default connect and request timeouts.
101+ fn tonic_endpoint ( endpoint : String ) -> Result < tonic:: transport:: Endpoint , Status > {
102+ let uri =
103+ Uri :: try_from ( endpoint) . map_err ( |error| Status :: invalid_argument ( error. to_string ( ) ) ) ?;
104+
105+ let timeout = humantime:: parse_duration ( DEFAULT_REQ_TIMEOUT ) . unwrap ( ) ;
106+ Ok ( tonic:: transport:: Endpoint :: from ( uri)
107+ . connect_timeout ( timeout)
108+ . timeout ( std:: time:: Duration :: from_secs ( 30 ) )
109+ . http2_keep_alive_interval ( Duration :: from_secs ( 5 ) )
110+ . keep_alive_timeout ( Duration :: from_secs ( 10 ) )
111+ . concurrency_limit ( utils:: DEFAULT_GRPC_CLIENT_CONCURRENCY ) )
112+ }
113+
114+ #[ tracing:: instrument( err, skip_all) ]
95115async fn issue_fs_freeze ( endpoint : String , volume_id : String ) -> Result < ( ) , Status > {
96116 trace ! ( "Issuing fs freeze" ) ;
97- let mut client = NodePluginClient :: connect ( format ! ( "http://{endpoint}" ) )
117+ let channel = tonic_endpoint ( format ! ( "http://{endpoint}" ) ) ?
118+ . connect ( )
98119 . await
99- . map_err ( |error| Status :: failed_precondition ( error. to_string ( ) ) ) ?;
120+ . map_err ( |error| Status :: unavailable ( error. to_string ( ) ) ) ?;
121+ let mut client = NodePluginClient :: new ( channel) ;
122+
100123 match client
101124 . freeze_fs ( Request :: new ( FreezeFsRequest {
102125 volume_id : volume_id. clone ( ) ,
@@ -112,12 +135,15 @@ async fn issue_fs_freeze(endpoint: String, volume_id: String) -> Result<(), Stat
112135 }
113136}
114137
115- #[ tracing:: instrument]
138+ #[ tracing:: instrument( err , skip_all ) ]
116139async fn issue_fs_unfreeze ( endpoint : String , volume_id : String ) -> Result < ( ) , Status > {
117140 trace ! ( "Issuing fs unfreeze" ) ;
118- let mut client = NodePluginClient :: connect ( format ! ( "http://{endpoint}" ) )
141+ let channel = tonic_endpoint ( format ! ( "http://{endpoint}" ) ) ?
142+ . connect ( )
119143 . await
120- . map_err ( |error| Status :: failed_precondition ( error. to_string ( ) ) ) ?;
144+ . map_err ( |error| Status :: unavailable ( error. to_string ( ) ) ) ?;
145+ let mut client = NodePluginClient :: new ( channel) ;
146+
121147 match client
122148 . unfreeze_fs ( Request :: new ( UnfreezeFsRequest {
123149 volume_id : volume_id. clone ( ) ,
@@ -133,6 +159,28 @@ async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), St
133159 }
134160}
135161
162+ #[ tracing:: instrument( err, skip_all) ]
163+ async fn force_unstage ( app_node : AppNode , volume_id : String ) -> Result < ( ) , Status > {
164+ tracing:: info!(
165+ "Issuing cleanup for volume: {} to node {}, to endpoint {}" ,
166+ volume_id,
167+ app_node. id,
168+ app_node. spec. endpoint
169+ ) ;
170+ let channel = tonic_endpoint ( format ! ( "http://{}" , app_node. spec. endpoint) ) ?
171+ . connect ( )
172+ . await
173+ . map_err ( |error| Status :: unavailable ( error. to_string ( ) ) ) ?;
174+ let mut client = NodePluginClient :: new ( channel) ;
175+
176+ client
177+ . force_unstage_volume ( Request :: new ( ForceUnstageVolumeRequest {
178+ volume_id : volume_id. clone ( ) ,
179+ } ) )
180+ . await
181+ . map ( |_| ( ) )
182+ }
183+
136184/// Get share URI for existing volume object and the node where the volume is published.
137185fn get_volume_share_location ( volume : & Volume ) -> Option < ( String , String ) > {
138186 volume
@@ -518,13 +566,12 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
518566 error ! ( "{}" , m) ;
519567 return Err ( Status :: internal ( m) ) ;
520568 }
521- } ,
569+ }
522570 _ => {
523-
524571 // Check for node being cordoned.
525572 fn cordon_check ( spec : Option < & NodeSpec > ) -> bool {
526573 if let Some ( spec) = spec {
527- return spec. cordondrainstate . is_some ( )
574+ return spec. cordondrainstate . is_some ( ) ;
528575 }
529576 false
530577 }
@@ -538,13 +585,19 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
538585 // then let the control-plane decide where to place the target. Node should not be cordoned.
539586 Ok ( node) if node. state . as_ref ( ) . map ( |n| n. status ) . unwrap_or ( NodeStatus :: Unknown ) != NodeStatus :: Online || cordon_check ( node. spec . as_ref ( ) ) => {
540587 Ok ( None )
541- } ,
588+ }
542589 // For 1-replica volumes, don't pre-select the target node. This will allow the
543590 // control-plane to pin the target to the replica node.
544591 Ok ( _) if volume. spec . num_replicas == 1 => Ok ( None ) ,
545592 Ok ( _) => Ok ( Some ( node_id. as_str ( ) ) ) ,
546593 } ?;
547594
595+ // Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing
596+ if self . force_unstage_volume {
597+ let app_node = RestApiClient :: get_client ( ) . get_app_node ( & args. node_id ) . await ?;
598+ force_unstage ( app_node, volume_id. to_string ( ) ) . await ?;
599+ }
600+
548601 // Volume is not published.
549602 let v = RestApiClient :: get_client ( )
550603 . publish_volume ( & volume_id, target_node, protocol, args. node_id . clone ( ) , & publish_context)
0 commit comments