Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4024]Refactor: Reduce unnecessary queries in catalog JDBC implementation #6540

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
public interface CatalogMetaMapper {
String TABLE_NAME = "catalog_meta";

@SelectProvider(
type = CatalogMetaSQLProviderFactory.class,
method = "listCatalogPOsByMetalakeName")
List<CatalogPO> listCatalogPOsByMetalakeName(@Param("metalakeName") String metalakeName);

@SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "listCatalogPOsByMetalakeId")
List<CatalogPO> listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId);

Expand All @@ -50,12 +55,20 @@ public interface CatalogMetaMapper {
Long selectCatalogIdByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name);

@SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "selectCatalogIdByName")
Long selectCatalogIdByName(
@Param("metalakeName") String metalakeName, @Param("catalogName") String name);

@SelectProvider(
type = CatalogMetaSQLProviderFactory.class,
method = "selectCatalogMetaByMetalakeIdAndName")
CatalogPO selectCatalogMetaByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name);

@SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "selectCatalogMetaByName")
CatalogPO selectCatalogMetaByName(
@Param("metalakeName") String metalakeName, @Param("catalogName") String catalogName);

@SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "selectCatalogMetaById")
CatalogPO selectCatalogMetaById(@Param("catalogId") Long catalogId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ static class CatalogMetaMySQLProvider extends CatalogMetaBaseSQLProvider {}

static class CatalogMetaH2Provider extends CatalogMetaBaseSQLProvider {}

public static String listCatalogPOsByMetalakeName(@Param("metalakeName") String metalakeName) {
return getProvider().listCatalogPOsByMetalakeName(metalakeName);
}

public static String listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId) {
return getProvider().listCatalogPOsByMetalakeId(metalakeId);
}
Expand All @@ -61,6 +65,11 @@ public static String listCatalogPOsByCatalogIds(@Param("catalogIds") List<Long>
return getProvider().listCatalogPOsByCatalogIds(catalogIds);
}

public static String selectCatalogIdByName(
@Param("metalakeName") String metalakeName, @Param("catalogName") String catalogName) {
return getProvider().selectCatalogIdByName(metalakeName, catalogName);
}

