Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector #8581

Open
wants to merge 23 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ccb9e27
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector
Jan 23, 2025
5bd1f29
Merge branch 'dev' into connector-activemq-source
Jan 23, 2025
9b58e76
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector
Jan 24, 2025
1533c52
Merge branch 'dev' into connector-activemq-source
Feb 6, 2025
322cb6a
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector
Feb 6, 2025
5ccaf07
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector
Feb 6, 2025
415cc3c
Merge branch 'dev' into connector-activemq-source
Feb 7, 2025
2df5eb0
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector
Feb 7, 2025
a1e6971
Merge branch 'dev' into connector-activemq-source
Feb 7, 2025
ab2f741
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector
Feb 7, 2025
a06d094
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector
Feb 7, 2025
e87209e
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector
Feb 7, 2025
58b298f
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector
Feb 8, 2025
cedb136
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector
Feb 8, 2025
a02f2d0
[feature][connectors-v2][ActiveMQ] add ActiveMQ source connector
Feb 8, 2025
d4e8ce5
Merge branch 'dev' into connector-activemq-source
Mar 4, 2025
898160c
[feature][connectors-v2][ActiveMQ] add message acknowledge mechanism
Mar 4, 2025
dc1387f
[feature][connectors-v2][ActiveMQ] add message acknowledge mechanism
Mar 7, 2025
afd3cfc
Merge branch 'dev' into connector-activemq-source
Mar 7, 2025
d94be4f
Merge branch 'dev' into connector-activemq-source
Mar 19, 2025
1e1ab10
Merge branch 'refs/heads/dev' into connector-activemq-source
Mar 20, 2025
ec925cf
[feature][connectors-v2][ActiveMQ] add blocking queue and thread pool…
Mar 20, 2025
265262a
[feature][connectors-v2][ActiveMQ] add documents commit
Mar 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions docs/en/connector-v2/sink/Activemq.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ Used to write data to Activemq.

| name | type | required | default value |
|-------------------------------------|---------|----------|---------------|
| host | string | no | - |
| port | int | no | - |
| virtual_host | string | no | - |
| username | string | no | - |
| password | string | no | - |
| queue_name | string | yes | - |
Expand All @@ -32,14 +29,6 @@ Used to write data to Activemq.
| warnAboutUnstartedConnectionTimeout | boolean | no | - |
| closeTimeout | int | no | - |

### host [string]

the default host to use for connections

### port [int]

the default port to use for connections

### username [string]

the AMQP user name to use when connecting to the broker
Expand Down
122 changes: 122 additions & 0 deletions docs/en/connector-v2/source/Activemq.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Activemq

> Activemq source connector

## Description

Read data from the ActiveMQ queue.

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-------------------------------------|---------|----------|---------------|
| username | string | no | - |
| password | string | no | - |
| queue_name | string | yes | - |
| uri | string | yes | - |
| schema | config | yes | - |
| format | string | no | json |
| field.delimiter | string | no | , |
| check_for_duplicate | boolean | no | - |
| client_id | boolean | no | - |
| disable_timeStamps_by_default | boolean | no | - |
| dispatch_async | boolean | no | - |
| warnAboutUnstartedConnectionTimeout | boolean | no | - |
| closeTimeout | int | no | - |
| use_correlation_id | boolean | no | - |

### username [string]

the AMQP user name to use when connecting to the broker

### password [string]

the password to use when connecting to the broker

### uri [string]

convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host

### queue_name [string]

the queue to write the message to

### schema [config]

The structure of the data, including field names and field types.

### format [string]

Data format. The default format is json. Optional text format. The default field separator is ",".
If you customize the delimiter, add the "field.delimiter" option.

### field.delimiter [string]

Customize the field delimiter for data format

### check_for_duplicate [boolean]

will check for duplucate messages

### client_id [string]

client id

### disable_timeStamps_by_default [boolean]

disables timestamp for slight performance boost

### close_timeout [boolean]

Sets the timeout, in milliseconds, before a close is considered complete.

### dispatch_async [boolean]

Should the broker dispatch messages asynchronously to the consumer

### use_correlation_id [boolean]

Whether the messages received are supplied with a unique id to deduplicate messages (in case of failed acknowledgments).

### warn_about_unstarted_connection_timeout [int]

The timeout, in milliseconds, from the time of connection creation to when a warning is generated

## Example

simple:

```hocon
source {
ActiveMQ {
uri="tcp://activemq-host:61616"
username = "admin"
password = "admin"
queue_name = "sourceQueue"
format = json
schema = {
fields {
id = bigint
c_string = string
c_boolean = boolean
c_tinyint = tinyint
}
}
}
}
```

## Changelog

### next version

- Add Activemq Source Connector

1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ seatunnel.sink.ObsFile = connector-file-obs
seatunnel.source.Milvus = connector-milvus
seatunnel.sink.Milvus = connector-milvus
seatunnel.sink.ActiveMQ = connector-activemq
seatunnel.source.ActiveMQ = connector-activemq
seatunnel.source.Prometheus = connector-prometheus
seatunnel.sink.Prometheus = connector-prometheus
seatunnel.source.Qdrant = connector-qdrant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.activemq.client;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig;
import org.apache.seatunnel.connectors.seatunnel.activemq.exception.ActivemqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.activemq.exception.ActivemqConnectorException;

Expand All @@ -29,34 +29,21 @@
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import java.nio.charset.StandardCharsets;

