31
31
import org .apache .gravitino .storage .relational .mapper .MetalakeMetaMapper ;
32
32
import org .apache .gravitino .storage .relational .mapper .ModelMetaMapper ;
33
33
import org .apache .gravitino .storage .relational .mapper .SchemaMetaMapper ;
34
+ import org .apache .gravitino .storage .relational .mapper .TableColumnMapper ;
34
35
import org .apache .gravitino .storage .relational .mapper .TableMetaMapper ;
36
+ import org .apache .gravitino .storage .relational .mapper .TopicMetaMapper ;
35
37
import org .apache .gravitino .storage .relational .po .CatalogPO ;
36
38
import org .apache .gravitino .storage .relational .po .ColumnPO ;
37
39
import org .apache .gravitino .storage .relational .po .FilesetPO ;
@@ -386,6 +388,103 @@ public static Map<Long, String> getTableObjectFullNames(List<Long> ids) {
386
388
return tableIdAndNameMap ;
387
389
}
388
390
391
+ public static Map <Long , String > getTopicObjectFullNames (List <Long > ids ) {
392
+ List <TopicPO > topicPOs =
393
+ SessionUtils .getWithoutCommit (
394
+ TopicMetaMapper .class , mapper -> mapper .listTopicPOsByTopicIds (ids ));
395
+
396
+ if (topicPOs == null || topicPOs .isEmpty ()) {
397
+ return new HashMap <>();
398
+ }
399
+
400
+ List <Long > catalogIds =
401
+ topicPOs .stream ().map (TopicPO ::getCatalogId ).collect (Collectors .toList ());
402
+ List <Long > schemaIds = topicPOs .stream ().map (TopicPO ::getSchemaId ).collect (Collectors .toList ());
403
+
404
+ Map <Long , String > catalogIdAndNameMap = getCatalogIdAndNameMap (catalogIds );
405
+ Map <Long , String > schemaIdAndNameMap = getSchemaIdAndNameMap (schemaIds );
406
+
407
+ HashMap <Long , String > topicIdAndNameMap = new HashMap <>();
408
+
409
+ topicPOs .forEach (
410
+ tablePO -> {
411
+ // since the catalog or schema can be deleted, we need to check the null value,
412
+ // and when catalog or schema is deleted, we will set fullName of tablePO to null
413
+ String catalogName = catalogIdAndNameMap .getOrDefault (tablePO .getCatalogId (), null );
414
+ if (catalogName == null ) {
415
+ LOG .warn ("The catalog of topic {} may be deleted" , tablePO .getTopicId ());
416
+ topicIdAndNameMap .put (tablePO .getTopicId (), null );
417
+ return ;
418
+ }
419
+
420
+ String schemaName = schemaIdAndNameMap .getOrDefault (tablePO .getSchemaId (), null );
421
+ if (schemaName == null ) {
422
+ LOG .warn ("The schema of topic {} may be deleted" , tablePO .getTopicId ());
423
+ topicIdAndNameMap .put (tablePO .getTopicId (), null );
424
+ return ;
425
+ }
426
+
427
+ String fullName = DOT_JOINER .join (catalogName , schemaName , tablePO .getTopicName ());
428
+ topicIdAndNameMap .put (tablePO .getTopicId (), fullName );
429
+ });
430
+
431
+ return topicIdAndNameMap ;
432
+ }
433
+
434
+ public static Map <Long , String > getColumnObjectFullNames (List <Long > ids ) {
435
+ List <ColumnPO > columnPOs =
436
+ SessionUtils .getWithoutCommit (
437
+ TableColumnMapper .class , mapper -> mapper .listColumnPOsByColumnIds (ids ));
438
+
439
+ if (columnPOs == null || columnPOs .isEmpty ()) {
440
+ return new HashMap <>();
441
+ }
442
+
443
+ List <Long > catalogIds =
444
+ columnPOs .stream ().map (ColumnPO ::getCatalogId ).collect (Collectors .toList ());
445
+ List <Long > schemaIds =
446
+ columnPOs .stream ().map (ColumnPO ::getSchemaId ).collect (Collectors .toList ());
447
+ List <Long > tableIds = columnPOs .stream ().map (ColumnPO ::getTableId ).collect (Collectors .toList ());
448
+
449
+ Map <Long , String > catalogIdAndNameMap = getCatalogIdAndNameMap (catalogIds );
450
+ Map <Long , String > schemaIdAndNameMap = getSchemaIdAndNameMap (schemaIds );
451
+ Map <Long , String > tableIdAndNameMap = getTableIdAndNameMap (tableIds );
452
+
453
+ HashMap <Long , String > columnIdAndNameMap = new HashMap <>();
454
+
455
+ columnPOs .forEach (
456
+ columnPO -> {
457
+ // since the catalog or schema can be deleted, we need to check the null value,
458
+ // and when catalog or schema is deleted, we will set fullName of filesetPO to null
459
+ String catalogName = catalogIdAndNameMap .getOrDefault (columnPO .getCatalogId (), null );
460
+ if (catalogName == null ) {
461
+ LOG .warn ("The catalog of column {} may be deleted" , columnPO .getColumnId ());
462
+ columnIdAndNameMap .put (columnPO .getColumnId (), null );
463
+ return ;
464
+ }
465
+
466
+ String schemaName = schemaIdAndNameMap .getOrDefault (columnPO .getSchemaId (), null );
467
+ if (schemaName == null ) {
468
+ LOG .warn ("The schema of column {} may be deleted" , columnPO .getColumnId ());
469
+ columnIdAndNameMap .put (columnPO .getColumnId (), null );
470
+ return ;
471
+ }
472
+
473
+ String tableName = tableIdAndNameMap .getOrDefault (columnPO .getTableId (), null );
474
+ if (tableName == null ) {
475
+ LOG .warn ("The table of column {} may be deleted" , columnPO .getColumnId ());
476
+ columnIdAndNameMap .put (columnPO .getColumnId (), null );
477
+ return ;
478
+ }
479
+
480
+ String fullName =
481
+ DOT_JOINER .join (catalogName , schemaName , tableName , columnPO .getColumnName ());
482
+ columnIdAndNameMap .put (columnPO .getColumnId (), fullName );
483
+ });
484
+
485
+ return columnIdAndNameMap ;
486
+ }
487
+
389
488
public static Map <Long , String > getCatalogObjectFullNames (List <Long > ids ) {
390
489
List <CatalogPO > catalogPOs =
391
490
SessionUtils .getWithoutCommit (
@@ -409,6 +508,14 @@ public static Map<Long, String> getCatalogObjectFullNames(List<Long> ids) {
409
508
return catalogIdAndNameMap ;
410
509
}
411
510
511
+ public static Map <Long , String > getCatalogIdAndNameMap (List <Long > catalogIds ) {
512
+ List <CatalogPO > catalogPOs =
513
+ SessionUtils .getWithoutCommit (
514
+ CatalogMetaMapper .class , mapper -> mapper .listCatalogPOsByCatalogIds (catalogIds ));
515
+ return catalogPOs .stream ()
516
+ .collect (Collectors .toMap (CatalogPO ::getCatalogId , CatalogPO ::getCatalogName ));
517
+ }
518
+
412
519
public static Map <Long , String > getSchemaIdAndNameMap (List <Long > schemaIds ) {
413
520
List <SchemaPO > schemaPOS =
414
521
SessionUtils .getWithoutCommit (
@@ -417,11 +524,10 @@ public static Map<Long, String> getSchemaIdAndNameMap(List<Long> schemaIds) {
417
524
.collect (Collectors .toMap (SchemaPO ::getSchemaId , SchemaPO ::getSchemaName ));
418
525
}
419
526
420
- public static Map <Long , String > getCatalogIdAndNameMap (List <Long > catalogIds ) {
421
- List <CatalogPO > catalogPOs =
527
+ public static Map <Long , String > getTableIdAndNameMap (List <Long > tableIds ) {
528
+ List <TablePO > tablePOS =
422
529
SessionUtils .getWithoutCommit (
423
- CatalogMetaMapper .class , mapper -> mapper .listCatalogPOsByCatalogIds (catalogIds ));
424
- return catalogPOs .stream ()
425
- .collect (Collectors .toMap (CatalogPO ::getCatalogId , CatalogPO ::getCatalogName ));
530
+ TableMetaMapper .class , mapper -> mapper .listTablePOsByTableIds (tableIds ));
531
+ return tablePOS .stream ().collect (Collectors .toMap (TablePO ::getTableId , TablePO ::getTableName ));
426
532
}
427
533
}
0 commit comments