Skip to content

Commit 98b874b

Browse files
author
Oliver Hsu
authored
CT-278 Support wildcards and matchAll for custom app event mapping (#47)
* Short-circuit `sendEvents` if nothing to send * Log if no event mappers match the records * Add Regex support for CustomAppEventMapping * Implement `matchAll` for CustomAppEventMapping
1 parent 6a0f2b7 commit 98b874b

File tree

11 files changed

+178
-51
lines changed

11 files changed

+178
-51
lines changed

config/connect-scalyr-sink-custom-app.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@
88
"topics": "logs",
99
"api_key": "<log write api key from https://app.scalyr.com/keys>",
1010
"event_enrichment": "key1=value1,key2=value2",
11-
"custom_app_event_mapping":"[{\"matcher\": {\"attribute\": \"app.name\", \"value\": \"myapp\"}, \"eventMapping\": {\"message\": \"message\", \"logfile\": \"log.path\", \"serverHost\": \"host.hostname\", \"parser\": \"fields.parser\", \"version\": \"app.version\", \"appField1\":\"appField1\", \"appField2\":\"nested.appField2\"}}]"
11+
"custom_app_event_mapping":"[{\"matcher\": {\"attribute\": \"app.name\", \"value\": \"myApp.*\"}, \"eventMapping\": {\"message\": \"message\", \"logfile\": \"log.path\", \"serverHost\": \"host.hostname\", \"parser\": \"fields.parser\", \"version\": \"app.version\", \"appField1\":\"appField1\", \"appField2\":\"nested.appField2\"}}, {\"matcher\": {\"matchAll\": \"true\"}, \"eventMapping\": {\"message\": \"message\", \"logfile\": \"log.path\", \"serverHost\": \"host.hostname\", \"parser\": \"fields.parser\"}}]"
1212
}
1313
}

config/connect-scalyr-sink.properties

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,13 @@ api_key=<log write api key from https://app.scalyr.com/keys>
1616
# Optional enrichment attributes in key=value format which are added to the Scalyr event attributes
1717
#event_enrichment=key1=value1,key2=value2
1818

19-
# Custom application event mapping - only needed for custom applications
20-
#custom_app_event_mapping=[{"matcher": {"attribute": "app.name", "value": "myapp"}, "eventMapping": {"message": "message", "logfile": "log.path", "serverHost": "host.hostname", "parser": "fields.parser", "version": "app.version", "appField1", "appField1", "appField2", "nested.appField2"}}]
19+
# Custom application event mapping - only needed for custom applications. Regex is supported for the `matcher.value`.
20+
#custom_app_event_mapping=[{"matcher": {"attribute": "app.name", "value": "myapp.*"}, "eventMapping": {"message": "message", "logfile": "log.path", "serverHost": "host.hostname", "parser": "fields.parser", "version": "app.version", "appField1", "appField1", "appField2", "nested.appField2"}}]
21+
22+
# Custom application event mapping with match all and send_entire_record=true
23+
# logfile, serverHost, and parser event mappings are still recommended, since these are special fields.
24+
#send_entire_record=true
25+
#custom_app_event_mapping=[{"matcher": {"matchAll": true}, "eventMapping": {"logfile": "log.path", "serverHost": "host.hostname", "parser": "fields.parser"}}]
2126

2227
# Advanced configuration options - these should not be modified in most cases
2328
# Compression type to use for sending log events. Valid values are: `deflate`, `none`.

