Skip to content

[Feature][Connector-V2] Add Apache InLong Connector Support #9250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from

Conversation

luchunliang
Copy link

@luchunliang luchunliang commented Apr 29, 2025

Description

Background​​
Apache InLong is a one-stop data integration framework that supports efficient streaming/batch data collection, aggregation, and distribution from diverse sources.

Apache InLong primarily focuses on ultra-large-scale data integration but lacks support for diverse data sources. In contrast, SeaTunnel provides extensive data source compatibility. By adding an ​​InLong Sink Connector​​ to SeaTunnel, users can reuse SeaTunnel's rich data sources to feed data into InLong, bridging the gap between versatile data ingestion and large-scale integration capabilities, and gain seamless integration with the InLong ecosystem, enhancing real-time data pipeline capabilities for scenarios like log/metrics synchronization and multi-system data distribution.

​​Proposal​​
Implement an Apache InLong Connector for SeaTunnel with the following features:

  • ​​InLong Sink Connector​​: Write processed data to InLong clusters for downstream consumption or governance.
  • Support core InLong configurations (e.g., groupId, streamId, authentication).

​​Module Structure​​

  • Create a new module: seatunnel-connectors/connector-inlong, referencing existing message queue connectors (e.g., Pulsar).
  • Leverage the InLong Java SDK (e.g., inlong-sdk-java) or direct API calls.

Usage Scenario

Distributing SeaTunnel-processed results through InLong to downstream systems (e.g., real-time dashboards, risk engines).
Suggested Solution

Purpose of this pull request

Add Apache InLong Connector Support

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for Apache InLong as a new sink connector in SeaTunnel, enabling users to write processed data into InLong clusters. Key changes include the implementation of core sink writer logic with retry and resend mechanisms, definition of state and commit objects, and integration with InLong’s Java SDK.

  • Added a new module for the InLong sink connector with complete sink writer, committer, and factory implementations.
  • Introduced utility methods and thread management for sending data with retry logic.
  • Provided unit tests and configuration properties for InLong connector functionality.

Reviewed Changes

Copilot reviewed 15 out of 17 changed files in this pull request and generated no comments.

Show a summary per file
File Description
seatunnel-connectors-v2/connector-inlong/src/test/java/org/apache/seatunnel/connectors/seatunnel/inlong/InlongFactoryTest.java Basic unit tests for the InLongSinkFactory.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/util/InlongUtil.java Utility class for sleep methods with logging.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/state/*.java New state and commit info classes required for sink state management.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/SenderMessage.java Defined the message container for batch sending.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongThreadFactory.java New thread factory implementation for the connector’s internal threads.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java Core sink writer implementation featuring message batching, retry, and resend logic.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkFactory.java Factory to create the InLong sink and define required options.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkCommitter.java Minimal committer implementation returning empty commit info.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSink.java Main sink implementation integrating writer, state serializer, and committer.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/exception/*.java New exception and error code classes for error handling in the connector.
seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/config/*.java Configuration property definitions and semantics enums for the InLong sink connector.
Files not reviewed (2)
  • plugin-mapping.properties: Language not supported
  • seatunnel-connectors-v2/connector-inlong/pom.xml: Language not supported
Comments suppressed due to low confidence (2)

seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java:290

  • The current logic removes an extra element from rowQue when a message exceeds batchSendLen, which may unintentionally drop the subsequent message. Consider removing the redundant rowQue.remove() call so that only the oversized message is dropped.
if (peekMessageLength > batchSendLen) { LOG.warn(...); rowQue.remove(); break; }

seatunnel-connectors-v2/connector-inlong/src/main/java/org/apache/seatunnel/connectors/seatunnel/inlong/sink/InlongSinkWriter.java:247

  • In the retry loop for sending messages, the code does not break out after exceeding MAX_RETRY, which may lead to an infinite loop when sending continually fails. Consider adding a break or alternative error handling to prevent the potential deadlock.
if (retry > MAX_RETRY) { ... }

@Hisoka-X
Copy link
Member

Thanks @luchunliang . Please add e2e test case.

@Hisoka-X Hisoka-X linked an issue Apr 30, 2025 that may be closed by this pull request
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Connector-V2] Add Apache InLong Connector Support
2 participants