Skip to content

[Feature][Connector-V2] Grok config is supported for file reading #8376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<hadoop-minikdc.version>3.1.4</hadoop-minikdc.version>
<dom4j.version>2.1.4</dom4j.version>
<jaxen.version>2.0.0</jaxen.version>
<java.grok>0.1.9</java.grok>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -158,6 +159,11 @@
<artifactId>jaxen</artifactId>
<version>${jaxen.version}</version>
</dependency>
<dependency>
<groupId>io.krakens</groupId>
<artifactId>java-grok</artifactId>
<version>${java.grok}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.format.text.constant.TextFormatConstant;

import java.util.List;
import java.util.Map;

public class BaseSourceConfigOptions {
public static final Option<FileFormat> FILE_FORMAT_TYPE =
Expand All @@ -40,6 +41,24 @@ public class BaseSourceConfigOptions {
.noDefaultValue()
.withDescription("The file path of source files");

public static final Option<FilePathRule> FILE_PATH_RULE =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add SinkFactory#optionRule()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry,I should create a darft pr, these issues will be fixed later

Options.key("file_path_rule")
.objectType(FilePathRule.class)
.defaultValue(FilePathRule.NONE)
.withDescription("The file path rule of source files");

public static final Option<Map<String, String>> GROK_PATTERN =
Options.key("grok_pattern")
.mapType()
.noDefaultValue()
.withDescription("The grok pattern of source files");

public static final Option<FilePathRule.GrokRule> GROK_RULE =
Options.key("grok_rule")
.objectType(FilePathRule.GrokRule.class)
.noDefaultValue()
.withDescription("The grok rule of source files");

public static final Option<String> FIELD_DELIMITER =
Options.key("field_delimiter")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.config;

import lombok.Data;

import java.util.List;
import java.util.Map;

public enum FilePathRule {
NONE,
GROK;

@Data
public static class GrokRule {
private Map<String, String> patterns;
private Map<String, TimeScope> timeScopes;
private Map<String, List<String>> enumRules;
}

@Data
public static class TimeScope {
private String start;
private String end;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FilePathRule;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;

Expand All @@ -37,6 +39,9 @@
import org.apache.commons.compress.compressors.gzip.GzipParameters;
import org.apache.hadoop.fs.FileStatus;

import io.krakens.grok.api.Grok;
import io.krakens.grok.api.GrokCompiler;
import io.krakens.grok.api.Match;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -70,6 +75,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
protected static final BigDecimal[] TYPE_ARRAY_BIG_DECIMAL = new BigDecimal[0];
protected static final LocalDate[] TYPE_ARRAY_LOCAL_DATE = new LocalDate[0];
protected static final LocalDateTime[] TYPE_ARRAY_LOCAL_DATETIME = new LocalDateTime[0];
private static final String STATIC_PATH_PATTERN = "(/[^%]+)";

protected HadoopConf hadoopConf;
protected SeaTunnelRowType seaTunnelRowType;
Expand All @@ -86,11 +92,38 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC.defaultValue();

protected Pattern pattern;
protected final GrokCompiler grokCompiler = GrokCompiler.newInstance();
protected Grok grok;

@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
if (pluginConfig != null
&& pluginConfig.hasPath(BaseSourceConfigOptions.FILE_PATH_RULE.key())) {
FilePathRule filePathRule =
pluginConfig.getEnum(
FilePathRule.class, BaseSourceConfigOptions.FILE_PATH_RULE.key());
switch (filePathRule) {
case GROK:
Map<String, Object> unwrapped =
pluginConfig
.getObject(BaseSourceConfigOptions.GROK_PATTERN.key())
.unwrapped();
Map<String, String> grokPatternMap = new LinkedHashMap<>();
for (Map.Entry<String, Object> entry : unwrapped.entrySet()) {
grokPatternMap.put(entry.getKey(), entry.getValue().toString());
}
this.grokCompiler.register(grokPatternMap);
grok =
grokCompiler.compile(
pluginConfig.getString(
BaseSourceConfigOptions.FILE_PATH.key()));
break;
default:
break;
}
}
}

@Override
Expand All @@ -106,7 +139,18 @@ boolean checkFileType(String path) {

@Override
public List<String> getFileNamesByPath(String path) throws IOException {
FilePathRule filePathRule = FilePathRule.NONE;
if (pluginConfig != null
&& pluginConfig.hasPath(BaseSourceConfigOptions.FILE_PATH_RULE.key())) {
filePathRule =
pluginConfig.getEnum(
FilePathRule.class, BaseSourceConfigOptions.FILE_PATH_RULE.key());
}
ArrayList<String> fileNames = new ArrayList<>();
if (filePathRule == FilePathRule.GROK) {

path = extractStaticPath(path);
}
FileStatus[] stats = hadoopFileSystemProxy.listStatus(path);
for (FileStatus fileStatus : stats) {
if (fileStatus.isDirectory()) {
Expand All @@ -118,17 +162,27 @@ public List<String> getFileNamesByPath(String path) throws IOException {
if (!fileStatus.getPath().getName().equals("_SUCCESS")
&& !fileStatus.getPath().getName().startsWith(".")) {
String filePath = fileStatus.getPath().toString();
if (!readPartitions.isEmpty()) {
for (String readPartition : readPartitions) {
if (filePath.contains(readPartition)) {
fileNames.add(filePath);
this.fileNames.add(filePath);
break;

switch (filePathRule) {
case GROK:
Match match = grok.match(filePath);
Map<String, Object> captureMap = match.capture();
Map<String, Object> grokRuleMap =
pluginConfig
.getObject(BaseSourceConfigOptions.GROK_RULE.key())
.unwrapped();
FilePathRule.GrokRule grokRule =
JsonUtils.parseObject(
JsonUtils.toJsonString(grokRuleMap),
FilePathRule.GrokRule.class);
if (isValidCapture(captureMap, grokRule)) {
addFileNameIfMatches(filePath, fileNames);
}
}
} else {
fileNames.add(filePath);
this.fileNames.add(filePath);
break;
case NONE:
default:
addFileNameIfMatches(filePath, fileNames);
break;
}
}
}
Expand All @@ -137,6 +191,86 @@ public List<String> getFileNamesByPath(String path) throws IOException {
return fileNames;
}

private void addFileNameIfMatches(String filePath, List<String> fileNames) {
if (!readPartitions.isEmpty()) {
for (String readPartition : readPartitions) {
if (filePath.contains(readPartition)) {
fileNames.add(filePath);
this.fileNames.add(filePath);
break;
}
}
} else {
fileNames.add(filePath);
this.fileNames.add(filePath);
}
}

private boolean isValidCapture(Map<String, Object> captureMap, FilePathRule.GrokRule grokRule) {
if (grokRule.getPatterns() != null) {
for (Map.Entry<String, String> entry : grokRule.getPatterns().entrySet()) {
String key = entry.getKey();
String regex = entry.getValue();
String actualValue = (String) captureMap.get(key);
if (actualValue == null) {
return false;
}
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(actualValue);
if (!matcher.matches()) {
return false;
}
}
}

if (grokRule.getTimeScopes() != null) {
for (Map.Entry<String, FilePathRule.TimeScope> entry :
grokRule.getTimeScopes().entrySet()) {
String key = entry.getKey();
FilePathRule.TimeScope timeScope = entry.getValue();
String actualValue = (String) captureMap.get(key);
if (actualValue == null) {
return false;
}
try {
long actualTime = Long.parseLong(actualValue);
long startTime = Long.parseLong(timeScope.getStart());
long endTime = Long.parseLong(timeScope.getEnd());
if (actualTime < startTime || actualTime > endTime) {
return false;
}
} catch (NumberFormatException e) {
return false;
}
}
}

if (grokRule.getEnumRules() != null) {
for (Map.Entry<String, List<String>> entry : grokRule.getEnumRules().entrySet()) {
String key = entry.getKey();
List<String> expectedValues = entry.getValue();
String actualValue = (String) captureMap.get(key);
if (actualValue == null || !expectedValues.contains(actualValue)) {
return false;
}
}
}

return true;
}

private static String extractStaticPath(String inputPath) {
Pattern pattern = Pattern.compile(STATIC_PATH_PATTERN);
Matcher matcher = pattern.matcher(inputPath);

StringBuilder staticPath = new StringBuilder();
while (matcher.find()) {
staticPath.append(matcher.group(1));
}

return staticPath.toString();
}

@Override
public void setPluginConfig(Config pluginConfig) {
this.pluginConfig = pluginConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.writer;
package org.apache.seatunnel.connectors.seatunnel.file.reader;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.writer;
package org.apache.seatunnel.connectors.seatunnel.file.reader;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.writer;
package org.apache.seatunnel.connectors.seatunnel.file.reader;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
Expand Down Expand Up @@ -122,7 +122,7 @@ public void testParquetReadUseSystemDefaultTimeZone() throws Exception {
@Test
public void testParquetReadProjection1() throws Exception {
URL resource = ParquetReadStrategyTest.class.getResource("/timestamp_as_int96.parquet");
URL conf = OrcReadStrategyTest.class.getResource("/test_read_parquet.conf");
URL conf = ParquetReadStrategyTest.class.getResource("/test_read_parquet.conf");
Assertions.assertNotNull(resource);
Assertions.assertNotNull(conf);
String path = Paths.get(resource.toURI()).toString();
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testParquetReadProjection1() throws Exception {
@Test
public void testParquetReadProjection2() throws Exception {
URL resource = ParquetReadStrategyTest.class.getResource("/hive.parquet");
URL conf = OrcReadStrategyTest.class.getResource("/test_read_parquet2.conf");
URL conf = ParquetReadStrategyTest.class.getResource("/test_read_parquet2.conf");
Assertions.assertNotNull(resource);
Assertions.assertNotNull(conf);
String path = Paths.get(resource.toURI()).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.writer;
package org.apache.seatunnel.connectors.seatunnel.file.reader;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
Expand Down
Loading
Loading