1616
1717package io .aiven .kafka .tieredstorage .manifest ;
1818
19+ import javax .management .ObjectName ;
20+
1921import java .io .ByteArrayInputStream ;
2022import java .io .IOException ;
2123import java .io .InputStream ;
24+ import java .lang .management .ManagementFactory ;
2225import java .time .Duration ;
2326import java .util .Optional ;
2427import java .util .concurrent .ForkJoinPool ;
4043import static org .assertj .core .api .Assertions .assertThatThrownBy ;
4144import static org .mockito .ArgumentMatchers .anyString ;
4245import static org .mockito .Mockito .doAnswer ;
46+ import static org .mockito .Mockito .times ;
4347import static org .mockito .Mockito .verify ;
4448import static org .mockito .Mockito .verifyNoMoreInteractions ;
4549import static org .mockito .Mockito .when ;
@@ -68,23 +72,23 @@ class SegmentManifestProviderTest {
6872 void setup () {
6973 provider = new SegmentManifestProvider (
7074 Optional .of (1000L ), Optional .empty (), storage , MAPPER ,
71- ForkJoinPool .commonPool ());
75+ ForkJoinPool .commonPool (), false );
7276 }
7377
7478 @ Test
7579 void unboundedShouldBeCreated () {
7680 assertThatNoException ()
7781 .isThrownBy (() -> new SegmentManifestProvider (
7882 Optional .empty (), Optional .of (Duration .ofMillis (1 )), storage , MAPPER ,
79- ForkJoinPool .commonPool ()));
83+ ForkJoinPool .commonPool (), false ));
8084 }
8185
8286 @ Test
8387 void withoutRetentionLimitsShouldBeCreated () {
8488 assertThatNoException ()
8589 .isThrownBy (() -> new SegmentManifestProvider (
8690 Optional .of (1L ), Optional .empty (), storage , MAPPER ,
87- ForkJoinPool .commonPool ()));
91+ ForkJoinPool .commonPool (), false ));
8892 }
8993
9094 @ Test
@@ -102,6 +106,40 @@ void shouldReturnAndCache() throws StorageBackendException, IOException {
102106 verifyNoMoreInteractions (storage );
103107 }
104108
109+ @ Test
110+ void invalidateCache_jmx () throws Exception {
111+ provider = new SegmentManifestProvider (
112+ OBJECT_KEY , Optional .of (1000L ), Optional .empty (), storage , MAPPER ,
113+ ForkJoinPool .commonPool (), true );
114+
115+ final String key = "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000000023-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest" ;
116+ final SegmentManifestV1 expectedManifest = new SegmentManifestV1 (
117+ new FixedSizeChunkIndex (100 , 1000 , 110 , 110 ),
118+ false , null
119+ );
120+ when (storage .fetch (key ))
121+ .thenReturn (new ByteArrayInputStream (MANIFEST .getBytes ()));
122+ assertThat (provider .get (REMOTE_LOG_METADATA )).isEqualTo (expectedManifest );
123+ verify (storage ).fetch (key );
124+
125+ final var mbeanName = new ObjectName (SegmentManifestCacheManager .MBEAN_NAME );
126+ final var mbeanServer = ManagementFactory .getPlatformMBeanServer ();
127+ assertThat (mbeanServer .isRegistered (mbeanName )).isTrue ();
128+
129+ final var sizeBefore = provider .cache ().estimatedSize ();
130+ assertThat (sizeBefore ).isEqualTo (1L );
131+
132+ mbeanServer .invoke (mbeanName , "clean" , new Object []{}, new String []{});
133+
134+ final var sizeAfter = provider .cache ().estimatedSize ();
135+ assertThat (sizeAfter ).isEqualTo (0L );
136+
137+ when (storage .fetch (key ))
138+ .thenReturn (new ByteArrayInputStream (MANIFEST .getBytes ()));
139+ assertThat (provider .get (REMOTE_LOG_METADATA )).isEqualTo (expectedManifest );
140+ verify (storage , times (2 )).fetch (key );
141+ }
142+
105143 @ Test
106144 void shouldPropagateStorageBackendException () throws StorageBackendException {
107145 when (storage .fetch (anyString ()))
0 commit comments