diff --git a/README.md b/README.md index 36404a4..489c626 100644 --- a/README.md +++ b/README.md @@ -13,14 +13,24 @@ If you require features not yet available in this plugin (including client versi # Pulsar Input Configuration Options This plugin supports these configuration options. -| Settings | Input type | Required | -| ------------- |:-------------:| -----:| -| codec | string, one of ["plain","json"] | No | -| topics | array | Yes | -| subscriptionName | string | Yes | -| consumerName | string | No | -| subscriptionType | string, one of["Shared","Exclusive","Failover","Key_shared"] | No | -| subscriptionInitialPosition| string, one of["Latest","Earliest"] | No | +| Settings | Input type | Required | +|----------------------------------|:------------------------------------------------------------:|---------:| +| codec | string, one of ["plain","json"] | No | +| topics | array | Yes | +| subscriptionName | string | Yes | +| consumerName | string | No | +| subscriptionType | string, one of["Shared","Exclusive","Failover","Key_shared"] | No | +| subscriptionInitialPosition | string, one of["Latest","Earliest"] | No | +| enable_tls | boolean, one of [true, false]. default is false | No | +| tls_trust_store_path | string, required if enable_tls is set to true | No | +| tls_trust_store_password | string, default is empty | No | +| tls_key_store_path | string, default is empty | No | +| tls_key_store_password | string, default is empty | No | +| enable_tls_hostname_verification | boolean, one of [true, false]. default is false | No | +| protocols | array, ciphers list. default is TLSv1.2 | No | +| allow_tls_insecure_connection | boolean, one of [true, false].default is false | No | +| auth_plugin_class_name | string | No | +| ciphers | array, ciphers list | No | # Example @@ -50,5 +60,5 @@ https://github.com/streamnative/logstash-input-pulsar/releases 2. Install this plugin using logstash preoffline command. ``` -bin/logstash-plugin install file://{PATH_TO}/logstash-input-pulsar-2.7.1.zip +bin/logstash-plugin install file://{PATH_TO}/logstash-input-pulsar-2.10.0.0.zip ``` diff --git a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java index 6f5b544..bfaaf16 100644 --- a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java @@ -11,13 +11,16 @@ import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls; import java.lang.reflect.Type; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -50,6 +53,8 @@ public class Pulsar implements Input { // base config private Configuration config; + private Boolean isBatchReceived; + // consumer config list // codec, plain, json @@ -60,7 +65,11 @@ public class Pulsar implements Input { // topic Names, array private static final PluginConfigSpec> CONFIG_TOPICS = - PluginConfigSpec.arraySetting("topics", null, false, true); + PluginConfigSpec.arraySetting("topics", new ArrayList<>(), false, false); + + // topic Pattern, array + private static final PluginConfigSpec CONFIG_TOPICS_PATTERN = + PluginConfigSpec.stringSetting("topics_pattern", null, false, false); // subscription name private static final PluginConfigSpec CONFIG_SUBSCRIPTION_NAME = @@ -78,12 +87,30 @@ public class Pulsar implements Input { private static final PluginConfigSpec CONFIG_SUBSCRIPTION_INITIAL_POSITION = PluginConfigSpec.stringSetting("subscriptionInitialPosition", "Latest"); + private static final PluginConfigSpec CONFIG_IF_OUTPUT_PULSAR_ERROR = + PluginConfigSpec.booleanSetting("block_if_output_pulsar_error", false); + // is Batch Received + private static final PluginConfigSpec CONFIG_IS_BATCH_RECEIVED = + PluginConfigSpec.booleanSetting("isBatchReceived", false); + + // batch Receive Max Num + private static final PluginConfigSpec CONFIG_BATCH_RECEIVE_MAX_NUM = + PluginConfigSpec.numSetting("batchReceiveMaxNum", 50); + + // batch Receive Max Bytes Size + private static final PluginConfigSpec CONFIG_BATCH_RECEIVE_MAX_BYTES_SIZE = + PluginConfigSpec.numSetting("batchReceiveMaxBytesSize", 10 * 1024); + + // batch Receive Timeout, default 100 ms + private static final PluginConfigSpec CONFIG_BATCH_RECEIVE_TIMEOUT_MS = + PluginConfigSpec.numSetting("batchReceiveTimeoutMs", 100); + // TODO: support decorate_events => true & consumer_threads => 2 & metadata // TLS Config private static final String authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls"; - private static final List protocols = Arrays.asList("TLSv1.2"); - private static final List ciphers = Arrays.asList( + private static final List protocols = Arrays.asList("TLSv1.2"); + private static final List ciphers = Arrays.asList( "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", @@ -91,38 +118,52 @@ public class Pulsar implements Input { ); private static final PluginConfigSpec CONFIG_ENABLE_TLS = - PluginConfigSpec.booleanSetting("enable_tls",false); + PluginConfigSpec.booleanSetting("enable_tls", false); private static final PluginConfigSpec CONFIG_ALLOW_TLS_INSECURE_CONNECTION = - PluginConfigSpec.booleanSetting("allow_tls_insecure_connection",false); + PluginConfigSpec.booleanSetting("allow_tls_insecure_connection", false); private static final PluginConfigSpec CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION = - PluginConfigSpec.booleanSetting("enable_tls_hostname_verification",true); + PluginConfigSpec.booleanSetting("enable_tls_hostname_verification", false); private static final PluginConfigSpec CONFIG_TLS_TRUST_STORE_PATH = - PluginConfigSpec.stringSetting("tls_trust_store_path",""); + PluginConfigSpec.stringSetting("tls_trust_store_path", ""); private static final PluginConfigSpec CONFIG_TLS_TRUST_STORE_PASSWORD = - PluginConfigSpec.stringSetting("tls_trust_store_password",""); + PluginConfigSpec.stringSetting("tls_trust_store_password", ""); + + private static final PluginConfigSpec CONFIG_TLS_KEY_STORE_PATH = + PluginConfigSpec.stringSetting("tls_key_store_path", ""); + + private static final PluginConfigSpec CONFIG_TLS_KEY_STORE_PASSWORD = + PluginConfigSpec.stringSetting("tls_key_store_password", ""); private static final PluginConfigSpec CONFIG_AUTH_PLUGIN_CLASS_NAME = - PluginConfigSpec.stringSetting("auth_plugin_class_name",authPluginClassName); + PluginConfigSpec.stringSetting("auth_plugin_class_name", authPluginClassName); private static final PluginConfigSpec> CONFIG_CIPHERS = - PluginConfigSpec.arraySetting("ciphers", Collections.singletonList(ciphers), false, false); + PluginConfigSpec.arraySetting("ciphers", ciphers, false, false); private static final PluginConfigSpec> CONFIG_PROTOCOLS = - PluginConfigSpec.arraySetting("protocols", Collections.singletonList(protocols), false, false); + PluginConfigSpec.arraySetting("protocols", protocols, false, false); public Pulsar(String id, Configuration config, Context context) { // constructors should validate configuration options this.id = id; this.config = config; + this.isBatchReceived = this.config.get(CONFIG_IS_BATCH_RECEIVED); } private void createConsumer() throws PulsarClientException { try { String serviceUrl = config.get(CONFIG_SERVICE_URL); + boolean enableTls = config.get(CONFIG_ENABLE_TLS); + if (enableTls) { + // pulsar TLS + client = buildTlsPulsarClient(serviceUrl); + } else { + client = buildNotTlsPulsarClient(serviceUrl); + } String codec = config.get(CONFIG_CODEC); List topics = config.get(CONFIG_TOPICS).stream().map(Object::toString).collect(Collectors.toList()); @@ -130,52 +171,41 @@ private void createConsumer() throws PulsarClientException { String consumerName = config.get(CONFIG_CONSUMER_NAME); String subscriptionType = config.get(CONFIG_SUBSCRIPTION_TYPE); String subscriptionInitialPosition = config.get(CONFIG_SUBSCRIPTION_INITIAL_POSITION); - boolean enableTls = config.get(CONFIG_ENABLE_TLS); - if (enableTls) { - // pulsar TLS - Boolean allowTlsInsecureConnection = config.get(CONFIG_ALLOW_TLS_INSECURE_CONNECTION); - Boolean enableTlsHostnameVerification = config.get(CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION); - String tlsTrustStorePath = config.get(CONFIG_TLS_TRUST_STORE_PATH); - Map authMap = new HashMap<>(); - authMap.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, "JKS"); - authMap.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, tlsTrustStorePath); - authMap.put(AuthenticationKeyStoreTls.KEYSTORE_PW, config.get(CONFIG_TLS_TRUST_STORE_PASSWORD)); - - Set cipherSet = new HashSet<>(); - Optional.ofNullable(config.get(CONFIG_CIPHERS)).ifPresent( - cipherList -> cipherList.forEach(cipher -> cipherSet.add(String.valueOf(cipher)))); - - Set protocolSet = new HashSet<>(); - Optional.ofNullable(config.get(CONFIG_PROTOCOLS)).ifPresent( - protocolList -> protocolList.forEach(protocol -> protocolSet.add(String.valueOf(protocol)))); - - client = PulsarClient.builder() - .serviceUrl(serviceUrl) - .tlsCiphers(cipherSet) - .tlsProtocols(protocolSet) - .allowTlsInsecureConnection(allowTlsInsecureConnection) - .enableTlsHostnameVerification(enableTlsHostnameVerification) - .tlsTrustStorePath(tlsTrustStorePath) - .tlsTrustStorePassword(config.get(CONFIG_TLS_TRUST_STORE_PASSWORD)) - .authentication(config.get(CONFIG_AUTH_PLUGIN_CLASS_NAME),authMap) - .build(); - } else { - client = PulsarClient.builder() - .serviceUrl(serviceUrl) - .build(); - } + String topicsPattern = config.get(CONFIG_TOPICS_PATTERN); + int batchReceiveMaxNum = config.get(CONFIG_BATCH_RECEIVE_MAX_NUM).intValue(); + int batchReceiveMaxBytesSize = config.get(CONFIG_BATCH_RECEIVE_MAX_BYTES_SIZE).intValue(); + int batchReceiveTimeoutMs = config.get(CONFIG_BATCH_RECEIVE_TIMEOUT_MS).intValue(); // Create a consumer ConsumerBuilder consumerBuilder = client.newConsumer() - .topics(topics) .subscriptionName(subscriptionName) .subscriptionType(getSubscriptionType()) .subscriptionInitialPosition(getSubscriptionInitialPosition()); + if (topicsPattern != null) { + logger.info("topics pattern : {}", topicsPattern); + consumerBuilder.topicsPattern(topicsPattern); + } else { + logger.info("topics : {}", topics); + consumerBuilder.topics(topics); + } if (consumerName != null) { consumerBuilder.consumerName(consumerName); } + if (isBatchReceived) { + // create batchReceivePolicy + BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder() + .maxNumMessages(batchReceiveMaxNum) + .maxNumBytes(batchReceiveMaxBytesSize) + .timeout(batchReceiveTimeoutMs, TimeUnit.MILLISECONDS) + .build(); + + consumerBuilder.batchReceivePolicy(batchReceivePolicy); + logger.info("batchReceiveMaxNum is {}, batchReceiveMaxBytesSize is {}, batchReceiveTimeoutMs is {}", + batchReceiveMaxNum, batchReceiveMaxBytesSize, batchReceiveTimeoutMs); + } + pulsarConsumer = consumerBuilder.subscribe(); - logger.info("Create subscription {} on topics {} with codec {}, consumer name is {},subscription Type is {},subscriptionInitialPosition is {}", subscriptionName, topics, codec , consumerName, subscriptionType, subscriptionInitialPosition); + logger.info("Create subscription {} on topics {} with codec {}, consumer name is {},subscription Type is {},subscriptionInitialPosition is {}", subscriptionName, topics, codec, consumerName, subscriptionType, subscriptionInitialPosition); } catch (PulsarClientException e) { logger.error("pulsar client exception ", e); @@ -183,6 +213,47 @@ private void createConsumer() throws PulsarClientException { } } + private PulsarClient buildNotTlsPulsarClient(String serviceUrl) throws PulsarClientException { + return PulsarClient.builder() + .serviceUrl(serviceUrl) + .build(); + } + + private PulsarClient buildTlsPulsarClient(String serviceUrl) throws PulsarClientException { + Boolean allowTlsInsecureConnection = config.get(CONFIG_ALLOW_TLS_INSECURE_CONNECTION); + Boolean enableTlsHostnameVerification = config.get(CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION); + String tlsTrustStorePath = config.get(CONFIG_TLS_TRUST_STORE_PATH); + String tlsTrustStorePassword = config.get(CONFIG_TLS_TRUST_STORE_PASSWORD); + String tlsKeyStorePath = config.get(CONFIG_TLS_KEY_STORE_PATH); + String tlsKeyStorePassword = config.get(CONFIG_TLS_KEY_STORE_PASSWORD); + String pluginClassName = config.get(CONFIG_AUTH_PLUGIN_CLASS_NAME); + + Map authMap = new HashMap<>(); + authMap.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, "JKS"); + authMap.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, tlsKeyStorePath); + authMap.put(AuthenticationKeyStoreTls.KEYSTORE_PW, tlsKeyStorePassword); + + Set cipherSet = new HashSet<>(); + Optional.ofNullable(config.get(CONFIG_CIPHERS)).ifPresent( + cipherList -> cipherList.forEach(cipher -> cipherSet.add(String.valueOf(cipher)))); + + Set protocolSet = new HashSet<>(); + Optional.ofNullable(config.get(CONFIG_PROTOCOLS)).ifPresent( + protocolList -> protocolList.forEach(protocol -> protocolSet.add(String.valueOf(protocol)))); + + return PulsarClient.builder() + .serviceUrl(serviceUrl) + .tlsCiphers(cipherSet) + .tlsProtocols(protocolSet) + .allowTlsInsecureConnection(allowTlsInsecureConnection) + .enableTlsHostnameVerification(enableTlsHostnameVerification) + .tlsTrustStorePath(tlsTrustStorePath) + .tlsTrustStorePassword(tlsTrustStorePassword) + .authentication(pluginClassName, authMap) + .useKeyStoreTls(true) + .build(); + } + private SubscriptionInitialPosition getSubscriptionInitialPosition() { SubscriptionInitialPosition position; switch (config.get(CONFIG_SUBSCRIPTION_INITIAL_POSITION)) { @@ -232,72 +303,121 @@ private void closePulsarConsumer() { } } - @Override public void start(Consumer> consumer) { - - // The start method should push Map instances to the supplied QueueWriter - // instance. Those will be converted to Event instances later in the Logstash event - // processing pipeline. - // - // Inputs that operate on unbounded streams of data or that poll indefinitely for new - // events should loop indefinitely until they receive a stop request. Inputs that produce - // a finite sequence of events should loop until that sequence is exhausted or until they - // receive a stop request, whichever comes first. - try { + Boolean blockIfOutputPulsarError = config.get(CONFIG_IF_OUTPUT_PULSAR_ERROR); createConsumer(); - Message message = null; - String msgString = null; - Gson gson = new Gson(); - Type gsonType = new TypeToken(){}.getType(); - while (!stopped) { - try { - // Block and wait until a single message is available - message = pulsarConsumer.receive(1000, TimeUnit.MILLISECONDS); - if(message == null){ - continue; - } - msgString = new String(message.getData()); - - if (config.get(CONFIG_CODEC).equals(CODEC_JSON)) { - try { - Map map = gson.fromJson(msgString, gsonType); - consumer.accept(map); - } catch (Exception e) { - // json parse exception - // treat it as codec plain - logger.error("json parse exception ", e); - logger.error("message key is {}, set logging level to debug if you'd like to see message", message.getKey()); - logger.debug("message content is {}", msgString); - logger.error("default codec plain will be used "); - - consumer.accept(Collections.singletonMap("message", msgString)); - } - } else { - // default codec: plain - consumer.accept(Collections.singletonMap("message", msgString)); - } + if (!isBatchReceived) { + logger.info("start with single receive"); + startWithSingleReceive(consumer, blockIfOutputPulsarError); + } else { + logger.info("start with batch receive"); + startWithBatchReceive(consumer, blockIfOutputPulsarError); + } + } catch (PulsarClientException e) { + logger.error("create pulsar client errors: {}", e.getMessage()); + } + } + public void startWithSingleReceive(Consumer> consumer, Boolean blockIfOutputPulsarError) { + Message message = null; + String msgString = null; + Gson gson = new Gson(); + Type gsonType = new TypeToken() { + }.getType(); + while (!stopped) { + try { + // Block and wait until a single message is available + message = pulsarConsumer.receive(1000, TimeUnit.MILLISECONDS); + if (message == null) { + continue; + } + msgString = new String(message.getData()); - // Acknowledge the message so that it can be - // deleted by the message broker + if (config.get(CONFIG_CODEC).equals(CODEC_JSON)) { + processMessage(consumer, msgString, message.getKey(), gson, gsonType); + } else { + // default codec: plain + consumer.accept(Collections.singletonMap("message", msgString)); + } + + // Acknowledge the message so that it can be + // deleted by the message broker + if (blockIfOutputPulsarError && Boolean.parseBoolean(System.getProperty("output.errors", String.valueOf(false)))) { + pulsarConsumer.negativeAcknowledge(message); + } else { pulsarConsumer.acknowledge(message); - } catch (Exception e) { + } + } catch (Exception e) { + // Message failed to process, redeliver later + logger.error("consume exception ", e); + if (message != null) { + pulsarConsumer.negativeAcknowledge(message); + logger.error("message is {}:{}", message.getKey(), msgString); + } + } + } + } - // Message failed to process, redeliver later - logger.error("consume exception ", e); - if (message != null) { - pulsarConsumer.negativeAcknowledge(message); - logger.error("message is {}:{}", message.getKey(), msgString); + public void startWithBatchReceive(Consumer> consumer, Boolean blockIfOutputPulsarError) { + Messages messages = null; + String msgString = null; + Gson gson = new Gson(); + Type gsonType = new TypeToken() { + }.getType(); + while (!stopped) { + try { + messages = pulsarConsumer.batchReceive(); + if (messages == null) { + continue; + } + logger.debug("messages had been received, size: {}", messages.size()); + + if (config.get(CONFIG_CODEC).equals(CODEC_JSON)) { + for (Message msg : messages) { + msgString = new String(msg.getData()); + processMessage(consumer, msgString, msg.getKey(), gson, gsonType); + } + } else { + // default codec: plain + for (Message msg : messages) { + msgString = new String(msg.getData()); + consumer.accept(Collections.singletonMap("message", msgString)); } } + // Acknowledge the message so that it can be + // deleted by the message broker + if (blockIfOutputPulsarError && Boolean.parseBoolean(System.getProperty("output.errors", String.valueOf(false)))) { + pulsarConsumer.negativeAcknowledge(messages); + } else { + pulsarConsumer.acknowledge(messages); + } + } catch (Exception e) { + // Message failed to process, redeliver later + logger.error("consume exception ", e); + if (messages != null) { + pulsarConsumer.negativeAcknowledge(messages); + logger.error("messages failed to process, size: {}", messages.size()); + } } - } catch (PulsarClientException e) { - } finally { - stopped = true; - done.countDown(); + } + } + + private void processMessage(Consumer> consumer, String msgString, String msgKey, Gson gson, Type gsonType) { + try { + Map map = gson.fromJson(msgString, gsonType); + consumer.accept(map); + } catch (Exception e) { + // json parse exception + // treat it as codec plain + logger.error("json parse exception ", e); + logger.error("message key is {}, set logging level to debug if you'd like to see message", msgKey); + logger.debug("message content is {}", msgString); + logger.error("default codec plain will be used "); + + consumer.accept(Collections.singletonMap("message", msgString)); } } @@ -322,7 +442,27 @@ public Collection> configSchema() { CONFIG_SUBSCRIPTION_TYPE, CONFIG_SUBSCRIPTION_INITIAL_POSITION, CONFIG_CONSUMER_NAME, - CONFIG_CODEC + CONFIG_CODEC, + CONFIG_TOPICS_PATTERN, + CONFIG_IF_OUTPUT_PULSAR_ERROR, + + // Pulsar TLS Config + CONFIG_ENABLE_TLS, + CONFIG_TLS_TRUST_STORE_PATH, + CONFIG_TLS_TRUST_STORE_PASSWORD, + CONFIG_TLS_KEY_STORE_PATH, + CONFIG_TLS_KEY_STORE_PASSWORD, + CONFIG_PROTOCOLS, + CONFIG_ALLOW_TLS_INSECURE_CONNECTION, + CONFIG_AUTH_PLUGIN_CLASS_NAME, + CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION, + CONFIG_CIPHERS, + + // Batch Receive + CONFIG_IS_BATCH_RECEIVED, + CONFIG_BATCH_RECEIVE_MAX_NUM, + CONFIG_BATCH_RECEIVE_MAX_BYTES_SIZE, + CONFIG_BATCH_RECEIVE_TIMEOUT_MS ); }