Skip to content

[Feature] Support connection config, and use conn_id to replace conn info in job config #8945

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

thirsd
Copy link

@thirsd thirsd commented Mar 9, 2025

this commit is for #8941

Purpose of this pull request

Does this PR introduce any user-facing change?

user can add -conn or --connect to set Connect Config file.

connections.conf

connection {
  # 定义 MySQL 连接配置
  mysql_prod {
    url = "jdbc:mysql://192.168.1.19:3306/test"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "root"
    password = "111111"
    fetch_size = 10000
  }

  # 定义 Kafka 连接配置
  kafka_test {
    bootstrap.servers = "kafka-test:9092"
    topic = "test_topic"
  }
  
  sqlite_test {
    url = "jdbc:sqlite:D:/MyWorld/05Projects/java/Seatunnel/SeaTunnel/db/test.db"
    driver = "org.sqlite.JDBC"
  }
}

sqlite_to_console.conf:

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  jdbc {
    url = "jdbc:sqlite:D:/MyWorld/05Projects/java/Seatunnel/SeaTunnel/db/test.db"
    driver = "org.sqlite.JDBC"
	table_path = "department"
  }
}

transform {
}

sink {
  console {
    plugin_input="fake"
  }
}

How was this patch tested?

no conn_id example

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  jdbc {
    url = "jdbc:sqlite:D:/MyWorld/05Projects/java/Seatunnel/SeaTunnel/db/test.db"
    driver = "org.sqlite.JDBC"
	table_path = "department"
  }
}

sqlite example

source {
  Jdbc {
    conn_id = "sqlite_test"
    table_path = "department"
  }
}

mysql example

source {
  Jdbc {
    conn_id = "mysql_prod"
    query="select * from department"
  }
}

transform {
}

sink {
  console {
  }
}

Check list

@github-actions github-actions bot added core SeaTunnel core module Zeta api labels Mar 9, 2025
@hailin0
Copy link
Member

hailin0 commented Mar 10, 2025

You can initiate a discussion on the developer mailing list, which requires majority consensus.

[email protected]

@liugddx
Copy link
Member

liugddx commented Mar 10, 2025

We should have a separate module to maintain configuration items, which can be a third-party tool, such as apache gravitino.Or of course you can maintain it yourself, as you are doing now. First of all, you need a design plan.

@thirsd
Copy link
Author

thirsd commented Mar 11, 2025

We should have a separate module to maintain configuration items, which can be a third-party tool, such as apache gravitino.Or of course you can maintain it yourself, as you are doing now. First of all, you need a design plan.

已经修复问题。

目前采用方式为增加--connect参数,通过指定connection对应的file文件,从而在job的config中若存在conn_id的信息,则会使用connection中的配置进行代替。

@davidzollo davidzollo added the First-time contributor First-time contributor label Mar 11, 2025
@liugddx
Copy link
Member

liugddx commented Mar 12, 2025

We can add a separate module to support it.It can support major catalogs, such as apache gravitino or seatunnel's own catalog management tool. In terms of usage, we can not add --conn but process it through env environment variables. env { metalake= "seatunnelCatalog" conn_id = "mysql" file_path = "s3://tmp/seatunnel/conf" }

@hailin0
Copy link
Member

hailin0 commented Mar 13, 2025

Any suggestions? @Hisoka-X @liugddx

common.conf


my_user_db {
    a = "xxxxx"
    b = "xxxx"
    c = "xxxx"
}

my_order_db {
    d = "xxxxx"
    e = "xxxx"
    f = "xxxx"
}

my_trans {
    x = "xxxxx"
    y = "xxxx"
    z = "xxxx"
}

job_example.conf


env {
    __st_config_ref_path__ = "/your/path/common.conf"
    ......
}

source {
    xxx {
        __st_config_ref_key__ = "my_user_db"
        .......
    }
}

transform {
    xxx {
        __st_config_ref_key__ = "my_trans"
        .......
    }
}

sink {
    xxx {
        __st_config_ref_key__ = "my_order_db"
        .......
    }
}

parsed config result

env {
    ......
}

source {
    xxx {
        a = "xxxxx"
        b = "xxxx"
        c = "xxxx"
        .......
    }
}

transform {
    xxx {
        x = "xxxxx"
        y = "xxxx"
        z = "xxxx"
        .......
    }
}

sink {
    xxx {
        d = "xxxxx"
        e = "xxxx"
        f = "xxxx"
        .......
    }
}

@liugddx
Copy link
Member

liugddx commented Mar 13, 2025

+1.

@thirsd
Copy link
Author

thirsd commented Mar 16, 2025

@hailin0 重新提交pull request。
#8984

烦请审核,若有需要调整,请随时反馈

@hailin0
Copy link
Member

hailin0 commented Mar 27, 2025

Why do we need to resubmit the PR? The context of the review will be lost.

@davidzollo davidzollo changed the title support connection config, and use conn_id to replace conn info in jo… [Feature] Support connection config, and use conn_id to replace conn info in job config Mar 27, 2025
@hailin0 hailin0 requested a review from Copilot March 31, 2025 05:19
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces support for connection configuration by adding a new connection config file that allows users to replace connection details in job configurations with a connection ID. It updates parsers, execution environment interfaces, client commands, and constants to integrate the new functionality.

  • Add a new connection configuration field and merging logic in the job config parser.
  • Update various client and command classes to pass and handle a connection config file path.
  • Extend configuration utilities and constants to support the connection config.

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java Added connection config field and merge logic for replacing conn_id with connection details.
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java Added new constructor parameter and field for connection file path.
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java Added interface methods to support passing a connection config path.
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java Extended client methods to include connection config path in execution context creation.
seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java Added logic to process and verify the connection config file path parameter from command-line arguments.
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java Updated configuration processing to handle connection configuration options.
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java Introduced a new parameter for connection configuration file.
seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java Introduced a constant for connection configuration.
Comments suppressed due to low confidence (1)

seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java:141

  • [nitpick] Consider renaming the variable 'str_conn_path' to 'connFilePath' or 'connFileAbsolutePath' for improved consistency and readability with other naming conventions.
String str_conn_path = null;

Comment on lines +181 to +185
Map<String, Object> conn_dit = new HashMap<>(connect.size());
for (String sensitiveOption : sensitiveOptions) {
conn_dit.computeIfPresent(sensitiveOption, processFunction);
}
connect = conn_dit;
Copy link
Preview

Copilot AI Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The variable name 'conn_dit' appears to be a typo; consider renaming it to 'conn_dict' for clarity.

Suggested change
Map<String, Object> conn_dit = new HashMap<>(connect.size());
for (String sensitiveOption : sensitiveOptions) {
conn_dit.computeIfPresent(sensitiveOption, processFunction);
}
connect = conn_dit;
Map<String, Object> conn_dict = new HashMap<>(connect.size());
for (String sensitiveOption : sensitiveOptions) {
conn_dict.computeIfPresent(sensitiveOption, processFunction);
}
connect = conn_dict;

Copilot uses AI. Check for mistakes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api core SeaTunnel core module First-time contributor First-time contributor Zeta
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants