Skip to content

Commit 60fb430

Browse files
authored
[ISSUE #5081] Enhancement update for connectors & admin-server (#5082)
* [ISSUE #5079] Enhancement update for admin-server * fix check style error * fix check style error * [ISSUE #5081] Enhancement update for connectors & admin-server * fix check style error * fix check style error
1 parent 9701f02 commit 60fb430

File tree

43 files changed

+2366
-1277
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2366
-1277
lines changed

Diff for: eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java

+3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.eventmesh.common.remote.datasource.DataSource;
3838
import org.apache.eventmesh.common.remote.datasource.DataSourceType;
3939
import org.apache.eventmesh.common.remote.exception.ErrorCode;
40+
import org.apache.eventmesh.common.remote.job.JobType;
4041
import org.apache.eventmesh.common.remote.request.CreateOrUpdateDataSourceReq;
4142
import org.apache.eventmesh.common.utils.JsonUtils;
4243

@@ -231,6 +232,8 @@ public JobDetail getJobDetail(String jobID) {
231232
}
232233
detail.setState(state);
233234
detail.setTransportType(TransportType.getTransportType(job.getTransportType()));
235+
detail.setJobType(JobType.fromIndex(job.getJobType()));
236+
detail.setJobDesc(job.getJobDesc());
234237
return detail;
235238
}
236239

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public <T> T getConfig(ConfigInfo configInfo) throws IOException {
131131
} else {
132132
filePath = path.startsWith(FILE_PATH_PREFIX) ? path.substring(FILE_PATH_PREFIX.length()) : this.configPath + path;
133133
}
134-
134+
filePath = normalizeFilePath(filePath);
135135
if (filePath.contains(".jar")) {
136136
try (final InputStream inputStream = getClass().getResourceAsStream(Objects.requireNonNull(resourceUrl))) {
137137
if (inputStream == null) {
@@ -152,6 +152,15 @@ public <T> T getConfig(ConfigInfo configInfo) throws IOException {
152152
return (T) object;
153153
}
154154

155+
private String normalizeFilePath(String filePath) {
156+
if (System.getProperty("os.name").toLowerCase().contains("win")) {
157+
if (filePath.startsWith("/")) {
158+
filePath = filePath.substring(1);
159+
}
160+
}
161+
return filePath;
162+
}
163+
155164
private void populateConfig(Object object, Class<?> clazz, Config config)
156165
throws NoSuchFieldException, IOException, IllegalAccessException {
157166
ConfigInfo configInfo = new ConfigInfo();
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.eventmesh.connector.http.sink.config;
18+
package org.apache.eventmesh.common.config.connector.http;
1919

2020
import lombok.Data;
2121

Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.eventmesh.connector.http.sink.config;
18+
package org.apache.eventmesh.common.config.connector.http;
1919

2020
import org.apache.eventmesh.common.config.connector.SinkConfig;
2121

Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.eventmesh.connector.http.sink.config;
18+
package org.apache.eventmesh.common.config.connector.http;
1919

2020
import lombok.Data;
2121

Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.eventmesh.connector.http.sink.config;
18+
package org.apache.eventmesh.common.config.connector.http;
1919

20-
import io.vertx.core.http.HttpClientOptions;
2120

2221
import lombok.Data;
2322

@@ -29,19 +28,19 @@ public class SinkConnectorConfig {
2928
private String[] urls;
3029

3130
// keepAlive, default true
32-
private boolean keepAlive = HttpClientOptions.DEFAULT_KEEP_ALIVE;
31+
private boolean keepAlive = true;
3332

3433
// timeunit: ms, default 60000ms
35-
private int keepAliveTimeout = HttpClientOptions.DEFAULT_KEEP_ALIVE_TIMEOUT * 1000; // Keep units consistent
34+
private int keepAliveTimeout = 60 * 1000; // Keep units consistent
3635

3736
// timeunit: ms, default 5000ms, recommended scope: 5000ms - 10000ms
3837
private int connectionTimeout = 5000;
3938

4039
// timeunit: ms, default 5000ms
41-
private int idleTimeout;
40+
private int idleTimeout = 5000;
4241

4342
// maximum number of HTTP/1 connections a client will pool, default 5
44-
private int maxConnectionPoolSize = HttpClientOptions.DEFAULT_MAX_POOL_SIZE;
43+
private int maxConnectionPoolSize = 5;
4544

4645
// retry config
4746
private HttpRetryConfig retryConfig = new HttpRetryConfig();

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class SourceConnectorConfig {
2727

2828
private String connectorName;
2929

30-
private String path;
30+
private String path = "/";
3131

3232
private int port;
3333

@@ -51,11 +51,11 @@ public class SourceConnectorConfig {
5151
private int batchSize = 10;
5252

5353
// protocol, default CloudEvent
54-
private String protocol = "CloudEvent";
54+
private String protocol = "Common";
5555

5656
// extra config, e.g. GitHub secret
5757
private Map<String, String> extraConfig = new HashMap<>();
5858

5959
// data consistency enabled, default true
60-
private boolean dataConsistencyEnabled = true;
60+
private boolean dataConsistencyEnabled = false;
6161
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java

+4-21
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
package org.apache.eventmesh.common.config.connector.rdb.canal;
1919

2020
import org.apache.eventmesh.common.config.connector.SinkConfig;
21-
import org.apache.eventmesh.common.remote.job.SyncMode;
21+
22+
import java.util.Map;
2223

2324
import lombok.Data;
2425
import lombok.EqualsAndHashCode;
@@ -27,25 +28,7 @@
2728
@EqualsAndHashCode(callSuper = true)
2829
public class CanalSinkConfig extends SinkConfig {
2930

30-
// batchSize
31-
private Integer batchSize = 50;
32-
33-
// enable batch
34-
private Boolean useBatch = true;
35-
36-
// sink thread size for single channel
37-
private Integer poolSize = 5;
38-
39-
// sync mode: field/row
40-
private SyncMode syncMode;
41-
42-
private boolean isGTIDMode = true;
43-
44-
private boolean isMariaDB = true;
45-
46-
// skip sink process exception
47-
private Boolean skipException = false;
48-
49-
public SinkConnectorConfig sinkConnectorConfig;
31+
// used to convert canal full/increment/check connector config
32+
private Map<String, Object> sinkConfig;
5033

5134
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,6 @@
2626
@Data
2727
@EqualsAndHashCode(callSuper = true)
2828
public class CanalSinkFullConfig extends SinkConfig {
29-
private SinkConnectorConfig sinkConfig;
29+
private SinkConnectorConfig sinkConnectorConfig;
3030
private String zeroDate;
3131
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.common.config.connector.rdb.canal;
19+
20+
import org.apache.eventmesh.common.remote.job.SyncMode;
21+
22+
import lombok.Data;
23+
import lombok.EqualsAndHashCode;
24+
25+
@Data
26+
@EqualsAndHashCode(callSuper = true)
27+
public class CanalSinkIncrementConfig extends CanalSinkConfig {
28+
29+
// batchSize
30+
private Integer batchSize = 50;
31+
32+
// enable batch
33+
private Boolean useBatch = true;
34+
35+
// sink thread size for single channel
36+
private Integer poolSize = 5;
37+
38+
// sync mode: field/row
39+
private SyncMode syncMode;
40+
41+
private boolean isGTIDMode = true;
42+
43+
private boolean isMariaDB = true;
44+
45+
// skip sink process exception
46+
private Boolean skipException = false;
47+
48+
public SinkConnectorConfig sinkConnectorConfig;
49+
50+
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java

+3-55
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@
1818
package org.apache.eventmesh.common.config.connector.rdb.canal;
1919

2020
import org.apache.eventmesh.common.config.connector.SourceConfig;
21-
import org.apache.eventmesh.common.remote.job.SyncConsistency;
22-
import org.apache.eventmesh.common.remote.job.SyncMode;
23-
import org.apache.eventmesh.common.remote.offset.RecordPosition;
2421

25-
import java.util.List;
22+
import java.util.Map;
2623

2724
import lombok.Data;
2825
import lombok.EqualsAndHashCode;
@@ -31,56 +28,7 @@
3128
@EqualsAndHashCode(callSuper = true)
3229
public class CanalSourceConfig extends SourceConfig {
3330

34-
private String destination;
31+
// used to convert canal full/increment/check connector config
32+
private Map<String, Object> sourceConfig;
3533

36-
private Long canalInstanceId;
37-
38-
private String desc;
39-
40-
private boolean ddlSync = true;
41-
42-
private boolean filterTableError = false;
43-
44-
private Long slaveId;
45-
46-
private Short clientId;
47-
48-
private String serverUUID;
49-
50-
private boolean isMariaDB = true;
51-
52-
private boolean isGTIDMode = true;
53-
54-
private Integer batchSize = 10000;
55-
56-
private Long batchTimeout = -1L;
57-
58-
private String tableFilter;
59-
60-
private String fieldFilter;
61-
62-
private List<RecordPosition> recordPositions;
63-
64-
// ================================= channel parameter
65-
// ================================
66-
67-
// enable remedy
68-
private Boolean enableRemedy = false;
69-
70-
// sync mode: field/row
71-
private SyncMode syncMode;
72-
73-
// sync consistency
74-
private SyncConsistency syncConsistency;
75-
76-
// ================================= system parameter
77-
// ================================
78-
79-
// Column name of the bidirectional synchronization mark
80-
private String needSyncMarkTableColumnName = "needSync";
81-
82-
// Column value of the bidirectional synchronization mark
83-
private String needSyncMarkTableColumnValue = "needSync";
84-
85-
private SourceConnectorConfig sourceConnectorConfig;
8634
}

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
@Data
2929
@EqualsAndHashCode(callSuper = true)
3030
public class CanalSourceFullConfig extends SourceConfig {
31-
private SourceConnectorConfig connectorConfig;
31+
private SourceConnectorConfig sourceConnectorConfig;
3232
private List<RecordPosition> startPosition;
3333
private int parallel;
3434
private int flushSize;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.common.config.connector.rdb.canal;
19+
20+
import org.apache.eventmesh.common.remote.job.SyncConsistency;
21+
import org.apache.eventmesh.common.remote.job.SyncMode;
22+
import org.apache.eventmesh.common.remote.offset.RecordPosition;
23+
24+
import java.util.List;
25+
26+
import lombok.Data;
27+
import lombok.EqualsAndHashCode;
28+
29+
@Data
30+
@EqualsAndHashCode(callSuper = true)
31+
public class CanalSourceIncrementConfig extends CanalSourceConfig {
32+
33+
private String destination;
34+
35+
private Long canalInstanceId;
36+
37+
private String desc;
38+
39+
private boolean ddlSync = true;
40+
41+
private boolean filterTableError = false;
42+
43+
private Long slaveId;
44+
45+
private Short clientId;
46+
47+
private String serverUUID;
48+
49+
private boolean isMariaDB = true;
50+
51+
private boolean isGTIDMode = true;
52+
53+
private Integer batchSize = 10000;
54+
55+
private Long batchTimeout = -1L;
56+
57+
private String tableFilter;
58+
59+
private String fieldFilter;
60+
61+
private List<RecordPosition> recordPositions;
62+
63+
// ================================= channel parameter
64+
// ================================
65+
66+
// enable remedy
67+
private Boolean enableRemedy = false;
68+
69+
// sync mode: field/row
70+
private SyncMode syncMode;
71+
72+
// sync consistency
73+
private SyncConsistency syncConsistency;
74+
75+
// ================================= system parameter
76+
// ================================
77+
78+
// Column name of the bidirectional synchronization mark
79+
private String needSyncMarkTableColumnName = "needSync";
80+
81+
// Column value of the bidirectional synchronization mark
82+
private String needSyncMarkTableColumnValue = "needSync";
83+
84+
private SourceConnectorConfig sourceConnectorConfig;
85+
86+
}

0 commit comments

Comments
 (0)