Skip to content

Commit 5738ad6

Browse files
gushang-hubcorgy-w
authored andcommitted
[Feature][Connector-V2] Grok config is supported for file reading
1 parent f22d4eb commit 5738ad6

File tree

15 files changed

+378
-18
lines changed

15 files changed

+378
-18
lines changed

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<hadoop-minikdc.version>3.1.4</hadoop-minikdc.version>
4040
<dom4j.version>2.1.4</dom4j.version>
4141
<jaxen.version>2.0.0</jaxen.version>
42+
<java.grok>0.1.9</java.grok>
4243
</properties>
4344

4445
<dependencyManagement>
@@ -158,6 +159,11 @@
158159
<artifactId>jaxen</artifactId>
159160
<version>${jaxen.version}</version>
160161
</dependency>
162+
<dependency>
163+
<groupId>io.krakens</groupId>
164+
<artifactId>java-grok</artifactId>
165+
<version>${java.grok}</version>
166+
</dependency>
161167
</dependencies>
162168

163169
<build>

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java

+19
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
2626

2727
import java.util.List;
28+
import java.util.Map;
2829

2930
public class BaseSourceConfigOptions {
3031
public static final Option<FileFormat> FILE_FORMAT_TYPE =
@@ -40,6 +41,24 @@ public class BaseSourceConfigOptions {
4041
.noDefaultValue()
4142
.withDescription("The file path of source files");
4243

44+
public static final Option<FilePathRule> FILE_PATH_RULE =
45+
Options.key("file_path_rule")
46+
.objectType(FilePathRule.class)
47+
.defaultValue(FilePathRule.NONE)
48+
.withDescription("The file path rule of source files");
49+
50+
public static final Option<Map<String, String>> GROK_PATTERN =
51+
Options.key("grok_pattern")
52+
.mapType()
53+
.noDefaultValue()
54+
.withDescription("The grok pattern of source files");
55+
56+
public static final Option<FilePathRule.GrokRule> GROK_RULE =
57+
Options.key("grok_rule")
58+
.objectType(FilePathRule.GrokRule.class)
59+
.noDefaultValue()
60+
.withDescription("The grok rule of source files");
61+
4362
public static final Option<String> FIELD_DELIMITER =
4463
Options.key("field_delimiter")
4564
.stringType()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.config;
19+
20+
import lombok.Data;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
public enum FilePathRule {
26+
NONE,
27+
GROK;
28+
29+
@Data
30+
public static class GrokRule {
31+
private Map<String, String> patterns;
32+
private Map<String, TimeScope> timeScopes;
33+
private Map<String, List<String>> enumRules;
34+
}
35+
36+
@Data
37+
public static class TimeScope {
38+
private String start;
39+
private String end;
40+
}
41+
}

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java

+142-10
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2626
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2727
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
28+
import org.apache.seatunnel.common.utils.JsonUtils;
2829
import org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
2930
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
3031
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
32+
import org.apache.seatunnel.connectors.seatunnel.file.config.FilePathRule;
3133
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
3234
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
3335

@@ -37,6 +39,9 @@
3739
import org.apache.commons.compress.compressors.gzip.GzipParameters;
3840
import org.apache.hadoop.fs.FileStatus;
3941

42+
import io.krakens.grok.api.Grok;
43+
import io.krakens.grok.api.GrokCompiler;
44+
import io.krakens.grok.api.Match;
4045
import lombok.extern.slf4j.Slf4j;
4146

4247
import java.io.ByteArrayInputStream;
@@ -70,6 +75,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
7075
protected static final BigDecimal[] TYPE_ARRAY_BIG_DECIMAL = new BigDecimal[0];
7176
protected static final LocalDate[] TYPE_ARRAY_LOCAL_DATE = new LocalDate[0];
7277
protected static final LocalDateTime[] TYPE_ARRAY_LOCAL_DATETIME = new LocalDateTime[0];
78+
private static final String STATIC_PATH_PATTERN = "(/[^%]+)";
7379

7480
protected HadoopConf hadoopConf;
7581
protected SeaTunnelRowType seaTunnelRowType;
@@ -86,11 +92,37 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
8692
BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC.defaultValue();
8793

8894
protected Pattern pattern;
95+
protected final GrokCompiler grokCompiler = GrokCompiler.newInstance();
96+
protected Grok grok;
8997

9098
@Override
9199
public void init(HadoopConf conf) {
92100
this.hadoopConf = conf;
93101
this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
102+
if (pluginConfig.hasPath(BaseSourceConfigOptions.FILE_PATH_RULE.key())) {
103+
FilePathRule filePathRule =
104+
pluginConfig.getEnum(
105+
FilePathRule.class, BaseSourceConfigOptions.FILE_PATH_RULE.key());
106+
switch (filePathRule) {
107+
case GROK:
108+
Map<String, Object> unwrapped =
109+
pluginConfig
110+
.getObject(BaseSourceConfigOptions.GROK_PATTERN.key())
111+
.unwrapped();
112+
Map<String, String> grokPatternMap = new LinkedHashMap<>();
113+
for (Map.Entry<String, Object> entry : unwrapped.entrySet()) {
114+
grokPatternMap.put(entry.getKey(), entry.getValue().toString());
115+
}
116+
this.grokCompiler.register(grokPatternMap);
117+
grok =
118+
grokCompiler.compile(
119+
pluginConfig.getString(
120+
BaseSourceConfigOptions.FILE_PATH.key()));
121+
break;
122+
default:
123+
break;
124+
}
125+
}
94126
}
95127

96128
@Override
@@ -106,7 +138,17 @@ boolean checkFileType(String path) {
106138

107139
@Override
108140
public List<String> getFileNamesByPath(String path) throws IOException {
141+
FilePathRule filePathRule = FilePathRule.NONE;
142+
if (pluginConfig.hasPath(BaseSourceConfigOptions.FILE_PATH_RULE.key())) {
143+
filePathRule =
144+
pluginConfig.getEnum(
145+
FilePathRule.class, BaseSourceConfigOptions.FILE_PATH_RULE.key());
146+
}
109147
ArrayList<String> fileNames = new ArrayList<>();
148+
if (filePathRule == FilePathRule.GROK) {
149+
150+
path = extractStaticPath(path);
151+
}
110152
FileStatus[] stats = hadoopFileSystemProxy.listStatus(path);
111153
for (FileStatus fileStatus : stats) {
112154
if (fileStatus.isDirectory()) {
@@ -118,17 +160,27 @@ public List<String> getFileNamesByPath(String path) throws IOException {
118160
if (!fileStatus.getPath().getName().equals("_SUCCESS")
119161
&& !fileStatus.getPath().getName().startsWith(".")) {
120162
String filePath = fileStatus.getPath().toString();
121-
if (!readPartitions.isEmpty()) {
122-
for (String readPartition : readPartitions) {
123-
if (filePath.contains(readPartition)) {
124-
fileNames.add(filePath);
125-
this.fileNames.add(filePath);
126-
break;
163+
164+
switch (filePathRule) {
165+
case GROK:
166+
Match match = grok.match(filePath);
167+
Map<String, Object> captureMap = match.capture();
168+
Map<String, Object> grokRuleMap =
169+
pluginConfig
170+
.getObject(BaseSourceConfigOptions.GROK_RULE.key())
171+
.unwrapped();
172+
FilePathRule.GrokRule grokRule =
173+
JsonUtils.parseObject(
174+
JsonUtils.toJsonString(grokRuleMap),
175+
FilePathRule.GrokRule.class);
176+
if (isValidCapture(captureMap, grokRule)) {
177+
addFileNameIfMatches(filePath, fileNames);
127178
}
128-
}
129-
} else {
130-
fileNames.add(filePath);
131-
this.fileNames.add(filePath);
179+
break;
180+
case NONE:
181+
default:
182+
addFileNameIfMatches(filePath, fileNames);
183+
break;
132184
}
133185
}
134186
}
@@ -137,6 +189,86 @@ public List<String> getFileNamesByPath(String path) throws IOException {
137189
return fileNames;
138190
}
139191

192+
private void addFileNameIfMatches(String filePath, List<String> fileNames) {
193+
if (!readPartitions.isEmpty()) {
194+
for (String readPartition : readPartitions) {
195+
if (filePath.contains(readPartition)) {
196+
fileNames.add(filePath);
197+
this.fileNames.add(filePath);
198+
break;
199+
}
200+
}
201+
} else {
202+
fileNames.add(filePath);
203+
this.fileNames.add(filePath);
204+
}
205+
}
206+
207+
private boolean isValidCapture(Map<String, Object> captureMap, FilePathRule.GrokRule grokRule) {
208+
if (grokRule.getPatterns() != null) {
209+
for (Map.Entry<String, String> entry : grokRule.getPatterns().entrySet()) {
210+
String key = entry.getKey();
211+
String regex = entry.getValue();
212+
String actualValue = (String) captureMap.get(key);
213+
if (actualValue == null) {
214+
return false;
215+
}
216+
Pattern pattern = Pattern.compile(regex);
217+
Matcher matcher = pattern.matcher(actualValue);
218+
if (!matcher.matches()) {
219+
return false;
220+
}
221+
}
222+
}
223+
224+
if (grokRule.getTimeScopes() != null) {
225+
for (Map.Entry<String, FilePathRule.TimeScope> entry :
226+
grokRule.getTimeScopes().entrySet()) {
227+
String key = entry.getKey();
228+
FilePathRule.TimeScope timeScope = entry.getValue();
229+
String actualValue = (String) captureMap.get(key);
230+
if (actualValue == null) {
231+
return false;
232+
}
233+
try {
234+
long actualTime = Long.parseLong(actualValue);
235+
long startTime = Long.parseLong(timeScope.getStart());
236+
long endTime = Long.parseLong(timeScope.getEnd());
237+
if (actualTime < startTime || actualTime > endTime) {
238+
return false;
239+
}
240+
} catch (NumberFormatException e) {
241+
return false;
242+
}
243+
}
244+
}
245+
246+
if (grokRule.getEnumRules() != null) {
247+
for (Map.Entry<String, List<String>> entry : grokRule.getEnumRules().entrySet()) {
248+
String key = entry.getKey();
249+
List<String> expectedValues = entry.getValue();
250+
String actualValue = (String) captureMap.get(key);
251+
if (actualValue == null || !expectedValues.contains(actualValue)) {
252+
return false;
253+
}
254+
}
255+
}
256+
257+
return true;
258+
}
259+
260+
private static String extractStaticPath(String inputPath) {
261+
Pattern pattern = Pattern.compile(STATIC_PATH_PATTERN);
262+
Matcher matcher = pattern.matcher(inputPath);
263+
264+
StringBuilder staticPath = new StringBuilder();
265+
while (matcher.find()) {
266+
staticPath.append(matcher.group(1));
267+
}
268+
269+
return staticPath.toString();
270+
}
271+
140272
@Override
141273
public void setPluginConfig(Config pluginConfig) {
142274
this.pluginConfig = pluginConfig;
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.seatunnel.connectors.seatunnel.file.writer;
18+
package org.apache.seatunnel.connectors.seatunnel.file.reader;
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.seatunnel.connectors.seatunnel.file.writer;
18+
package org.apache.seatunnel.connectors.seatunnel.file.reader;
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.seatunnel.connectors.seatunnel.file.writer;
18+
package org.apache.seatunnel.connectors.seatunnel.file.reader;
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -122,7 +122,7 @@ public void testParquetReadUseSystemDefaultTimeZone() throws Exception {
122122
@Test
123123
public void testParquetReadProjection1() throws Exception {
124124
URL resource = ParquetReadStrategyTest.class.getResource("/timestamp_as_int96.parquet");
125-
URL conf = OrcReadStrategyTest.class.getResource("/test_read_parquet.conf");
125+
URL conf = ParquetReadStrategyTest.class.getResource("/test_read_parquet.conf");
126126
Assertions.assertNotNull(resource);
127127
Assertions.assertNotNull(conf);
128128
String path = Paths.get(resource.toURI()).toString();
@@ -151,7 +151,7 @@ public void testParquetReadProjection1() throws Exception {
151151
@Test
152152
public void testParquetReadProjection2() throws Exception {
153153
URL resource = ParquetReadStrategyTest.class.getResource("/hive.parquet");
154-
URL conf = OrcReadStrategyTest.class.getResource("/test_read_parquet2.conf");
154+
URL conf = ParquetReadStrategyTest.class.getResource("/test_read_parquet2.conf");
155155
Assertions.assertNotNull(resource);
156156
Assertions.assertNotNull(conf);
157157
String path = Paths.get(resource.toURI()).toString();
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.seatunnel.connectors.seatunnel.file.writer;
18+
package org.apache.seatunnel.connectors.seatunnel.file.reader;
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

0 commit comments

Comments
 (0)