Skip to content

Commit

Permalink
refactor(core): Optimize Catalog storage queries using JOIN for JDBC …
Browse files Browse the repository at this point in the history
…backend
  • Loading branch information
zhengkezhou1 committed Mar 3, 2025
1 parent a5169f1 commit 581134d
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
public interface CatalogMetaMapper {
String TABLE_NAME = "catalog_meta";

@SelectProvider(
type = CatalogMetaSQLProviderFactory.class,
method = "listCatalogPOsByMetalakeName")
List<CatalogPO> listCatalogPOsByMetalakeName(@Param("metalakeName") String metalakeName);

@SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "listCatalogPOsByMetalakeId")
List<CatalogPO> listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId);

Expand All @@ -50,12 +55,18 @@ public interface CatalogMetaMapper {
Long selectCatalogIdByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name);

@SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "selectCatalogIdByName")
Long selectCatalogIdByName(@Param("catalogName") String name);

@SelectProvider(
type = CatalogMetaSQLProviderFactory.class,
method = "selectCatalogMetaByMetalakeIdAndName")
CatalogPO selectCatalogMetaByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name);

@SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "selectCatalogMetaByName")
CatalogPO selectCatalogMetaByName(@Param("catalogName") String catalogName);

@SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "selectCatalogMetaById")
CatalogPO selectCatalogMetaById(@Param("catalogId") Long catalogId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ static class CatalogMetaMySQLProvider extends CatalogMetaBaseSQLProvider {}

static class CatalogMetaH2Provider extends CatalogMetaBaseSQLProvider {}

public static String listCatalogPOsByMetalakeName(@Param("metalakeName") String metalakeName) {
return getProvider().listCatalogPOsByMetalakeName(metalakeName);
}

public static String listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId) {
return getProvider().listCatalogPOsByMetalakeId(metalakeId);
}
Expand All @@ -61,6 +65,10 @@ public static String listCatalogPOsByCatalogIds(@Param("catalogIds") List<Long>
return getProvider().listCatalogPOsByCatalogIds(catalogIds);
}

public static String selectCatalogIdByName(@Param("catalogName") String name) {
return getProvider().selectCatalogIdByName(name);
}

