22
22
#define ZIM_CONCURRENT_CACHE_H
23
23
24
24
#include " lrucache.h"
25
+ #include " log.h"
25
26
27
+ #include < chrono>
26
28
#include < cstddef>
27
29
#include < future>
28
30
#include < mutex>
@@ -39,65 +41,119 @@ namespace zim
39
41
safe, and, in case of a cache miss, will block until that element becomes
40
42
available.
41
43
*/
42
- template <typename Key, typename Value>
44
+ template <typename Key, typename Value, typename CostEstimation >
43
45
class ConcurrentCache
44
46
{
45
47
private: // types
46
48
typedef std::shared_future<Value> ValuePlaceholder;
47
- typedef lru_cache<Key, ValuePlaceholder> Impl;
48
49
49
- public: // types
50
- explicit ConcurrentCache (size_t maxEntries)
51
- : impl_(maxEntries)
50
+ struct CacheEntry
51
+ {
52
+ size_t cost = 0 ;
53
+ ValuePlaceholder value;
54
+
55
+ bool ready () const {
56
+ const auto zeroNs = std::chrono::nanoseconds::zero ();
57
+ return value.wait_for (zeroNs) == std::future_status::ready;
58
+ }
59
+ };
60
+
61
+ struct GetCacheEntryCost
62
+ {
63
+ static size_t cost (const CacheEntry& x) { return x.cost ; }
64
+ };
65
+
66
+ typedef lru_cache<Key, CacheEntry, GetCacheEntryCost> Impl;
67
+
68
+ public: // functions
69
+ explicit ConcurrentCache (size_t maxCost)
70
+ : impl_(maxCost)
52
71
{}
53
72
54
73
// Gets the entry corresponding to the given key. If the entry is not in the
55
74
// cache, it is obtained by calling f() (without any arguments) and the
56
75
// result is put into the cache.
57
76
//
58
77
// The cache as a whole is locked only for the duration of accessing
59
- // the respective slot. If, in the case of the a cache miss, the generation
78
+ // the respective slot. If, in the case of a cache miss, the generation
60
79
// of the missing element takes a long time, only attempts to access that
61
80
// element will block - the rest of the cache remains open to concurrent
62
81
// access.
63
82
template <class F >
64
83
Value getOrPut (const Key& key, F f)
65
84
{
85
+ log_debug_func_call (" ConcurrentCache::getOrPut" , key);
86
+
66
87
std::promise<Value> valuePromise;
67
- std::unique_lock<std::mutex> l (lock_ );
68
- const auto x = impl_. getOrPut (key, valuePromise. get_future (). share ());
69
- l. unlock ( );
88
+ const auto x = getCacheSlot (key, valuePromise. get_future (). share () );
89
+ CacheEntry cacheEntry (x. value ());
90
+ log_debug ( " Obtained the cache slot " );
70
91
if ( x.miss () ) {
92
+ log_debug (" It was a cache miss. Going to obtain the value..." );
71
93
try {
72
- valuePromise.set_value (f ());
94
+ cacheEntry.cost = materializeValue (valuePromise, f);
95
+ finalizeCacheMiss (key, cacheEntry);
96
+ log_debug (" Done. Cache cost is at " << getCurrentCost () );
73
97
} catch (std::exception & e) {
98
+ log_debug (" Evaluation failed. Releasing the cache slot..." );
74
99
drop (key);
75
100
throw ;
76
101
}
77
102
}
78
103
79
- return x.value ().get ();
104
+ log_debug ((!cacheEntry.ready () ? " Waiting for result..." : " Returning immediately..." ));
105
+ return log_debug_return_value (cacheEntry.value .get ());
80
106
}
81
107
82
108
bool drop (const Key& key)
83
109
{
84
- std::unique_lock<std::mutex> l (lock_);
110
+ log_debug_func_call (" ConcurrentCache::drop" , key);
111
+ log_debug_raii_sync_statement (std::unique_lock<std::mutex> l (lock_));
85
112
return impl_.drop (key);
86
113
}
87
114
88
- size_t getMaxSize () const {
115
+ size_t getMaxCost () const {
89
116
std::unique_lock<std::mutex> l (lock_);
90
- return impl_.getMaxSize ();
117
+ return impl_.getMaxCost ();
91
118
}
92
119
93
- size_t getCurrentSize () const {
120
+ size_t getCurrentCost () const {
94
121
std::unique_lock<std::mutex> l (lock_);
95
- return impl_.size ();
122
+ return impl_.cost ();
96
123
}
97
124
98
- void setMaxSize (size_t newSize) {
99
- std::unique_lock<std::mutex> l (lock_);
100
- return impl_.setMaxSize (newSize);
125
+ void setMaxCost (size_t newSize) {
126
+ log_debug_func_call (" ConcurrentCache::setMaxCost" , newSize);
127
+ log_debug_raii_sync_statement (std::unique_lock<std::mutex> l (lock_));
128
+ return impl_.setMaxCost (newSize);
129
+ }
130
+
131
+ private: // functions
132
+ typename Impl::AccessResult getCacheSlot (const Key& key, const ValuePlaceholder& v)
133
+ {
134
+ log_debug_func_call (" ConcurrentCache::getCacheSlot" , key);
135
+ log_debug_raii_sync_statement (std::unique_lock<std::mutex> l (lock_));
136
+ return impl_.getOrPut (key, CacheEntry{0 , v});
137
+ }
138
+
139
+ template <class F >
140
+ static size_t materializeValue (std::promise<Value>& valuePromise, F f)
141
+ {
142
+ const auto materializedValue = f ();
143
+ log_debug (" Value was successfully obtained." );
144
+ valuePromise.set_value (materializedValue);
145
+ log_debug (" Made the value available for concurrent access." );
146
+ log_debug (" Computing the cost of the new entry..." );
147
+ auto cost = CostEstimation::cost (materializedValue);
148
+ log_debug (" cost=" << cost);
149
+ return cost;
150
+ }
151
+
152
+ void finalizeCacheMiss (const Key& key, const CacheEntry& cacheEntry)
153
+ {
154
+ log_debug_func_call (" ConcurrentCache::finalizeCacheMiss" , key);
155
+ log_debug_raii_sync_statement (std::unique_lock<std::mutex> l (lock_));
156
+ impl_.put (key, cacheEntry);
101
157
}
102
158
103
159
private: // data
0 commit comments