Skip to content

Commit 9133e26

Browse files
committed
[procedure] Merge migrate_file to migrate_table
1 parent 26ac4b1 commit 9133e26

File tree

6 files changed

+72
-90
lines changed

6 files changed

+72
-90
lines changed

docs/content/migration/migration-from-hive.md

Lines changed: 38 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -30,59 +30,41 @@ Apache Hive supports ORC, Parquet file formats that could be migrated to Paimon.
3030
When migrating data to a paimon table, the origin table will be permanently disappeared. So please back up your data if you
3131
still need the original table. The migrated table will be [append table]({{< ref "append-table/overview" >}}).
3232

33-
Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate File Procedure to totally migrate a table from hive to paimon.
33+
Now, we can use paimon hive catalog with Migrate Table Procedure to totally migrate a table from hive to paimon.
3434
At the same time, you can use paimon hive catalog with Migrate Database Procedure to fully synchronize all tables in the database to paimon.
3535

3636
* Migrate Table Procedure: Paimon table does not exist, use the procedure upgrade hive table to paimon table. Hive table will disappear after action done.
3737
* Migrate Database Procedure: Paimon table does not exist, use the procedure upgrade all hive tables in database to paimon table. All hive tables will disappear after action done.
38-
* Migrate File Procedure: Paimon table already exists, use the procedure to migrate files from hive table to paimon table. **Notice that, Hive table will also disappear after action done.**
3938

4039
These three actions now support file format of hive "orc" and "parquet" and "avro".
4140

4241
<span style="color: red; "> **We highly recommend to back up hive table data before migrating, because migrating action is not atomic. If been interrupted while migrating, you may lose your data.** </span>
4342