public static String selectCatalogIdByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name) {
return getProvider().selectCatalogIdByMetalakeIdAndName(metalakeId, name);
Expand All @@ -71,6 +79,10 @@ public static String selectCatalogMetaByMetalakeIdAndName(
return getProvider().selectCatalogMetaByMetalakeIdAndName(metalakeId, name);
}

public static String selectCatalogMetaByName(@Param("catalogName") String name) {
return getProvider().selectCatalogMetaByName(name);
}

public static String selectCatalogMetaById(@Param("catalogId") Long catalogId) {
return getProvider().selectCatalogMetaById(catalogId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,26 @@
import static org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper.TABLE_NAME;

import java.util.List;
import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
import org.apache.gravitino.storage.relational.po.CatalogPO;
import org.apache.ibatis.annotations.Param;

public class CatalogMetaBaseSQLProvider {
public String listCatalogPOsByMetalakeName(@Param("metalakeName") String metalakeName) {
return "SELECT cm.catalog_id as catalogId, cm.catalog_name as catalogName,"
+ " cm.metalake_id as metalakeId, cm.type, cm.provider,"
+ " cm.catalog_comment as catalogComment, cm.properties, cm.audit_info as auditInfo,"
+ " cm.current_version as currentVersion, cm.last_version as lastVersion,"
+ " cm.deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " cm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm ON cm.metalake_id = mm.metalake_id"
+ " WHERE mm.metalake_name = #{metalakeName}"
+ " AND mm.deleted_at = 0 AND cm.deleted_at = 0";
}

public String listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId) {
return "SELECT catalog_id as catalogId, catalog_name as catalogName,"
+ " metalake_id as metalakeId, type, provider,"
Expand Down Expand Up @@ -55,6 +71,15 @@ public String listCatalogPOsByCatalogIds(@Param("catalogIds") List<Long> catalog
+ "</script>";
}

public String selectCatalogIdByName(@Param("catalogName") String name) {
return "SELECT cm.catalog_id as catalogId FROM "
+ TABLE_NAME
+ " cm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm ON cm.metalake_id = mm.metalake_id"
+ " WHERE catalog_name = #{catalogName} AND cm.deleted_at = 0";
}

public String selectCatalogIdByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name) {
return "SELECT catalog_id as catalogId FROM "
Expand All @@ -74,6 +99,20 @@ public String selectCatalogMetaByMetalakeIdAndName(
+ " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0";
}

public String selectCatalogMetaByName(@Param("catalogName") String name) {
return "SELECT cm.catalog_id as catalogId, cm.catalog_name as catalogName,"
+ " cm.metalake_id as metalakeId, cm.type, cm.provider,"
+ " cm.catalog_comment as catalogComment, cm.properties, cm.audit_info as auditInfo,"
+ " cm.current_version as currentVersion, cm.last_version as lastVersion,"
+ " cm.deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " cm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm ON cm.metalake_id = mm.metalake_id"
+ " WHERE cm.catalog_name = #{catalogName} AND cm.deleted_at = 0";
}

public String selectCatalogMetaById(@Param("catalogId") Long catalogId) {
return "SELECT catalog_id as catalogId, catalog_name as catalogName,"
+ " metalake_id as metalakeId, type, provider,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,10 @@ public static CatalogMetaService getInstance() {

private CatalogMetaService() {}

public CatalogPO getCatalogPOByMetalakeIdAndName(Long metalakeId, String catalogName) {
public CatalogPO getCatalogPOByName(String catalogName) {
CatalogPO catalogPO =
SessionUtils.getWithoutCommit(
CatalogMetaMapper.class,
mapper -> mapper.selectCatalogMetaByMetalakeIdAndName(metalakeId, catalogName));
CatalogMetaMapper.class, mapper -> mapper.selectCatalogMetaByName(catalogName));

if (catalogPO == null) {
throw new NoSuchEntityException(
Expand Down Expand Up @@ -105,26 +104,35 @@ public Long getCatalogIdByMetalakeIdAndName(Long metalakeId, String catalogName)
return catalogId;
}

public Long getCatalogIdByName(String catalogName) {
Long catalogId =
SessionUtils.doWithCommitAndFetchResult(
CatalogMetaMapper.class, mapper -> mapper.selectCatalogIdByName(catalogName));

if (catalogId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.CATALOG.name().toLowerCase(),
catalogName);
}
return catalogId;
}

public CatalogEntity getCatalogByIdentifier(NameIdentifier identifier) {
NameIdentifierUtil.checkCatalog(identifier);
String catalogName = identifier.name();

Long metalakeId =
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());

CatalogPO catalogPO = getCatalogPOByMetalakeIdAndName(metalakeId, catalogName);
CatalogPO catalogPO = getCatalogPOByName(catalogName);

return POConverters.fromCatalogPO(catalogPO, identifier.namespace());
}

public List<CatalogEntity> listCatalogsByNamespace(Namespace namespace) {
NamespaceUtil.checkCatalog(namespace);

Long metalakeId = CommonMetaService.getInstance().getParentEntityIdByNamespace(namespace);

List<CatalogPO> catalogPOS =
SessionUtils.getWithoutCommit(
CatalogMetaMapper.class, mapper -> mapper.listCatalogPOsByMetalakeId(metalakeId));
CatalogMetaMapper.class,
mapper -> mapper.listCatalogPOsByMetalakeName(namespace.level(0)));

return POConverters.fromCatalogPOs(catalogPOS, namespace);
}
Expand Down Expand Up @@ -158,10 +166,8 @@ public <E extends Entity & HasIdentifier> CatalogEntity updateCatalog(
NameIdentifierUtil.checkCatalog(identifier);

String catalogName = identifier.name();
Long metalakeId =
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());

CatalogPO oldCatalogPO = getCatalogPOByMetalakeIdAndName(metalakeId, catalogName);
CatalogPO oldCatalogPO = getCatalogPOByName(catalogName);

CatalogEntity oldCatalogEntity =
POConverters.fromCatalogPO(oldCatalogPO, identifier.namespace());
Expand All @@ -179,7 +185,8 @@ public <E extends Entity & HasIdentifier> CatalogEntity updateCatalog(
CatalogMetaMapper.class,
mapper ->
mapper.updateCatalogMeta(
POConverters.updateCatalogPOWithVersion(oldCatalogPO, newEntity, metalakeId),
POConverters.updateCatalogPOWithVersion(
oldCatalogPO, newEntity, oldCatalogPO.getMetalakeId()),
oldCatalogPO));
} catch (RuntimeException re) {
ExceptionUtils.checkSQLException(
Expand All @@ -198,10 +205,7 @@ public boolean deleteCatalog(NameIdentifier identifier, boolean cascade) {
NameIdentifierUtil.checkCatalog(identifier);

String catalogName = identifier.name();
Long metalakeId =
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());

Long catalogId = getCatalogIdByMetalakeIdAndName(metalakeId, catalogName);
long catalogId = getCatalogIdByName(catalogName);

if (cascade) {
SessionUtils.doMultipleWithCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.gravitino.Configs.ENTITY_STORE;
import static org.apache.gravitino.Configs.RELATIONAL_ENTITY_STORE;
import static org.apache.gravitino.SupportsRelationOperations.Type.OWNER_REL;
import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -79,8 +80,10 @@
import org.apache.gravitino.meta.TopicEntity;
import org.apache.gravitino.meta.UserEntity;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
import org.apache.gravitino.storage.relational.mapper.UserMetaMapper;
import org.apache.gravitino.storage.relational.service.CatalogMetaService;
import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
import org.apache.gravitino.storage.relational.service.RoleMetaService;
import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
Expand Down Expand Up @@ -759,6 +762,13 @@ public void testMetaLifeCycleFromCreationToDeletion() throws IOException {
backend.list(catalog.namespace(), Entity.EntityType.CATALOG, true);
assertTrue(catalogs.contains(catalog));

assertEquals(
1,
SessionUtils.doWithCommitAndFetchResult(
CatalogMetaMapper.class,
mapper -> mapper.listCatalogPOsByMetalakeName(metalake.name()))
.size());

List<SchemaEntity> schemas = backend.list(schema.namespace(), Entity.EntityType.SCHEMA, true);
assertTrue(schemas.contains(schema));

Expand All @@ -782,6 +792,11 @@ public void testMetaLifeCycleFromCreationToDeletion() throws IOException {
assertEquals(1, RoleMetaService.getInstance().listRolesByUserId(user.id()).size());
assertEquals(1, RoleMetaService.getInstance().listRolesByGroupId(group.id()).size());

CatalogEntity catalogEntity = backend.get(catalog.nameIdentifier(), Entity.EntityType.CATALOG);
assertEquals(catalog, catalogEntity);
assertNotNull(CatalogMetaService.getInstance().getCatalogPOByName(catalog.name()));
assertEquals(catalog.id(), CatalogMetaService.getInstance().getCatalogIdByName(catalog.name()));

UserEntity userEntity = backend.get(user.nameIdentifier(), Entity.EntityType.USER);
assertEquals(user, userEntity);
assertEquals(
Expand Down Expand Up @@ -856,6 +871,13 @@ public void testMetaLifeCycleFromCreationToDeletion() throws IOException {
// meta data soft delete
backend.delete(metalake.nameIdentifier(), Entity.EntityType.METALAKE, true);

assertEquals(
0,
SessionUtils.doWithCommitAndFetchResult(
CatalogMetaMapper.class,
mapper -> mapper.listCatalogPOsByMetalakeName(metalake.name()))
.size());

// check existence after soft delete
assertFalse(backend.exists(metalake.nameIdentifier(), Entity.EntityType.METALAKE));
assertTrue(backend.exists(anotherMetaLake.nameIdentifier(), Entity.EntityType.METALAKE));
Expand Down

0 comments on commit 581134d

Please sign in to comment.