Skip to content

Commit a981605

Browse files
handle dummy frame in cassandra sink cluster
1 parent 69c1e7d commit a981605

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

shotover/src/transforms/cassandra/sink_cluster/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ impl CassandraConnection {
2727

2828
pub fn send(&mut self, requests: Vec<Message>) -> Result<(), ConnectionError> {
2929
self.pending_request_stream_ids
30-
.extend(requests.iter().map(|x| x.stream_id().unwrap()));
30+
.extend(requests.iter().filter_map(|x| x.stream_id()));
3131
self.connection.send(requests)
3232
}
3333

shotover/src/transforms/cassandra/sink_cluster/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ struct CassandraSinkCluster {
252252
impl CassandraSinkCluster {
253253
async fn send_message(&mut self, mut requests: Messages) -> Result<Messages> {
254254
if self.version.is_none() {
255-
if let Some(message) = requests.first() {
255+
if let Some(message) = requests.iter().find(|m| !m.is_dummy()) {
256256
if let Ok(Metadata::Cassandra(CassandraMetadata { version, .. })) =
257257
message.metadata()
258258
{
@@ -468,6 +468,13 @@ impl CassandraSinkCluster {
468468
responses: &mut Vec<Message>,
469469
) -> Result<()> {
470470
for mut message in requests.into_iter() {
471+
if message.is_dummy() {
472+
let mut dummy_response = Message::from_frame(Frame::Dummy);
473+
dummy_response.set_request_id(message.id());
474+
responses.push(dummy_response);
475+
continue;
476+
}
477+
471478
if self.pool.nodes().is_empty()
472479
|| !self.init_handshake_complete
473480
// system.local and system.peers must be routed to the same node otherwise the system.local node will be amongst the system.peers nodes and a node will be missing

0 commit comments

Comments
 (0)