Skip to content

GRPC calls may hang indefinitely in the event of a server fault #672

Open
@alexjpwalker

Description

@alexjpwalker

Problem to Solve

Suppose there is some (any) kind of issue affecting the server, or the connection to it. In a recent incident we had a TypeDB Cloud cluster node that was not responding to the user_token GRPC request. This meant that TypeDB.cloudDriver (using, in our case, either the Java driver or Rust driver) would hang indefinitely, rather than throwing an error.

Proposed Solution

The most obvious solution would be to add a timeout to GRPC calls in the Rust driver. This would need to be done with care, as long-running queries are legitimate.

Additional Information

We ran the following test...

let connection = Connection::new_cloud_with_translation(
        [
            ("address1", "localhost:1729"),
            ("address2", "localhost:1730"),
            ("address3", "localhost:1731"),
        ]
        .into(),
        Credential::without_tls("username", "password")
    )
    .unwrap();

... with the following modifications to our source code (println statements)

/* connection/network/transmitter/rpc.rs */

    pub(in crate::connection) fn start_cloud(
        address: Address,
        credential: Credential,
        runtime: &BackgroundRuntime,
    ) -> Result<Self> {
        println!("{}", address.clone().to_string());
        let (request_sink, request_source) = unbounded_async();
        let (shutdown_sink, shutdown_source) = unbounded_async();
        runtime.run_blocking(async move {
            println!("a");
            let (channel, call_credentials) = open_callcred_channel(address, credential)?;
            println!("b");
            let rpc = RPCStub::new(channel, Some(call_credentials)).await;
            println!("c");
            tokio::spawn(Self::dispatcher_loop(rpc, request_source, shutdown_source));
            Ok::<(), Error>(())
        })?;
        Ok(Self { request_sink, shutdown_sink })
    }
/* connection/network/stub.rs */
    pub(super) async fn new(channel: Channel, call_credentials: Option<Arc<CallCredentials>>) -> Self {
        println!("d");
        let mut this = Self { grpc: GRPC::new(channel), call_credentials };
        println!("e");
        if let Err(err) = this.renew_token().await {
            warn!("{err:?}");
        }
        println!("f");
        this
    }

    async fn renew_token(&mut self) -> Result {
        if let Some(call_credentials) = &self.call_credentials {
            trace!("renewing token...");
            println!("g");
            call_credentials.reset_token();
            let req = user::token::Req { username: call_credentials.username().to_owned() };
            trace!("sending token request...");
            println!("h");
            let token = self.grpc.user_token(req).await?.into_inner().token;
            println!("i");
            call_credentials.set_token(token);
            trace!("renewed token");
            println!("j");
        }
        Ok(())
    }

This produced the following output ...

running 1 test
localhost:1730
a
b
d
e
g
h
test integration::network::address_translation has been running for over 60 seconds

This indicates that it hung at self.grpc.user_token(req).await in renew_token.

Naturally, you'd need a broken server to actually reproduce the issue; we hypothesise that it stops responding when there are too many concurrent connections to it.

Putting the server on a breakpoint might also work.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions