diff --git a/.github/workflows/on_push_master.yml b/.github/workflows/on_push_master.yml
index 6cccfcb..4db9c58 100644
--- a/.github/workflows/on_push_master.yml
+++ b/.github/workflows/on_push_master.yml
@@ -22,6 +22,8 @@ jobs:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
restore-keys: ${{ runner.os }}-gradle
+ - name: Lint with Gradle
+ run: ./gradlew spotlessCheck
- name: Build and analyze
run: ./gradlew build
- uses: actions/upload-artifact@v4
diff --git a/.spotless/HEADER b/.spotless/HEADER
new file mode 100644
index 0000000..eaa97d7
--- /dev/null
+++ b/.spotless/HEADER
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
\ No newline at end of file
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
new file mode 100644
index 0000000..d788d20
--- /dev/null
+++ b/CODE_OF_CONDUCT.md
@@ -0,0 +1,137 @@
+# Contributor Covenant Code of Conduct
+
+## Our Pledge
+
+We as members, contributors, and leaders pledge to make participation in our
+community a harassment-free experience for everyone, regardless of age, body
+size, visible or invisible disability, ethnicity, sex characteristics, gender
+identity and expression, level of experience, education, socio-economic status,
+nationality, personal appearance, race, caste, color, religion, or sexual
+identity and orientation.
+
+We pledge to act and interact in ways that contribute to an open, welcoming,
+diverse, inclusive, and healthy community.
+
+## Our Standards
+
+Examples of behavior that contributes to a positive environment for our
+community include:
+
+* Demonstrating empathy and kindness toward other people
+* Being respectful of differing opinions, viewpoints, and experiences
+* Giving and gracefully accepting constructive feedback
+* Accepting responsibility and apologizing to those affected by our mistakes,
+ and learning from the experience
+* Focusing on what is best not just for us as individuals, but for the overall
+ community
+
+Examples of unacceptable behavior include:
+
+* The use of sexualized language or imagery, and sexual attention or advances of
+ any kind
+* Trolling, insulting or derogatory comments, and personal or political attacks
+* Public or private harassment
+* Publishing others' private information, such as a physical or email address,
+ without their explicit permission
+* Other conduct which could reasonably be considered inappropriate in a
+ professional setting
+
+## Enforcement Responsibilities
+
+Community leaders are responsible for clarifying and enforcing our standards of
+acceptable behavior and will take appropriate and fair corrective action in
+response to any behavior that they deem inappropriate, threatening, offensive,
+or harmful.
+
+Community leaders have the right and responsibility to remove, edit, or reject
+comments, commits, code, wiki edits, issues, and other contributions that are
+not aligned to this Code of Conduct, and will communicate reasons for moderation
+decisions when appropriate.
+
+## Scope
+
+This Code of Conduct applies within all community spaces, and also applies when
+an individual is officially representing the community in public spaces.
+Examples of representing our community include using an official email address,
+posting via an official social media account, or acting as an appointed
+representative at an online or offline event.
+
+## Enforcement
+
+Instances of abusive, harassing, or otherwise unacceptable behavior may be
+reported to the community leaders responsible for enforcement at
+the contact form available at
+[https://opensource.michelin.io](https://opensource.michelin.io)
+All complaints will be reviewed and investigated promptly and fairly.
+
+All community leaders are obligated to respect the privacy and security of the
+reporter of any incident.
+
+## Enforcement Guidelines
+
+Community leaders will follow these Community Impact Guidelines in determining
+the consequences for any action they deem in violation of this Code of Conduct:
+
+### 1. Correction
+
+**Community Impact**: Use of inappropriate language or other behavior deemed
+unprofessional or unwelcome in the community.
+
+**Consequence**: A private, written warning from community leaders, providing
+clarity around the nature of the violation and an explanation of why the
+behavior was inappropriate. A public apology may be requested.
+
+### 2. Warning
+
+**Community Impact**: A violation through a single incident or series of
+actions.
+
+**Consequence**: A warning with consequences for continued behavior. No
+interaction with the people involved, including unsolicited interaction with
+those enforcing the Code of Conduct, for a specified period of time. This
+includes avoiding interactions in community spaces as well as external channels
+like social media. Violating these terms may lead to a temporary or permanent
+ban.
+
+### 3. Temporary Ban
+
+**Community Impact**: A serious violation of community standards, including
+sustained inappropriate behavior.
+
+**Consequence**: A temporary ban from any sort of interaction or public
+communication with the community for a specified period of time. No public or
+private interaction with the people involved, including unsolicited interaction
+with those enforcing the Code of Conduct, is allowed during this period.
+Violating these terms may lead to a permanent ban.
+
+### 4. Permanent Ban
+
+**Community Impact**: Demonstrating a pattern of violation of community
+standards, including sustained inappropriate behavior, harassment of an
+individual, or aggression toward or disparagement of classes of individuals.
+
+**Consequence**: A permanent ban from any sort of public interaction within the
+community.
+
+## Attribution
+
+This Code of Conduct is adapted from the [Contributor Covenant][homepage],
+version 2.1, available at
+[https://www.contributor-covenant.org/version/2/1/code_of_conduct.html][v2.1].
+
+Community Impact Guidelines were inspired by
+[Mozilla's code of conduct enforcement ladder][Mozilla CoC].
+
+For answers to common questions about this code of conduct, see the FAQ at
+[https://www.contributor-covenant.org/faq][FAQ]. Translations are available at
+[https://www.contributor-covenant.org/translations][translations].
+
+[homepage]: https://www.contributor-covenant.org
+
+[v2.1]: https://www.contributor-covenant.org/version/2/1/code_of_conduct.html
+
+[Mozilla CoC]: https://github.com/mozilla/diversity
+
+[FAQ]: https://www.contributor-covenant.org/faq
+
+[translations]: https://www.contributor-covenant.org/translations
\ No newline at end of file
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..c38e45c
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1,55 @@
+# Contributing to Michelin Connect Plugins
+
+Welcome to our contribution guide.
+This guide will help you understand the process and expectations for contributing.
+
+## Getting Started
+
+### Issues
+
+If you want to report a bug, request a feature, or suggest an improvement, please open an issue on
+the [GitHub repository](https://github.com/michelin/michelin-connect-plugins/issues)
+and fill out the appropriate template.
+
+If you find an existing issue that matches your problem, please:
+
+- Add your reproduction details to the existing issue instead of creating a duplicate.
+- Use reactions (e.g., 👍) on the issue to signal that it affects more
+ users. [GitHub reactions](https://github.blog/news-insights/product-news/add-reactions-to-pull-requests-issues-and-comments/)
+ help maintainers prioritize issues based on user impact.
+
+If no open issue addresses your problem, please open a new one and include:
+
+- A clear title and detailed description of the issue.
+- Relevant environment details (e.g., version, OS, configurations).
+- A code sample or executable test case demonstrating the expected behavior that is not occurring.
+
+### Pull Requests
+
+To contribute to Michelin Connect Plugins:
+
+- Fork the repository to your own GitHub account
+- Clone the project to your machine
+- Create a branch from the `main` branch
+- Make your changes and commit them to your branch
+- Push your changes to your fork
+- Open a merge request to the `main` branch of the Michelin Connect Plugins repository so that we can review your changes
+
+## Style Guide
+
+### Code Style
+
+We maintain a consistent code style using [Spotless](https://github.com/diffplug/spotless/tree/main/plugin-gradle).
+For Java code, we follow the [Palantir](https://github.com/palantir/palantir-java-format) style.
+
+To check for formatting issues, run:
+
+```bash
+./gradlew spotlessCheck
+```
+
+To automatically fix formatting issues and add missing file headers, run:
+
+```bash
+./gradlew spotlessApply
+```
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index 37b924f..01e66e1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -9,6 +9,8 @@
plugins {
// Apply the java-library plugin for API and implementation separation.
id 'java-library'
+ // Add spotless plugin
+ id("com.diffplug.spotless") version "7.2.1"
}
repositories {
@@ -41,3 +43,30 @@ test {
// Use JUnit Platform for unit tests.
useJUnitPlatform()
}
+
+spotless {
+ java {
+ target "src/main/java/**/*.java", "src/test/java/**/*.java"
+ palantirJavaFormat("2.58.0").style("PALANTIR").formatJavadoc(true)
+ removeUnusedImports()
+ formatAnnotations()
+ importOrder()
+ trimTrailingWhitespace()
+ endWithNewline()
+ licenseHeaderFile {
+ file ".spotless/HEADER"
+ }
+ }
+ groovyGradle {
+ target "*.gradle"
+ importOrder()
+ removeSemicolons()
+ leadingTabsToSpaces()
+ }
+ yaml {
+ target "src/main/resources/*.yml", "src/test/resources/**/*.yml"
+ jackson()
+ .feature('ORDER_MAP_ENTRIES_BY_KEYS', true)
+ .yamlFeature('WRITE_DOC_START_MARKER', false)
+ }
+}
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 69a9715..3ae1e2f 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.1-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/semantic-build-versioning.gradle b/semantic-build-versioning.gradle
index f4dbaac..db62fbc 100644
--- a/semantic-build-versioning.gradle
+++ b/semantic-build-versioning.gradle
@@ -1 +1 @@
-tagPrefix = 'v'
\ No newline at end of file
+tagPrefix = 'v'
diff --git a/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java b/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java
index 0fc6852..03bce3f 100644
--- a/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java
+++ b/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java
@@ -1,14 +1,28 @@
/*
- * This Java source file was generated by the Gradle 'init' task.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package com.michelin.kafka.config.providers;
-import org.apache.kafka.common.config.ConfigData;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.config.provider.ConfigProvider;
-import org.apache.kafka.common.config.types.Password;
-
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.NoSuchAlgorithmException;
+import java.security.spec.InvalidKeySpecException;
+import java.util.*;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
@@ -16,11 +30,11 @@
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.spec.SecretKeySpec;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.security.NoSuchAlgorithmException;
-import java.security.spec.InvalidKeySpecException;
-import java.util.*;
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.config.types.Password;
/**
* AES256 encrypted Kafka config provider.
@@ -33,43 +47,39 @@ public class AES256ConfigProvider implements ConfigProvider {
private static final String AES_KEY_CONFIG = "key";
private static final String SALT_CONFIG = "salt";
- /**
- * The AES encryption algorithm.
- */
+ /** The AES encryption algorithm. */
private static final String ENCRYPT_ALGO = "AES/GCM/NoPadding";
- /**
- * The authentication tag length.
- */
+ /** The authentication tag length. */
private static final int TAG_LENGTH_BIT = 128;
- /**
- * The Initial Value length.
- */
+ /** The Initial Value length. */
private static final int IV_LENGTH_BYTE = 12;
- /**
- * The NS4KAFKA prefix.
- */
+ /** The NS4KAFKA prefix. */
private static final String NS4KAFKA_PREFIX = "NS4K";
- /**
- * Definition of accepted parameters: key and salt.
- */
+ /** Definition of accepted parameters: key and salt. */
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(AES_KEY_CONFIG, ConfigDef.Type.PASSWORD, ConfigDef.NO_DEFAULT_VALUE,
- new ConfigDef.NonNullValidator(), ConfigDef.Importance.HIGH, "The AES256 key.")
- .define(SALT_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
- new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH, "The AES256 salt.");
-
- /**
- * Represents the aes256 key
- */
+ .define(
+ AES_KEY_CONFIG,
+ ConfigDef.Type.PASSWORD,
+ ConfigDef.NO_DEFAULT_VALUE,
+ new ConfigDef.NonNullValidator(),
+ ConfigDef.Importance.HIGH,
+ "The AES256 key.")
+ .define(
+ SALT_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ new ConfigDef.NonEmptyString(),
+ ConfigDef.Importance.HIGH,
+ "The AES256 salt.");
+
+ /** Represents the aes256 key */
private Password aesKey;
- /**
- * Represents the aes256 salt
- */
+ /** Represents the aes256 salt */
private String salt;
@Override
@@ -107,8 +117,8 @@ public void close() {
* Decrypt text with the right algorithm.
*
* @param encryptedText The text to decrypt.
- * @param aesKey The encryption key.
- * @param salt The encryption salt.
+ * @param aesKey The encryption key.
+ * @param salt The encryption salt.
* @return The encrypted password.
*/
public static String decrypt(final String encryptedText, final Password aesKey, final String salt) {
@@ -133,19 +143,21 @@ public static String decrypt(final String encryptedText, final Password aesKey,
* Decrypt text with the given key and salt encoded by the aes256 api.
*
* @param encryptedText The text to decrypt.
- * @param aesKey The encryption key.
- * @param salt The encryption salt.
+ * @param aesKey The encryption key.
+ * @param salt The encryption salt.
* @return The encrypted password.
*/
- public static String decryptAESFromPreviousAPI(final String encryptedText, final Password aesKey, final String salt) {
+ public static String decryptAESFromPreviousAPI(
+ final String encryptedText, final Password aesKey, final String salt) {
if (encryptedText == null || encryptedText.isEmpty()) {
return encryptedText;
}
try {
- final var ivspec = new IvParameterSpec(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
+ final var ivspec = new IvParameterSpec(new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
final var factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
- final var spec = new PBEKeySpec(aesKey.value().toCharArray(), salt.getBytes(StandardCharsets.UTF_8), 65536, 256);
+ final var spec =
+ new PBEKeySpec(aesKey.value().toCharArray(), salt.getBytes(StandardCharsets.UTF_8), 65536, 256);
final var tmp = factory.generateSecret(spec);
final var secretKey = new SecretKeySpec(tmp.getEncoded(), "AES");
final var cipher = Cipher.getInstance("AES/CBC/PKCS5PADDING");
@@ -161,8 +173,8 @@ public static String decryptAESFromPreviousAPI(final String encryptedText, final
* Decrypt text with the given key and salt encoded by NS4Kafka.
*
* @param encryptedText The text to decrypt.
- * @param aesKey The encryption key.
- * @param salt The encryption salt.
+ * @param aesKey The encryption key.
+ * @param salt The encryption salt.
* @return The encrypted password.
*/
public static String decryptAESFromNs4Kafka(final String encryptedText, final Password aesKey, final String salt) {
@@ -190,15 +202,14 @@ public static String decryptAESFromNs4Kafka(final String encryptedText, final Pa
}
}
-
/**
* Gets the secret key derived AES 256 bits key
*
- * @param key The encryption key
+ * @param key The encryption key
* @param salt The encryption salt
* @return The encryption secret key.
* @throws NoSuchAlgorithmException No such algorithm exception.
- * @throws InvalidKeySpecException Invalid key spec exception.
+ * @throws InvalidKeySpecException Invalid key spec exception.
*/
private static SecretKey getAESSecretKey(final String key, final String salt)
throws NoSuchAlgorithmException, InvalidKeySpecException {
diff --git a/src/main/java/com/michelin/kafka/connect/transforms/ExpandJsonHeaders.java b/src/main/java/com/michelin/kafka/connect/transforms/ExpandJsonHeaders.java
index 290c90c..93addca 100644
--- a/src/main/java/com/michelin/kafka/connect/transforms/ExpandJsonHeaders.java
+++ b/src/main/java/com/michelin/kafka/connect/transforms/ExpandJsonHeaders.java
@@ -1,5 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package com.michelin.kafka.connect.transforms;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
@@ -10,24 +31,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.util.Iterator;
-import java.util.Map;
-
/**
- *
Kafka Connect Single Message Transform (SMT) that takes an existing JSON header
- * and expands each key-value pair into separate individual Kafka message headers.
- * The original JSON header is removed after expansion.
+ * Kafka Connect Single Message Transform (SMT) that takes an existing JSON header and expands each key-value pair into
+ * separate individual Kafka message headers. The original JSON header is removed after expansion.
*
- * This transform is useful when you have JSON content in a header that you want
- * to split into multiple headers for better message routing and filtering.
+ * This transform is useful when you have JSON content in a header that you want to split into multiple headers for
+ * better message routing and filtering.
*
*
Configuration:
+ *
*
- * - header.field: Header name containing the JSON map (default: "headers")
- *
+ * header.field: Header name containing the JSON map (default: "headers")
+ *
*/
public class ExpandJsonHeaders> implements Transformation {
@@ -37,8 +52,12 @@ public class ExpandJsonHeaders> implements Transforma
private static final String HEADER_FIELD_DEFAULT = "headers";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(HEADER_FIELD_CONFIG, ConfigDef.Type.STRING, HEADER_FIELD_DEFAULT,
- ConfigDef.Importance.HIGH, "Header name containing the JSON map");
+ .define(
+ HEADER_FIELD_CONFIG,
+ ConfigDef.Type.STRING,
+ HEADER_FIELD_DEFAULT,
+ ConfigDef.Importance.HIGH,
+ "Header name containing the JSON map");
private String headerField;
private ObjectMapper objectMapper;
@@ -53,11 +72,10 @@ public void configure(Map props) {
}
/**
- * Applies the transformation to expand a JSON header into individual headers.
- * Extracts key-value pairs from the specified JSON header field, adds them as
- * separate headers, and removes the original JSON header. If the header is
- * missing or invalid, the original record is returned unchanged.
-
+ * Applies the transformation to expand a JSON header into individual headers. Extracts key-value pairs from the
+ * specified JSON header field, adds them as separate headers, and removes the original JSON header. If the header
+ * is missing or invalid, the original record is returned unchanged.
+ *
* @param currentRecord the Kafka Connect record to transform
* @return the transformed record with expanded headers
*/
@@ -98,15 +116,14 @@ public R apply(R currentRecord) {
}
return currentRecord.newRecord(
- currentRecord.topic(),
- currentRecord.kafkaPartition(),
- currentRecord.keySchema(),
- currentRecord.key(),
- currentRecord.valueSchema(),
- currentRecord.value(),
- currentRecord.timestamp(),
- headers
- );
+ currentRecord.topic(),
+ currentRecord.kafkaPartition(),
+ currentRecord.keySchema(),
+ currentRecord.key(),
+ currentRecord.valueSchema(),
+ currentRecord.value(),
+ currentRecord.timestamp(),
+ headers);
}
@Override
diff --git a/src/main/java/com/michelin/kafka/connect/transforms/TimestampMicrosConverter.java b/src/main/java/com/michelin/kafka/connect/transforms/TimestampMicrosConverter.java
index b8518bf..19070a2 100644
--- a/src/main/java/com/michelin/kafka/connect/transforms/TimestampMicrosConverter.java
+++ b/src/main/java/com/michelin/kafka/connect/transforms/TimestampMicrosConverter.java
@@ -1,11 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package com.michelin.kafka.connect.transforms;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.errors.ConnectException;
@@ -14,20 +41,10 @@
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.Date;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
-
/**
- * Fork from https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
- * to support timestamps microseconds by default.
+ * Fork from in
+ * progress Kafka TimestampConverter to support timestamps microseconds by default.
*
* @param Type of he record.
* @author Michelin
@@ -35,9 +52,10 @@
public abstract class TimestampMicrosConverter> implements Transformation {
public static final String OVERVIEW_DOC =
"Convert timestamps between different formats such as Unix epoch, strings, and Connect Date/Timestamp types."
- + "Applies to individual fields or to the entire value."
- + "Use the concrete transformation type designed for the record key (" + TimestampMicrosConverter.Key.class.getName() + ") "
- + "or value (" + TimestampMicrosConverter.Value.class.getName() + ").";
+ + "Applies to individual fields or to the entire value."
+ + "Use the concrete transformation type designed for the record key ("
+ + TimestampMicrosConverter.Key.class.getName() + ") "
+ + "or value (" + TimestampMicrosConverter.Value.class.getName() + ").";
public static final String FIELD_CONFIG = "field";
private static final String FIELD_DEFAULT = "";
@@ -62,50 +80,59 @@ public abstract class TimestampMicrosConverter> imple
private static final String UNIX_PRECISION_NANOS = "nanoseconds";
private static final String UNIX_PRECISION_SECONDS = "seconds";
- private static final Set VALID_TYPES = new HashSet<>(Arrays.asList(TYPE_STRING, TYPE_UNIX, TYPE_DATE, TYPE_TIME, TYPE_TIMESTAMP));
+ private static final Set VALID_TYPES =
+ new HashSet<>(Arrays.asList(TYPE_STRING, TYPE_UNIX, TYPE_DATE, TYPE_TIME, TYPE_TIMESTAMP));
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
- public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema();
- public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema();
+ public static final Schema OPTIONAL_DATE_SCHEMA =
+ org.apache.kafka.connect.data.Date.builder().optional().schema();
+ public static final Schema OPTIONAL_TIMESTAMP_SCHEMA =
+ Timestamp.builder().optional().schema();
public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema();
- /**
- * Definition of accepted parameters: field, target.type, format and unix.precision.
- */
+ /** Definition of accepted parameters: field, target.type, format and unix.precision. */
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(FIELD_CONFIG, ConfigDef.Type.STRING, FIELD_DEFAULT, ConfigDef.Importance.HIGH,
+ .define(
+ FIELD_CONFIG,
+ ConfigDef.Type.STRING,
+ FIELD_DEFAULT,
+ ConfigDef.Importance.HIGH,
"The field containing the timestamp, or empty if the entire value is a timestamp")
- .define(TARGET_TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
+ .define(
+ TARGET_TYPE_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.ValidString.in(TYPE_STRING, TYPE_UNIX, TYPE_DATE, TYPE_TIME, TYPE_TIMESTAMP),
ConfigDef.Importance.HIGH,
"The desired timestamp representation: string, unix, Date, Time, or Timestamp")
- .define(FORMAT_CONFIG, ConfigDef.Type.STRING, FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM,
- "A SimpleDateFormat-compatible format for the timestamp. " +
- "Used to generate the output when type=string or used to parse the input if the input is a string.")
- .define(UNIX_PRECISION_CONFIG, ConfigDef.Type.STRING, UNIX_PRECISION_DEFAULT,
+ .define(
+ FORMAT_CONFIG,
+ ConfigDef.Type.STRING,
+ FORMAT_DEFAULT,
+ ConfigDef.Importance.MEDIUM,
+ "A SimpleDateFormat-compatible format for the timestamp. "
+ + "Used to generate the output when type=string or used to parse the input if the input is a string.")
+ .define(
+ UNIX_PRECISION_CONFIG,
+ ConfigDef.Type.STRING,
+ UNIX_PRECISION_DEFAULT,
ConfigDef.ValidString.in(
UNIX_PRECISION_NANOS, UNIX_PRECISION_MICROS,
UNIX_PRECISION_MILLIS, UNIX_PRECISION_SECONDS),
ConfigDef.Importance.LOW,
- "The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. " +
- "Used to generate the output when type=unix or used to parse the input if the input is a Long." +
- "Note: This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components.");
+ "The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. "
+ + "Used to generate the output when type=unix or used to parse the input if the input is a Long."
+ + "Note: This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components.");
private interface TimestampTranslator {
- /**
- * Convert from the type-specific format to the universal java.util.Date format
- */
+ /** Convert from the type-specific format to the universal java.util.Date format */
Date toRaw(final Config pConfig, final Object pOrig);
- /**
- * Get the schema for this format.
- */
+ /** Get the schema for this format. */
Schema typeSchema(final boolean pIsOptional);
- /**
- * Convert from the universal java.util.Date format to the type-specific format
- */
+ /** Convert from the universal java.util.Date format to the type-specific format */
Object toType(final Config pConfig, final Date pOrig);
}
@@ -121,8 +148,10 @@ public Date toRaw(final Config pConfig, final Object pOrig) {
try {
return pConfig.format.parse((String) pOrig);
} catch (final ParseException e) {
- throw new DataException("Could not parse timestamp: value (" + pOrig + ") does not match pattern ("
- + pConfig.format.toPattern() + ")", e);
+ throw new DataException(
+ "Could not parse timestamp: value (" + pOrig + ") does not match pattern ("
+ + pConfig.format.toPattern() + ")",
+ e);
}
}
@@ -150,9 +179,11 @@ public Date toRaw(final Config pConfig, final Object pOrig) {
case UNIX_PRECISION_SECONDS:
return TimestampMicros.toLogical(TimestampMicros.SCHEMA, TimeUnit.SECONDS.toMillis(unixTime));
case UNIX_PRECISION_MICROS:
- return TimestampMicros.toLogical(TimestampMicros.SCHEMA, TimeUnit.MICROSECONDS.toMillis(unixTime));
+ return TimestampMicros.toLogical(
+ TimestampMicros.SCHEMA, TimeUnit.MICROSECONDS.toMillis(unixTime));
case UNIX_PRECISION_NANOS:
- return TimestampMicros.toLogical(TimestampMicros.SCHEMA, TimeUnit.NANOSECONDS.toMillis(unixTime));
+ return TimestampMicros.toLogical(
+ TimestampMicros.SCHEMA, TimeUnit.NANOSECONDS.toMillis(unixTime));
case UNIX_PRECISION_MILLIS:
default:
return TimestampMicros.toLogical(TimestampMicros.SCHEMA, unixTime);
@@ -257,7 +288,8 @@ private static final class Config {
private final SimpleDateFormat format;
private final String unixPrecision;
- private Config(final String pField, final String pType, final SimpleDateFormat pFormat, final String pUnixPrecision) {
+ private Config(
+ final String pField, final String pType, final SimpleDateFormat pFormat, final String pUnixPrecision) {
field = pField;
type = pType;
format = pFormat;
@@ -279,10 +311,11 @@ public void configure(final Map pConfigs) {
if (!VALID_TYPES.contains(type)) {
throw new ConfigException("Unknown timestamp type in TimestampConverter: " + type + ". Valid values are "
- + String.join(", ", VALID_TYPES) + ".");
+ + String.join(", ", VALID_TYPES) + ".");
}
if (type.equals(TYPE_STRING) && formatPattern.trim().isEmpty()) {
- throw new ConfigException("TimestampConverter requires format option to be specified when using string timestamps");
+ throw new ConfigException(
+ "TimestampConverter requires format option to be specified when using string timestamps");
}
SimpleDateFormat format = null;
if (formatPattern != null && !formatPattern.trim().isEmpty()) {
@@ -290,8 +323,10 @@ public void configure(final Map pConfigs) {
format = new SimpleDateFormat(formatPattern);
format.setTimeZone(UTC);
} catch (IllegalArgumentException e) {
- throw new ConfigException("TimestampConverter requires a SimpleDateFormat-compatible pattern for string timestamps: "
- + formatPattern, e);
+ throw new ConfigException(
+ "TimestampConverter requires a SimpleDateFormat-compatible pattern for string timestamps: "
+ + formatPattern,
+ e);
}
}
config = new Config(field, type, format, unixPrecision);
@@ -333,7 +368,14 @@ protected Object operatingValue(final R pRecord) {
@Override
protected R newRecord(final R pRecord, final Schema pUpdatedSchema, final Object pUpdatedValue) {
- return pRecord.newRecord(pRecord.topic(), pRecord.kafkaPartition(), pUpdatedSchema, pUpdatedValue, pRecord.valueSchema(), pRecord.value(), pRecord.timestamp());
+ return pRecord.newRecord(
+ pRecord.topic(),
+ pRecord.kafkaPartition(),
+ pUpdatedSchema,
+ pUpdatedValue,
+ pRecord.valueSchema(),
+ pRecord.value(),
+ pRecord.timestamp());
}
}
@@ -350,7 +392,14 @@ protected Object operatingValue(final R pRecord) {
@Override
protected R newRecord(final R pRecord, final Schema pUpdatedSchema, final Object pUpdatedValue) {
- return pRecord.newRecord(pRecord.topic(), pRecord.kafkaPartition(), pRecord.keySchema(), pRecord.key(), pUpdatedSchema, pUpdatedValue, pRecord.timestamp());
+ return pRecord.newRecord(
+ pRecord.topic(),
+ pRecord.kafkaPartition(),
+ pRecord.keySchema(),
+ pRecord.key(),
+ pUpdatedSchema,
+ pUpdatedValue,
+ pRecord.timestamp());
}
}
@@ -375,7 +424,11 @@ private R applyWithSchema(final R pRecord) {
final var builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (final Field field : schema.fields()) {
if (field.name().equals(config.field)) {
- builder.field(field.name(), TRANSLATORS.get(config.type).typeSchema(field.schema().isOptional()));
+ builder.field(
+ field.name(),
+ TRANSLATORS
+ .get(config.type)
+ .typeSchema(field.schema().isOptional()));
} else {
builder.field(field.name(), field.schema());
}
@@ -426,9 +479,7 @@ private R applySchemaless(final R pRecord) {
return newRecord(pRecord, null, updatedValue);
}
- /**
- * Determine the type/format of the timestamp based on the schema
- */
+ /** Determine the type/format of the timestamp based on the schema */
private static String timestampTypeFromSchema(final Schema pSchema) {
if (Timestamp.LOGICAL_NAME.equals(pSchema.name())) {
return TYPE_TIMESTAMP;
@@ -446,9 +497,7 @@ private static String timestampTypeFromSchema(final Schema pSchema) {
throw new ConnectException("Schema " + pSchema + " does not correspond to a known timestamp type format");
}
- /**
- * Infer the type/format of the timestamp based on the raw Java type
- */
+ /** Infer the type/format of the timestamp based on the raw Java type */
private static String inferTimestampType(final Object pTimestamp) {
// Note that we can't infer all types, e.g. Date/Time/Timestamp all have the same runtime representation as a
// java.util.Date
@@ -459,13 +508,14 @@ private static String inferTimestampType(final Object pTimestamp) {
} else if (pTimestamp instanceof String) {
return TYPE_STRING;
}
- throw new DataException("TimestampConverter does not support " + pTimestamp.getClass() + " objects as timestamps");
+ throw new DataException(
+ "TimestampConverter does not support " + pTimestamp.getClass() + " objects as timestamps");
}
/**
* Convert the given timestamp to the target timestamp format.
*
- * @param pTimestamp the input timestamp, may be null
+ * @param pTimestamp the input timestamp, may be null
* @param pTimestampFormat the format of the timestamp, or null if the format should be inferred
* @return the converted timestamp
*/
@@ -505,14 +555,16 @@ public static SchemaBuilder builder() {
public static long fromLogical(final Schema pSchema, final java.util.Date pValue) {
if (!LOGICAL_NAME.equals(pSchema.name())) {
- throw new DataException("Requested conversion of TimestampMicros object but the schema does not match.");
+ throw new DataException(
+ "Requested conversion of TimestampMicros object but the schema does not match.");
}
return ChronoUnit.MILLIS.between(Instant.EPOCH, pValue.toInstant());
}
public static java.util.Date toLogical(final Schema pSchema, final long pValue) {
if (!LOGICAL_NAME.equals(pSchema.name())) {
- throw new DataException("Requested conversion of TimestampMicros object but the schema does not match.");
+ throw new DataException(
+ "Requested conversion of TimestampMicros object but the schema does not match.");
}
return Date.from(Instant.EPOCH.plus(pValue, ChronoUnit.MICROS));
}
diff --git a/src/main/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatches.java b/src/main/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatches.java
index 3e2da83..8e57e2b 100644
--- a/src/main/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatches.java
+++ b/src/main/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatches.java
@@ -1,15 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package com.michelin.kafka.connect.transforms.predicates;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.regex.Pattern;
-
/**
* Kafka Connect custom header value filter.
*
@@ -17,23 +34,35 @@
* @author Michelin
*/
public class HeaderValueMatches> implements Predicate {
- public static final String OVERVIEW_DOC = "A predicate which is true for records with a header's value that matches the configured regular expression.";
+ public static final String OVERVIEW_DOC =
+ "A predicate which is true for records with a header's value that matches the configured regular expression.";
private static final String HEADER_CONFIG = "header.name";
private static final String PATTERN_CONFIG = "pattern";
private static final String MISSING_HEADER_CONFIG = "missing.header.behavior";
- /**
- * Definition of accepted parameters: header.name, pattern and missing.header.behavior.
- */
+ /** Definition of accepted parameters: header.name, pattern and missing.header.behavior. */
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(HEADER_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
- new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "The header name.")
- .define(PATTERN_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
+ .define(
+ HEADER_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ new ConfigDef.NonEmptyString(),
+ ConfigDef.Importance.MEDIUM,
+ "The header name.")
+ .define(
+ PATTERN_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.CompositeValidator.of(new ConfigDef.NonEmptyString(), new RegexValidator()),
- ConfigDef.Importance.MEDIUM, "A Java regular expression for matching against the value of a record's header.")
- .define(MISSING_HEADER_CONFIG, ConfigDef.Type.BOOLEAN, "false",
- ConfigDef.Importance.LOW, "Predicate behavior when header is missing [true/false]. Default to false");
+ ConfigDef.Importance.MEDIUM,
+ "A Java regular expression for matching against the value of a record's header.")
+ .define(
+ MISSING_HEADER_CONFIG,
+ ConfigDef.Type.BOOLEAN,
+ "false",
+ ConfigDef.Importance.LOW,
+ "Predicate behavior when header is missing [true/false]. Default to false");
private String headerName;
private Pattern pattern;
@@ -57,7 +86,6 @@ public void configure(final Map pConfigs) {
this.missingHeaderBehavior = config.getBoolean(MISSING_HEADER_CONFIG);
}
-
@Override
public boolean test(final R pRecord) {
final var headerIterator = pRecord.headers().allWithName(headerName);
@@ -94,9 +122,9 @@ public void close() {
@Override
public String toString() {
- return "HasHeader{" +
- "headerName='" + headerName + "'," +
- "pattern='" + pattern + "'," +
- "missingHeaderBehavior='" + missingHeaderBehavior + "'}";
+ return "HasHeader{" + "headerName='"
+ + headerName + "'," + "pattern='"
+ + pattern + "'," + "missingHeaderBehavior='"
+ + missingHeaderBehavior + "'}";
}
}
diff --git a/src/test/java/com/michelin/kafka/config/providers/AES256ConfigProviderTest.java b/src/test/java/com/michelin/kafka/config/providers/AES256ConfigProviderTest.java
index a3fb5cd..b3069d1 100644
--- a/src/test/java/com/michelin/kafka/config/providers/AES256ConfigProviderTest.java
+++ b/src/test/java/com/michelin/kafka/config/providers/AES256ConfigProviderTest.java
@@ -1,16 +1,30 @@
/*
- * This Java source file was generated by the Gradle 'init' task.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package com.michelin.kafka.config.providers;
-import org.apache.kafka.common.config.ConfigException;
-import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.HashMap;
import java.util.HashSet;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
class AES256ConfigProviderTest {
@Test
@@ -51,13 +65,15 @@ void DecryptSuccess() {
configs.put("salt", "salt-aaaabbbbccccdddd");
configProvider.configure(configs);
- // String encoded = AES256Helper.encrypt("aaaabbbbccccdddd",AES256ConfigProvider.DEFAULT_SALT, originalPassword);
+ // String encoded = AES256Helper.encrypt("aaaabbbbccccdddd",AES256ConfigProvider.DEFAULT_SALT,
+ // originalPassword);
// System.out.println(encoded);
final var rightKeys = new HashSet();
rightKeys.add(encodedPassword);
- assertEquals(originalPassword, configProvider.get("", rightKeys).data().get(encodedPassword));
+ assertEquals(
+ originalPassword, configProvider.get("", rightKeys).data().get(encodedPassword));
}
}
@@ -73,7 +89,8 @@ void DecryptSuccessBothAlgorithm() {
configs.put("salt", "salt-aaaabbbbccccdddd");
configProvider.configure(configs);
- // String encoded = AES256Helper.encrypt("aaaabbbbccccdddd",AES256ConfigProvider.DEFAULT_SALT, originalPassword);
+ // String encoded = AES256Helper.encrypt("aaaabbbbccccdddd",AES256ConfigProvider.DEFAULT_SALT,
+ // originalPassword);
// System.out.println(encoded);
final var rightKeys = new HashSet();
diff --git a/src/test/java/com/michelin/kafka/connect/transforms/ExpandJsonHeadersTest.java b/src/test/java/com/michelin/kafka/connect/transforms/ExpandJsonHeadersTest.java
index 7e221f0..b9c1656 100644
--- a/src/test/java/com/michelin/kafka/connect/transforms/ExpandJsonHeadersTest.java
+++ b/src/test/java/com/michelin/kafka/connect/transforms/ExpandJsonHeadersTest.java
@@ -1,5 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package com.michelin.kafka.connect.transforms;
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.util.HashMap;
+import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
@@ -8,11 +30,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.*;
-
class ExpandJsonHeadersTest {
private ExpandJsonHeaders transform;
@@ -51,8 +68,7 @@ void testExpandJsonHeaderWithMultipleFields() {
headers.addString("headers", "{\"userId\":\"user123\",\"requestId\":\"req456\",\"source\":\"web\"}");
SourceRecord record = new SourceRecord(
- null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers
- );
+ null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers);
// Apply transform
SourceRecord transformed = transform.apply(record);
@@ -81,8 +97,7 @@ void testExpandJsonHeaderWithCustomFieldName() {
headers.addString("metadata", "{\"version\":\"1.0\",\"type\":\"event\"}");
SourceRecord record = new SourceRecord(
- null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers
- );
+ null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers);
// Apply transform
SourceRecord transformed = transform.apply(record);
@@ -109,8 +124,7 @@ void testExpandEmptyJsonHeader() {
headers.addString("headers", "{}");
SourceRecord record = new SourceRecord(
- null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers
- );
+ null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers);
// Apply transform
SourceRecord transformed = transform.apply(record);
@@ -132,8 +146,7 @@ void testMissingHeaderField() {
headers.addString("other_header", "some_value");
SourceRecord record = new SourceRecord(
- null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers
- );
+ null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers);
// Apply transform
SourceRecord transformed = transform.apply(record);
@@ -154,8 +167,7 @@ void testInvalidJsonHeader() {
headers.addString("headers", "invalid json {");
SourceRecord record = new SourceRecord(
- null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers
- );
+ null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers);
// Apply transform
SourceRecord transformed = transform.apply(record);
@@ -178,8 +190,7 @@ void testNonObjectJsonHeader() {
headers.addString("headers", "[\"item1\", \"item2\"]");
SourceRecord record = new SourceRecord(
- null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers
- );
+ null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers);
// Apply transform
SourceRecord transformed = transform.apply(record);
@@ -199,11 +210,11 @@ void testComplexJsonValues() {
// Create record with complex JSON values (nested objects, arrays, etc.)
Headers headers = new ConnectHeaders();
- headers.addString("headers", "{\"simple\":\"value\",\"number\":42,\"boolean\":true,\"nested\":{\"key\":\"value\"}}");
+ headers.addString(
+ "headers", "{\"simple\":\"value\",\"number\":42,\"boolean\":true,\"nested\":{\"key\":\"value\"}}");
SourceRecord record = new SourceRecord(
- null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers
- );
+ null, null, "test-topic", null, null, null, Schema.STRING_SCHEMA, "test-value", null, headers);
// Apply transform
SourceRecord transformed = transform.apply(record);
diff --git a/src/test/java/com/michelin/kafka/connect/transforms/TimestampConverterTest.java b/src/test/java/com/michelin/kafka/connect/transforms/TimestampConverterTest.java
index 4852eb9..b004adb 100644
--- a/src/test/java/com/michelin/kafka/connect/transforms/TimestampConverterTest.java
+++ b/src/test/java/com/michelin/kafka/connect/transforms/TimestampConverterTest.java
@@ -1,11 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package com.michelin.kafka.connect.transforms;
-import org.apache.kafka.connect.data.SchemaBuilder;
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
@@ -13,6 +25,11 @@
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
class TimestampConverterTest {
private final TimestampMicrosConverter.Value smt = new TimestampMicrosConverter.Value<>();
@@ -27,7 +44,9 @@ void shouldConvertTimestampMicroSecondsWithSchema() {
final var schemaWithTimestampMicros = SchemaBuilder.struct()
.field("id", SchemaBuilder.int8())
- .field("timestamp", SchemaBuilder.int64().parameter("type", "long").parameter("logicalType", "timestamp-micros"))
+ .field(
+ "timestamp",
+ SchemaBuilder.int64().parameter("type", "long").parameter("logicalType", "timestamp-micros"))
.optional()
.build();
@@ -37,11 +56,11 @@ void shouldConvertTimestampMicroSecondsWithSchema() {
.put("id", (byte) 1)
.put("timestamp", ChronoUnit.MICROS.between(Instant.EPOCH, now));
- final var record = new SourceRecord(
- null, null, "test", 0, schemaWithTimestampMicros, recordWithTimestampMicros
- );
+ final var record =
+ new SourceRecord(null, null, "test", 0, schemaWithTimestampMicros, recordWithTimestampMicros);
final var transformedRecord = smt.apply(record);
- Assertions.assertEquals(ChronoUnit.MILLIS.between(Instant.EPOCH, now), ((Struct) transformedRecord.value()).get("timestamp"));
+ Assertions.assertEquals(
+ ChronoUnit.MILLIS.between(Instant.EPOCH, now), ((Struct) transformedRecord.value()).get("timestamp"));
}
@Test
@@ -56,7 +75,9 @@ void shouldConvertTimestampToStringMicroSecondsWithSchema() {
final var schemaWithTimestampMicros = SchemaBuilder.struct()
.field("id", SchemaBuilder.int8())
- .field("timestamp", SchemaBuilder.int64().parameter("type", "long").parameter("logicalType", "timestamp-micros"))
+ .field(
+ "timestamp",
+ SchemaBuilder.int64().parameter("type", "long").parameter("logicalType", "timestamp-micros"))
.optional()
.build();
@@ -68,11 +89,11 @@ void shouldConvertTimestampToStringMicroSecondsWithSchema() {
.put("id", (byte) 1)
.put("timestamp", ChronoUnit.MICROS.between(Instant.EPOCH, now));
- final var record = new SourceRecord(
- null, null, "test", 0, schemaWithTimestampMicros, recordWithTimestampMicros
- );
+ final var record =
+ new SourceRecord(null, null, "test", 0, schemaWithTimestampMicros, recordWithTimestampMicros);
final var transformedRecord = smt.apply(record);
- Assertions.assertEquals(formatter.format(Date.from(now)), ((Struct) transformedRecord.value()).get("timestamp"));
+ Assertions.assertEquals(
+ formatter.format(Date.from(now)), ((Struct) transformedRecord.value()).get("timestamp"));
}
@Test
@@ -89,10 +110,9 @@ void shouldConvertTimestampMicroSecondsWhenNoSchema() {
dataWithoutSchema.put("id", (byte) 1);
dataWithoutSchema.put("timestamp", ChronoUnit.MICROS.between(Instant.EPOCH, now));
- final var record = new SourceRecord(
- null, null, "test", 0, null, dataWithoutSchema
- );
+ final var record = new SourceRecord(null, null, "test", 0, null, dataWithoutSchema);
final var transformedRecord = smt.apply(record);
- Assertions.assertEquals(ChronoUnit.MILLIS.between(Instant.EPOCH, now), ((Map) transformedRecord.value()).get("timestamp"));
+ Assertions.assertEquals(
+ ChronoUnit.MILLIS.between(Instant.EPOCH, now), ((Map) transformedRecord.value()).get("timestamp"));
}
}
diff --git a/src/test/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatchesTest.java b/src/test/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatchesTest.java
index 6d0c277..4524e1b 100644
--- a/src/test/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatchesTest.java
+++ b/src/test/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatchesTest.java
@@ -1,14 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package com.michelin.kafka.connect.transforms.predicates;
-import org.apache.kafka.connect.header.ConnectHeaders;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Test;
class HeaderValueMatchesTest {
private final HeaderValueMatches predicate = new HeaderValueMatches<>();
@@ -26,8 +43,7 @@ void ValueMatches() {
headers.add("my-header", "fixed", null);
headers.add("unrelated-header", null, null);
- final var record = new SourceRecord(null, null, "test", 0,
- null, null, null, null, 0L, headers);
+ final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers);
assertTrue(predicate.test(record));
}
@@ -45,8 +61,7 @@ void ValueRegexMatches() {
headers.add("license-plate", "CG-768-AP", null);
headers.add("unrelated-header", null, null);
- final var record = new SourceRecord(null, null, "test", 0,
- null, null, null, null, 0L, headers);
+ final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers);
assertTrue(predicate.test(record));
}
@@ -64,8 +79,7 @@ void ByteArrayValueRegexMatches() {
headers.add("license-plate", "CG-768-AP".getBytes(StandardCharsets.UTF_8), null);
headers.add("unrelated-header", null, null);
- final var record = new SourceRecord(null, null, "test", 0,
- null, null, null, null, 0L, headers);
+ final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers);
assertTrue(predicate.test(record));
}
@@ -82,8 +96,7 @@ void valueNull() {
headers.add("my-header", null, null);
headers.add("unrelated-header", null, null);
- final var record = new SourceRecord(null, null, "test", 0,
- null, null, null, null, 0L, headers);
+ final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers);
assertFalse(predicate.test(record));
}
@@ -101,8 +114,7 @@ void ValueNotMatching() {
headers.add("my-header", "OTHER", null);
headers.add("unrelated-header", null, null);
- final var record = new SourceRecord(null, null, "test", 0,
- null, null, null, null, 0L, headers);
+ final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers);
assertFalse(predicate.test(record));
}
@@ -120,8 +132,7 @@ void MissingHeaderDefaultBehavior() {
headers.add("other-header", "OTHER", null);
headers.add("unrelated-header", null, null);
- final var record = new SourceRecord(null, null, "test", 0,
- null, null, null, null, 0L, headers);
+ final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers);
assertFalse(predicate.test(record));
}
@@ -140,8 +151,7 @@ void MissingHeaderOverriddenBehavior() {
headers.add("other-header", "OTHER", null);
headers.add("unrelated-header", null, null);
- final var record = new SourceRecord(null, null, "test", 0,
- null, null, null, null, 0L, headers);
+ final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers);
assertTrue(predicate.test(record));
}
@@ -162,8 +172,7 @@ void MultipleHeadersWithMatchingValue() {
headers.add("my-header", "fixed", null);
headers.add("unrelated-header", null, null);
- final var record = new SourceRecord(null, null, "test", 0,
- null, null, null, null, 0L, headers);
+ final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers);
assertTrue(predicate.test(record));
}