src/main/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfig.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,12 @@ public class ScalyrSinkConnectorConfig extends AbstractConfig {
6565
+ BATCH_SEND_SIZE_BYTES_CONFIG + " may not be reached for longer periods of time.";
6666
public static final String CUSTOM_APP_EVENT_MAPPING_CONFIG = "custom_app_event_mapping";
6767
private static final String CUSTOM_APP_EVENT_MAPPING_DOC = "JSON config describing how to map custom application nested Kafka messages to Scalyr events." +
68-
" Multiple custom application event mappings can be specified in a JSON list. Example config JSON:\n"
69-
+ "[{\"matcher\": { \"attribute\": \"app.name\", \"value\": \"customApp\"},\n" +
70-
" \"eventMapping\": { \"message\": \"message\", \"logfile\": \"log.path\", \"serverHost\": \"host.hostname\", \"parser\": \"fields.parser\", \"version\": \"app.version\"} }]";
68+
" Multiple custom application event mappings can be specified in a JSON list and are evaluated in the order specified in the list." +
69+
" The matcher.value supports regex to match the entire field value. Example config JSON:\n" +
70+
"[{\"matcher\": { \"attribute\": \"app.name\", \"value\": \"mpApp.*\"},\n" +
71+
" \"eventMapping\": { \"message\": \"message\", \"logfile\": \"log.path\", \"serverHost\": \"host.hostname\", \"parser\": \"fields.parser\", \"version\": \"app.version\"} },\n" +
72+
"{\"matcher\": { \"matchAll\": true},\n" +
73+
" \"eventMapping\": { \"message\": \"message\", \"logfile\": \"log.path\", \"serverHost\": \"host.hostname\", \"parser\": \"fields.parser\"} }]";
7174
public static final String SEND_ENTIRE_RECORD = "send_entire_record";
7275
private static final String SEND_ENTIRE_RECORD_DOC = "If true, send the entire Kafka Connect record value serialized to JSON as the message field.";
7376

@@ -138,10 +141,23 @@ public static ConfigDef configDef() {
138141
if (customAppEventMappings.isEmpty()) {
139142
throw new ConfigException("No custom event mappings are defined");
140143
}
144+
int numMatchAll = 0;
141145
for (CustomAppEventMapping mapping : customAppEventMappings) {
142-
if (mapping.getMatcherFields().isEmpty() || Strings.isNullOrEmpty(mapping.getMatcherValue())) {
146+
if (!mapping.isMatchAll() && (mapping.getMatcherFields().isEmpty() || Strings.isNullOrEmpty(mapping.getMatcherValue()))) {
143147
throw new ConfigException("Custom event application mapping matcher not defined");
144148
}
149+
if (mapping.isMatchAll()) {
150+
numMatchAll++;
151+
}
152+
}
153+
154+
// Only one matchAll is allowed to avoid ambiguity
155+
if (numMatchAll > 1) {
156+
throw new ConfigException("More than one match all custom event mapping matcher is defined");
157+
}
158+
// Custom event mappings are evaluated in order. Match all should be last so that other custom event mappings are evaluated.
159+
if (numMatchAll == 1 && !customAppEventMappings.get(customAppEventMappings.size() - 1).isMatchAll()) {
160+
throw new ConfigException("Custom event mapper with matchAll=true must be listed last. Otherwise, some mappers would be ignored.");
145161
}
146162
} catch (IOException | IllegalArgumentException e) {
147163
throw new ConfigException("Invalid custom application event mapping JSON", e);

src/main/java/com/scalyr/integrations/kafka/ScalyrSinkTask.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.scalyr.integrations.kafka;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.util.concurrent.RateLimiter;
2021
import com.scalyr.api.internal.ScalyrUtil;
2122
import com.scalyr.integrations.kafka.mapping.CustomAppEventMapping;
2223
import com.scalyr.integrations.kafka.mapping.EventMapper;
@@ -39,6 +40,7 @@
3940
import java.util.Objects;
4041
import java.util.concurrent.CompletableFuture;
4142
import java.util.concurrent.TimeUnit;
43+
import java.util.concurrent.atomic.AtomicInteger;
4244
import java.util.function.LongConsumer;
4345
import java.util.stream.Collectors;
4446

@@ -74,6 +76,8 @@ public class ScalyrSinkTask extends SinkTask {
7476

7577
private long lastBatchSendTimeMs;
7678

79+
private static final RateLimiter noRecordLogRateLimiter = RateLimiter.create(1.0/30); // 1 permit every 30 seconds to not log
80+
7781
/**
7882
* Default constructor called by Kafka Connect.
7983
*/
@@ -146,16 +150,23 @@ public void put(Collection<SinkRecord> records) {
146150
throw lastError;
147151
}
148152

153+
AtomicInteger recordCount = new AtomicInteger();
149154
records.stream()
150155
.map(eventMapper::createEvent)
151156
.filter(Objects::nonNull)
152157
.forEach(event -> {
158+
recordCount.incrementAndGet();
153159
if (eventBuffer.estimatedSerializedBytes() + event.estimatedSerializedBytes() >= batchSendSizeBytes) {
154160
sendEvents();
155161
}
156162
eventBuffer.addEvent(event);
157163
});
158164

165+
// No early return when recordCount == 0 to send events when batchSendWaitMs exceeded
166+
if (recordCount.get() == 0 && noRecordLogRateLimiter.tryAcquire()) {
167+
log.warn("No records matched an event mapper. Records not sent to Scalyr. Check the custom_app_event_mapping matcher configuration.");
168+
}
169+
159170
// Send events when batchSendWaitMs exceeded
160171
if (ScalyrUtil.currentTimeMillis() - lastBatchSendTimeMs >= batchSendWaitMs) {
161172
sendEvents();
@@ -166,6 +177,9 @@ public void put(Collection<SinkRecord> records) {
166177
* Call addEvents with EventBuffer
167178
*/
168179
private void sendEvents() {
180+
if (eventBuffer.length() == 0) {
181+
return;
182+
}
169183
PerfStats perfStats = new PerfStats(log);
170184
perfStats.recordEvents(eventBuffer);
171185
pendingAddEvents = addEventsClient.log(eventBuffer.getEvents(), pendingAddEvents).whenComplete(this::processResponse);
@@ -188,9 +202,7 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
188202
log.debug("Flushing data to Scalyr with the following offsets: {}", currentOffsets);
189203

190204
// Send pending events in buffer
191-
if (eventBuffer.length() > 0) {
192-
sendEvents();
193-
}
205+
sendEvents();
194206

195207
// Wait for in-flight addEvents requests to complete
196208
waitForRequestsToComplete();

src/main/java/com/scalyr/integrations/kafka/mapping/CustomAppEventMapping.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@
5454
5555
`matcher` defines an attribute to determine whether the event mapping applies to the message.
5656
The event mapping is only applied to messages where the `matcher.attribute` value matches the `matcher.value`.
57+
`matcher.value` can be a regex.
58+
59+
`matcher` can also be defined to match all events:
60+
```
61+
"matcher": {
62+
"matchAll": true
63+
}
64+
```
65+
When `matchAll=true`, `attribute` and `value` fields are not required and are ignored.
5766
5867
`eventMapping` defines the message fields that are mapped to Scalyr event attributes.
5968
The attribute is the Scalyr event key. The attribute value specifies the
@@ -78,23 +87,27 @@ public class CustomAppEventMapping {
7887
public static final String LOG_FILE = "logfile";
7988
public static final String PARSER = "parser";
8089
public static final String MESSAGE = "message";
90+
private static final String MATCHER_NOT_DEFINED_ERROR = "matcher not defined in custom application event mapping";
8191

8292
private static final Set<String> standardAttrs = ImmutableSet.of(SERVER_HOST, LOG_FILE, PARSER, MESSAGE);
8393

84-
public void setMatcher(Matcher matcher) {
94+
public CustomAppEventMapping setMatcher(Matcher matcher) {
8595
this.matcher = matcher;
96+
return this;
8697
}
8798

88-
public void setEventMapping(Map<String, String> eventMapping) {
99+
public CustomAppEventMapping setEventMapping(Map<String, String> eventMapping) {
89100
this.eventMapping = eventMapping;
101+
return this;
90102
}
91103

92-
public void setDelimiter(String fieldDelimiter) {
104+
public CustomAppEventMapping setDelimiter(String fieldDelimiter) {
93105
this.delimiter = fieldDelimiter;
106+
return this;
94107
}
95108

96109
public String getMatcherValue() {
97-
Preconditions.checkArgument(matcher != null, "matcher not defined in custom application event mapping");
110+
Preconditions.checkArgument(matcher != null, MATCHER_NOT_DEFINED_ERROR);
98111
return matcher.value;
99112
}
100113

@@ -103,10 +116,15 @@ public String getMatcherValue() {
103116
* using the delimiter as the field separator.
104117
*/
105118
public List<String> getMatcherFields() {
106-
Preconditions.checkArgument(matcher != null, "matcher not defined in custom application event mapping");
119+
Preconditions.checkArgument(matcher != null, MATCHER_NOT_DEFINED_ERROR);
107120
return splitAttrFields(matcher.attribute);
108121
}
109122

123+
public boolean isMatchAll() {
124+
Preconditions.checkArgument(matcher != null, MATCHER_NOT_DEFINED_ERROR);
125+
return matcher.matchAll;
126+
}
127+
110128
public List<String> getServerHostFields() {
111129
return getAttribute(SERVER_HOST);
112130
}
@@ -147,15 +165,23 @@ private List<String> splitAttrFields(String attrFields) {
147165
* Defines the field and value to determine whether the message matches the custom app event mapping.
148166
*/
149167
public static class Matcher {
168+
private boolean matchAll;
150169
private String attribute;
151170
private String value;
152171

153-
public void setAttribute(String attribute) {
172+
public Matcher setMatchAll(Boolean matchAll) {
173+
this.matchAll = matchAll;
174+
return this;
175+
}
176+
177+
public Matcher setAttribute(String attribute) {
154178
this.attribute = attribute;
179+
return this;
155180
}
156181

157-
public void setValue(String value) {
182+
public Matcher setValue(String value) {
158183
this.value = value;
184+
return this;
159185
}
160186

161187
@Override

src/main/java/com/scalyr/integrations/kafka/mapping/CustomAppMessageMapper.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.regex.Pattern;
2425

2526
/**
2627
* Maps custom app messages to Scalyr events using {@link CustomAppEventMapping} event mapping definition.
@@ -32,7 +33,8 @@ public class CustomAppMessageMapper implements MessageMapper {
3233
private final List<String> parserFields;
3334
private final List<String> matcherFields;
3435
private final Map<String, List<String>> additionalAttrsFields;
35-
private final String matcherValue;
36+
private final Pattern matcherRegex;
37+
private final boolean matchAll;
3638

3739
/**
3840
* CustomApplicationDefinition fields are memoized to instance variables
@@ -44,8 +46,9 @@ public CustomAppMessageMapper(CustomAppEventMapping customAppEventMapping) {
4446
serverHostFields = customAppEventMapping.getServerHostFields();
4547
parserFields = customAppEventMapping.getParserFields();
4648
matcherFields = customAppEventMapping.getMatcherFields();
49+
matchAll = customAppEventMapping.isMatchAll();
50+
matcherRegex = matchAll ? null : Pattern.compile(customAppEventMapping.getMatcherValue());
4751
additionalAttrsFields = customAppEventMapping.getAdditionalAttrFields();
48-
matcherValue = customAppEventMapping.getMatcherValue();
4952
}
5053

5154
@Override
@@ -78,7 +81,11 @@ public Map<String, Object> getAdditionalAttrs(SinkRecord record) {
7881

7982
@Override
8083
public boolean matches(SinkRecord record) {
84+
if (matchAll) {
85+
return true;
86+
}
87+
8188
Object fieldValue = FieldExtractor.getField(record.value(), matcherFields);
82-
return matcherValue.equals(fieldValue);
89+
return fieldValue == null ? false : matcherRegex.matcher(fieldValue.toString()).matches();
8390
}
8491
}

src/main/java/com/scalyr/integrations/kafka/mapping/EventMapper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
/**
3535
* Converts a SinkRecord to a Scalyr Event.
3636
* Determines which {@link MessageMapper} to use and maps the SinkRecord to an Event using the
37-
* MessageMapper.
37+
* MessageMapper. MessageMappers are evaluated in the following order:
38+
* 1. Scalyr supported message sources
39+
* 2. Custom app event mappers in the order they are defined in connector config.
3840
*/
3941
public class EventMapper {
4042
private static final Logger log = LoggerFactory.getLogger(EventMapper.class);

src/test/java/com/scalyr/integrations/kafka/ScalyrSinkConnectorConfigTest.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.scalyr.integrations.kafka;
1818

1919
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.google.common.collect.ImmutableList;
2021
import com.google.common.collect.ImmutableSet;
2122
import org.apache.kafka.common.config.ConfigDef;
2223
import org.apache.kafka.common.config.ConfigException;
@@ -211,22 +212,41 @@ public void testCustomAppEventMappingValidator() throws IOException {
211212
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
212213
config.remove(CUSTOM_APP_EVENT_MAPPING_CONFIG);
213214

215+
// match all - parsed matchAll ok even though no matcher fields are defined
216+
config.put(CUSTOM_APP_EVENT_MAPPING_CONFIG, TestValues.CUSTOM_APP_MATCH_ALL_EVENT_MAPPING_JSON);
217+
new ScalyrSinkConnectorConfig(config);
218+
config.remove(CUSTOM_APP_EVENT_MAPPING_CONFIG);
219+
214220
// No matcher field
215-
Map<String, Object> customAppEventMapping = TestValues.createCustomAppEventMapping(".");
221+
Map<String, Object> customAppEventMapping = TestValues.createCustomAppEventMapping(".", false);
216222
customAppEventMapping.remove("matcher");
217223
config.put(CUSTOM_APP_EVENT_MAPPING_CONFIG, objectMapper.writeValueAsString(Collections.singletonList(customAppEventMapping)));
218224
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
219225
config.remove(CUSTOM_APP_EVENT_MAPPING_CONFIG);
220226

227+
// multiple match all
228+
config.put(CUSTOM_APP_EVENT_MAPPING_CONFIG, objectMapper.writeValueAsString(ImmutableList.of(
229+
TestValues.createCustomAppEventMapping(".", true),
230+
TestValues.createCustomAppEventMapping(".", true))));
231+
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
232+
config.remove(CUSTOM_APP_EVENT_MAPPING_CONFIG);
233+
234+
// match all not last custom app event mapping
235+
config.put(CUSTOM_APP_EVENT_MAPPING_CONFIG, objectMapper.writeValueAsString(ImmutableList.of(
236+
TestValues.createCustomAppEventMapping(".", true),
237+
TestValues.createCustomAppEventMapping(".", false))));
238+
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
239+
config.remove(CUSTOM_APP_EVENT_MAPPING_CONFIG);
240+
221241
// No event mapping when SEND_ENTIRE_RECORD=false
222-
customAppEventMapping = TestValues.createCustomAppEventMapping(".");
242+
customAppEventMapping = TestValues.createCustomAppEventMapping(".", false);
223243
customAppEventMapping.remove("eventMapping");
224244
config.put(CUSTOM_APP_EVENT_MAPPING_CONFIG, objectMapper.writeValueAsString(Collections.singletonList(customAppEventMapping)));
225245
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
226246
config.remove(CUSTOM_APP_EVENT_MAPPING_CONFIG);
227247

228248
// No event mapping when SEND_ENTIRE_RECORD=true
229-
customAppEventMapping = TestValues.createCustomAppEventMapping(".");
249+
customAppEventMapping = TestValues.createCustomAppEventMapping(".", false);
230250
customAppEventMapping.remove("eventMapping");
231251
config.put(CUSTOM_APP_EVENT_MAPPING_CONFIG, objectMapper.writeValueAsString(Collections.singletonList(customAppEventMapping)));
232252
config.put(SEND_ENTIRE_RECORD, "true");
@@ -235,28 +255,28 @@ public void testCustomAppEventMappingValidator() throws IOException {
235255
config.remove(SEND_ENTIRE_RECORD);
236256

237257
// Empty matcher
238-
customAppEventMapping = TestValues.createCustomAppEventMapping(".");
258+
customAppEventMapping = TestValues.createCustomAppEventMapping(".", false);
239259
customAppEventMapping.put("matcher", Collections.EMPTY_MAP);
240260
config.put(CUSTOM_APP_EVENT_MAPPING_CONFIG, objectMapper.writeValueAsString(Collections.singletonList(customAppEventMapping)));
241261
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
242262
config.remove(CUSTOM_APP_EVENT_MAPPING_CONFIG);
243263

244264
// Empty event mapping
245-
customAppEventMapping = TestValues.createCustomAppEventMapping(".");
265+
customAppEventMapping = TestValues.createCustomAppEventMapping(".", false);
246266
customAppEventMapping.put("eventMapping", Collections.EMPTY_MAP);
247267
config.put(CUSTOM_APP_EVENT_MAPPING_CONFIG, objectMapper.writeValueAsString(Collections.singletonList(customAppEventMapping)));
248268
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
249269
config.remove(CUSTOM_APP_EVENT_MAPPING_CONFIG);
250270

251271
// No message or application attributes defined
252-
customAppEventMapping = TestValues.createCustomAppEventMapping(".");
272+
customAppEventMapping = TestValues.createCustomAppEventMapping(".", false);
253273
customAppEventMapping.put("eventMapping", TestUtils.makeMap("logfile", "/var/log/syslog", "serverHost", "testHost", "parser", "systemLog"));
254274
config.put(CUSTOM_APP_EVENT_MAPPING_CONFIG, objectMapper.writeValueAsString(Collections.singletonList(customAppEventMapping)));
255275
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);
256276
config.remove(CUSTOM_APP_EVENT_MAPPING_CONFIG);
257277

258278
// Invalid fields
259-
customAppEventMapping = TestValues.createCustomAppEventMapping(".");
279+
customAppEventMapping = TestValues.createCustomAppEventMapping(".", false);
260280
customAppEventMapping.put("extraField", "bad");
261281
config.put(CUSTOM_APP_EVENT_MAPPING_CONFIG, objectMapper.writeValueAsString(Collections.singletonList(customAppEventMapping)));
262282
fails(() -> new ScalyrSinkConnectorConfig(config), ConfigException.class);

0 commit comments

Comments
 (0)