Skip to content

Commit f97fc31

Browse files
committed
Add a serving multiplexer to server both web and grpc requests on the same port.
1 parent 32b8b4a commit f97fc31

File tree

11 files changed

+811
-59
lines changed

11 files changed

+811
-59
lines changed

Cargo.lock

Lines changed: 502 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@ edition = "2021"
77
[dependencies]
88
async-std = {version = "1.12", features = ["attributes"]}
99
async-trait = "0.1.80"
10+
axum = "0.6.12"
1011
bytes = "1.6"
1112
chrono = "0.4"
1213
futures = "0.3"
14+
hyper="0.14.25"
15+
multiplex-tonic-hyper = "0.1"
1316
im = "15"
1417
prost = "0.12"
1518
rand = "0.8"
19+
reqwest = "0.12"
1620
structopt = "0.3"
1721
timer = "0.2"
1822
tokio = { version = "1.37", features = ["macros", "rt-multi-thread"] }
@@ -22,6 +26,7 @@ tracing = "0.1.40"
2226
tracing-subscriber = {version = "0.3.1", features = ["env-filter"]}
2327
tower = "0.4.13"
2428
pin-project = "1.1"
29+
querystring = "1.1"
2530

2631
[build-dependencies]
2732
tonic-build = "0.11"

src/keyvalue/http.rs

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
use axum::routing::get;
2+
use axum::Router;
3+
use hyper::{Body, StatusCode};
4+
use std::sync::Arc;
5+
use tonic::Request;
6+
7+
use crate::keyvalue::keyvalue_proto::key_value_server::KeyValue;
8+
use crate::keyvalue::keyvalue_proto::GetRequest;
9+
10+
// Provides an HTTP interface for the supplied KeyValue instance.
11+
#[derive(Clone)]
12+
pub struct HttpHandler {
13+
service: Arc<dyn KeyValue>,
14+
}
15+
16+
impl HttpHandler {
17+
pub fn new(service: Arc<dyn KeyValue>) -> Self {
18+
Self { service }
19+
}
20+
21+
/**
22+
* Installs routes that allow interacting with the keyvalue store. Example queries
23+
* include /get?key=foo.
24+
*/
25+
pub fn routes(self: Arc<Self>) -> Router {
26+
let hello = move |req: hyper::Request<Body>| async move { self.handle_get(req).await };
27+
Router::new().route("/get", get(hello))
28+
}
29+
30+
fn make_response(code: StatusCode, message: String) -> hyper::Response<Body> {
31+
hyper::Response::builder()
32+
.status(code)
33+
.body(Body::from(format!("{}\n", message)))
34+
.unwrap()
35+
}
36+
37+
fn invalid(message: String) -> hyper::Response<Body> {
38+
Self::make_response(StatusCode::BAD_REQUEST, message)
39+
}
40+
41+
async fn handle_get(&self, request: hyper::Request<Body>) -> hyper::Response<Body> {
42+
let query = match request.uri().query() {
43+
Some(q) => q,
44+
None => return HttpHandler::invalid("must pass query".to_string()),
45+
};
46+
47+
let parsed = querystring::querify(query);
48+
let key = match parsed
49+
.iter()
50+
.find(|(a, _)| a.eq(&"key"))
51+
.map(|(_, b)| b.to_string())
52+
{
53+
Some(value) => value,
54+
_ => return HttpHandler::invalid("must pass key parameter".to_string()),
55+
};
56+
57+
let request = Request::new(GetRequest {
58+
key: key.clone().as_bytes().to_vec(),
59+
version: -1,
60+
});
61+
62+
match self.service.get(request).await {
63+
Ok(p) => {
64+
let proto = p.into_inner();
65+
match proto.entry {
66+
Some(e) => match String::from_utf8(e.value) {
67+
Ok(value) => HttpHandler::make_response(
68+
StatusCode::OK,
69+
format!("{}={}, version={}", key, value, proto.version),
70+
),
71+
_ => HttpHandler::make_response(
72+
StatusCode::BAD_REQUEST,
73+
format!("failed to parse value as utf8 for key {}", key),
74+
),
75+
},
76+
None => HttpHandler::make_response(
77+
StatusCode::NOT_FOUND,
78+
format!("no value for key {}", key),
79+
),
80+
}
81+
}
82+
Err(status) => HttpHandler::make_response(
83+
StatusCode::INTERNAL_SERVER_ERROR,
84+
format!("Failed to query keyvalue store {}", status.to_string()),
85+
),
86+
}
87+
}
88+
}
89+
90+
#[cfg(test)]
91+
mod tests {
92+
use super::*;
93+
use crate::keyvalue::grpc::PutRequest;
94+
use crate::keyvalue::keyvalue_proto::{Entry, GetResponse, PutResponse};
95+
use crate::testing::TestHttpServer;
96+
use async_trait::async_trait;
97+
use tonic::{Response, Status};
98+
99+
struct FakeKeyValue {}
100+
101+
#[async_trait]
102+
impl KeyValue for FakeKeyValue {
103+
async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
104+
let key = request.into_inner().key.clone();
105+
let key_string = String::from_utf8(key.clone());
106+
let entry = match key_string.expect("utf8").as_str() {
107+
"foo" => Some(Entry {
108+
key: key.clone(),
109+
value: "foo-value".to_string().into_bytes(),
110+
}),
111+
"bar" => Some(Entry {
112+
key: key.clone(),
113+
value: "bar-value".to_string().into_bytes(),
114+
}),
115+
_ => None,
116+
};
117+
118+
Ok(Response::new(GetResponse { entry, version: 0 }))
119+
}
120+
121+
async fn put(&self, _: Request<PutRequest>) -> Result<Response<PutResponse>, Status> {
122+
// We don't need any writes for now.
123+
panic!("Not implemented");
124+
}
125+
}
126+
127+
async fn make_server() -> TestHttpServer {
128+
let kv = Arc::new(FakeKeyValue {});
129+
let http = Arc::new(HttpHandler {
130+
service: kv.clone(),
131+
});
132+
let web_service = Router::new()
133+
.nest("/keyvalue", http.routes())
134+
.into_make_service();
135+
136+
TestHttpServer::run(web_service).await
137+
}
138+
139+
async fn send_request(server: &TestHttpServer, path: &str) -> reqwest::Response {
140+
let port = server.port().expect("port");
141+
let uri = format!("http://{}:{}{}", "127.0.0.1", port, path);
142+
reqwest::get(uri.as_str()).await.expect("request")
143+
}
144+
145+
#[tokio::test]
146+
async fn test_returns_value() {
147+
let server = make_server().await;
148+
let response = send_request(&server, "/keyvalue/get?key=foo").await;
149+
150+
let status = response.status().clone();
151+
let text = response.text().await.expect("text");
152+
153+
assert_eq!(reqwest::StatusCode::OK, status);
154+
assert_eq!("foo=foo-value, version=0", text.trim());
155+
}
156+
157+
#[tokio::test]
158+
async fn test_not_found() {
159+
let server = make_server().await;
160+
let response = send_request(&server, "/keyvalue/get?key=not-a-rea-key").await;
161+
let status = response.status();
162+
assert_eq!(reqwest::StatusCode::NOT_FOUND, status);
163+
}
164+
165+
#[tokio::test]
166+
async fn test_bad_path() {
167+
let server = make_server().await;
168+
let response = send_request(&server, "/INVALID/get?key=not-a-rea-key").await;
169+
let status = response.status();
170+
assert_eq!(reqwest::StatusCode::NOT_FOUND, status);
171+
}
172+
}