44-
## Example for Migration
45-
46-
**Migrate Hive Table**
47-
48-
Command: <br>
49-
50-
***CALL <font color="green">sys.migrate_table</font>(&#39;hive&#39;, &#39;&lt;hive_database&gt;.&lt;hive_tablename&gt;&#39;, &#39;&lt;paimon_tableconf&gt;&#39;);***
51-
52-
**Example**
43+
## Migrate Hive Table
5344

45+
{{< tabs "migrate table" >}}
46+
{{< tab "Flink SQL" >}}
5447
```sql
55-
CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
48+
CREATE CATALOG PAIMON WITH (
49+
'type'='paimon',
50+
'metastore' = 'hive',
51+
'uri' = 'thrift://localhost:9083',
52+
'warehouse'='/path/to/warehouse/');
5653

5754
USE CATALOG PAIMON;
5855

59-
CALL sys.migrate_table(connector => 'hive', source_table => 'default.hivetable', options => 'file.format=orc');
60-
```
61-
After invoke, "hivetable" will totally convert to paimon format. Writing and reading the table by old "hive way" will fail.
62-
We can add our table properties while importing by sys.migrate_table('<database>.<tablename>', '<tableproperties>').
63-
<tableproperties> here should be separated by ",". For example:
64-
65-
```sql
6656
CALL sys.migrate_table(
67-
connector => 'hive',
68-
source_table => 'my_db.wait_to_upgrate',
69-
options => 'file.format=orc,read.batch-size=2096,write-only=true'
70-
);
57+
connector => 'hive',
58+
source_table => 'default.hivetable',
59+
-- You can specify the target table, and if the target table already exists
60+
-- the file will be migrated directly to it
61+
-- target_table => 'default.paimontarget',
62+
-- You can specify delete_origin is false, this won't delete hivetable
63+
-- delete_origin => false,
64+
options => 'file.format=orc');
7165
```
72-
73-
If your flink version is below 1.17, you can use flink action to achieve this:
74-
```bash
75-
<FLINK_HOME>/bin/flink run \
76-
/path/to/paimon-flink-action-{{< version >}}.jar \
77-
migrate_table \
78-
--warehouse <warehouse-path> \
79-
--source_type hive \
80-
--table <database.table-name> \
81-
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
82-
[--options <paimon-table-conf [,paimon-table-conf ...]> ]
83-
```
84-
85-
Example:
66+
{{< /tab >}}
67+
{{< tab "Flink Action" >}}
8668
```bash
8769
<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
8870
migrate_table \
@@ -92,35 +74,31 @@ migrate_table \
9274
--source_type hive \
9375
--table default.hive_or_paimon
9476
```
77+
{{< /tab >}}
78+
{{< /tabs >}}
9579

96-
**Migrate Hive Database**
97-
98-
Command: <br>
99-
100-
***CALL <font color="green">sys.migrate_database</font>(&#39;hive&#39;, &#39;&lt;hive_database&gt;&#39;, &#39;&lt;paimon_tableconf&gt;&#39;);***
80+
After invoke, "hivetable" will totally convert to paimon format. Writing and reading the table by old "hive way" will fail.
10181

102-
**Example**
82+
## Migrate Hive Database
10383

84+
{{< tabs "migrate database" >}}
85+
{{< tab "Flink SQL" >}}
10486
```sql
105-
CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
87+
CREATE CATALOG PAIMON WITH (
88+
'type'='paimon',
89+
'metastore' = 'hive',
90+
'uri' = 'thrift://localhost:9083',
91+
'warehouse'='/path/to/warehouse/');
10692

10793
USE CATALOG PAIMON;
10894

109-
CALL sys.migrate_database(connector => 'hive', source_database => 'default', options => 'file.format=orc');
110-
```
111-
After invoke, all tables in "default" database will totally convert to paimon format. Writing and reading the table by old "hive way" will fail.
112-
We can add our table properties while importing by sys.migrate_database('<database>', '<tableproperties>').
113-
<tableproperties> here should be separated by ",". For example:
114-
115-
```sql
11695
CALL sys.migrate_database(
117-
connector => 'hive',
118-
source_database => 'my_db',
119-
options => 'file.format=orc,read.batch-size=2096,write-only=true'
120-
);
96+
connector => 'hive',
97+
source_database => 'default',
98+
options => 'file.format=orc');
12199
```
122-
123-
If your flink version is below 1.17, you can use flink action to achieve this:
100+
{{< /tab >}}
101+
{{< tab "Flink Action" >}}
124102
```bash
125103
<FLINK_HOME>/bin/flink run \
126104
/path/to/paimon-flink-action-{{< version >}}.jar \
@@ -141,21 +119,7 @@ Example:
141119
--source_type hive \
142120
--database default
143121
```
122+
{{< /tab >}}
123+
{{< /tabs >}}
144124
145-
**Migrate Hive File**
146-
147-
Command: <br>
148-
149-
***CALL <font color="green">sys.migrate_file</font>(&#39;hive&#39;, &#39;&lt;hive_database&gt;.&lt;hive_table_name&gt;&#39;, &#39;&lt;paimon_database&gt;.&lt;paimon_tablename&gt;&#39;);***
150-
151-
**Example**
152-
153-
```sql
154-
CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
155-
156-
USE CATALOG PAIMON;
157-
158-
CALL sys.migrate_file(connector => 'hive', source_table => 'default.hivetable', target_table => 'default.paimontable');
159-
```
160-
After invoke, "hivetable" will disappear. And all files will be moved and renamed to paimon directory. "paimontable" here must have the same
161-
partition keys with "hivetable", and "paimontable" should be in unaware-bucket mode.
125+
After invoke, all tables in "default" database will totally convert to paimon format. Writing and reading the table by old "hive way" will fail.

paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ public interface Migrator {
2525

2626
void renameTable(boolean ignoreIfNotExists) throws Exception;
2727

28-
public void deleteOriginTable(boolean delete) throws Exception;
28+
void deleteOriginTable(boolean delete) throws Exception;
2929
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ public void run() throws Exception {
5353
new DefaultProcedureContext(env),
5454
connector,
5555
hiveTableFullName,
56+
null,
5657
tableProperties,
57-
parallelism);
58+
parallelism,
59+
null);
5860
}
5961
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.paimon.catalog.Identifier;
2222
import org.apache.paimon.flink.utils.TableMigrationUtils;
2323
import org.apache.paimon.migrate.Migrator;
24-
import org.apache.paimon.utils.ParameterUtils;
2524

2625
import org.apache.flink.table.annotation.ArgumentHint;
2726
import org.apache.flink.table.annotation.DataTypeHint;
@@ -30,6 +29,8 @@
3029
import org.slf4j.Logger;
3130
import org.slf4j.LoggerFactory;
3231

32+
import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
33+
3334
/** Migrate procedure to migrate hive table to paimon table. */
3435
public class MigrateTableProcedure extends ProcedureBase {
3536

@@ -46,25 +47,34 @@ public String identifier() {
4647
argument = {
4748
@ArgumentHint(name = "connector", type = @DataTypeHint("STRING")),
4849
@ArgumentHint(name = "source_table", type = @DataTypeHint("STRING")),
50+
@ArgumentHint(name = "target_table", type = @DataTypeHint("STRING")),
4951
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true),
5052
@ArgumentHint(
5153
name = "parallelism",
5254
type = @DataTypeHint("Integer"),
55+
isOptional = true),
56+
@ArgumentHint(
57+
name = "delete_origin",
58+
type = @DataTypeHint("BOOLEAN"),
5359
isOptional = true)
5460
})
5561
public String[] call(
5662
ProcedureContext procedureContext,
5763
String connector,
58-
String sourceTablePath,
64+
String sourceTable,
65+
String targetTable,
5966
String properties,
60-
Integer parallelism)
67+
Integer parallelism,
68+
Boolean deleteOrigin)
6169
throws Exception {
62-
properties = notnull(properties);
63-
64-
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
70+
if (targetTable == null && !deleteOrigin) {
71+
throw new IllegalArgumentException("delete_origin is false but targetTable is null.");
72+
}
6573

66-
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
67-
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
74+
Identifier sourceTableId = Identifier.fromString(sourceTable);
75+
Identifier targetTableId =
76+
Identifier.fromString(
77+
targetTable == null ? sourceTable + PAIMON_SUFFIX : targetTable);
6878

6979
Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
7080

@@ -77,11 +87,14 @@ public String[] call(
7787
targetTableId.getDatabaseName(),
7888
targetTableId.getObjectName(),
7989
p,
80-
ParameterUtils.parseCommaSeparatedKeyValues(properties));
90+
parseCommaSeparatedKeyValues(notnull(properties)));
8191
LOG.info("create migrator success.");
92+
migrator.deleteOriginTable(deleteOrigin);
8293
migrator.executeMigrate();
8394

84-
migrator.renameTable(false);
95+
if (targetTable == null) {
96+
migrator.renameTable(false);
97+
}
8598
return new String[] {"Success"};
8699
}
87100
}

paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,11 @@ public void test(String format, boolean isNamedArgument) throws Exception {
103103
"CREATE TABLE paimontable (id STRING, id2 INT, id3 INT) PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
104104
if (isNamedArgument) {
105105
tEnv.executeSql(
106-
"CALL sys.migrate_file(connector => 'hive', source_table => 'default.hivetable', target_table => 'default.paimontable')")
106+
"CALL sys.migrate_table(connector => 'hive', source_table => 'default.hivetable', target_table => 'default.paimontable')")
107107
.await();
108108
} else {
109109
tEnv.executeSql(
110-
"CALL sys.migrate_file('hive', 'default.hivetable', 'default.paimontable')")
110+
"CALL sys.migrate_table('hive', 'default.hivetable', 'default.paimontable')")
111111
.await();
112112
}
113113
List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM paimontable").collect());
@@ -149,11 +149,11 @@ public void testMigrateFileAction(String format, boolean isNamedArgument) throws
149149

150150
if (isNamedArgument) {
151151
tEnv.executeSql(
152-
"CALL sys.migrate_file(connector => 'hive', source_table => 'default.hivetable01', target_table => 'default.paimontable01', delete_origin => false)")
152+
"CALL sys.migrate_table(connector => 'hive', source_table => 'default.hivetable01', target_table => 'default.paimontable01', delete_origin => false)")
153153
.await();
154154
} else {
155155
tEnv.executeSql(
156-
"CALL sys.migrate_file('hive', 'default.hivetable01', 'default.paimontable01', false)")
156+
"CALL sys.migrate_table('hive', 'default.hivetable01', 'default.paimontable01', false)")
157157
.await();
158158
}
159159

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ public InternalRow[] call(InternalRow args) {
9696
int parallelism =
9797
args.isNullAt(6) ? Runtime.getRuntime().availableProcessors() : args.getInt(6);
9898

99+
if (targetTable == null && !deleteNeed) {
100+
throw new IllegalArgumentException("delete_origin is false but targetTable is null.");
101+
}
99102
Identifier sourceTableId = Identifier.fromString(sourceTable);
100103
Identifier tmpTableId =
101104
StringUtils.isEmpty(targetTable)

0 commit comments

Comments
 (0)