public static String selectCatalogIdByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name) {
return getProvider().selectCatalogIdByMetalakeIdAndName(metalakeId, name);
Expand All @@ -71,6 +80,11 @@ public static String selectCatalogMetaByMetalakeIdAndName(
return getProvider().selectCatalogMetaByMetalakeIdAndName(metalakeId, name);
}

public static String selectCatalogMetaByName(
@Param("metalakeName") String metalakeName, @Param("catalogName") String catalogName) {
return getProvider().selectCatalogMetaByName(metalakeName, catalogName);
}

public static String selectCatalogMetaById(@Param("catalogId") Long catalogId) {
return getProvider().selectCatalogMetaById(catalogId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,26 @@
import static org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper.TABLE_NAME;

import java.util.List;
import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
import org.apache.gravitino.storage.relational.po.CatalogPO;
import org.apache.ibatis.annotations.Param;

public class CatalogMetaBaseSQLProvider {
public String listCatalogPOsByMetalakeName(@Param("metalakeName") String metalakeName) {
return "SELECT cm.catalog_id as catalogId, cm.catalog_name as catalogName,"
+ " cm.metalake_id as metalakeId, cm.type, cm.provider,"
+ " cm.catalog_comment as catalogComment, cm.properties, cm.audit_info as auditInfo,"
+ " cm.current_version as currentVersion, cm.last_version as lastVersion,"
+ " cm.deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " cm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm ON cm.metalake_id = mm.metalake_id"
+ " WHERE mm.metalake_name = #{metalakeName}"
+ " AND mm.deleted_at = 0 AND cm.deleted_at = 0";
}

public String listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId) {
return "SELECT catalog_id as catalogId, catalog_name as catalogName,"
+ " metalake_id as metalakeId, type, provider,"
Expand Down Expand Up @@ -55,6 +71,17 @@ public String listCatalogPOsByCatalogIds(@Param("catalogIds") List<Long> catalog
+ "</script>";
}

public String selectCatalogIdByName(
@Param("metalakeName") String metalakeName, @Param("catalogName") String catalogName) {
return "SELECT cm.catalog_id as catalogId FROM "
+ TABLE_NAME
+ " cm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm ON cm.metalake_id = mm.metalake_id"
+ " WHERE catalog_name = #{catalogName} AND mm.metalake_name = #{metalakeName}"
+ " AND cm.deleted_at = 0 AND mm.deleted_at = 0";
}

public String selectCatalogIdByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name) {
return "SELECT catalog_id as catalogId FROM "
Expand All @@ -74,6 +101,22 @@ public String selectCatalogMetaByMetalakeIdAndName(
+ " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0";
}

public String selectCatalogMetaByName(
@Param("metalakeName") String metalakeName, @Param("catalogName") String catalogName) {
return "SELECT cm.catalog_id as catalogId, cm.catalog_name as catalogName,"
+ " cm.metalake_id as metalakeId, cm.type, cm.provider,"
+ " cm.catalog_comment as catalogComment, cm.properties, cm.audit_info as auditInfo,"
+ " cm.current_version as currentVersion, cm.last_version as lastVersion,"
+ " cm.deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " cm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm ON cm.metalake_id = mm.metalake_id"
+ " WHERE cm.catalog_name = #{catalogName} AND mm.metalake_name = #{metalakeName}"
+ " AND cm.deleted_at = 0 AND mm.deleted_at = 0";
}

public String selectCatalogMetaById(@Param("catalogId") Long catalogId) {
return "SELECT catalog_id as catalogId, catalog_name as catalogName,"
+ " metalake_id as metalakeId, type, provider,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ public static CatalogMetaService getInstance() {

private CatalogMetaService() {}

public CatalogPO getCatalogPOByMetalakeIdAndName(Long metalakeId, String catalogName) {
public CatalogPO getCatalogPOByName(String metalakeName, String catalogName) {
CatalogPO catalogPO =
SessionUtils.getWithoutCommit(
CatalogMetaMapper.class,
mapper -> mapper.selectCatalogMetaByMetalakeIdAndName(metalakeId, catalogName));
mapper -> mapper.selectCatalogMetaByName(metalakeName, catalogName));

if (catalogPO == null) {
throw new NoSuchEntityException(
Expand Down Expand Up @@ -105,26 +105,36 @@ public Long getCatalogIdByMetalakeIdAndName(Long metalakeId, String catalogName)
return catalogId;
}

public Long getCatalogIdByName(String metalakeName, String catalogName) {
Long catalogId =
SessionUtils.doWithCommitAndFetchResult(
CatalogMetaMapper.class,
mapper -> mapper.selectCatalogIdByName(metalakeName, catalogName));

if (catalogId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.CATALOG.name().toLowerCase(),
catalogName);
}
return catalogId;
}

public CatalogEntity getCatalogByIdentifier(NameIdentifier identifier) {
NameIdentifierUtil.checkCatalog(identifier);
String catalogName = identifier.name();

Long metalakeId =
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());

CatalogPO catalogPO = getCatalogPOByMetalakeIdAndName(metalakeId, catalogName);
CatalogPO catalogPO = getCatalogPOByName(identifier.namespace().level(0), catalogName);

return POConverters.fromCatalogPO(catalogPO, identifier.namespace());
}

public List<CatalogEntity> listCatalogsByNamespace(Namespace namespace) {
NamespaceUtil.checkCatalog(namespace);

Long metalakeId = CommonMetaService.getInstance().getParentEntityIdByNamespace(namespace);

List<CatalogPO> catalogPOS =
SessionUtils.getWithoutCommit(
CatalogMetaMapper.class, mapper -> mapper.listCatalogPOsByMetalakeId(metalakeId));
CatalogMetaMapper.class,
mapper -> mapper.listCatalogPOsByMetalakeName(namespace.level(0)));

return POConverters.fromCatalogPOs(catalogPOS, namespace);
}
Expand Down Expand Up @@ -158,10 +168,8 @@ public <E extends Entity & HasIdentifier> CatalogEntity updateCatalog(
NameIdentifierUtil.checkCatalog(identifier);

String catalogName = identifier.name();
Long metalakeId =
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());

CatalogPO oldCatalogPO = getCatalogPOByMetalakeIdAndName(metalakeId, catalogName);
CatalogPO oldCatalogPO = getCatalogPOByName(identifier.namespace().level(0), catalogName);

CatalogEntity oldCatalogEntity =
POConverters.fromCatalogPO(oldCatalogPO, identifier.namespace());
Expand All @@ -179,7 +187,8 @@ public <E extends Entity & HasIdentifier> CatalogEntity updateCatalog(
CatalogMetaMapper.class,
mapper ->
mapper.updateCatalogMeta(
POConverters.updateCatalogPOWithVersion(oldCatalogPO, newEntity, metalakeId),
POConverters.updateCatalogPOWithVersion(
oldCatalogPO, newEntity, oldCatalogPO.getMetalakeId()),
oldCatalogPO));
} catch (RuntimeException re) {
ExceptionUtils.checkSQLException(
Expand All @@ -197,11 +206,9 @@ public <E extends Entity & HasIdentifier> CatalogEntity updateCatalog(
public boolean deleteCatalog(NameIdentifier identifier, boolean cascade) {
NameIdentifierUtil.checkCatalog(identifier);

String metalakeName = identifier.namespace().level(0);
String catalogName = identifier.name();
Long metalakeId =
CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace());

Long catalogId = getCatalogIdByMetalakeIdAndName(metalakeId, catalogName);
long catalogId = getCatalogIdByName(metalakeName, catalogName);

if (cascade) {
SessionUtils.doMultipleWithCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.gravitino.Configs.ENTITY_STORE;
import static org.apache.gravitino.Configs.RELATIONAL_ENTITY_STORE;
import static org.apache.gravitino.SupportsRelationOperations.Type.OWNER_REL;
import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -79,8 +80,10 @@
import org.apache.gravitino.meta.TopicEntity;
import org.apache.gravitino.meta.UserEntity;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
import org.apache.gravitino.storage.relational.mapper.UserMetaMapper;
import org.apache.gravitino.storage.relational.service.CatalogMetaService;
import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
import org.apache.gravitino.storage.relational.service.RoleMetaService;
import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
Expand Down Expand Up @@ -759,6 +762,13 @@ public void testMetaLifeCycleFromCreationToDeletion() throws IOException {
backend.list(catalog.namespace(), Entity.EntityType.CATALOG, true);
assertTrue(catalogs.contains(catalog));

assertEquals(
1,
SessionUtils.doWithCommitAndFetchResult(
CatalogMetaMapper.class,
mapper -> mapper.listCatalogPOsByMetalakeName(metalake.name()))
.size());

List<SchemaEntity> schemas = backend.list(schema.namespace(), Entity.EntityType.SCHEMA, true);
assertTrue(schemas.contains(schema));

Expand All @@ -782,6 +792,16 @@ public void testMetaLifeCycleFromCreationToDeletion() throws IOException {
assertEquals(1, RoleMetaService.getInstance().listRolesByUserId(user.id()).size());
assertEquals(1, RoleMetaService.getInstance().listRolesByGroupId(group.id()).size());

CatalogEntity catalogEntity = backend.get(catalog.nameIdentifier(), Entity.EntityType.CATALOG);
assertEquals(catalog, catalogEntity);
assertNotNull(
CatalogMetaService.getInstance()
.getCatalogPOByName(catalogEntity.namespace().level(0), catalog.name()));
assertEquals(
catalog.id(),
CatalogMetaService.getInstance()
.getCatalogIdByName(catalog.namespace().level(0), catalog.name()));

UserEntity userEntity = backend.get(user.nameIdentifier(), Entity.EntityType.USER);
assertEquals(user, userEntity);
assertEquals(
Expand Down Expand Up @@ -856,6 +876,13 @@ public void testMetaLifeCycleFromCreationToDeletion() throws IOException {
// meta data soft delete
backend.delete(metalake.nameIdentifier(), Entity.EntityType.METALAKE, true);

assertEquals(
0,
SessionUtils.doWithCommitAndFetchResult(
CatalogMetaMapper.class,
mapper -> mapper.listCatalogPOsByMetalakeName(metalake.name()))
.size());

// check existence after soft delete
assertFalse(backend.exists(metalake.nameIdentifier(), Entity.EntityType.METALAKE));
assertTrue(backend.exists(anotherMetaLake.nameIdentifier(), Entity.EntityType.METALAKE));
Expand Down