Skip to content

Commit fc57855

Browse files
author
Oliver Hsu
authored
Send entire SinkRecord value serialized as json in Scalyr event message field (#39)
* Add `JsonRecordToMessageMapping` to send entire SinkRecord value serialized as JSON in the `message` field * Add `send_entire_record` config
1 parent 429bf38 commit fc57855

File tree

12 files changed

+300
-35
lines changed

12 files changed

+300
-35
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
<version>${kafka.version}</version>
2727
<scope>provided</scope>
2828
</dependency>
29+
<dependency>
30+
<groupId>org.apache.kafka</groupId>
31+
<artifactId>connect-json</artifactId>
32+
<version>${kafka.version}</version>
33+
</dependency>
2934
<dependency>
3035
<groupId>com.scalyr</groupId>
3136
<artifactId>scalyr-client</artifactId>

src/main/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfig.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ public class ScalyrSinkConnectorConfig extends AbstractConfig {
6868
" Multiple custom application event mappings can be specified in a JSON list. Example config JSON:\n"
6969
+ "[{\"matcher\": { \"attribute\": \"app.name\", \"value\": \"customApp\"},\n" +
7070
" \"eventMapping\": { \"message\": \"message\", \"logfile\": \"log.path\", \"serverHost\": \"host.hostname\", \"parser\": \"fields.parser\", \"version\": \"app.version\"} }]";
71+
public static final String SEND_ENTIRE_RECORD = "send_entire_record";
72+
private static final String SEND_ENTIRE_RECORD_DOC = "If true, send the entire Kafka Connect record value serialized to JSON as the message field.";
7173

7274
public ScalyrSinkConnectorConfig(Map<String, String> parsedConfig) {
7375
super(configDef(), parsedConfig);
@@ -85,7 +87,8 @@ public static ConfigDef configDef() {
8587
.define(ADD_EVENTS_RETRY_DELAY_MS_CONFIG, Type.INT, DEFAULT_ADD_EVENTS_RETRY_DELAY_MS, ConfigDef.Range.atLeast(100), Importance.LOW, ADD_EVENTS_RETRY_DELAY_MS_DOC)
8688
.define(BATCH_SEND_SIZE_BYTES_CONFIG, Type.INT, DEFAULT_BATCH_SEND_SIZE_BYTES, ConfigDef.Range.between(500_000, 5_500_000), Importance.LOW, BATCH_SEND_SIZE_BYTES_DOC)
8789
.define(BATCH_SEND_WAIT_MS_CONFIG, Type.INT, DEFAULT_BATCH_SEND_WAIT_MS, ConfigDef.Range.atLeast(1000), Importance.LOW, BATCH_SEND_WAIT_MS_DOC)
88-
.define(CUSTOM_APP_EVENT_MAPPING_CONFIG, Type.STRING, null, customAppEventMappingValidator, Importance.MEDIUM, CUSTOM_APP_EVENT_MAPPING_DOC);
90+
.define(CUSTOM_APP_EVENT_MAPPING_CONFIG, Type.STRING, null, customAppEventMappingValidator, Importance.MEDIUM, CUSTOM_APP_EVENT_MAPPING_DOC)
91+
.define(SEND_ENTIRE_RECORD, Type.BOOLEAN, false, Importance.LOW, SEND_ENTIRE_RECORD_DOC);
8992
}
9093

9194

src/main/java/com/scalyr/integrations/kafka/ScalyrSinkTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ public void start(Map<String, String> configProps) {
115115

116116
this.eventMapper = new EventMapper(
117117
parseEnrichmentAttrs(sinkConfig.getList(ScalyrSinkConnectorConfig.EVENT_ENRICHMENT_CONFIG)),
118-
parseCustomAppEventMapping(sinkConfig.getString(ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG)));
118+
parseCustomAppEventMapping(sinkConfig.getString(ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG)),
119+
sinkConfig.getBoolean(ScalyrSinkConnectorConfig.SEND_ENTIRE_RECORD));
119120

120121
log.info("Started ScalyrSinkTask with config {}", configProps);
121122
}

src/main/java/com/scalyr/integrations/kafka/mapping/EventMapper.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,31 @@
3939
public class EventMapper {
4040
private static final Logger log = LoggerFactory.getLogger(EventMapper.class);
4141
private static final List<MessageMapper> DEFAULT_MAPPERS = ImmutableList.of(new FilebeatMessageMapper());
42-
private static final List<MessageMapper> messageMappers = new ArrayList<>();
42+
private final List<MessageMapper> messageMappers = new ArrayList<>();
4343
private final Map<String, String> enrichmentAttrs;
4444
private static final RateLimiter noEventMapperLogRateLimiter = RateLimiter.create(1.0/30); // 1 permit every 30 seconds to not log
4545
@VisibleForTesting static final String DEFAULT_PARSER = "kafkaParser";
4646

4747
/**
4848
* @param enrichmentAttrs Map<String, String> of enrichment key/value pairs
4949
*/
50-
public EventMapper(Map<String, String> enrichmentAttrs, List<CustomAppEventMapping> customAppEventMappings) {
50+
public EventMapper(Map<String, String> enrichmentAttrs, List<CustomAppEventMapping> customAppEventMappings, boolean sendEntireRecord) {
5151
this.enrichmentAttrs = enrichmentAttrs;
5252
if (customAppEventMappings != null) {
5353
log.info("Adding custom event mappers {}", customAppEventMappings);
54-
customAppEventMappings.forEach(customAppEventMapping -> messageMappers.add(new CustomAppMessageMapper(customAppEventMapping)));
54+
customAppEventMappings.forEach(customAppEventMapping -> messageMappers.add(
55+
addJsonRecordToMessageMapperIfNeeeded(new CustomAppMessageMapper(customAppEventMapping), sendEntireRecord)));
5556
}
56-
messageMappers.addAll(DEFAULT_MAPPERS);
57+
DEFAULT_MAPPERS.forEach(messageMapper -> messageMappers.add(addJsonRecordToMessageMapperIfNeeeded(messageMapper, sendEntireRecord)));
58+
}
59+
60+
/**
61+
* Add JsonRecordToMessageMapping decorator if sendEntireRecord is enabled.
62+
* @return messageMapper decorated with JsonRecordToMessageMapping when sendEntireRecord=true.
63+
* Otherwise, return original MessageMapper.
64+
*/
65+
private MessageMapper addJsonRecordToMessageMapperIfNeeeded(MessageMapper messageMapper, boolean sendEntireRecord) {
66+
return sendEntireRecord ? new JsonRecordToMessageMapping(messageMapper) : messageMapper;
5767
}
5868

5969
/**
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2020 Scalyr Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.scalyr.integrations.kafka.mapping;
17+
18+
import com.google.common.collect.ImmutableMap;
19+
import org.apache.kafka.connect.json.JsonConverter;
20+
import org.apache.kafka.connect.json.JsonConverterConfig;
21+
import org.apache.kafka.connect.sink.SinkRecord;
22+
import org.apache.kafka.connect.storage.ConverterConfig;
23+
24+
import java.nio.charset.StandardCharsets;
25+
import java.util.Map;
26+
27+
/**
28+
* Decorator for base MessageMapper which will map the entire Record value to the message field.
29+
* This is used to send the entire Kafka record value serialized as JSON to Scalyr in the Scalyr event `message` field.
30+
* The typical use case for this is when the Kafka record value is JSON and the entire JSON needs to be sent to Scalyr.
31+
*/
32+
public class JsonRecordToMessageMapping implements MessageMapper {
33+
/** MessageMapper for the SinkRecord message format. */
34+
private final MessageMapper baseMapper;
35+
private final JsonConverter jsonConverter;
36+
37+
public JsonRecordToMessageMapping(MessageMapper baseMapper) {
38+
this.baseMapper = baseMapper;
39+
jsonConverter = new JsonConverter();
40+
jsonConverter.configure(ImmutableMap.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false, ConverterConfig.TYPE_CONFIG, "value"));
41+
}
42+
43+
@Override
44+
public String getServerHost(SinkRecord record) {
45+
return baseMapper.getServerHost(record);
46+
}
47+
48+
@Override
49+
public String getLogfile(SinkRecord record) {
50+
return baseMapper.getLogfile(record);
51+
}
52+
53+
@Override
54+
public String getParser(SinkRecord record) {
55+
return baseMapper.getParser(record);
56+
}
57+
58+
@Override
59+
public String getMessage(SinkRecord record) {
60+
return new String(jsonConverter.fromConnectData(
61+
record.topic(),
62+
record.valueSchema(),
63+
record.value()),
64+
StandardCharsets.UTF_8);
65+
}
66+
67+
@Override
68+
public Map<String, Object> getAdditionalAttrs(SinkRecord record) {
69+
return baseMapper.getAdditionalAttrs(record);
70+
}
71+
72+
@Override
73+
public boolean matches(SinkRecord record) {
74+
return baseMapper.matches(record);
75+
}
76+
}

src/test/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfigTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static com.scalyr.integrations.kafka.ScalyrSinkConnectorConfig.*;
3232
import static com.scalyr.integrations.kafka.TestUtils.fails;
3333
import static org.junit.Assert.assertEquals;
34+
import static org.junit.Assert.assertFalse;
3435
import static org.junit.Assert.assertNull;
3536
import static org.junit.Assert.assertTrue;
3637

@@ -64,7 +65,8 @@ public void testConfig() {
6465
EVENT_ENRICHMENT_CONFIG, TEST_EVENT_ENRICHMENT,
6566
BATCH_SEND_SIZE_BYTES_CONFIG, TEST_BATCH_SEND_SIZE,
6667
BATCH_SEND_WAIT_MS_CONFIG, TEST_BATCH_SEND_WAIT_MS,
67-
CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON);
68+
CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON,
69+
SEND_ENTIRE_RECORD, "true");
6870

6971
ScalyrSinkConnectorConfig connectorConfig = new ScalyrSinkConnectorConfig(config);
7072
assertEquals(TEST_SCALYR_SERVER, connectorConfig.getString(SCALYR_SERVER_CONFIG));
@@ -77,6 +79,7 @@ public void testConfig() {
7779
assertEquals(Integer.valueOf(TEST_BATCH_SEND_SIZE), connectorConfig.getInt(BATCH_SEND_SIZE_BYTES_CONFIG));
7880
assertEquals(Integer.valueOf(TEST_BATCH_SEND_WAIT_MS), connectorConfig.getInt(BATCH_SEND_WAIT_MS_CONFIG));
7981
assertEquals(TestValues.CUSTOM_APP_EVENT_MAPPING_JSON, connectorConfig.getString(CUSTOM_APP_EVENT_MAPPING_CONFIG));
82+
assertTrue(connectorConfig.getBoolean(SEND_ENTIRE_RECORD));
8083
}
8184

8285
/**
@@ -97,6 +100,7 @@ public void testConfigDefaults() {
97100
assertEquals(DEFAULT_BATCH_SEND_SIZE_BYTES, connectorConfig.getInt(BATCH_SEND_SIZE_BYTES_CONFIG).intValue());
98101
assertEquals(DEFAULT_BATCH_SEND_WAIT_MS, connectorConfig.getInt(BATCH_SEND_WAIT_MS_CONFIG).intValue());
99102
assertNull(connectorConfig.getString(CUSTOM_APP_EVENT_MAPPING_CONFIG));
103+
assertFalse(connectorConfig.getBoolean(SEND_ENTIRE_RECORD));
100104
}
101105

102106
/**
@@ -175,6 +179,10 @@ public void testInvalidConfigValues() {
175179
config.put(ADD_EVENTS_RETRY_DELAY_MS_CONFIG, "1");
176180
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
177181
config.remove(ADD_EVENTS_RETRY_DELAY_MS_CONFIG);
182+
183+
config.put(SEND_ENTIRE_RECORD, "invalidBoolean");
184+
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
185+
config.remove(SEND_ENTIRE_RECORD);
178186
}
179187

180188
/**
@@ -254,7 +262,7 @@ public void testConfigDef() {
254262
final ImmutableSet<String> configs = ImmutableSet.of(SCALYR_SERVER_CONFIG, SCALYR_API_CONFIG,
255263
COMPRESSION_TYPE_CONFIG, COMPRESSION_LEVEL_CONFIG, ADD_EVENTS_TIMEOUT_MS_CONFIG,
256264
ADD_EVENTS_RETRY_DELAY_MS_CONFIG, EVENT_ENRICHMENT_CONFIG,
257-
BATCH_SEND_SIZE_BYTES_CONFIG, BATCH_SEND_WAIT_MS_CONFIG, CUSTOM_APP_EVENT_MAPPING_CONFIG);
265+
BATCH_SEND_SIZE_BYTES_CONFIG, BATCH_SEND_WAIT_MS_CONFIG, CUSTOM_APP_EVENT_MAPPING_CONFIG, SEND_ENTIRE_RECORD);
258266

259267
ConfigDef configDef = ScalyrSinkConnectorConfig.configDef();
260268
assertEquals(configs.size(), configDef.names().size());

src/test/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void testTaskConfigs() {
7979
scalyrSinkConnector.start(config);
8080
List<Map<String, String>> taskConfigs = scalyrSinkConnector.taskConfigs(numTaskConfigs);
8181
assertEquals(numTaskConfigs, taskConfigs.size());
82-
taskConfigs.forEach(taskConfig -> TestUtils.verifyMap(config, taskConfig));
82+
taskConfigs.forEach(taskConfig -> assertEquals(config, taskConfig));
8383
}
8484

8585
/**

src/test/java/com/scalyr/integrations/kafka/ScalyrSinkTaskTest.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,14 @@
4343
import java.util.Map;
4444
import java.util.stream.Collectors;
4545
import java.util.stream.IntStream;
46+
import java.util.stream.Stream;
4647

4748
import static com.scalyr.integrations.kafka.AddEventsClientTest.EVENTS;
4849
import static com.scalyr.integrations.kafka.TestUtils.fails;
4950
import static org.junit.Assert.assertEquals;
5051
import static org.junit.Assert.assertNotNull;
5152
import static org.junit.Assert.assertTrue;
53+
import static org.junit.Assume.assumeFalse;
5254

5355
/**
5456
* Test ScalyrSinkTask
@@ -58,6 +60,7 @@ public class ScalyrSinkTaskTest {
5860

5961
private ScalyrSinkTask scalyrSinkTask;
6062
private final TriFunction<Integer, Integer, Integer, Object> recordValue;
63+
private final boolean sendEntireRecord;
6164

6265
private static final String topic = "test-topic";
6366
private static final int partition = 0;
@@ -67,15 +70,19 @@ public class ScalyrSinkTaskTest {
6770
private static final int ADD_EVENTS_OVERHEAD_BYTES = (int)(TestValues.MIN_BATCH_SEND_SIZE_BYTES * 0.2); // 20% overhead
6871

6972
/**
70-
* Create test parameters for each SinkRecordValueCreator type.
73+
* Create test parameters for each SinkRecordValueCreator type and send_entire_record combination.
74+
* Object[] = {TriFunction<Integer, Integer, Integer, Object> recordValue, send_entire_record boolean}
7175
*/
7276
@Parameterized.Parameters
7377
public static Collection<Object[]> testParams() {
74-
return TestUtils.multipleRecordValuesTestParams();
78+
return TestUtils.multipleRecordValuesTestParams().stream()
79+
.flatMap(recordValue -> Stream.of(new Object[] {recordValue[0], false}, new Object[] {recordValue[0], true}))
80+
.collect(Collectors.toList());
7581
}
7682

77-
public ScalyrSinkTaskTest(TriFunction<Integer, Integer, Integer, Object> recordValue) {
83+
public ScalyrSinkTaskTest(TriFunction<Integer, Integer, Integer, Object> recordValue, boolean sendEntireRecord) {
7884
this.recordValue = recordValue;
85+
this.sendEntireRecord = sendEntireRecord;
7986
// Print test params
8087
Object data = recordValue.apply(1, 1, 1);
8188
System.out.println("Executing test with " + (data instanceof Struct ? "schema" : "schemaless") + " recordValue: " + data);
@@ -119,7 +126,7 @@ public void testPut() throws Exception {
119126
*/
120127
private void putAndVerifyRecords(MockWebServer server) throws InterruptedException, java.io.IOException {
121128
// Add multiple server responses for `put` batch exceeds `batch_send_size_bytes`
122-
IntStream.range(0, 2).forEach(i ->
129+
IntStream.range(0, 4).forEach(i ->
123130
server.enqueue(new MockResponse().setResponseCode(200).setBody(TestValues.ADD_EVENTS_RESPONSE_SUCCESS)));
124131

125132
// put SinkRecords
@@ -141,7 +148,7 @@ private void verifyRecords(MockWebServer server, List<SinkRecord> records) throw
141148

142149
EventMapper eventMapper = new EventMapper(
143150
scalyrSinkTask.parseEnrichmentAttrs(new ScalyrSinkConnectorConfig(createConfig()).getList(ScalyrSinkConnectorConfig.EVENT_ENRICHMENT_CONFIG)),
144-
CustomAppEventMapping.parseCustomAppEventMappingConfig(TestValues.CUSTOM_APP_EVENT_MAPPING_JSON));
151+
CustomAppEventMapping.parseCustomAppEventMappingConfig(TestValues.CUSTOM_APP_EVENT_MAPPING_JSON), sendEntireRecord);
145152

146153
List<Event> origEvents = records.stream()
147154
.map(eventMapper::createEvent)
@@ -190,6 +197,7 @@ public void testPutFlushCycles() throws Exception {
190197
*/
191198
@Test
192199
public void testPutErrorHandling() {
200+
assumeFalse(sendEntireRecord);
193201
final int numRequests = 3;
194202
int requestCount = 0;
195203
TestUtils.MockSleep mockSleep = new TestUtils.MockSleep();
@@ -228,6 +236,7 @@ public void testPutErrorHandling() {
228236
*/
229237
@Test
230238
public void testIgnoreInputTooLongError() throws Exception {
239+
assumeFalse(sendEntireRecord);
231240
TestUtils.MockSleep mockSleep = new TestUtils.MockSleep();
232241
this.scalyrSinkTask = new ScalyrSinkTask(mockSleep.sleep);
233242
MockWebServer server = new MockWebServer();
@@ -258,6 +267,7 @@ public void testIgnoreInputTooLongError() throws Exception {
258267
*/
259268
@Test
260269
public void testPutEventBufferingSendSize() throws Exception {
270+
assumeFalse(sendEntireRecord);
261271
final int numRecords = (TestValues.MIN_BATCH_EVENTS / 2) + 1;
262272
ScalyrUtil.setCustomTimeNs(0); // Set custom time and never advance so batchSendWaitMs will not be met
263273

@@ -366,6 +376,7 @@ public void testFlushSendsEventsInBuffer() throws Exception {
366376
*/
367377
@Test
368378
public void testSinglePutExceedsBatchBytesSize() throws Exception {
379+
assumeFalse(sendEntireRecord);
369380
final int numExpectedSends = 4;
370381

371382
MockWebServer server = new MockWebServer();
@@ -394,6 +405,7 @@ public void testSinglePutExceedsBatchBytesSize() throws Exception {
394405
*/
395406
@Test
396407
public void testLargeMsgMixedWithSmallMsgs() throws Exception {
408+
assumeFalse(sendEntireRecord);
397409
final int numExpectedSends = 3;
398410

399411
MockWebServer server = new MockWebServer();
@@ -526,6 +538,7 @@ private Map<String, String> createConfig() {
526538
ScalyrSinkConnectorConfig.SCALYR_API_CONFIG, TestValues.API_KEY_VALUE,
527539
ScalyrSinkConnectorConfig.COMPRESSION_TYPE_CONFIG, CompressorFactory.NONE,
528540
ScalyrSinkConnectorConfig.EVENT_ENRICHMENT_CONFIG, TestValues.ENRICHMENT_VALUE,
529-
ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON);
541+
ScalyrSinkConnectorConfig.CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_EVENT_MAPPING_JSON,
542+
ScalyrSinkConnectorConfig.SEND_ENTIRE_RECORD, Boolean.toString(sendEntireRecord));
530543
}
531544
}

0 commit comments

Comments
 (0)