|
27 | 27 | import io.trino.dispatcher.DispatchManager;
|
28 | 28 | import io.trino.execution.QueryInfo;
|
29 | 29 | import io.trino.execution.QueryManager;
|
| 30 | +import io.trino.execution.QueryStats; |
30 | 31 | import io.trino.metadata.FunctionManager;
|
31 | 32 | import io.trino.metadata.Metadata;
|
32 | 33 | import io.trino.metadata.QualifiedObjectName;
|
33 | 34 | import io.trino.server.BasicQueryInfo;
|
| 35 | +import io.trino.spi.QueryId; |
34 | 36 | import io.trino.spi.connector.ConnectorSession;
|
35 | 37 | import io.trino.spi.connector.MaterializedViewFreshness;
|
| 38 | +import io.trino.spi.eventlistener.QueryCompletedEvent; |
36 | 39 | import io.trino.spi.security.Identity;
|
37 | 40 | import io.trino.sql.planner.OptimizerConfig.JoinDistributionType;
|
38 | 41 | import io.trino.sql.planner.Plan;
|
|
45 | 48 | import io.trino.sql.query.QueryAssertions.QueryAssert;
|
46 | 49 | import io.trino.testing.QueryRunner.MaterializedResultWithPlan;
|
47 | 50 | import io.trino.testing.assertions.TrinoExceptionAssert;
|
| 51 | +import io.trino.testing.eventlistener.EventsAwaitingQueries; |
| 52 | +import io.trino.testing.eventlistener.EventsCollector; |
| 53 | +import io.trino.testing.eventlistener.NamedClosable; |
| 54 | +import io.trino.testing.eventlistener.QueryEvents; |
| 55 | +import io.trino.testing.eventlistener.TestingEventListenerPlugin; |
48 | 56 | import io.trino.testing.sql.TestTable;
|
49 | 57 | import io.trino.testing.sql.TestView;
|
50 | 58 | import io.trino.tpch.TpchTable;
|
|
68 | 76 | import java.util.concurrent.CyclicBarrier;
|
69 | 77 | import java.util.concurrent.ExecutorService;
|
70 | 78 | import java.util.concurrent.Future;
|
| 79 | +import java.util.concurrent.TimeoutException; |
71 | 80 | import java.util.concurrent.atomic.AtomicReference;
|
72 | 81 | import java.util.function.Consumer;
|
73 | 82 | import java.util.function.Function;
|
@@ -186,7 +195,7 @@ public abstract class BaseConnectorTest
|
186 | 195 | {
|
187 | 196 | private static final Logger log = Logger.get(BaseConnectorTest.class);
|
188 | 197 |
|
189 |
| - protected static final List<TpchTable<?>> REQUIRED_TPCH_TABLES = ImmutableSet.<TpchTable<?>>builder() |
| 198 | + public static final List<TpchTable<?>> REQUIRED_TPCH_TABLES = ImmutableSet.<TpchTable<?>>builder() |
190 | 199 | .addAll(AbstractTestQueries.REQUIRED_TPCH_TABLES)
|
191 | 200 | .add(CUSTOMER)
|
192 | 201 | .build().asList();
|
@@ -5030,6 +5039,58 @@ public void testUpdateMultipleCondition()
|
5030 | 5039 | }
|
5031 | 5040 | }
|
5032 | 5041 |
|
| 5042 | + @Test |
| 5043 | + public void testUpdateStatsWithRaisedEvents() |
| 5044 | + { |
| 5045 | + if (!hasBehavior(SUPPORTS_UPDATE)) { |
| 5046 | + // Note this change is a no-op, if actually run |
| 5047 | + assertQueryFails("UPDATE nation SET nationkey = nationkey + regionkey WHERE regionkey < 1", MODIFYING_ROWS_MESSAGE); |
| 5048 | + return; |
| 5049 | + } |
| 5050 | + Supplier<NamedClosable> supplier = () -> { |
| 5051 | + TestTable table = newTrinoTable("test_row_update", "AS SELECT * FROM nation"); |
| 5052 | + return new NamedClosable(table.getName(), table); |
| 5053 | + }; |
| 5054 | + runUpdateDeleteStatsWithRaisedEvents(supplier, |
| 5055 | + table -> "UPDATE " + table + " SET nationkey = 100 WHERE regionkey = 2"); |
| 5056 | + } |
| 5057 | + |
| 5058 | + @Test |
| 5059 | + public void testDeleteStatsWithRaisedEvents() |
| 5060 | + { |
| 5061 | + skipTestUnless(hasBehavior(SUPPORTS_DELETE)); |
| 5062 | + // delete successive parts of the table |
| 5063 | + Supplier<NamedClosable> supplier = () -> { |
| 5064 | + TestTable table = newTrinoTable("test_delete_", "AS SELECT * FROM orders"); |
| 5065 | + return new NamedClosable(table.getName(), table); |
| 5066 | + }; |
| 5067 | + runUpdateDeleteStatsWithRaisedEvents(supplier, |
| 5068 | + table -> "DELETE FROM " + table + " WHERE custkey <= 100"); |
| 5069 | + } |
| 5070 | + |
| 5071 | + protected void runUpdateDeleteStatsWithRaisedEvents(Supplier<NamedClosable> namedClosable, |
| 5072 | + Function<String, String> querySupplier) { |
| 5073 | + EventsCollector generatedEvents = new EventsCollector(); |
| 5074 | + EventsAwaitingQueries queries = new EventsAwaitingQueries(generatedEvents, getQueryRunner()); |
| 5075 | + getQueryRunner().installPlugin(new TestingEventListenerPlugin(generatedEvents)); |
| 5076 | + try(NamedClosable table = namedClosable.get()) { |
| 5077 | + EventsAwaitingQueries.MaterializedResultWithEvents result = queries.runQueryAndWaitForEvents(querySupplier.apply(table.getName()), getSession()); |
| 5078 | + QueryEvents queryEvents = result.getQueryEvents(); |
| 5079 | + try { |
| 5080 | + queryEvents.waitForQueryCompletion(new Duration(300, SECONDS)); |
| 5081 | + SECONDS.sleep(1); |
| 5082 | + QueryCompletedEvent c = queryEvents.getQueryCompletedEvent(); |
| 5083 | + QueryStats queryStats = getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(new QueryId(c.getMetadata().getQueryId())).getQueryStats(); |
| 5084 | + assertThat(result.getMaterializedResult().getUpdateCount().orElse(0)).isEqualTo(queryStats.getUpdatedPositions()); |
| 5085 | + } catch (InterruptedException | TimeoutException e) { |
| 5086 | + throw new RuntimeException(e); |
| 5087 | + } |
| 5088 | + } catch (Exception e) { |
| 5089 | + throw new RuntimeException(e); |
| 5090 | + } |
| 5091 | + } |
| 5092 | + |
| 5093 | + |
5033 | 5094 | @Test
|
5034 | 5095 | public void testUpdateWithNullValues()
|
5035 | 5096 | {
|
|
0 commit comments