import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SESSION_ASYNC;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SYNC_SEND;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CHECK_FOR_DUPLICATE;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLIENT_ID;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLOSE_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CONSUMER_EXPIRY_CHECK_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.DISPATCH_ASYNC;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.NESTED_MAP_AND_LIST_ENABLED;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.QUEUE_NAME;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.URI;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;

@Slf4j
@AllArgsConstructor
public class ActivemqClient {
private final ReadonlyConfig config;
private final ActivemqConfig config;
private final ActiveMQConnectionFactory connectionFactory;
private final Connection connection;

public ActivemqClient(ReadonlyConfig config) {
public ActivemqClient(ActivemqConfig config) {
this.config = config;
try {
this.connectionFactory = getConnectionFactory();
Expand All @@ -73,52 +60,52 @@ public ActivemqClient(ReadonlyConfig config) {
}

public ActiveMQConnectionFactory getConnectionFactory() {
log.info("broker url : " + config.get(URI));
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(config.get(URI));
log.info("broker url : " + config.getUri());
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(config.getUri());

if (config.get(ALWAYS_SESSION_ASYNC) != null) {
factory.setAlwaysSessionAsync(config.get(ALWAYS_SESSION_ASYNC));
if (config.getAlwaysSessionAsync() != null) {
factory.setAlwaysSessionAsync(config.getAlwaysSessionAsync());
}

if (config.get(CLIENT_ID) != null) {
factory.setClientID(config.get(CLIENT_ID));
if (config.getClientID() != null) {
factory.setClientID(config.getClientID());
}

if (config.get(ALWAYS_SYNC_SEND) != null) {
factory.setAlwaysSyncSend(config.get(ALWAYS_SYNC_SEND));
if (config.getAlwaysSyncSend() != null) {
factory.setAlwaysSyncSend(config.getAlwaysSyncSend());
}

if (config.get(CHECK_FOR_DUPLICATE) != null) {
factory.setCheckForDuplicates(config.get(CHECK_FOR_DUPLICATE));
if (config.getCheckForDuplicate() != null) {
factory.setCheckForDuplicates(config.getCheckForDuplicate());
}

if (config.get(CLOSE_TIMEOUT) != null) {
factory.setCloseTimeout(config.get(CLOSE_TIMEOUT));
if (config.getCloseTimeout() != null) {
factory.setCloseTimeout(config.getCloseTimeout());
}

if (config.get(CONSUMER_EXPIRY_CHECK_ENABLED) != null) {
factory.setConsumerExpiryCheckEnabled(config.get(CONSUMER_EXPIRY_CHECK_ENABLED));
if (config.getConsumerExpiryCheckEnabled() != null) {
factory.setConsumerExpiryCheckEnabled(config.getConsumerExpiryCheckEnabled());
}
if (config.get(DISPATCH_ASYNC) != null) {
factory.setDispatchAsync(config.get(DISPATCH_ASYNC));
if (config.getDispatchAsync() != null) {
factory.setDispatchAsync(config.getDispatchAsync());
}

if (config.get(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT) != null) {
if (config.getWarnAboutUnstartedConnectionTimeout() != null) {
factory.setWarnAboutUnstartedConnectionTimeout(
config.get(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT));
config.getWarnAboutUnstartedConnectionTimeout());
}

if (config.get(NESTED_MAP_AND_LIST_ENABLED) != null) {
factory.setNestedMapAndListEnabled(config.get(NESTED_MAP_AND_LIST_ENABLED));
if (config.getNestedMapAndListEnabled() != null) {
factory.setNestedMapAndListEnabled(config.getNestedMapAndListEnabled());
}

return factory;
}

public void write(byte[] msg) {
try {
this.connection.start();
Session session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(config.get(QUEUE_NAME));
Session session = this.getSession();
Destination destination = session.createQueue(config.getQueueName());
MessageProducer producer = session.createProducer(destination);
String messageBody = new String(msg, StandardCharsets.UTF_8);
TextMessage objectMessage = session.createTextMessage(messageBody);
Expand All @@ -129,11 +116,31 @@ public void write(byte[] msg) {
ActivemqConnectorErrorCode.SEND_MESSAGE_FAILED,
String.format(
"Cannot send AMQ message %s at %s",
config.get(QUEUE_NAME), config.get(CLIENT_ID)),
config.getQueueName(), config.getClientID()),
e);
}
}

public Session getSession() {
try {
this.connection.start();
return this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
throw new ActivemqConnectorException(
ActivemqConnectorErrorCode.CLOSE_SESSION_FAILED, e);
}
}

public MessageConsumer getConsumer() {
try {
return this.getSession()
.createConsumer(this.getSession().createQueue(config.getQueueName()));
} catch (JMSException e) {
throw new ActivemqConnectorException(
ActivemqConnectorErrorCode.INITIALIZE_CONSUME_FAILED, e);
}
}

public void close() {
try {
if (connection != null) {
Expand All @@ -143,13 +150,13 @@ public void close() {
throw new ActivemqConnectorException(
ActivemqConnectorErrorCode.CLOSE_CONNECTION_FAILED,
String.format(
"Error while closing AMQ connection with %s", config.get(QUEUE_NAME)));
"Error while closing AMQ connection with %s", config.getQueueName()));
}
}

private Connection createConnection(ReadonlyConfig config) throws JMSException {
if (config.get(USERNAME) != null && config.get(PASSWORD) != null) {
return connectionFactory.createConnection(config.get(USERNAME), config.get(PASSWORD));
private Connection createConnection(ActivemqConfig config) throws JMSException {
if (config.getUsername() != null && config.getPassword() != null) {
return connectionFactory.createConnection(config.getUsername(), config.getPassword());
}
return connectionFactory.createConnection();
}
Expand Down
Loading
Loading