Skip to content

Commit 47760c7

Browse files
authored
Add S3 source connector for Text and Parquet (#20)
* Add stream connector v1 * Fix reading next extent * Calculate stream offset based on UTF-8 and UTF-16 codec * Support multiple files * Remove nonterminated line handling; return empty batch to indicate stream available but not sufficient to decode * Remove nonterminated line handling; return empty batch to indicate stream available but not sufficient to decode * Add tests * Add reading from directory or files * Move test directory * Fix utf16 stream offset for skipping * Do not open stream on eof files * Add watcher for new file discovery * Add test for directory or files config, new file discovery * Test task configs * Fix reconfigure concurrent access to known files * Update test for file discovery * Separate task and connector configs * Test Watcher timeout and error; update Watcher thread pool size * watcher: make default check timeout equal to check delay * codec: add decoder interface * minor: collect to list * codec: use record only * parquet: add files, refactor [wip] * parquet: fix converter on singleton versus repeated values * parquet: update test * parquet: refactor and add test * parquet: add decoder test * parquet: hook into connect task * parquet,minor: rename methods * parquet: add missing Decoders utility * parquet: float to double for progress, and minor fixes * parquet: rename Cli * parquet: clean up cli * stream: optimize imports; remove dead code * misc: bump kafka version; cleanup * endpoint: add StorageGridS3 * stream: exactly once support by decoder type * stream: test compression * stream: parameterize extent stride * parquet: convert utf8 string as string * stream: validate connector basic config * stream: rename as extent * stream: validate configuration * endpoint: add StorageGRID S3 in config message * bugfix: rare double close * config: validate charset * minor: typos * stream: validate access keys; up log level for Watcher * stream: up log level for Task * minor: tune log level for Task and Watcher
1 parent 2146020 commit 47760c7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+4371
-1
lines changed

distribution.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525
<artifactId>instaclustr-sla-connector</artifactId>
2626
<version>0.1.3</version>
2727
</dependency>
28+
<dependency>
29+
<groupId>com.instaclustr.kafkaconnect</groupId>
30+
<artifactId>instaclustr-stream-connector</artifactId>
31+
<version>0.1.0</version>
32+
</dependency>
2833
</dependencies>
2934

3035
<build>

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212
<modules>
1313
<module>s3</module>
1414
<module>sla</module>
15+
<module>stream</module>
1516
<module>distribution.xml</module>
1617
</modules>
1718

1819
<properties>
19-
<kafka.version>2.5.1</kafka.version>
20+
<kafka.version>3.9.1</kafka.version>
2021
<junit.version>4.13</junit.version>
2122
<slf4j.version>1.7.12</slf4j.version>
2223
</properties>

stream/pom.xml

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>instaclustr-connectors</artifactId>
7+
<groupId>com.instaclustr.kafkaconnect</groupId>
8+
<version>0.2.1</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>instaclustr-stream-connector</artifactId>
13+
<version>0.1.0</version>
14+
<packaging>jar</packaging>
15+
16+
<dependencies>
17+
<dependency>
18+
<groupId>com.amazonaws</groupId>
19+
<artifactId>aws-java-sdk-s3</artifactId>
20+
<version>1.12.261</version>
21+
</dependency>
22+
<dependency>
23+
<groupId>com.amazonaws</groupId>
24+
<artifactId>aws-java-sdk-sts</artifactId>
25+
<version>1.12.261</version>
26+
</dependency>
27+
<dependency>
28+
<groupId>ch.qos.logback</groupId>
29+
<artifactId>logback-classic</artifactId>
30+
<version>1.5.13</version>
31+
</dependency>
32+
33+
<dependency>
34+
<groupId>org.apache.parquet</groupId>
35+
<artifactId>parquet-common</artifactId>
36+
<version>1.15.1</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.apache.parquet</groupId>
40+
<artifactId>parquet-encoding</artifactId>
41+
<version>1.15.1</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.apache.parquet</groupId>
45+
<artifactId>parquet-column</artifactId>
46+
<version>1.15.1</version>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.apache.parquet</groupId>
50+
<artifactId>parquet-hadoop</artifactId>
51+
<version>1.15.1</version>
52+
</dependency>
53+
54+
<dependency>
55+
<groupId>org.apache.hadoop</groupId>
56+
<artifactId>hadoop-common</artifactId>
57+
<version>3.4.1</version>
58+
<exclusions>
59+
<exclusion>
60+
<groupId>com.fasterxml.jackson.core</groupId>
61+
<artifactId>jackson-core</artifactId>
62+
</exclusion>
63+
</exclusions>
64+
</dependency>
65+
<dependency>
66+
<groupId>org.apache.hadoop</groupId>
67+
<artifactId>hadoop-mapreduce-client-core</artifactId>
68+
<version>3.4.1</version>
69+
<scope>runtime</scope>
70+
</dependency>
71+
<dependency>
72+
<groupId>org.apache.parquet</groupId>
73+
<artifactId>parquet-hadoop</artifactId>
74+
<version>1.15.1</version>
75+
<classifier>tests</classifier>
76+
<scope>test</scope>
77+
</dependency>
78+
</dependencies>
79+
80+
<build>
81+
<resources>
82+
<resource>
83+
<directory>src/main/resources</directory>
84+
</resource>
85+
</resources>
86+
<plugins>
87+
<plugin>
88+
<artifactId>maven-assembly-plugin</artifactId>
89+
<version>3.2.0</version>
90+
<configuration>
91+
<descriptors>
92+
<descriptor>package.xml</descriptor>
93+
</descriptors>
94+
<archive>
95+
<manifest>
96+
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
97+
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
98+
</manifest>
99+
</archive>
100+
</configuration>
101+
<executions>
102+
<execution>
103+
<id>make-assembly</id>
104+
<phase>package</phase>
105+
<goals>
106+
<goal>single</goal>
107+
</goals>
108+
</execution>
109+
</executions>
110+
</plugin>
111+
</plugins>
112+
</build>
113+
114+
</project>
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.instaclustr.kafka.connect.stream;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.util.stream.Stream;
6+
7+
public interface Endpoint {
8+
9+
InputStream openInputStream(String path) throws IOException;
10+
11+
RandomAccessInputStream openRandomAccessInputStream(String path) throws IOException;
12+
13+
Stream<String> listRegularFiles(String path) throws IOException;
14+
15+
long getFileSize(String path) throws IOException;
16+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.instaclustr.kafka.connect.stream;
2+
3+
import com.instaclustr.kafka.connect.stream.endpoint.S3BucketAws;
4+
import com.instaclustr.kafka.connect.stream.endpoint.LocalFile;
5+
import com.instaclustr.kafka.connect.stream.endpoint.S3BucketOntap;
6+
import com.instaclustr.kafka.connect.stream.endpoint.S3BucketStorageGrid;
7+
import org.apache.kafka.common.config.AbstractConfig;
8+
import org.apache.kafka.common.config.ConfigDef;
9+
10+
import java.util.Map;
11+
12+
public class Endpoints {
13+
public static final String ENDPOINT_TYPE = "endpoint.type";
14+
public static final String AWS_S3 = "awss3";
15+
public static final String LOCAL_FILE = "localfile";
16+
public static final String ONTAP_S3 = "ontaps3";
17+
public static final String STORAGEGRID_S3 = "storagegrids3";
18+
19+
static final ConfigDef CONFIG_DEF = new ConfigDef().define(
20+
ENDPOINT_TYPE,
21+
ConfigDef.Type.STRING,
22+
ConfigDef.NO_DEFAULT_VALUE,
23+
ConfigDef.CaseInsensitiveValidString.in(AWS_S3, LOCAL_FILE, ONTAP_S3, STORAGEGRID_S3),
24+
ConfigDef.Importance.HIGH,
25+
"Endpoint type: AwsS3, OntapS3, or StorageGridS3");
26+
27+
public static Endpoint of(Map<String, String> props) {
28+
AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
29+
String endpointType = config.getString(ENDPOINT_TYPE).trim().toLowerCase();
30+
switch (endpointType) {
31+
case AWS_S3:
32+
return S3BucketAws.of(props);
33+
case LOCAL_FILE:
34+
return LocalFile.of(props);
35+
case ONTAP_S3:
36+
return S3BucketOntap.of(props);
37+
case STORAGEGRID_S3:
38+
return S3BucketStorageGrid.of(props);
39+
default:
40+
throw new UnsupportedOperationException(endpointType);
41+
}
42+
}
43+
44+
}

0 commit comments

Comments
 (0)