Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<hadoop-minikdc.version>3.1.4</hadoop-minikdc.version>
<dom4j.version>2.1.4</dom4j.version>
<jaxen.version>2.0.0</jaxen.version>
<java.grok.version>0.1.9</java.grok.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -158,6 +159,11 @@
<artifactId>jaxen</artifactId>
<version>${jaxen.version}</version>
</dependency>
<dependency>
<groupId>io.krakens</groupId>
<artifactId>java-grok</artifactId>
<version>${java.grok.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import org.apache.seatunnel.format.text.constant.TextFormatConstant;

import java.util.List;
import java.util.Map;

public class BaseSourceConfigOptions {

public static final String GROK_PATTEN_TEMPLATES_PATH = "grok_templates.yaml";

public static final Option<FileFormat> FILE_FORMAT_TYPE =
Options.key("file_format_type")
.objectType(FileFormat.class)
Expand All @@ -40,6 +44,24 @@ public class BaseSourceConfigOptions {
.noDefaultValue()
.withDescription("The file path of source files");

public static final Option<FilePathRule> FILE_PATH_RULE =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add SinkFactory#optionRule()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry,I should create a darft pr, these issues will be fixed later

Options.key("file_path_rule")
.objectType(FilePathRule.class)
.defaultValue(FilePathRule.NONE)
.withDescription("The file path rule of source files");

public static final Option<Map<String, String>> GROK_PATTERN =
Options.key("grok_pattern")
.mapType()
.noDefaultValue()
.withDescription("The grok pattern of source files");

public static final Option<FilePathRule.GrokRule> GROK_RULE =
Options.key("grok_rule")
.objectType(FilePathRule.GrokRule.class)
.noDefaultValue()
.withDescription("The grok rule of source files");

public static final Option<String> FIELD_DELIMITER =
Options.key("field_delimiter")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.config;

import lombok.Data;

import java.util.List;
import java.util.Map;

public enum FilePathRule {
NONE,
GROK;

@Data
public static class GrokRule {
private Map<String, String> patterns;
private Map<String, TimeScope> timeScopes;
private Map<String, List<String>> enumRules;
}

@Data
public static class TimeScope {
private String start;
private String end;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.source.Collector;
Expand All @@ -25,9 +27,11 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FilePathRule;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;

Expand All @@ -37,17 +41,24 @@
import org.apache.commons.compress.compressors.gzip.GzipParameters;
import org.apache.hadoop.fs.FileStatus;

import io.krakens.grok.api.Grok;
import io.krakens.grok.api.GrokCompiler;
import io.krakens.grok.api.Match;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.net.URL;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -57,6 +68,8 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions.GROK_PATTEN_TEMPLATES_PATH;

@Slf4j
public abstract class AbstractReadStrategy implements ReadStrategy {
protected static final String[] TYPE_ARRAY_STRING = new String[0];
Expand All @@ -70,6 +83,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
protected static final BigDecimal[] TYPE_ARRAY_BIG_DECIMAL = new BigDecimal[0];
protected static final LocalDate[] TYPE_ARRAY_LOCAL_DATE = new LocalDate[0];
protected static final LocalDateTime[] TYPE_ARRAY_LOCAL_DATETIME = new LocalDateTime[0];
private static final String STATIC_PATH_PATTERN = "^(.*?)[/\\\\]%\\{.*?}";

protected HadoopConf hadoopConf;
protected SeaTunnelRowType seaTunnelRowType;
Expand All @@ -86,11 +100,71 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC.defaultValue();

protected Pattern pattern;
protected final GrokCompiler grokCompiler = GrokCompiler.newInstance();
protected Grok grok;

@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
if (pluginConfig != null
&& pluginConfig.hasPath(BaseSourceConfigOptions.FILE_PATH_RULE.key())) {
FilePathRule filePathRule =
pluginConfig.getEnum(
FilePathRule.class, BaseSourceConfigOptions.FILE_PATH_RULE.key());
switch (filePathRule) {
case GROK:
Map<String, String> defaultGrokPatternMap = new HashMap<>();
URL resource =
AbstractReadStrategy.class
.getClassLoader()
.getResource(GROK_PATTEN_TEMPLATES_PATH);
if (resource == null) {
log.error("grok pattern file not found!");
} else {
defaultGrokPatternMap = loadFlattenedGrokTemplates(resource.getPath());
}
Map<String, Object> unwrapped =
pluginConfig
.getObject(BaseSourceConfigOptions.GROK_PATTERN.key())
.unwrapped();
Map<String, String> custoGrokPatternMap = new LinkedHashMap<>();
for (Map.Entry<String, Object> entry : unwrapped.entrySet()) {
custoGrokPatternMap.put(entry.getKey(), entry.getValue().toString());
}
defaultGrokPatternMap.putAll(custoGrokPatternMap);
this.grokCompiler.register(defaultGrokPatternMap);
grok =
grokCompiler.compile(
pluginConfig.getString(
BaseSourceConfigOptions.FILE_PATH.key()));
break;
default:
break;
}
}
}

@SneakyThrows
private static Map<String, String> loadFlattenedGrokTemplates(String filePath) {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
Map<String, Object> yamlData = mapper.readValue(new File(filePath), Map.class);
Map<String, String> flattenedMap = new HashMap<>();
flattenToBottomLevel(yamlData, flattenedMap);
return flattenedMap;
}

private static void flattenToBottomLevel(
Map<String, Object> source, Map<String, String> target) {
for (Map.Entry<String, Object> entry : source.entrySet()) {
Object value = entry.getValue();

if (value instanceof Map) {
flattenToBottomLevel((Map<String, Object>) value, target);
} else if (value instanceof String) {
target.put(entry.getKey(), (String) value);
}
}
}

@Override
Expand All @@ -106,7 +180,17 @@ boolean checkFileType(String path) {

@Override
public List<String> getFileNamesByPath(String path) throws IOException {
FilePathRule filePathRule = FilePathRule.NONE;
if (pluginConfig != null
&& pluginConfig.hasPath(BaseSourceConfigOptions.FILE_PATH_RULE.key())) {
filePathRule =
pluginConfig.getEnum(
FilePathRule.class, BaseSourceConfigOptions.FILE_PATH_RULE.key());
}
ArrayList<String> fileNames = new ArrayList<>();
if (filePathRule == FilePathRule.GROK) {
path = extractStaticPath(path);
}
FileStatus[] stats = hadoopFileSystemProxy.listStatus(path);
for (FileStatus fileStatus : stats) {
if (fileStatus.isDirectory()) {
Expand All @@ -115,20 +199,30 @@ public List<String> getFileNamesByPath(String path) throws IOException {
}
if (fileStatus.isFile() && filterFileByPattern(fileStatus) && fileStatus.getLen() > 0) {
// filter '_SUCCESS' file
if (!fileStatus.getPath().getName().equals("_SUCCESS")
if (!"_SUCCESS".equals(fileStatus.getPath().getName())
&& !fileStatus.getPath().getName().startsWith(".")) {
String filePath = fileStatus.getPath().toString();
if (!readPartitions.isEmpty()) {
for (String readPartition : readPartitions) {
if (filePath.contains(readPartition)) {
fileNames.add(filePath);
this.fileNames.add(filePath);
break;

switch (filePathRule) {
case GROK:
Match match = grok.match(filePath);
Map<String, Object> captureMap = match.capture();
Map<String, Object> grokRuleMap =
pluginConfig
.getObject(BaseSourceConfigOptions.GROK_RULE.key())
.unwrapped();
FilePathRule.GrokRule grokRule =
JsonUtils.parseObject(
JsonUtils.toJsonString(grokRuleMap),
FilePathRule.GrokRule.class);
if (isValidCapture(captureMap, grokRule)) {
addFileNameIfMatches(filePath, fileNames);
}
}
} else {
fileNames.add(filePath);
this.fileNames.add(filePath);
break;
case NONE:
default:
addFileNameIfMatches(filePath, fileNames);
break;
}
}
}
Expand All @@ -137,6 +231,89 @@ public List<String> getFileNamesByPath(String path) throws IOException {
return fileNames;
}

private void addFileNameIfMatches(String filePath, List<String> fileNames) {
if (!readPartitions.isEmpty()) {
for (String readPartition : readPartitions) {
if (filePath.contains(readPartition)) {
fileNames.add(filePath);
this.fileNames.add(filePath);
break;
}
}
} else {
fileNames.add(filePath);
this.fileNames.add(filePath);
}
}

private boolean isValidCapture(Map<String, Object> captureMap, FilePathRule.GrokRule grokRule) {
if (grokRule == null) {
return true;
}
if (grokRule.getPatterns() != null) {
for (Map.Entry<String, String> entry : grokRule.getPatterns().entrySet()) {
String key = entry.getKey();
String regex = entry.getValue();
String actualValue = (String) captureMap.get(key);
if (actualValue == null) {
return false;
}
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(actualValue);
if (!matcher.matches()) {
return false;
}
}
}

if (grokRule.getTimeScopes() != null) {
for (Map.Entry<String, FilePathRule.TimeScope> entry :
grokRule.getTimeScopes().entrySet()) {
String key = entry.getKey();
FilePathRule.TimeScope timeScope = entry.getValue();
String actualValue = (String) captureMap.get(key);
if (actualValue == null) {
return false;
}
try {
long actualTime = Long.parseLong(actualValue);
long startTime = Long.parseLong(timeScope.getStart());
long endTime = Long.parseLong(timeScope.getEnd());
if (actualTime < startTime || actualTime > endTime) {
return false;
}
} catch (NumberFormatException e) {
return false;
}
}
}

if (grokRule.getEnumRules() != null) {
for (Map.Entry<String, List<String>> entry : grokRule.getEnumRules().entrySet()) {
String key = entry.getKey();
List<String> expectedValues = entry.getValue();
String actualValue = (String) captureMap.get(key);
if (actualValue == null || !expectedValues.contains(actualValue)) {
return false;
}
}
}

return true;
}

private static String extractStaticPath(String inputPath) {
Pattern pattern = Pattern.compile(STATIC_PATH_PATTERN);
Matcher matcher = pattern.matcher(inputPath);

StringBuilder staticPath = new StringBuilder();
while (matcher.find()) {
staticPath.append(matcher.group(1));
}

return staticPath.toString();
}

@Override
public void setPluginConfig(Config pluginConfig) {
this.pluginConfig = pluginConfig;
Expand Down
Loading
Loading