Skip to content

[Feature][Connector-V2][Juicefsfile] Support Juicefsfile source & sink #7957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/plugin_config
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,4 @@ connector-sls
connector-qdrant
connector-typesense
connector-cdc-opengauss
connector-file-juicefs
529 changes: 529 additions & 0 deletions docs/en/connector-v2/sink/JuicefsFile.md

Large diffs are not rendered by default.

570 changes: 570 additions & 0 deletions docs/en/connector-v2/source/JuicefsFile.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ seatunnel.sink.Sls = connector-sls
seatunnel.source.Typesense = connector-typesense
seatunnel.sink.Typesense = connector-typesense
seatunnel.source.Opengauss-CDC = connector-cdc-opengauss
seatunnel.source.JuicefsFile = connector-file-juicefs
seatunnel.sink.JuicefsFile = connector-file-juicefs

seatunnel.transform.Sql = seatunnel-transforms-v2
seatunnel.transform.FieldMapper = seatunnel-transforms-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public enum FileSystemType implements Serializable {
FTP("FtpFile"),
SFTP("SftpFile"),
S3("S3File"),
OBS("ObsFile");
OBS("ObsFile"),
JUICEFS("JuicefsFile");

private final String fileSystemPluginName;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file</artifactId>
<version>${revision}</version>
</parent>

<artifactId>connector-file-juicefs</artifactId>
<name>SeaTunnel : Connectors V2 : File : Juice FS</name>

<properties>
<hadoop-juicefs.version>1.2.1</hadoop-juicefs.version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-base</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
<version>${project.version}</version>
<classifier>optional</classifier>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.juicefs</groupId>
<artifactId>juicefs-hadoop</artifactId>
<version>${hadoop-juicefs.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.juicefs.catalog;

import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;

public class JuicefsFileCatalog extends AbstractFileCatalog {

public JuicefsFileCatalog(
HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) {
super(hadoopFileSystemProxy, filePath, catalogName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.juicefs.catalog;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.juicefs.config.JuicefsHadoopConf;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class JuicefsFileCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
HadoopConf hadoopConf = JuicefsHadoopConf.buildWithConfig(options);
HadoopFileSystemProxy fileSystemUtils = new HadoopFileSystemProxy(hadoopConf);
return new JuicefsFileCatalog(
fileSystemUtils,
options.get(BaseSourceConfigOptions.FILE_PATH),
FileSystemType.JUICEFS.getFileSystemPluginName());
}

@Override
public String factoryIdentifier() {
return FileSystemType.JUICEFS.getFileSystemPluginName();
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.juicefs.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;

import java.util.HashMap;
import java.util.Map;

public class JuicefsConfigOptions extends BaseSourceConfigOptions {

public static final Option<String> JFS_NAME =
Options.key("jfs_name")
.stringType()
.noDefaultValue()
.withDescription("Juicefs volume name. eg: jfs://seatunnel");

public static final Option<String> META_URL =
Options.key("meta_url")
.stringType()
.noDefaultValue()
.withDescription("Juicefs meta url");
/**
* The current key for that config option. if you need to add a new option, you can add it here
* and refer to this:
*
* <p>https://juicefs.com/docs/community/hadoop_java_sdk/
*
* <p>such as: key = "juicefs.cache-size" value = "0"
*/
public static final Option<Map<String, String>> HADOOP_PROPERTIES =
Options.key("hadoop_properties")
.mapType()
.defaultValue(new HashMap<>())
.withDescription("Juicefs hadoop properties");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.juicefs.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;

import org.apache.commons.lang3.StringUtils;

import lombok.Setter;

import java.util.HashMap;

@Setter
public class JuicefsHadoopConf extends HadoopConf {

private static final String HDFS_IMPL = "io.juicefs.JuiceFileSystem";
private static final String HDFS_ABSTRACT_IMPL = "io.juicefs.JuiceFS";

private static final String JUICEFS_META_KEY = "juicefs.meta";
private static final String DEFAULT_SCHEMA = "jfs";

private String schema = DEFAULT_SCHEMA;

@Override
public String getFsHdfsImpl() {
return HDFS_IMPL;
}

@Override
public String getSchema() {
return this.schema;
}

public JuicefsHadoopConf(String hdfsNameKey) {
super(hdfsNameKey);
}

public static HadoopConf buildWithConfig(ReadonlyConfig config) {
String jfsName = config.get(JuicefsConfigOptions.JFS_NAME);
JuicefsHadoopConf hadoopConf = new JuicefsHadoopConf(jfsName);
String schema = jfsName.split("://")[0];
hadoopConf.setSchema(schema);
String remoteUser = config.get(BaseSourceConfigOptions.REMOTE_USER);
if (StringUtils.isNotEmpty(remoteUser)) {
hadoopConf.setRemoteUser(remoteUser);
}
String hdfsAbstractImplKey =
String.format("fs.AbstractFileSystem.%s.impl", hadoopConf.getSchema());
HashMap<String, String> juicefsOptions = new HashMap<>();
juicefsOptions.put(hdfsAbstractImplKey, HDFS_ABSTRACT_IMPL);
juicefsOptions.put(JUICEFS_META_KEY, config.get(JuicefsConfigOptions.META_URL));
juicefsOptions.putAll(config.get(JuicefsConfigOptions.HADOOP_PROPERTIES));

hadoopConf.setExtraOptions(juicefsOptions);

return hadoopConf;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.juicefs.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.juicefs.config.JuicefsHadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;

import java.util.Optional;

public class JuicefsFileSink extends BaseMultipleTableFileSink {

private final CatalogTable catalogTable;

@Override
public String getPluginName() {
return FileSystemType.JUICEFS.getFileSystemPluginName();
}

public JuicefsFileSink(CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
super(JuicefsHadoopConf.buildWithConfig(readonlyConfig), readonlyConfig, catalogTable);
this.catalogTable = catalogTable;
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.ofNullable(catalogTable);
}
}
Loading
Loading