src/keyvalue/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22
// cluster. Users of this module are expected to run the service in their grpc
33
// server and/or use the generated grpc client code to make requests.
44

5-
pub use crate::keyvalue::store::{MapStore};
5+
pub use crate::keyvalue::store::{MapStore, Store};
6+
pub use http::HttpHandler;
67
pub use service::KeyValueService;
78

89
pub mod grpc {
910
pub use crate::keyvalue::keyvalue_proto::key_value_client::KeyValueClient;
1011
pub use crate::keyvalue::keyvalue_proto::key_value_server::KeyValueServer;
11-
pub use crate::keyvalue::keyvalue_proto::{PutRequest};
12+
pub use crate::keyvalue::keyvalue_proto::PutRequest;
1213
}
1314

15+
pub(in crate::keyvalue) mod http;
1416
#[path = "generated/keyvalue_proto.rs"]
1517
pub(in crate::keyvalue) mod keyvalue_proto;
1618
pub(in crate::keyvalue) mod service;

src/keyvalue/service.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use async_std::sync::{Arc, Mutex};
2+
use async_trait::async_trait;
23
use bytes::Bytes;
34
use prost::Message;
45
use tonic::{Request, Response, Status};
@@ -56,7 +57,7 @@ impl KeyValueService {
5657
}
5758
}
5859

