Skip to content

Commit 91bc773

Browse files
committed
Add support for branching in engine
1 parent 398853e commit 91bc773

21 files changed

+1274
-38
lines changed

Diff for: core/trino-main/src/main/java/io/trino/connector/CatalogServiceProviderModule.java

+8
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.inject.Singleton;
2121
import io.trino.SystemSessionPropertiesProvider;
2222
import io.trino.metadata.AnalyzePropertyManager;
23+
import io.trino.metadata.BranchPropertyManager;
2324
import io.trino.metadata.CatalogProcedures;
2425
import io.trino.metadata.CatalogTableFunctions;
2526
import io.trino.metadata.CatalogTableProcedures;
@@ -156,6 +157,13 @@ public static AnalyzePropertyManager createAnalyzePropertyManager(ConnectorServi
156157
return new AnalyzePropertyManager(new ConnectorCatalogServiceProvider<>("analyze properties", connectorServicesProvider, ConnectorServices::getAnalyzeProperties));
157158
}
158159

160+
@Provides
161+
@Singleton
162+
public static BranchPropertyManager createBranchPropertyManager(ConnectorServicesProvider connectorServicesProvider)
163+
{
164+
return new BranchPropertyManager(new ConnectorCatalogServiceProvider<>("branch properties", connectorServicesProvider, ConnectorServices::getAnalyzeProperties));
165+
}
166+
159167
@Provides
160168
@Singleton
161169
public static TableProceduresPropertyManager createTableProceduresPropertyManager(ConnectorServicesProvider connectorServicesProvider)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.execution;
15+
16+
import com.google.common.util.concurrent.ListenableFuture;
17+
import com.google.inject.Inject;
18+
import io.trino.Session;
19+
import io.trino.execution.warnings.WarningCollector;
20+
import io.trino.metadata.BranchPropertyManager;
21+
import io.trino.metadata.Metadata;
22+
import io.trino.metadata.QualifiedObjectName;
23+
import io.trino.metadata.TableHandle;
24+
import io.trino.security.AccessControl;
25+
import io.trino.spi.connector.CatalogHandle;
26+
import io.trino.sql.PlannerContext;
27+
import io.trino.sql.tree.CreateBranch;
28+
import io.trino.sql.tree.Expression;
29+
import io.trino.sql.tree.NodeRef;
30+
import io.trino.sql.tree.Parameter;
31+
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.Optional;
35+
36+
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
37+
import static io.trino.execution.ParameterExtractor.bindParameters;
38+
import static io.trino.metadata.MetadataUtil.createQualifiedObjectName;
39+
import static io.trino.metadata.MetadataUtil.getRequiredCatalogHandle;
40+
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
41+
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
42+
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
43+
import static io.trino.sql.analyzer.SemanticExceptions.semanticException;
44+
import static io.trino.sql.tree.SaveMode.FAIL;
45+
import static io.trino.sql.tree.SaveMode.REPLACE;
46+
import static java.util.Objects.requireNonNull;
47+
48+
public class CreateBranchTask
49+
implements DataDefinitionTask<CreateBranch>
50+
{
51+
private final PlannerContext plannerContext;
52+
private final Metadata metadata;
53+
private final AccessControl accessControl;
54+
private final BranchPropertyManager branchPropertyManager;
55+
56+
@Inject
57+
public CreateBranchTask(PlannerContext plannerContext, Metadata metadata, AccessControl accessControl, BranchPropertyManager branchPropertyManager)
58+
{
59+
this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
60+
this.metadata = requireNonNull(metadata, "metadata is null");
61+
this.accessControl = requireNonNull(accessControl, "accessControl is null");
62+
this.branchPropertyManager = requireNonNull(branchPropertyManager, "branchPropertyManager is null");
63+
}
64+
65+
@Override
66+
public String getName()
67+
{
68+
return "CREATE BRANCH";
69+
}
70+
71+
@Override
72+
public ListenableFuture<Void> execute(CreateBranch statement, QueryStateMachine stateMachine, List<Expression> parameters, WarningCollector warningCollector)
73+
{
74+
Session session = stateMachine.getSession();
75+
76+
QualifiedObjectName table = createQualifiedObjectName(session, statement, statement.getTableName());
77+
String branch = statement.getBranchName().getValue();
78+
79+
if (metadata.isMaterializedView(session, table)) {
80+
throw semanticException(GENERIC_USER_ERROR, statement, "Dropping branch from materialized view is not supported");
81+
}
82+
if (metadata.isView(session, table)) {
83+
throw semanticException(GENERIC_USER_ERROR, statement, "Dropping branch from view is not supported");
84+
}
85+
Optional<TableHandle> tableHandle = metadata.getRedirectionAwareTableHandle(session, table).tableHandle();
86+
if (tableHandle.isEmpty()) {
87+
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", table);
88+
}
89+
90+
if (metadata.branchExists(session, table, branch) && statement.getSaveMode() != REPLACE) {
91+
if (statement.getSaveMode() == FAIL) {
92+
throw semanticException(NOT_SUPPORTED, statement, "Branch '%s' already exists", branch);
93+
}
94+
return immediateVoidFuture();
95+
}
96+
97+
Map<NodeRef<Parameter>, Expression> parameterLookup = bindParameters(statement, parameters);
98+
CatalogHandle catalogHandle = getRequiredCatalogHandle(metadata, session, statement, table.catalogName());
99+
Map<String, Object> properties = branchPropertyManager.getProperties(
100+
table.catalogName(),
101+
catalogHandle,
102+
statement.getProperties(),
103+
session,
104+
plannerContext,
105+
accessControl,
106+
parameterLookup,
107+
true);
108+
109+
accessControl.checkCanCreateBranch(session.toSecurityContext(), table, branch);
110+
metadata.createBranch(session, tableHandle.get(), branch, properties);
111+
112+
return immediateVoidFuture();
113+
}
114+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.execution;
15+
16+
import com.google.common.util.concurrent.ListenableFuture;
17+
import com.google.inject.Inject;
18+
import io.trino.Session;
19+
import io.trino.execution.warnings.WarningCollector;
20+
import io.trino.metadata.Metadata;
21+
import io.trino.metadata.QualifiedObjectName;
22+
import io.trino.metadata.TableHandle;
23+
import io.trino.security.AccessControl;
24+
import io.trino.sql.tree.DropBranch;
25+
import io.trino.sql.tree.Expression;
26+
27+
import java.util.Collection;
28+
import java.util.List;
29+
import java.util.Optional;
30+
31+
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
32+
import static io.trino.metadata.MetadataUtil.createQualifiedObjectName;
33+
import static io.trino.spi.StandardErrorCode.BRANCH_NOT_FOUND;
34+
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
35+
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
36+
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
37+
import static io.trino.sql.analyzer.SemanticExceptions.semanticException;
38+
import static java.util.Objects.requireNonNull;
39+
40+
public class DropBranchTask
41+
implements DataDefinitionTask<DropBranch>
42+
{
43+
private final Metadata metadata;
44+
private final AccessControl accessControl;
45+
46+
@Inject
47+
public DropBranchTask(Metadata metadata, AccessControl accessControl)
48+
{
49+
this.metadata = requireNonNull(metadata, "metadata is null");
50+
this.accessControl = requireNonNull(accessControl, "accessControl is null");
51+
}
52+
53+
@Override
54+
public String getName()
55+
{
56+
return "DROP BRANCH";
57+
}
58+
59+
@Override
60+
public ListenableFuture<Void> execute(DropBranch statement, QueryStateMachine stateMachine, List<Expression> parameters, WarningCollector warningCollector)
61+
{
62+
Session session = stateMachine.getSession();
63+
64+
QualifiedObjectName table = createQualifiedObjectName(session, statement, statement.getTableName());
65+
String branch = statement.getBranchName().getValue();
66+
67+
if (metadata.isMaterializedView(session, table)) {
68+
throw semanticException(GENERIC_USER_ERROR, statement, "Dropping branch from materialized view is not supported");
69+
}
70+
if (metadata.isView(session, table)) {
71+
throw semanticException(GENERIC_USER_ERROR, statement, "Dropping branch from view is not supported");
72+
}
73+
Optional<TableHandle> tableHandle = metadata.getRedirectionAwareTableHandle(session, table).tableHandle();
74+
if (tableHandle.isEmpty()) {
75+
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", table);
76+
}
77+
78+
Collection<String> branches = metadata.listBranches(session, table);
79+
if (!branches.contains(branch)) {
80+
if (!statement.isExists()) {
81+
throw semanticException(BRANCH_NOT_FOUND, statement, "Branch '%s' does not exist", branch);
82+
}
83+
return immediateVoidFuture();
84+
}
85+
if (branches.size() == 1) {
86+
throw semanticException(NOT_SUPPORTED, statement, "Cannot drop the only branch in a table");
87+
}
88+
89+
accessControl.canCanDropBranch(session.toSecurityContext(), table, branch);
90+
metadata.dropBranch(session, tableHandle.get(), branch);
91+
92+
return immediateVoidFuture();
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.execution;
15+
16+
import com.google.common.util.concurrent.ListenableFuture;
17+
import com.google.inject.Inject;
18+
import io.trino.Session;
19+
import io.trino.execution.warnings.WarningCollector;
20+
import io.trino.metadata.Metadata;
21+
import io.trino.metadata.QualifiedObjectName;
22+
import io.trino.metadata.TableHandle;
23+
import io.trino.security.AccessControl;
24+
import io.trino.sql.tree.Expression;
25+
import io.trino.sql.tree.FastForwardBranch;
26+
27+
import java.util.Collection;
28+
import java.util.List;
29+
import java.util.Optional;
30+
31+
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
32+
import static io.trino.metadata.MetadataUtil.createQualifiedObjectName;
33+
import static io.trino.spi.StandardErrorCode.BRANCH_NOT_FOUND;
34+
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
35+
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
36+
import static io.trino.sql.analyzer.SemanticExceptions.semanticException;
37+
import static java.util.Objects.requireNonNull;
38+
39+
public class FastForwardBranchTask
40+
implements DataDefinitionTask<FastForwardBranch>
41+
{
42+
private final Metadata metadata;
43+
private final AccessControl accessControl;
44+
45+
@Inject
46+
public FastForwardBranchTask(Metadata metadata, AccessControl accessControl)
47+
{
48+
this.metadata = requireNonNull(metadata, "metadata is null");
49+
this.accessControl = requireNonNull(accessControl, "accessControl is null");
50+
}
51+
52+
@Override
53+
public String getName()
54+
{
55+
return "FAST FORWARD BRANCH";
56+
}
57+
58+
@Override
59+
public ListenableFuture<Void> execute(FastForwardBranch statement, QueryStateMachine stateMachine, List<Expression> parameters, WarningCollector warningCollector)
60+
{
61+
Session session = stateMachine.getSession();
62+
63+
QualifiedObjectName table = createQualifiedObjectName(session, statement, statement.geTableName());
64+
String sourceBranch = statement.getSourceBranchName().getValue();
65+
String targetBranch = statement.getTargetBranchName().getValue();
66+
67+
if (metadata.isMaterializedView(session, table)) {
68+
throw semanticException(GENERIC_USER_ERROR, statement, "Fast forwarding branch on materialized view is not supported");
69+
}
70+
if (metadata.isView(session, table)) {
71+
throw semanticException(GENERIC_USER_ERROR, statement, "Fast forwarding branch on view is not supported");
72+
}
73+
Optional<TableHandle> tableHandle = metadata.getRedirectionAwareTableHandle(session, table).tableHandle();
74+
if (tableHandle.isEmpty()) {
75+
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", table);
76+
}
77+
78+
if (sourceBranch.equals(targetBranch)) {
79+
throw semanticException(GENERIC_USER_ERROR, statement, "Fast forwarding branch between same branches is not supported");
80+
}
81+
Collection<String> branches = metadata.listBranches(session, table);
82+
if (!branches.contains(sourceBranch)) {
83+
throw semanticException(BRANCH_NOT_FOUND, statement, "Branch '%s' does not exist", sourceBranch);
84+
}
85+
if (!branches.contains(targetBranch)) {
86+
throw semanticException(BRANCH_NOT_FOUND, statement, "Branch '%s' does not exist", targetBranch);
87+
}
88+
89+
accessControl.canCanAlterBranch(session.toSecurityContext(), table, sourceBranch);
90+
metadata.fastForwardBranch(session, tableHandle.get(), sourceBranch, targetBranch);
91+
92+
return immediateVoidFuture();
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.metadata;
15+
16+
import io.trino.connector.CatalogServiceProvider;
17+
import io.trino.spi.session.PropertyMetadata;
18+
19+
import java.util.Map;
20+
21+
import static io.trino.spi.StandardErrorCode.INVALID_BRANCH_PROPERTY;
22+
23+
public class BranchPropertyManager
24+
extends AbstractCatalogPropertyManager
25+
{
26+
public BranchPropertyManager(CatalogServiceProvider<Map<String, PropertyMetadata<?>>> connectorProperties)
27+
{
28+
super("branch", INVALID_BRANCH_PROPERTY, connectorProperties);
29+
}
30+
}

Diff for: core/trino-main/src/main/java/io/trino/metadata/Metadata.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,16 @@ default ResolvedFunction getCoercion(Type fromType, Type toType)
758758

759759
void dropLanguageFunction(Session session, QualifiedObjectName name, String signatureToken);
760760

761+
void createBranch(Session session, TableHandle tableHandle, String branch, Map<String, Object> properties);
762+
763+
void dropBranch(Session session, TableHandle tableHandle, String branch);
764+
765+
void fastForwardBranch(Session session, TableHandle tableHandle, String sourceBranch, String targetBranch);
766+
767+
Collection<String> listBranches(Session session, QualifiedObjectName tableName);
768+
769+
boolean branchExists(Session session, QualifiedObjectName tableName, String branch);
770+
761771
/**
762772
* Creates the specified materialized view with the specified view definition.
763773
*/
@@ -835,12 +845,22 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName
835845
/**
836846
* Get the target table handle after performing redirection with a table version.
837847
*/
838-
RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion);
848+
RedirectionAwareTableHandle getRedirectionAwareTableHandle(
849+
Session session,
850+
QualifiedObjectName tableName,
851+
Optional<TableVersion> startVersion,
852+
Optional<TableVersion> endVersion,
853+
Optional<String> branch);
839854

840855
/**
841856
* Returns a table handle for the specified table name with a specified version
842857
*/
843-
Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName tableName, Optional<TableVersion> startVersion, Optional<TableVersion> endVersion);
858+
Optional<TableHandle> getTableHandle(
859+
Session session,
860+
QualifiedObjectName tableName,
861+
Optional<TableVersion> startVersion,
862+
Optional<TableVersion> endVersion,
863+
Optional<String> branch);
844864

845865
/**
846866
* Returns maximum number of tasks that can be created while writing data to specific connector.

0 commit comments

Comments
 (0)