From 0c5e5eb16fc532295ca4638a03a8ca2a6c7088cb Mon Sep 17 00:00:00 2001 From: zymap Date: Tue, 6 Aug 2024 19:02:58 +0800 Subject: [PATCH] [Fix] Allow the content be empty when deserialize in MetadataCache --- ### Motivation When using admin to create topic, it will set the content to byte[0] then put it into the value. Sometimes we need to update the value then we will received the deserailization error. So we need to check the value type to make it as optional empty when the value is null. --- .../cache/impl/MetadataCacheImpl.java | 9 +++- .../pulsar/metadata/MetadataCacheTest.java | 52 +++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index b9051a7dc7df4..23b62c1c0189f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -108,6 +108,9 @@ private CompletableFuture>> readValueFromStore(String try { GetResult res = optRes.get(); + if (res.getValue().length == 0) { + return FutureUtils.value(Optional.of(new CacheGetResult<>(null, res.getStat()))); + } T obj = serde.deserialize(path, res.getValue(), res.getStat()); return FutureUtils .value(Optional.of(new CacheGetResult<>(obj, res.getStat()))); @@ -152,10 +155,14 @@ public CompletableFuture readModifyUpdateOrCreate(String path, Function urlSupplier) throws Exception { + @Cleanup + MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); + + String path = "/testReadModifyUpdateOrCreateWithEmptyValue"; + store.put(path, new byte[0], Optional.of(-1L)).get(); + + MetadataCache objCache = store.getMetadataCache(Policies.class); + + Optional policies = objCache.get(path).get(); + assertFalse(policies.isPresent()); + Policies policies1 = objCache.readModifyUpdateOrCreate(path, (rp) -> { + Policies p = rp.orElse(new Policies()); + p.max_unacked_messages_per_consumer = 100; + return p; + }).get(); + assertEquals(policies1.max_unacked_messages_per_consumer.intValue(), 100); + } + @Test(dataProvider = "impl") public void readModifyUpdate(String provider, Supplier urlSupplier) throws Exception { @Cleanup @@ -475,6 +497,36 @@ public void readModifyUpdate(String provider, Supplier urlSupplier) thro } } + @Test(dataProvider = "impl") + public void testReadModifyUpdateWithEmptyValue(String provider, Supplier urlSupplier) throws Exception { + @Cleanup + MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); + + String path = "/testReadModifyUpdateWithEmptyValue"; + store.put(path, new byte[0], Optional.of(-1L)).get(); + + MetadataCache objCache = store.getMetadataCache(Policies.class); + + Optional policies = objCache.get(path).get(); + assertFalse(policies.isPresent()); + Policies policies1 = objCache.readModifyUpdate(path, (rp) -> { + if (rp != null) { + rp.max_unacked_messages_per_consumer = 100; + } + return rp; + }).get(); + assertNull(policies1); + + Policies policies2 = objCache.readModifyUpdate(path, (rp) -> { + if (rp == null) { + rp = new Policies(); + } + rp.max_unacked_messages_per_consumer = 100; + return rp; + }).get(); + assertEquals(policies2.max_unacked_messages_per_consumer.intValue(), 100); + } + /** * This test validates that metadata-cache can handle BadVersion failure if other cache/metadata-source updates the * data with different version.