Skip to content

Commit 068925c

Browse files
committed
add canal
1 parent cb1b7b8 commit 068925c

File tree

38 files changed

+4627
-0
lines changed

38 files changed

+4627
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
List canal = [
19+
"com.alibaba.otter:canal.instance.manager:$canal_version",
20+
"com.alibaba.otter:canal.parse:$canal_version",
21+
"com.alibaba.otter:canal.server:$canal_version"
22+
]
23+
24+
dependencies {
25+
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
26+
implementation project(":eventmesh-common")
27+
implementation canal
28+
implementation "com.alibaba:druid:1.2.6"
29+
// implementation "org.apache.ddlutils:ddlutils:1.0"
30+
compileOnly 'org.projectlombok:lombok'
31+
annotationProcessor 'org.projectlombok:lombok'
32+
testImplementation "org.mockito:mockito-core"
33+
testImplementation "org.mockito:mockito-junit-jupiter"
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
canal_version=1.1.7
18+
pluginType=connector
19+
pluginName=MySQL
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (C) 2010-2101 Alibaba Group Holding Limited.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.eventmesh.connector.canal;
18+
19+
import org.apache.commons.beanutils.ConversionException;
20+
import org.apache.commons.beanutils.Converter;
21+
import org.apache.commons.beanutils.converters.ArrayConverter;
22+
import org.apache.commons.beanutils.converters.ByteConverter;
23+
24+
25+
public class ByteArrayConverter implements Converter {
26+
27+
public static final Converter SQL_BYTES = new ByteArrayConverter(null);
28+
private static final Converter converter = new ArrayConverter(byte[].class, new ByteConverter());
29+
30+
protected final Object defaultValue;
31+
protected final boolean useDefault;
32+
33+
public ByteArrayConverter() {
34+
this.defaultValue = null;
35+
this.useDefault = false;
36+
}
37+
38+
public ByteArrayConverter(Object defaultValue) {
39+
this.defaultValue = defaultValue;
40+
this.useDefault = true;
41+
}
42+
43+
public Object convert(Class type, Object value) {
44+
if (value == null) {
45+
if (useDefault) {
46+
return (defaultValue);
47+
} else {
48+
throw new ConversionException("No value specified");
49+
}
50+
}
51+
52+
if (value instanceof byte[]) {
53+
return (value);
54+
}
55+
56+
// BLOB类型,canal直接存储为String("ISO-8859-1")
57+
if (value instanceof String) {
58+
try {
59+
return ((String) value).getBytes("ISO-8859-1");
60+
} catch (Exception e) {
61+
throw new ConversionException(e);
62+
}
63+
}
64+
65+
return converter.convert(type, value); // byteConvertor进行转化
66+
}
67+
}

0 commit comments

Comments
 (0)