Skip to content

Commit b3b6c65

Browse files
Zero Buffer Implementation and Tests (#5416)
* Zero Buffer Implementation and Tests Signed-off-by: Mohammed Aghil Puthiyottil <[email protected]> * Moved ZeroBuffer Implementation into data-prepper-core and addressed comments Signed-off-by: Mohammed Aghil Puthiyottil <[email protected]> * Modified ZeroBufferTests to use MockitoExtension and addressed comments Signed-off-by: Mohammed Aghil Puthiyottil <[email protected]> --------- Signed-off-by: Mohammed Aghil Puthiyottil <[email protected]>
1 parent 815ddc0 commit b3b6c65

File tree

4 files changed

+398
-0
lines changed

4 files changed

+398
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.opensearch.dataprepper.core.pipeline;
2+
3+
public interface PipelineRunner {
4+
void runAllProcessorsAndPublishToSinks();
5+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.opensearch.dataprepper.core.pipeline;
2+
3+
public interface SupportsPipelineRunner {
4+
PipelineRunner getPipelineRunner();
5+
6+
void setPipelineRunner(PipelineRunner pipelineRunner);
7+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package org.opensearch.dataprepper.core.pipeline.buffer;
2+
3+
import com.google.common.annotations.VisibleForTesting;
4+
import org.opensearch.dataprepper.core.pipeline.PipelineRunner;
5+
import org.opensearch.dataprepper.core.pipeline.SupportsPipelineRunner;
6+
import org.opensearch.dataprepper.metrics.MetricNames;
7+
import org.opensearch.dataprepper.metrics.PluginMetrics;
8+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
9+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
10+
import org.opensearch.dataprepper.model.buffer.Buffer;
11+
import org.opensearch.dataprepper.model.CheckpointState;
12+
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
13+
import org.opensearch.dataprepper.model.record.Record;
14+
import io.micrometer.core.instrument.Counter;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
import java.util.ArrayList;
18+
import java.util.Collection;
19+
import java.util.Map;
20+
import java.util.concurrent.TimeoutException;
21+
22+
@DataPrepperPlugin(name = "zero", pluginType = Buffer.class)
23+
public class ZeroBuffer<T extends Record<?>> implements Buffer<T>, SupportsPipelineRunner {
24+
private static final Logger LOG = LoggerFactory.getLogger(ZeroBuffer.class);
25+
private static final String PLUGIN_COMPONENT_ID = "ZeroBuffer";
26+
private final PluginMetrics pluginMetrics;
27+
private final ThreadLocal<Collection<T>> threadLocalStore;
28+
private PipelineRunner pipelineRunner;
29+
@VisibleForTesting
30+
final String pipelineName;
31+
private final Counter writeRecordsCounter;
32+
private final Counter readRecordsCounter;
33+
34+
@DataPrepperPluginConstructor
35+
public ZeroBuffer(PipelineDescription pipelineDescription) {
36+
this.pluginMetrics = PluginMetrics.fromNames(PLUGIN_COMPONENT_ID, pipelineDescription.getPipelineName());
37+
this.pipelineName = pipelineDescription.getPipelineName();
38+
this.threadLocalStore = new ThreadLocal<>();
39+
this.writeRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_WRITTEN);
40+
this.readRecordsCounter = pluginMetrics.counter(MetricNames.RECORDS_READ);
41+
}
42+
43+
@Override
44+
public void write(T record, int timeoutInMillis) throws TimeoutException {
45+
if (record == null) {
46+
throw new NullPointerException("The write record cannot be null");
47+
}
48+
49+
if (threadLocalStore.get() == null) {
50+
threadLocalStore.set(new ArrayList<>());
51+
}
52+
53+
threadLocalStore.get().add(record);
54+
writeRecordsCounter.increment();
55+
56+
getPipelineRunner().runAllProcessorsAndPublishToSinks();
57+
}
58+
59+
@Override
60+
public void writeAll(Collection<T> records, int timeoutInMillis) throws Exception {
61+
if (records == null) {
62+
throw new NullPointerException("The write records cannot be null");
63+
}
64+
65+
if (threadLocalStore.get() == null) {
66+
threadLocalStore.set(new ArrayList<>(records));
67+
} else {
68+
// Add the new records to the existing records
69+
threadLocalStore.get().addAll(records);
70+
}
71+
72+
writeRecordsCounter.increment((double) records.size());
73+
getPipelineRunner().runAllProcessorsAndPublishToSinks();
74+
}
75+
76+
@Override
77+
public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
78+
if (threadLocalStore.get() == null) {
79+
threadLocalStore.set(new ArrayList<>());
80+
}
81+
82+
Collection<T> storedRecords = threadLocalStore.get();
83+
CheckpointState checkpointState = new CheckpointState(0);
84+
if (storedRecords!= null && !storedRecords.isEmpty()) {
85+
checkpointState = new CheckpointState(storedRecords.size());
86+
threadLocalStore.remove();
87+
readRecordsCounter.increment((double) storedRecords.size());
88+
}
89+
90+
return Map.entry(storedRecords, checkpointState);
91+
}
92+
93+
@Override
94+
public void checkpoint(CheckpointState checkpointState) {}
95+
96+
@Override
97+
public boolean isEmpty() {
98+
return (this.threadLocalStore.get() == null || this.threadLocalStore.get().isEmpty());
99+
}
100+
101+
@Override
102+
public PipelineRunner getPipelineRunner() {
103+
return pipelineRunner;
104+
}
105+
106+
@Override
107+
public void setPipelineRunner(PipelineRunner pipelineRunner) {
108+
this.pipelineRunner = pipelineRunner;
109+
}
110+
}

0 commit comments

Comments
 (0)