From b21067f9d2a64b9e370ee384202653c9514a0a1f Mon Sep 17 00:00:00 2001 From: zzxp Date: Thu, 16 Jun 2022 22:04:18 +0800 Subject: [PATCH 1/9] [BugFix] params protocols and ciphers are repeated as collections --- README.md | 28 +++-- .../apache/pulsar/logstash/inputs/Pulsar.java | 113 +++++++++++------- 2 files changed, 91 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index 36404a4..489c626 100644 --- a/README.md +++ b/README.md @@ -13,14 +13,24 @@ If you require features not yet available in this plugin (including client versi # Pulsar Input Configuration Options This plugin supports these configuration options. -| Settings | Input type | Required | -| ------------- |:-------------:| -----:| -| codec | string, one of ["plain","json"] | No | -| topics | array | Yes | -| subscriptionName | string | Yes | -| consumerName | string | No | -| subscriptionType | string, one of["Shared","Exclusive","Failover","Key_shared"] | No | -| subscriptionInitialPosition| string, one of["Latest","Earliest"] | No | +| Settings | Input type | Required | +|----------------------------------|:------------------------------------------------------------:|---------:| +| codec | string, one of ["plain","json"] | No | +| topics | array | Yes | +| subscriptionName | string | Yes | +| consumerName | string | No | +| subscriptionType | string, one of["Shared","Exclusive","Failover","Key_shared"] | No | +| subscriptionInitialPosition | string, one of["Latest","Earliest"] | No | +| enable_tls | boolean, one of [true, false]. default is false | No | +| tls_trust_store_path | string, required if enable_tls is set to true | No | +| tls_trust_store_password | string, default is empty | No | +| tls_key_store_path | string, default is empty | No | +| tls_key_store_password | string, default is empty | No | +| enable_tls_hostname_verification | boolean, one of [true, false]. default is false | No | +| protocols | array, ciphers list. default is TLSv1.2 | No | +| allow_tls_insecure_connection | boolean, one of [true, false].default is false | No | +| auth_plugin_class_name | string | No | +| ciphers | array, ciphers list | No | # Example @@ -50,5 +60,5 @@ https://github.com/streamnative/logstash-input-pulsar/releases 2. Install this plugin using logstash preoffline command. ``` -bin/logstash-plugin install file://{PATH_TO}/logstash-input-pulsar-2.7.1.zip +bin/logstash-plugin install file://{PATH_TO}/logstash-input-pulsar-2.10.0.0.zip ``` diff --git a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java index 6f5b544..66ce6ef 100644 --- a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java @@ -82,8 +82,8 @@ public class Pulsar implements Input { // TLS Config private static final String authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls"; - private static final List protocols = Arrays.asList("TLSv1.2"); - private static final List ciphers = Arrays.asList( + private static final List protocols = Arrays.asList("TLSv1.2"); + private static final List ciphers = Arrays.asList( "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", @@ -97,7 +97,7 @@ public class Pulsar implements Input { PluginConfigSpec.booleanSetting("allow_tls_insecure_connection",false); private static final PluginConfigSpec CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION = - PluginConfigSpec.booleanSetting("enable_tls_hostname_verification",true); + PluginConfigSpec.booleanSetting("enable_tls_hostname_verification",false); private static final PluginConfigSpec CONFIG_TLS_TRUST_STORE_PATH = PluginConfigSpec.stringSetting("tls_trust_store_path",""); @@ -105,14 +105,20 @@ public class Pulsar implements Input { private static final PluginConfigSpec CONFIG_TLS_TRUST_STORE_PASSWORD = PluginConfigSpec.stringSetting("tls_trust_store_password",""); + private static final PluginConfigSpec CONFIG_TLS_KEY_STORE_PATH = + PluginConfigSpec.stringSetting("tls_key_store_path", ""); + + private static final PluginConfigSpec CONFIG_TLS_KEY_STORE_PASSWORD = + PluginConfigSpec.stringSetting("tls_key_store_password", ""); + private static final PluginConfigSpec CONFIG_AUTH_PLUGIN_CLASS_NAME = PluginConfigSpec.stringSetting("auth_plugin_class_name",authPluginClassName); private static final PluginConfigSpec> CONFIG_CIPHERS = - PluginConfigSpec.arraySetting("ciphers", Collections.singletonList(ciphers), false, false); + PluginConfigSpec.arraySetting("ciphers", ciphers, false, false); private static final PluginConfigSpec> CONFIG_PROTOCOLS = - PluginConfigSpec.arraySetting("protocols", Collections.singletonList(protocols), false, false); + PluginConfigSpec.arraySetting("protocols", protocols, false, false); public Pulsar(String id, Configuration config, Context context) { // constructors should validate configuration options @@ -123,6 +129,13 @@ public Pulsar(String id, Configuration config, Context context) { private void createConsumer() throws PulsarClientException { try { String serviceUrl = config.get(CONFIG_SERVICE_URL); + boolean enableTls = config.get(CONFIG_ENABLE_TLS); + if (enableTls) { + // pulsar TLS + client = buildTlsPulsarClient(serviceUrl); + } else { + client = buildNotTlsPulsarClient(serviceUrl); + } String codec = config.get(CONFIG_CODEC); List topics = config.get(CONFIG_TOPICS).stream().map(Object::toString).collect(Collectors.toList()); @@ -130,41 +143,6 @@ private void createConsumer() throws PulsarClientException { String consumerName = config.get(CONFIG_CONSUMER_NAME); String subscriptionType = config.get(CONFIG_SUBSCRIPTION_TYPE); String subscriptionInitialPosition = config.get(CONFIG_SUBSCRIPTION_INITIAL_POSITION); - boolean enableTls = config.get(CONFIG_ENABLE_TLS); - if (enableTls) { - // pulsar TLS - Boolean allowTlsInsecureConnection = config.get(CONFIG_ALLOW_TLS_INSECURE_CONNECTION); - Boolean enableTlsHostnameVerification = config.get(CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION); - String tlsTrustStorePath = config.get(CONFIG_TLS_TRUST_STORE_PATH); - Map authMap = new HashMap<>(); - authMap.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, "JKS"); - authMap.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, tlsTrustStorePath); - authMap.put(AuthenticationKeyStoreTls.KEYSTORE_PW, config.get(CONFIG_TLS_TRUST_STORE_PASSWORD)); - - Set cipherSet = new HashSet<>(); - Optional.ofNullable(config.get(CONFIG_CIPHERS)).ifPresent( - cipherList -> cipherList.forEach(cipher -> cipherSet.add(String.valueOf(cipher)))); - - Set protocolSet = new HashSet<>(); - Optional.ofNullable(config.get(CONFIG_PROTOCOLS)).ifPresent( - protocolList -> protocolList.forEach(protocol -> protocolSet.add(String.valueOf(protocol)))); - - client = PulsarClient.builder() - .serviceUrl(serviceUrl) - .tlsCiphers(cipherSet) - .tlsProtocols(protocolSet) - .allowTlsInsecureConnection(allowTlsInsecureConnection) - .enableTlsHostnameVerification(enableTlsHostnameVerification) - .tlsTrustStorePath(tlsTrustStorePath) - .tlsTrustStorePassword(config.get(CONFIG_TLS_TRUST_STORE_PASSWORD)) - .authentication(config.get(CONFIG_AUTH_PLUGIN_CLASS_NAME),authMap) - .build(); - } else { - client = PulsarClient.builder() - .serviceUrl(serviceUrl) - .build(); - } - // Create a consumer ConsumerBuilder consumerBuilder = client.newConsumer() .topics(topics) @@ -183,6 +161,47 @@ private void createConsumer() throws PulsarClientException { } } + private PulsarClient buildNotTlsPulsarClient(String serviceUrl) throws PulsarClientException { + return PulsarClient.builder() + .serviceUrl(serviceUrl) + .build(); + } + + private PulsarClient buildTlsPulsarClient(String serviceUrl) throws PulsarClientException { + Boolean allowTlsInsecureConnection = config.get(CONFIG_ALLOW_TLS_INSECURE_CONNECTION); + Boolean enableTlsHostnameVerification = config.get(CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION); + String tlsTrustStorePath = config.get(CONFIG_TLS_TRUST_STORE_PATH); + String tlsTrustStorePassword = config.get(CONFIG_TLS_TRUST_STORE_PASSWORD); + String tlsKeyStorePath = config.get(CONFIG_TLS_KEY_STORE_PATH); + String tlsKeyStorePassword = config.get(CONFIG_TLS_KEY_STORE_PASSWORD); + String pluginClassName = config.get(CONFIG_AUTH_PLUGIN_CLASS_NAME); + + Map authMap = new HashMap<>(); + authMap.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, "JKS"); + authMap.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, tlsKeyStorePath); + authMap.put(AuthenticationKeyStoreTls.KEYSTORE_PW, tlsKeyStorePassword); + + Set cipherSet = new HashSet<>(); + Optional.ofNullable(config.get(CONFIG_CIPHERS)).ifPresent( + cipherList -> cipherList.forEach(cipher -> cipherSet.add(String.valueOf(cipher)))); + + Set protocolSet = new HashSet<>(); + Optional.ofNullable(config.get(CONFIG_PROTOCOLS)).ifPresent( + protocolList -> protocolList.forEach(protocol -> protocolSet.add(String.valueOf(protocol)))); + + return PulsarClient.builder() + .serviceUrl(serviceUrl) + .tlsCiphers(cipherSet) + .tlsProtocols(protocolSet) + .allowTlsInsecureConnection(allowTlsInsecureConnection) + .enableTlsHostnameVerification(enableTlsHostnameVerification) + .tlsTrustStorePath(tlsTrustStorePath) + .tlsTrustStorePassword(tlsTrustStorePassword) + .authentication(pluginClassName,authMap) + .useKeyStoreTls(true) + .build(); + } + private SubscriptionInitialPosition getSubscriptionInitialPosition() { SubscriptionInitialPosition position; switch (config.get(CONFIG_SUBSCRIPTION_INITIAL_POSITION)) { @@ -322,7 +341,19 @@ public Collection> configSchema() { CONFIG_SUBSCRIPTION_TYPE, CONFIG_SUBSCRIPTION_INITIAL_POSITION, CONFIG_CONSUMER_NAME, - CONFIG_CODEC + CONFIG_CODEC, + + // Pulsar TLS Config + CONFIG_ENABLE_TLS, + CONFIG_TLS_TRUST_STORE_PATH, + CONFIG_TLS_TRUST_STORE_PASSWORD, + CONFIG_TLS_KEY_STORE_PATH, + CONFIG_TLS_KEY_STORE_PASSWORD, + CONFIG_PROTOCOLS, + CONFIG_ALLOW_TLS_INSECURE_CONNECTION, + CONFIG_AUTH_PLUGIN_CLASS_NAME, + CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION, + CONFIG_CIPHERS ); } From 78b1bd93be4fc5829d6c62d1ef47edcff0293089 Mon Sep 17 00:00:00 2001 From: zzxp Date: Wed, 7 Sep 2022 14:50:45 +0800 Subject: [PATCH 2/9] support topic patterns --- .../apache/pulsar/logstash/inputs/Pulsar.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java index 66ce6ef..4b7aa84 100644 --- a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java @@ -18,6 +18,7 @@ import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls; import java.lang.reflect.Type; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -60,7 +61,11 @@ public class Pulsar implements Input { // topic Names, array private static final PluginConfigSpec> CONFIG_TOPICS = - PluginConfigSpec.arraySetting("topics", null, false, true); + PluginConfigSpec.arraySetting("topics", new ArrayList<>(), false, false); + + // topic Pattern, array + private static final PluginConfigSpec CONFIG_TOPICS_PATTERN = + PluginConfigSpec.stringSetting("topics_pattern", null, false, false); // subscription name private static final PluginConfigSpec CONFIG_SUBSCRIPTION_NAME = @@ -143,12 +148,19 @@ private void createConsumer() throws PulsarClientException { String consumerName = config.get(CONFIG_CONSUMER_NAME); String subscriptionType = config.get(CONFIG_SUBSCRIPTION_TYPE); String subscriptionInitialPosition = config.get(CONFIG_SUBSCRIPTION_INITIAL_POSITION); + String topicsPattern = config.get(CONFIG_TOPICS_PATTERN); // Create a consumer ConsumerBuilder consumerBuilder = client.newConsumer() - .topics(topics) .subscriptionName(subscriptionName) .subscriptionType(getSubscriptionType()) .subscriptionInitialPosition(getSubscriptionInitialPosition()); + if (topicsPattern != null) { + logger.info("topics pattern : {}", topicsPattern); + consumerBuilder.topicsPattern(topicsPattern); + } else { + logger.info("topics : {}", topics); + consumerBuilder.topics(topics); + } if (consumerName != null) { consumerBuilder.consumerName(consumerName); } @@ -314,6 +326,7 @@ public void start(Consumer> consumer) { } } catch (PulsarClientException e) { + logger.error("create pulsar client error: {}", e.getMessage()); } finally { stopped = true; done.countDown(); @@ -342,6 +355,7 @@ public Collection> configSchema() { CONFIG_SUBSCRIPTION_INITIAL_POSITION, CONFIG_CONSUMER_NAME, CONFIG_CODEC, + CONFIG_TOPICS_PATTERN, // Pulsar TLS Config CONFIG_ENABLE_TLS, From 6738825c8e88398c54494fd426d73f2b4cc97cee Mon Sep 17 00:00:00 2001 From: Zhong-X Date: Sun, 18 Sep 2022 21:46:04 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E3=80=90=E6=8F=8F=E8=BF=B0=E3=80=91?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=89=B9=E9=87=8F=E6=8E=A5=E6=94=B6=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/pulsar/logstash/inputs/Pulsar.java | 130 ++++++++++++++++-- 1 file changed, 115 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java index 4b7aa84..dd005c0 100644 --- a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java @@ -11,10 +11,12 @@ import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls; import java.lang.reflect.Type; @@ -51,6 +53,8 @@ public class Pulsar implements Input { // base config private Configuration config; + private Boolean isBatchReceived; + // consumer config list // codec, plain, json @@ -83,6 +87,22 @@ public class Pulsar implements Input { private static final PluginConfigSpec CONFIG_SUBSCRIPTION_INITIAL_POSITION = PluginConfigSpec.stringSetting("subscriptionInitialPosition", "Latest"); + // is Batch Received + private static final PluginConfigSpec CONFIG_IS_BATCH_RECEIVED = + PluginConfigSpec.booleanSetting("isBatchReceived", false); + + // batch Receive Max Num + private static final PluginConfigSpec CONFIG_BATCH_RECEIVE_MAX_NUM = + PluginConfigSpec.numSetting("batchReceiveMaxNum", 50); + + // batch Receive Max Bytes Size + private static final PluginConfigSpec CONFIG_BATCH_RECEIVE_MAX_BYTES_SIZE = + PluginConfigSpec.numSetting("batchReceiveMaxBytesSize", 10 * 1024); + + // batch Receive Timeout, default 100 ms + private static final PluginConfigSpec CONFIG_BATCH_RECEIVE_TIMEOUT_MS = + PluginConfigSpec.numSetting("batchReceiveTimeoutMs", 100); + // TODO: support decorate_events => true & consumer_threads => 2 & metadata // TLS Config @@ -129,6 +149,7 @@ public Pulsar(String id, Configuration config, Context context) { // constructors should validate configuration options this.id = id; this.config = config; + this.isBatchReceived = this.config.get(CONFIG_IS_BATCH_RECEIVED); } private void createConsumer() throws PulsarClientException { @@ -149,6 +170,10 @@ private void createConsumer() throws PulsarClientException { String subscriptionType = config.get(CONFIG_SUBSCRIPTION_TYPE); String subscriptionInitialPosition = config.get(CONFIG_SUBSCRIPTION_INITIAL_POSITION); String topicsPattern = config.get(CONFIG_TOPICS_PATTERN); + int batchReceiveMaxNum = config.get(CONFIG_BATCH_RECEIVE_MAX_NUM).intValue(); + int batchReceiveMaxBytesSize = config.get(CONFIG_BATCH_RECEIVE_MAX_BYTES_SIZE).intValue(); + int batchReceiveTimeoutMs = config.get(CONFIG_BATCH_RECEIVE_TIMEOUT_MS).intValue(); + // Create a consumer ConsumerBuilder consumerBuilder = client.newConsumer() .subscriptionName(subscriptionName) @@ -164,6 +189,19 @@ private void createConsumer() throws PulsarClientException { if (consumerName != null) { consumerBuilder.consumerName(consumerName); } + if (isBatchReceived) { + // create batchReceivePolicy + BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder() + .maxNumMessages(batchReceiveMaxNum) + .maxNumBytes(batchReceiveMaxBytesSize) + .timeout(batchReceiveTimeoutMs, TimeUnit.MILLISECONDS) + .build(); + + consumerBuilder.batchReceivePolicy(batchReceivePolicy); + logger.info("batchReceiveMaxNum is {}, batchReceiveMaxBytesSize is {}, batchReceiveTimeoutMs is {}", + batchReceiveMaxNum, batchReceiveMaxBytesSize, batchReceiveTimeoutMs); + } + pulsarConsumer = consumerBuilder.subscribe(); logger.info("Create subscription {} on topics {} with codec {}, consumer name is {},subscription Type is {},subscriptionInitialPosition is {}", subscriptionName, topics, codec , consumerName, subscriptionType, subscriptionInitialPosition); @@ -263,7 +301,6 @@ private void closePulsarConsumer() { } } - @Override public void start(Consumer> consumer) { @@ -276,6 +313,16 @@ public void start(Consumer> consumer) { // a finite sequence of events should loop until that sequence is exhausted or until they // receive a stop request, whichever comes first. + if (!isBatchReceived) { + logger.info("start with single receive"); + startWithSingleReceive(consumer); + } else { + logger.info("start with batch receive"); + startWithBatchReceive(consumer); + } + } + + public void startWithSingleReceive(Consumer> consumer) { try { createConsumer(); Message message = null; @@ -292,25 +339,12 @@ public void start(Consumer> consumer) { msgString = new String(message.getData()); if (config.get(CONFIG_CODEC).equals(CODEC_JSON)) { - try { - Map map = gson.fromJson(msgString, gsonType); - consumer.accept(map); - } catch (Exception e) { - // json parse exception - // treat it as codec plain - logger.error("json parse exception ", e); - logger.error("message key is {}, set logging level to debug if you'd like to see message", message.getKey()); - logger.debug("message content is {}", msgString); - logger.error("default codec plain will be used "); - - consumer.accept(Collections.singletonMap("message", msgString)); - } + processMessage(consumer, msgString, message.getKey(), gson, gsonType); } else { // default codec: plain consumer.accept(Collections.singletonMap("message", msgString)); } - // Acknowledge the message so that it can be // deleted by the message broker pulsarConsumer.acknowledge(message); @@ -333,6 +367,72 @@ public void start(Consumer> consumer) { } } + public void startWithBatchReceive(Consumer> consumer) { + try { + createConsumer(); + Messages messages = null; + String msgString = null; + Gson gson = new Gson(); + Type gsonType = new TypeToken(){}.getType(); + while (!stopped) { + try { + messages = pulsarConsumer.batchReceive(); + if(messages == null){ + continue; + } + logger.info("messages had been received, size: {}", messages.size()); + + if (config.get(CONFIG_CODEC).equals(CODEC_JSON)) { + for (Message msg : messages) { + msgString = new String(msg.getData()); + processMessage(consumer, msgString, msg.getKey(), gson, gsonType); + } + } else { + // default codec: plain + for (Message msg : messages) { + msgString = new String(msg.getData()); + consumer.accept(Collections.singletonMap("message", msgString)); + } + } + + // Acknowledge the message so that it can be + // deleted by the message broker + pulsarConsumer.acknowledge(messages); + } catch (Exception e) { + + // Message failed to process, redeliver later + logger.error("consume exception ", e); + if (messages != null) { + pulsarConsumer.negativeAcknowledge(messages); + logger.error("messages failed to process, size: {}", messages.size()); + } + } + + } + } catch (PulsarClientException e) { + logger.error("create pulsar client error: {}", e.getMessage()); + } finally { + stopped = true; + done.countDown(); + } + } + + private void processMessage(Consumer> consumer, String msgString, String msgKey, Gson gson, Type gsonType) { + try { + Map map = gson.fromJson(msgString, gsonType); + consumer.accept(map); + } catch (Exception e) { + // json parse exception + // treat it as codec plain + logger.error("json parse exception ", e); + logger.error("message key is {}, set logging level to debug if you'd like to see message", msgKey); + logger.debug("message content is {}", msgString); + logger.error("default codec plain will be used "); + + consumer.accept(Collections.singletonMap("message", msgString)); + } + } + @Override public void stop() { stopped = true; // set flag to request cooperative stop of input From 911ac9582fed7f48536a63964b6cdd54c40aa79d Mon Sep 17 00:00:00 2001 From: Zhong-X Date: Mon, 19 Sep 2022 21:55:15 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=E3=80=90=E6=8F=8F=E8=BF=B0=E3=80=91?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=89=B9=E9=87=8F=E6=B6=88=E8=B4=B9=E8=83=BD?= =?UTF-8?q?=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/pulsar/logstash/inputs/Pulsar.java | 156 ++++++++---------- 1 file changed, 72 insertions(+), 84 deletions(-) diff --git a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java index dd005c0..333b768 100644 --- a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java @@ -312,108 +312,96 @@ public void start(Consumer> consumer) { // events should loop indefinitely until they receive a stop request. Inputs that produce // a finite sequence of events should loop until that sequence is exhausted or until they // receive a stop request, whichever comes first. - - if (!isBatchReceived) { - logger.info("start with single receive"); - startWithSingleReceive(consumer); - } else { - logger.info("start with batch receive"); - startWithBatchReceive(consumer); + try { + createConsumer(); + if (!isBatchReceived) { + logger.info("start with single receive"); + startWithSingleReceive(consumer); + } else { + logger.info("start with batch receive"); + startWithBatchReceive(consumer); + } + } catch (PulsarClientException e) { + logger.error("create pulsar client error: {}", e.getMessage()); + } finally { + stopped = true; + done.countDown(); } } public void startWithSingleReceive(Consumer> consumer) { - try { - createConsumer(); - Message message = null; - String msgString = null; - Gson gson = new Gson(); - Type gsonType = new TypeToken(){}.getType(); - while (!stopped) { - try { - // Block and wait until a single message is available - message = pulsarConsumer.receive(1000, TimeUnit.MILLISECONDS); - if(message == null){ - continue; - } - msgString = new String(message.getData()); + Message message = null; + String msgString = null; + Gson gson = new Gson(); + Type gsonType = new TypeToken(){}.getType(); + while (!stopped) { + try { + // Block and wait until a single message is available + message = pulsarConsumer.receive(1000, TimeUnit.MILLISECONDS); + if(message == null){ + continue; + } + msgString = new String(message.getData()); - if (config.get(CONFIG_CODEC).equals(CODEC_JSON)) { - processMessage(consumer, msgString, message.getKey(), gson, gsonType); - } else { - // default codec: plain - consumer.accept(Collections.singletonMap("message", msgString)); - } + if (config.get(CONFIG_CODEC).equals(CODEC_JSON)) { + processMessage(consumer, msgString, message.getKey(), gson, gsonType); + } else { + // default codec: plain + consumer.accept(Collections.singletonMap("message", msgString)); + } - // Acknowledge the message so that it can be - // deleted by the message broker - pulsarConsumer.acknowledge(message); - } catch (Exception e) { + // Acknowledge the message so that it can be + // deleted by the message broker + pulsarConsumer.acknowledge(message); + } catch (Exception e) { - // Message failed to process, redeliver later - logger.error("consume exception ", e); - if (message != null) { - pulsarConsumer.negativeAcknowledge(message); - logger.error("message is {}:{}", message.getKey(), msgString); - } + // Message failed to process, redeliver later + logger.error("consume exception ", e); + if (message != null) { + pulsarConsumer.negativeAcknowledge(message); + logger.error("message is {}:{}", message.getKey(), msgString); } - } - } catch (PulsarClientException e) { - logger.error("create pulsar client error: {}", e.getMessage()); - } finally { - stopped = true; - done.countDown(); } } public void startWithBatchReceive(Consumer> consumer) { - try { - createConsumer(); - Messages messages = null; - String msgString = null; - Gson gson = new Gson(); - Type gsonType = new TypeToken(){}.getType(); - while (!stopped) { - try { - messages = pulsarConsumer.batchReceive(); - if(messages == null){ - continue; - } - logger.info("messages had been received, size: {}", messages.size()); - - if (config.get(CONFIG_CODEC).equals(CODEC_JSON)) { - for (Message msg : messages) { - msgString = new String(msg.getData()); - processMessage(consumer, msgString, msg.getKey(), gson, gsonType); - } - } else { - // default codec: plain - for (Message msg : messages) { - msgString = new String(msg.getData()); - consumer.accept(Collections.singletonMap("message", msgString)); - } - } - - // Acknowledge the message so that it can be - // deleted by the message broker - pulsarConsumer.acknowledge(messages); - } catch (Exception e) { + Messages messages = null; + String msgString = null; + Gson gson = new Gson(); + Type gsonType = new TypeToken(){}.getType(); + while (!stopped) { + try { + messages = pulsarConsumer.batchReceive(); + if(messages == null){ + continue; + } + logger.info("messages had been received, size: {}", messages.size()); - // Message failed to process, redeliver later - logger.error("consume exception ", e); - if (messages != null) { - pulsarConsumer.negativeAcknowledge(messages); - logger.error("messages failed to process, size: {}", messages.size()); + if (config.get(CONFIG_CODEC).equals(CODEC_JSON)) { + for (Message msg : messages) { + msgString = new String(msg.getData()); + processMessage(consumer, msgString, msg.getKey(), gson, gsonType); + } + } else { + // default codec: plain + for (Message msg : messages) { + msgString = new String(msg.getData()); + consumer.accept(Collections.singletonMap("message", msgString)); } } + // Acknowledge the message so that it can be + // deleted by the message broker + pulsarConsumer.acknowledge(messages); + } catch (Exception e) { + // Message failed to process, redeliver later + logger.error("consume exception ", e); + if (messages != null) { + pulsarConsumer.negativeAcknowledge(messages); + logger.error("messages failed to process, size: {}", messages.size()); + } } - } catch (PulsarClientException e) { - logger.error("create pulsar client error: {}", e.getMessage()); - } finally { - stopped = true; - done.countDown(); } } From 49e71c2ca56483350fd8c846f45e22e498d2efb5 Mon Sep 17 00:00:00 2001 From: Zhong-X Date: Fri, 23 Sep 2022 20:49:20 +0800 Subject: [PATCH 5/9] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=89=B9=E9=87=8F?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/apache/pulsar/logstash/inputs/Pulsar.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java index 333b768..ad52f9f 100644 --- a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java @@ -455,7 +455,13 @@ public Collection> configSchema() { CONFIG_ALLOW_TLS_INSECURE_CONNECTION, CONFIG_AUTH_PLUGIN_CLASS_NAME, CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION, - CONFIG_CIPHERS + CONFIG_CIPHERS, + + // Batch Receive + CONFIG_IS_BATCH_RECEIVED, + CONFIG_BATCH_RECEIVE_MAX_NUM, + CONFIG_BATCH_RECEIVE_MAX_BYTES_SIZE, + CONFIG_BATCH_RECEIVE_TIMEOUT_MS ); } From f13f56afd126d45b232580c0b5a888a8b1d4731e Mon Sep 17 00:00:00 2001 From: zzxp Date: Sat, 24 Sep 2022 10:11:45 +0800 Subject: [PATCH 6/9] add switch --- .../apache/pulsar/logstash/inputs/Pulsar.java | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java index 4b7aa84..bd89d44 100644 --- a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java @@ -83,6 +83,9 @@ public class Pulsar implements Input { private static final PluginConfigSpec CONFIG_SUBSCRIPTION_INITIAL_POSITION = PluginConfigSpec.stringSetting("subscriptionInitialPosition", "Latest"); + private static final PluginConfigSpec CONFIG_IF_OUTPUT_PULSAR_ERROR = + PluginConfigSpec.booleanSetting("block_if_output_pulsar_error", false); + // TODO: support decorate_events => true & consumer_threads => 2 & metadata // TLS Config @@ -96,19 +99,19 @@ public class Pulsar implements Input { ); private static final PluginConfigSpec CONFIG_ENABLE_TLS = - PluginConfigSpec.booleanSetting("enable_tls",false); + PluginConfigSpec.booleanSetting("enable_tls", false); private static final PluginConfigSpec CONFIG_ALLOW_TLS_INSECURE_CONNECTION = - PluginConfigSpec.booleanSetting("allow_tls_insecure_connection",false); + PluginConfigSpec.booleanSetting("allow_tls_insecure_connection", false); private static final PluginConfigSpec CONFIG_ENABLE_TLS_HOSTNAME_VERIFICATION = - PluginConfigSpec.booleanSetting("enable_tls_hostname_verification",false); + PluginConfigSpec.booleanSetting("enable_tls_hostname_verification", false); private static final PluginConfigSpec CONFIG_TLS_TRUST_STORE_PATH = - PluginConfigSpec.stringSetting("tls_trust_store_path",""); + PluginConfigSpec.stringSetting("tls_trust_store_path", ""); private static final PluginConfigSpec CONFIG_TLS_TRUST_STORE_PASSWORD = - PluginConfigSpec.stringSetting("tls_trust_store_password",""); + PluginConfigSpec.stringSetting("tls_trust_store_password", ""); private static final PluginConfigSpec CONFIG_TLS_KEY_STORE_PATH = PluginConfigSpec.stringSetting("tls_key_store_path", ""); @@ -117,7 +120,7 @@ public class Pulsar implements Input { PluginConfigSpec.stringSetting("tls_key_store_password", ""); private static final PluginConfigSpec CONFIG_AUTH_PLUGIN_CLASS_NAME = - PluginConfigSpec.stringSetting("auth_plugin_class_name",authPluginClassName); + PluginConfigSpec.stringSetting("auth_plugin_class_name", authPluginClassName); private static final PluginConfigSpec> CONFIG_CIPHERS = PluginConfigSpec.arraySetting("ciphers", ciphers, false, false); @@ -165,7 +168,7 @@ private void createConsumer() throws PulsarClientException { consumerBuilder.consumerName(consumerName); } pulsarConsumer = consumerBuilder.subscribe(); - logger.info("Create subscription {} on topics {} with codec {}, consumer name is {},subscription Type is {},subscriptionInitialPosition is {}", subscriptionName, topics, codec , consumerName, subscriptionType, subscriptionInitialPosition); + logger.info("Create subscription {} on topics {} with codec {}, consumer name is {},subscription Type is {},subscriptionInitialPosition is {}", subscriptionName, topics, codec, consumerName, subscriptionType, subscriptionInitialPosition); } catch (PulsarClientException e) { logger.error("pulsar client exception ", e); @@ -209,7 +212,7 @@ private PulsarClient buildTlsPulsarClient(String serviceUrl) throws PulsarClient .enableTlsHostnameVerification(enableTlsHostnameVerification) .tlsTrustStorePath(tlsTrustStorePath) .tlsTrustStorePassword(tlsTrustStorePassword) - .authentication(pluginClassName,authMap) + .authentication(pluginClassName, authMap) .useKeyStoreTls(true) .build(); } @@ -277,16 +280,18 @@ public void start(Consumer> consumer) { // receive a stop request, whichever comes first. try { + Boolean blockIfOutputPulsarError = config.get(CONFIG_IF_OUTPUT_PULSAR_ERROR); createConsumer(); Message message = null; String msgString = null; Gson gson = new Gson(); - Type gsonType = new TypeToken(){}.getType(); + Type gsonType = new TypeToken() { + }.getType(); while (!stopped) { try { // Block and wait until a single message is available message = pulsarConsumer.receive(1000, TimeUnit.MILLISECONDS); - if(message == null){ + if (message == null) { continue; } msgString = new String(message.getData()); @@ -313,7 +318,11 @@ public void start(Consumer> consumer) { // Acknowledge the message so that it can be // deleted by the message broker - pulsarConsumer.acknowledge(message); + if (blockIfOutputPulsarError && Boolean.parseBoolean(System.getProperty("output.errors", String.valueOf(false)))) { + pulsarConsumer.negativeAcknowledge(message); + } else { + pulsarConsumer.acknowledge(message); + } } catch (Exception e) { // Message failed to process, redeliver later @@ -356,6 +365,7 @@ public Collection> configSchema() { CONFIG_CONSUMER_NAME, CONFIG_CODEC, CONFIG_TOPICS_PATTERN, + CONFIG_IF_OUTPUT_PULSAR_ERROR, // Pulsar TLS Config CONFIG_ENABLE_TLS, From 08e3f2a1410f12e97b131d5cba9af68b1c73af71 Mon Sep 17 00:00:00 2001 From: zzxp Date: Mon, 26 Sep 2022 16:26:26 +0800 Subject: [PATCH 7/9] resolve code conflicts --- .../apache/pulsar/logstash/inputs/Pulsar.java | 74 ++----------------- 1 file changed, 7 insertions(+), 67 deletions(-) diff --git a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java index 01396f3..b33b1e1 100644 --- a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java @@ -305,73 +305,13 @@ private void closePulsarConsumer() { @Override public void start(Consumer> consumer) { - - // The start method should push Map instances to the supplied QueueWriter - // instance. Those will be converted to Event instances later in the Logstash event - // processing pipeline. - // - // Inputs that operate on unbounded streams of data or that poll indefinitely for new - // events should loop indefinitely until they receive a stop request. Inputs that produce - // a finite sequence of events should loop until that sequence is exhausted or until they - // receive a stop request, whichever comes first. - try { - Boolean blockIfOutputPulsarError = config.get(CONFIG_IF_OUTPUT_PULSAR_ERROR); - createConsumer(); - Message message = null; - String msgString = null; - Gson gson = new Gson(); - Type gsonType = new TypeToken() { - }.getType(); - while (!stopped) { - try { - // Block and wait until a single message is available - message = pulsarConsumer.receive(1000, TimeUnit.MILLISECONDS); - if (message == null) { - continue; - } - msgString = new String(message.getData()); - - if (config.get(CONFIG_CODEC).equals(CODEC_JSON)) { - try { - Map map = gson.fromJson(msgString, gsonType); - consumer.accept(map); - } catch (Exception e) { - // json parse exception - // treat it as codec plain - logger.error("json parse exception ", e); - logger.error("message key is {}, set logging level to debug if you'd like to see message", message.getKey()); - logger.debug("message content is {}", msgString); - logger.error("default codec plain will be used "); - - consumer.accept(Collections.singletonMap("message", msgString)); - } - } else { - // default codec: plain - consumer.accept(Collections.singletonMap("message", msgString)); - } - } catch (Exception e) { - - // Message failed to process, redeliver later - logger.error("consume exception ", e); - if (message != null) { - pulsarConsumer.negativeAcknowledge(message); - logger.error("message is {}:{}", message.getKey(), msgString); - } - } - - if (!isBatchReceived) { - logger.info("start with single receive"); - startWithSingleReceive(consumer, blockIfOutputPulsarError); - } else { - logger.info("start with batch receive"); - startWithBatchReceive(consumer, blockIfOutputPulsarError); - } - } - } catch (PulsarClientException e) { - logger.error("create pulsar client error: {}", e.getMessage()); - } finally { - stopped = true; - done.countDown(); + Boolean blockIfOutputPulsarError = config.get(CONFIG_IF_OUTPUT_PULSAR_ERROR); + if (!isBatchReceived) { + logger.info("start with single receive"); + startWithSingleReceive(consumer, blockIfOutputPulsarError); + } else { + logger.info("start with batch receive"); + startWithBatchReceive(consumer, blockIfOutputPulsarError); } } From 1cce7daa5f6f9386f31d39e3719cf339344fdbb5 Mon Sep 17 00:00:00 2001 From: zzxp Date: Mon, 26 Sep 2022 16:30:47 +0800 Subject: [PATCH 8/9] add createConsumer --- .../apache/pulsar/logstash/inputs/Pulsar.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java index b33b1e1..90d8058 100644 --- a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java @@ -305,13 +305,18 @@ private void closePulsarConsumer() { @Override public void start(Consumer> consumer) { - Boolean blockIfOutputPulsarError = config.get(CONFIG_IF_OUTPUT_PULSAR_ERROR); - if (!isBatchReceived) { - logger.info("start with single receive"); - startWithSingleReceive(consumer, blockIfOutputPulsarError); - } else { - logger.info("start with batch receive"); - startWithBatchReceive(consumer, blockIfOutputPulsarError); + try { + Boolean blockIfOutputPulsarError = config.get(CONFIG_IF_OUTPUT_PULSAR_ERROR); + createConsumer(); + if (!isBatchReceived) { + logger.info("start with single receive"); + startWithSingleReceive(consumer, blockIfOutputPulsarError); + } else { + logger.info("start with batch receive"); + startWithBatchReceive(consumer, blockIfOutputPulsarError); + } + } catch (PulsarClientException e) { + logger.error("create pulsar client errors: {}", e.getMessage()); } } From 175dbca4a540fa97f9febe0290f9e2ae01de4162 Mon Sep 17 00:00:00 2001 From: zzxp Date: Mon, 26 Sep 2022 21:32:22 +0800 Subject: [PATCH 9/9] refactor log level --- src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java index 90d8058..bfaaf16 100644 --- a/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java +++ b/src/main/java/org/apache/pulsar/logstash/inputs/Pulsar.java @@ -372,7 +372,7 @@ public void startWithBatchReceive(Consumer> consumer, Boolea if (messages == null) { continue; } - logger.info("messages had been received, size: {}", messages.size()); + logger.debug("messages had been received, size: {}", messages.size()); if (config.get(CONFIG_CODEC).equals(CODEC_JSON)) { for (Message msg : messages) {