@@ -694,8 +694,10 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
694694 options.write_buffer_size = 1024 ;
695695 options.create_if_missing = true ;
696696 options.compression = kNoCompression ;
697+ options.statistics = CreateDBStatistics ();
697698 options.env = env.get ();
698- if (std::get<0 >(GetParam ())) {
699+ bool use_direct_io = std::get<0 >(GetParam ());
700+ if (use_direct_io) {
699701 options.use_direct_reads = true ;
700702 options.use_direct_io_for_flush_and_compaction = true ;
701703 }
@@ -708,8 +710,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
708710 options.table_factory .reset (NewBlockBasedTableFactory (table_options));
709711
710712 Status s = TryReopen (options);
711- if (std::get<0 >(GetParam ()) &&
712- (s.IsNotSupported () || s.IsInvalidArgument ())) {
713+ if (use_direct_io && (s.IsNotSupported () || s.IsInvalidArgument ())) {
713714 // If direct IO is not supported, skip the test
714715 return ;
715716 } else {
@@ -766,22 +767,34 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
766767 // TODO akanksha: Remove after adding new units.
767768 ro.async_io = true ;
768769 }
770+
771+ ASSERT_OK (options.statistics ->Reset ());
769772 auto iter = std::unique_ptr<Iterator>(db_->NewIterator (ro));
770773 int num_keys = 0 ;
771774 for (iter->SeekToFirst (); iter->Valid (); iter->Next ()) {
772775 ASSERT_OK (iter->status ());
773776 num_keys++;
774777 }
775778 ASSERT_EQ (num_keys, total_keys);
776-
777779 ASSERT_GT (buff_prefetch_count, 0 );
778- buff_prefetch_count = 0 ;
779780 // For index and data blocks.
780781 if (is_adaptive_readahead) {
781782 ASSERT_EQ (readahead_carry_over_count, 2 * (num_sst_files - 1 ));
782783 } else {
783784 ASSERT_EQ (readahead_carry_over_count, 0 );
784785 }
786+
787+ // Check stats to make sure async prefetch is done.
788+ {
789+ HistogramData async_read_bytes;
790+ options.statistics ->histogramData (ASYNC_READ_BYTES, &async_read_bytes);
791+ if (ro.async_io && !use_direct_io) {
792+ ASSERT_GT (async_read_bytes.count , 0 );
793+ } else {
794+ ASSERT_EQ (async_read_bytes.count , 0 );
795+ }
796+ }
797+
785798 SyncPoint::GetInstance ()->DisableProcessing ();
786799 SyncPoint::GetInstance ()->ClearAllCallBacks ();
787800 }
@@ -902,6 +915,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
902915 options.use_direct_reads = true ;
903916 options.use_direct_io_for_flush_and_compaction = true ;
904917 }
918+
919+ options.statistics = CreateDBStatistics ();
905920 BlockBasedTableOptions table_options;
906921 std::shared_ptr<Cache> cache = NewLRUCache (4 * 1024 * 1024 , 2 ); // 8MB
907922 table_options.block_cache = cache;
@@ -948,7 +963,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
948963 SyncPoint::GetInstance ()->EnableProcessing ();
949964 ReadOptions ro;
950965 ro.adaptive_readahead = true ;
951- // TODO akanksha: Remove after adding new units.
952966 ro.async_io = true ;
953967 {
954968 /*
@@ -964,7 +978,9 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
964978 iter->Seek (BuildKey (1019 ));
965979 buff_prefetch_count = 0 ;
966980 }
981+
967982 {
983+ ASSERT_OK (options.statistics ->Reset ());
968984 // After caching, blocks will be read from cache (Sequential blocks)
969985 auto iter = std::unique_ptr<Iterator>(db_->NewIterator (ro));
970986 iter->Seek (BuildKey (0 ));
@@ -1008,6 +1024,18 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
10081024 ASSERT_TRUE (iter->Valid ());
10091025 ASSERT_EQ (current_readahead_size, expected_current_readahead_size);
10101026 ASSERT_EQ (buff_prefetch_count, 2 );
1027+
1028+ // Check stats to make sure async prefetch is done.
1029+ {
1030+ HistogramData async_read_bytes;
1031+ options.statistics ->histogramData (ASYNC_READ_BYTES, &async_read_bytes);
1032+ if (GetParam ()) {
1033+ ASSERT_EQ (async_read_bytes.count , 0 );
1034+ } else {
1035+ ASSERT_GT (async_read_bytes.count , 0 );
1036+ }
1037+ }
1038+
10111039 buff_prefetch_count = 0 ;
10121040 }
10131041 Close ();
@@ -1033,6 +1061,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
10331061 options.create_if_missing = true ;
10341062 options.compression = kNoCompression ;
10351063 options.env = env.get ();
1064+ options.statistics = CreateDBStatistics ();
10361065 if (use_direct_io) {
10371066 options.use_direct_reads = true ;
10381067 options.use_direct_io_for_flush_and_compaction = true ;
@@ -1080,6 +1109,7 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
10801109 ro.adaptive_readahead = true ;
10811110 ro.async_io = true ;
10821111
1112+ ASSERT_OK (options.statistics ->Reset ());
10831113 auto iter = std::unique_ptr<Iterator>(db_->NewIterator (ro));
10841114 int num_keys = 0 ;
10851115 for (iter->SeekToFirst (); iter->Valid (); iter->Next ()) {
@@ -1088,7 +1118,19 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
10881118 }
10891119 ASSERT_EQ (num_keys, total_keys);
10901120 ASSERT_GT (buff_prefetch_count, 0 );
1121+
1122+ // Check stats to make sure async prefetch is done.
1123+ {
1124+ HistogramData async_read_bytes;
1125+ options.statistics ->histogramData (ASYNC_READ_BYTES, &async_read_bytes);
1126+ #if defined(ROCKSDB_IOURING_PRESENT)
1127+ ASSERT_GT (async_read_bytes.count , 0 );
1128+ #else
1129+ ASSERT_EQ (async_read_bytes.count , 0 );
1130+ #endif
1131+ }
10911132 }
1133+
10921134 SyncPoint::GetInstance ()->DisableProcessing ();
10931135 SyncPoint::GetInstance ()->ClearAllCallBacks ();
10941136
0 commit comments