Skip to content

Commit 339913c

Browse files
authored
acks: send sequence-0 ack for 0-window batches (#479)
* acks: send sequence-0 ack for 0-window batches Ensures that at least one ACK is sent in reply to each window, including those with 0-length, by emitting an empty batch when we receive a 0-window frame and sending a sequence-0 ACK in reply when we process an empty batch. This enables upstream users of the protocol to (ab)use a 0-length window frame and an _ensured_ in-protocol response to validate that we are alive. Note: 0-sequence ACKs are already used as application-level keep-alives between when the `ConnectionHandler` receives bytes and when `BeatsHandler` finishes processing a batch, and therefore should already be handled gracefully by any existing clients. * version, changelog
1 parent 4889fd4 commit 339913c

File tree

8 files changed

+69
-3
lines changed

8 files changed

+69
-3
lines changed

.ci/run.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ env
55

66
set -ex
77

8+
bundle exec rake test:java
89
bundle exec rspec --format=documentation
910

1011
bundle exec rake test:integration:setup

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 6.7.0
2+
- PROTOCOL: adds explicit support for receiving a 0-length window to encapsulate an empty batch. Empty batches are acknowledged with the same 0-sequence ACK's that are used as keep-alives during processing. [#479](https://github.com/logstash-plugins/logstash-input-beats/pull/479)
3+
14
## 6.6.4
25
- [DOC] Fix misleading `enrich/source_data` input beats documentation about the Logstash host. [#478](https://github.com/logstash-plugins/logstash-input-beats/pull/478)
36

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
6.6.4
1+
6.7.0

lib/tasks/test.rake

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ end
2121
require "logstash/devutils/rake"
2222

2323
namespace :test do
24+
task :java do
25+
exit(1) unless system './gradlew test'
26+
end
2427
namespace :integration do
2528
task :setup do
2629
Rake::Task["test:integration:setup:filebeat"].invoke

src/main/java/org/logstash/beats/BeatsHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public void channelRead0(ChannelHandlerContext ctx, Batch batch) {
4545
logger.debug(format("Received a new payload"));
4646
}
4747
try {
48+
if (batch.isEmpty()) {
49+
logger.debug("Sending 0-seq ACK for empty batch");
50+
writeAck(ctx, batch.getProtocol(), 0);
51+
}
4852
for (Message message : batch) {
4953
if (logger.isDebugEnabled()) {
5054
logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence()));

src/main/java/org/logstash/beats/BeatsParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
112112
logger.warn("New window size received but the current batch was not complete, sending the current batch");
113113
out.add(batch);
114114
batchComplete();
115+
} else if (batch.getBatchSize() == 0) {
116+
logger.debug("New window size 0 received, sending an empty batch");
117+
out.add(batch);
118+
batchComplete();
115119
}
116120

117121
transition(States.READ_HEADER);

src/test/java/org/logstash/beats/BeatsHandlerTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,19 @@ public void testAcksLastMessageInBatch() {
121121
embeddedChannel.writeInbound(batch);
122122
assertEquals(messageCount, spyListener.getLastMessages().size());
123123
Ack ack = embeddedChannel.readOutbound();
124-
assertEquals(ack.getProtocol(), Protocol.VERSION_1);
125-
assertEquals(ack.getSequence(), startSequenceNumber + messageCount - 1);
124+
assertEquals(Protocol.VERSION_1, ack.getProtocol());
125+
assertEquals(startSequenceNumber + messageCount - 1, ack.getSequence());
126+
embeddedChannel.close();
127+
}
128+
129+
@Test
130+
public void testAcksZeroSequenceForEmptyBatch() {
131+
EmbeddedChannel embeddedChannel = new EmbeddedChannel(new BeatsHandler(spyListener));
132+
embeddedChannel.writeInbound(new V2Batch());
133+
assertEquals(0, spyListener.getLastMessages().size());
134+
Ack ack = embeddedChannel.readOutbound();
135+
assertEquals(Protocol.VERSION_2, ack.getProtocol());
136+
assertEquals(0, ack.getSequence());
126137
embeddedChannel.close();
127138
}
128139
}

src/test/java/org/logstash/beats/BeatsParserTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.hamcrest.Matchers.isA;
2121
import static org.junit.Assert.assertEquals;
2222
import static org.junit.Assert.assertNotNull;
23+
import static org.junit.Assert.assertTrue;
2324

2425

2526
public class BeatsParserTest {
@@ -135,6 +136,45 @@ public void testShouldNotCrashOnGarbageData() {
135136
sendPayloadToParser(randomBufferData);
136137
}
137138

139+
@Test
140+
public void testV1EmptyWindowEmitsEmptyBatch() {
141+
Batch decodedBatch = decodeBatch(new V1Batch());
142+
143+
assertNotNull(decodedBatch);
144+
assertTrue(decodedBatch.isEmpty());
145+
assertEquals(0, decodedBatch.getBatchSize());
146+
assertEquals(0, decodedBatch.size());
147+
}
148+
@Test
149+
public void testV2EmptyWindowEmitsEmptyBatch() {
150+
Batch decodedBatch = decodeBatch(new V2Batch());
151+
152+
assertNotNull(decodedBatch);
153+
assertTrue(decodedBatch.isEmpty());
154+
assertEquals(0, decodedBatch.getBatchSize());
155+
assertEquals(0, decodedBatch.size());
156+
}
157+
158+
159+
@Test
160+
public void testV1CompressedFrameEmptyWindowEmitsEmptyBatch() {
161+
Batch decodedBatch = decodeCompressedBatch(new V1Batch());
162+
163+
assertNotNull(decodedBatch);
164+
assertTrue(decodedBatch.isEmpty());
165+
assertEquals(0, decodedBatch.getBatchSize());
166+
assertEquals(0, decodedBatch.size());
167+
}
168+
@Test
169+
public void testV2CompressedFrameEmptyWindowEmitsEmptyBatch() {
170+
Batch decodedBatch = decodeCompressedBatch(new V2Batch());
171+
172+
assertNotNull(decodedBatch);
173+
assertTrue(decodedBatch.isEmpty());
174+
assertEquals(0, decodedBatch.getBatchSize());
175+
assertEquals(0, decodedBatch.size());
176+
}
177+
138178

139179
@Test
140180
public void testNegativeJsonPayloadShouldRaiseAnException() throws JsonProcessingException {

0 commit comments

Comments
 (0)