59-
#[tonic::async_trait]
60+
#[async_trait]
6061
impl KeyValue for KeyValueService {
6162
#[instrument(fields(server=%self.name),skip(self,request))]
6263
async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
@@ -141,7 +142,7 @@ mod tests {
141142
use crate::keyvalue::store::MapStore;
142143
use crate::raft::raft_proto::EntryId;
143144
use crate::raft::StateMachine;
144-
use crate::testing::TestServer;
145+
use crate::testing::TestRpcServer;
145146

146147
use super::*;
147148

@@ -151,7 +152,7 @@ mod tests {
151152
store: Arc<Mutex<MapStore>>,
152153
}
153154

154-
#[tonic::async_trait]
155+
#[async_trait]
155156
impl Client for FakeRaftClient {
156157
async fn commit(&self, payload: &[u8]) -> Result<EntryId, Status> {
157158
let copy = payload.to_vec();
@@ -176,7 +177,7 @@ mod tests {
176177
async fn test_get() {
177178
let service = create_service();
178179
let store = service.store.clone();
179-
let server = TestServer::run(KeyValueServer::new(service)).await;
180+
let server = TestRpcServer::run(KeyValueServer::new(service)).await;
180181

181182
store
182183
.lock()
@@ -204,7 +205,7 @@ mod tests {
204205
async fn test_put() {
205206
let service = create_service();
206207
let store = service.store.clone();
207-
let server = TestServer::run(KeyValueServer::new(service)).await;
208+
let server = TestRpcServer::run(KeyValueServer::new(service)).await;
208209

209210
let request = PutRequest {
210211
key: "foo".as_bytes().to_vec(),

src/keyvalue/store.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
extern crate bytes;
22
extern crate im;
33

4-
use std::collections::VecDeque;
54
use bytes::Bytes;
65
use im::HashMap;
7-
use prost::Message;
86
use keyvalue_proto::{Entry, Operation, Snapshot};
7+
use prost::Message;
8+
use std::collections::VecDeque;
99

1010
use crate::keyvalue::keyvalue_proto;
1111
use crate::keyvalue::keyvalue_proto::operation::Op::Set;

src/main.rs

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,29 @@
44
extern crate structopt;
55
extern crate tracing;
66

7-
use std::error::Error;
8-
use std::time::Duration;
9-
107
use async_std::sync::{Arc, Mutex};
8+
use axum::Router;
119
use futures::future::join5;
1210
use futures::future::join_all;
11+
use multiplex_tonic_hyper::MakeMultiplexer;
1312
use rand::seq::SliceRandom;
13+
use std::error::Error;
14+
use std::net::SocketAddr;
15+
use std::time::Duration;
1416
use structopt::StructOpt;
1517
use tokio::time::{sleep, Instant};
18+
use tower::make::Shared;
1619
use tracing::{debug, error, info, info_span, Instrument};
1720
use tracing_subscriber::EnvFilter;
1821

19-
use raft::raft_proto;
20-
use raft::{Diagnostics, FailureOptions, Options, RaftImpl};
21-
use raft_proto::Server;
22-
2322
use crate::keyvalue::grpc::KeyValueClient;
2423
use crate::keyvalue::grpc::KeyValueServer;
2524
use crate::keyvalue::grpc::PutRequest;
2625
use crate::keyvalue::{KeyValueService, MapStore};
26+
use crate::raft::raft_proto;
27+
use crate::raft::{Diagnostics, FailureOptions, Options, RaftImpl};
2728
use crate::raft_proto::raft_server::RaftServer;
29+
use crate::raft_proto::Server;
2830

2931
mod keyvalue;
3032
mod raft;
@@ -60,6 +62,12 @@ fn server(host: &str, port: i32, name: &str) -> Server {
6062
}
6163
}
6264

65+
fn make_address(address: &Server) -> SocketAddr {
66+
format!("[{}]:{}", address.host, address.port)
67+
.parse()
68+
.unwrap()
69+
}
70+
6371
//#[instrument(skip(all,diagnostics))]
6472
async fn run_server(address: &Server, all: &Vec<Server>, diagnostics: Arc<Mutex<Diagnostics>>) {
6573
let server = address.name.to_string();
@@ -74,8 +82,8 @@ async fn run_server(address: &Server, all: &Vec<Server>, diagnostics: Arc<Mutex<
7482
let server_diagnostics = diagnostics.lock().await.get_server(&address);
7583

7684
// Set up the grpc service for the key-value store.
77-
let keyvalue = KeyValueService::new(server.as_str(), &address, kv_store.clone());
78-
let kv_grpc = KeyValueServer::new(keyvalue);
85+
let kv1 = KeyValueService::new(server.as_str(), &address, kv_store.clone());
86+
let kv_grpc = KeyValueServer::new(kv1);
7987

8088
// Set up the grpc service for the raft participant.
8189
let raft = RaftImpl::new(
@@ -89,19 +97,27 @@ async fn run_server(address: &Server, all: &Vec<Server>, diagnostics: Arc<Mutex<
8997
raft.start().await;
9098
let raft_grpc = RaftServer::new(raft);
9199

92-
// Put it all together and serve.
93-
info!("Created raft and key-value service");
94-
let serve = tonic::transport::Server::builder()
95-
.add_service(raft_grpc)
96-
.add_service(kv_grpc)
97-
.serve(
98-
format!("[{}]:{}", address.host, address.port)
99-
.parse()
100-
.unwrap(),
101-
)
102-
.await;
103-
104-
match serve {
100+
// Set up the webservice serving the contents of the kvstore.
101+
// TODO(dino): See if we can reuse/share the "kv1" instance above.
102+
let kv2 = KeyValueService::new(server.as_str(), &address, kv_store.clone());
103+
let kv_http = Arc::new(keyvalue::HttpHandler::new(Arc::new(kv2)));
104+
let web_service = Router::new()
105+
.nest("/keyvalue", kv_http.routes())
106+
.into_make_service();
107+
108+
// Put the pieces together to serve on a single port.
109+
let grpc_service = Shared::new(
110+
tonic::transport::Server::builder()
111+
.add_service(raft_grpc)
112+
.add_service(kv_grpc)
113+
.into_service(),
114+
);
115+
let multiplexer = MakeMultiplexer::new(grpc_service, web_service);
116+
let serve = hyper::Server::bind(&make_address(&address)).serve(multiplexer);
117+
118+
info!("Started server (http, grpc) on port {}", address.port);
119+
120+
match serve.await {
105121
Ok(()) => info!("Serving terminated successfully"),
106122
Err(message) => error!("Serving terminated unsuccessfully: {}", message),
107123
}

0 commit comments

Comments
 (0)