diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index b9051a7dc7df4..23b62c1c0189f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -108,6 +108,9 @@ private CompletableFuture>> readValueFromStore(String try { GetResult res = optRes.get(); + if (res.getValue().length == 0) { + return FutureUtils.value(Optional.of(new CacheGetResult<>(null, res.getStat()))); + } T obj = serde.deserialize(path, res.getValue(), res.getStat()); return FutureUtils .value(Optional.of(new CacheGetResult<>(obj, res.getStat()))); @@ -152,10 +155,14 @@ public CompletableFuture readModifyUpdateOrCreate(String path, Function urlSupplier) throws Exception { + @Cleanup + MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); + + String path = "/testReadModifyUpdateOrCreateWithEmptyValue"; + store.put(path, new byte[0], Optional.of(-1L)).get(); + + MetadataCache objCache = store.getMetadataCache(Policies.class); + + Optional policies = objCache.get(path).get(); + assertFalse(policies.isPresent()); + Policies policies1 = objCache.readModifyUpdateOrCreate(path, (rp) -> { + Policies p = rp.orElse(new Policies()); + p.max_unacked_messages_per_consumer = 100; + return p; + }).get(); + assertEquals(policies1.max_unacked_messages_per_consumer.intValue(), 100); + } + @Test(dataProvider = "impl") public void readModifyUpdate(String provider, Supplier urlSupplier) throws Exception { @Cleanup @@ -475,6 +497,36 @@ public void readModifyUpdate(String provider, Supplier urlSupplier) thro } } + @Test(dataProvider = "impl") + public void testReadModifyUpdateWithEmptyValue(String provider, Supplier urlSupplier) throws Exception { + @Cleanup + MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); + + String path = "/testReadModifyUpdateWithEmptyValue"; + store.put(path, new byte[0], Optional.of(-1L)).get(); + + MetadataCache objCache = store.getMetadataCache(Policies.class); + + Optional policies = objCache.get(path).get(); + assertFalse(policies.isPresent()); + Policies policies1 = objCache.readModifyUpdate(path, (rp) -> { + if (rp != null) { + rp.max_unacked_messages_per_consumer = 100; + } + return rp; + }).get(); + assertNull(policies1); + + Policies policies2 = objCache.readModifyUpdate(path, (rp) -> { + if (rp == null) { + rp = new Policies(); + } + rp.max_unacked_messages_per_consumer = 100; + return rp; + }).get(); + assertEquals(policies2.max_unacked_messages_per_consumer.intValue(), 100); + } + /** * This test validates that metadata-cache can handle BadVersion failure if other cache/metadata-source updates the * data with different version.