Skip to content

[Proposal][Transform&CatalogTable] Make Transform Support CatalogTable And CatalogTable Evolution #4402

Open
@EricJoy2048

Description

@EricJoy2048

Backgroud

The SaveMode feature is in our RoadMap already, To support auto create table schema, the sink connector needs to know the CatalogTable sent by its upstream. So, the Transform needs to support CatalogTable and CatalogTable Evolution.

Design

Add createAndPrepareTransform to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java

public static SeaTunnelTransform<?> createAndPrepareTransform(
            CatalogTable catalogTable,
            ReadonlyConfig options,
            ClassLoader classLoader,
            String factoryIdentifier) {
        final TableTransformFactory factory =
                discoverFactory(classLoader, TableTransformFactory.class, factoryIdentifier);
        TableFactoryContext context =
                new TableFactoryContext(
                        Collections.singletonList(catalogTable), options, classLoader);
        return factory.createTransform(context).createTransform();
    }

SeaTunnelTransform API

Because we need know the CatalogTable output by Transform and make it as the input CatalogTable of the downstream Node. So we need add a method like this:

    /**
     * Get the catalog table output by this transform
     *
     * @return
     */
    CatalogTable getProducedCatalogTable();

AbstractCatalogSupportTransform

AbstractCatalogSupportTransform is an abstract class and have some method like this:

public abstract class AbstractCatalogSupportTransform extends AbstractSeaTunnelTransform {
    protected CatalogTable inputCatalogTable;

    protected volatile CatalogTable outputCatalogTable;

    public AbstractCatalogSupportTransform(@NonNull CatalogTable inputCatalogTable) {
        this.inputCatalogTable = inputCatalogTable;
    }

    @Override
    public CatalogTable getProducedCatalogTable() {
        if (outputCatalogTable == null) {
            synchronized (this) {
                if (outputCatalogTable == null) {
                    outputCatalogTable = transformCatalogTable();
                }
            }
        }

        return outputCatalogTable;
    }

    private CatalogTable transformCatalogTable() {
        TableIdentifier tableIdentifier = transformTableIdentifier();
        TableSchema tableSchema = transformTableSchema();
        CatalogTable catalogTable =
                CatalogTable.of(
                        tableIdentifier,
                        tableSchema,
                        inputCatalogTable.getOptions(),
                        inputCatalogTable.getPartitionKeys(),
                        inputCatalogTable.getComment());
        return catalogTable;
    }

    protected abstract TableSchema transformTableSchema();

    protected abstract TableIdentifier transformTableIdentifier();

    @Override
    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return getProducedCatalogTable().getTableSchema().toPhysicalRowDataType();
    }
}

Because Transform Instance in all starter module(Flink, Spark, Zeta) and we can not complete all module update in a short time. So AbstractCatalogSupportTransform extends AbstractSeaTunnelTransform and ensure the Transform Connector can support old starter and new one.

TransformFactory

The Transform which want to support CatalogTable need implement public TableTransform createTransform(@NonNull TableFactoryContext context)

For example:

public class SplitTransformFactory implements TableTransformFactory {

   @Override
    public TableTransform createTransform(@NonNull TableFactoryContext context) {
        SplitTransformConfig splitTransformConfig = SplitTransformConfig.of(context.getOptions());
        CatalogTable catalogTable = context.getCatalogTable();
        return () -> new SplitTransform(splitTransformConfig, catalogTable);
    }
}

Starter

The starter module need update and use FactoryUtil.createAndPrepareTransform to create Transform instance.

Zeta

Update the method parseTransform in seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java and add this code

CatalogTable catalogTable = inputs.get(0)._1();
        SeaTunnelTransform<?> transform =
                FactoryUtil.createAndPrepareTransform(
                        catalogTable, readonlyConfig, classLoader, factoryId);
        long id = idGenerator.getNextId();
        String actionName =
                JobConfigParser.createTransformActionName(
                        0, factoryId, JobConfigParser.getTableName(config));

        TransformAction transformAction =
                new TransformAction(
                        id, actionName, new ArrayList<>(inputActions), transform, factoryUrls);
        tableWithActionMap.put(
                tableId,
                Collections.singletonList(
                        new Tuple2<>(transform.getProducedCatalogTable(), transformAction)));
        return;

Todo list

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions