Skip to content

Commit

Permalink
WIP: Add version to each cache object
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Mueller committed Sep 6, 2024
1 parent 0a82821 commit d11d594
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 49 deletions.
42 changes: 25 additions & 17 deletions FastMmap.xs
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,17 @@ fc_is_locked(obj)


void
fc_read(obj, hash_slot, key)
fc_read(obj, hash_slot, key, no_val)
SV * obj;
unsigned long hash_slot;
SV * key;
unsigned long no_val;
INIT:
int key_len, val_len, found;
void * key_ptr, * val_ptr;
MU64 expire_on = 0;
MU64 flags = 0;
MU64 version = 0;
STRLEN pl_key_len;
SV * val;

Expand All @@ -177,15 +179,15 @@ fc_read(obj, hash_slot, key)
key_len = (int)pl_key_len;

/* Get value data pointer */
found = mmc_read(cache, (MU64)hash_slot, key_ptr, key_len, &val_ptr, &val_len, &expire_on, &flags);
found = mmc_read(cache, (MU64)hash_slot, key_ptr, key_len, &val_ptr, &val_len, &expire_on, &version, &flags);

/* If not found, use undef */
if (found == -1) {
val = &PL_sv_undef;
} else {

/* Cached an undef value? */
if (flags & FC_UNDEF) {
/* Cached an undef value or don't want the actual value? */
if (flags & FC_UNDEF || no_val) {
val = &PL_sv_undef;

} else {
Expand All @@ -206,15 +208,17 @@ fc_read(obj, hash_slot, key)
XPUSHs(sv_2mortal(newSViv((IV)flags)));
XPUSHs(sv_2mortal(newSViv((IV)!found)));
XPUSHs(sv_2mortal(newSViv((IV)expire_on)));
XPUSHs(sv_2mortal(newSViv((IV)version)));


int
fc_write(obj, hash_slot, key, val, expire_on, in_flags)
fc_write(obj, hash_slot, key, val, expire_on, version, in_flags)
SV * obj;
unsigned long hash_slot;
SV * key;
SV * val;
unsigned long expire_on;
unsigned long version;
unsigned long in_flags;
INIT:
int key_len, val_len;
Expand Down Expand Up @@ -252,7 +256,7 @@ fc_write(obj, hash_slot, key, val, expire_on, in_flags)
}

/* Write value to cache */
RETVAL = mmc_write(cache, (MU64)hash_slot, key_ptr, key_len, val_ptr, val_len, (MU64)expire_on, (MU64)in_flags);
RETVAL = mmc_write(cache, (MU64)hash_slot, key_ptr, key_len, val_ptr, val_len, (MU64)expire_on, (MU64)version, (MU64)in_flags);

OUTPUT:
RETVAL
Expand All @@ -263,7 +267,7 @@ fc_delete(obj, hash_slot, key)
unsigned long hash_slot;
SV * key;
INIT:
MU64 out_flags;
MU64 flags, version;
int key_len, did_delete;
void * key_ptr;
STRLEN pl_key_len;
Expand All @@ -277,10 +281,11 @@ fc_delete(obj, hash_slot, key)
key_len = (int)pl_key_len;

/* Write value to cache */
did_delete = mmc_delete(cache, (MU64)hash_slot, key_ptr, key_len, &out_flags);
did_delete = mmc_delete(cache, (MU64)hash_slot, key_ptr, key_len, &flags, &version);

XPUSHs(sv_2mortal(newSViv((IV)did_delete)));
XPUSHs(sv_2mortal(newSViv((IV)out_flags)));
XPUSHs(sv_2mortal(newSViv((IV)flags)));
XPUSHs(sv_2mortal(newSViv((IV)version)));


void
Expand Down Expand Up @@ -321,7 +326,7 @@ fc_expunge(obj, mode, wb, len)

void * key_ptr, * val_ptr;
int key_len, val_len;
MU64 last_access, expire_on, flags;
MU64 last_access, expire_on, version, flags;

FC_ENTRY

Expand All @@ -336,7 +341,7 @@ fc_expunge(obj, mode, wb, len)
for (item = 0; item < num_expunge; item++) {
mmc_get_details(cache, to_expunge[item],
&key_ptr, &key_len, &val_ptr, &val_len,
&last_access, &expire_on, &flags);
&last_access, &expire_on, &version, &flags);

{
HV * ih = (HV *)sv_2mortal((SV *)newHV());
Expand Down Expand Up @@ -365,6 +370,7 @@ fc_expunge(obj, mode, wb, len)
hv_store(ih, "value", 5, val, 0);
hv_store(ih, "last_access", 11, newSViv((IV)last_access), 0);
hv_store(ih, "expire_on", 9, newSViv((IV)expire_on), 0);
hv_store(ih, "version", 7, newSViv((IV)version), 0);
hv_store(ih, "flags", 5, newSViv((IV)flags), 0);

/* Create reference to hash */
Expand All @@ -388,7 +394,7 @@ fc_get_keys(obj, mode)
MU64 * entry_ptr;
void * key_ptr, * val_ptr;
int key_len, val_len;
MU64 last_access, expire_on, flags;
MU64 last_access, expire_on, version, flags;

FC_ENTRY

Expand All @@ -401,7 +407,7 @@ fc_get_keys(obj, mode)
SV * key;
mmc_get_details(cache, entry_ptr,
&key_ptr, &key_len, &val_ptr, &val_len,
&last_access, &expire_on, &flags);
&last_access, &expire_on, &version, &flags);

/* Create key SV, and set UTF8'ness if needed */
key = newSVpvn((const char *)key_ptr, key_len);
Expand All @@ -422,6 +428,7 @@ fc_get_keys(obj, mode)
hv_store(ih, "key", 3, key, 0);
hv_store(ih, "last_access", 11, newSViv((IV)last_access), 0);
hv_store(ih, "expire_on", 9, newSViv((IV)expire_on), 0);
hv_store(ih, "version", 7, newSViv((IV)version), 0);
hv_store(ih, "flags", 5, newSViv((IV)flags), 0);

/* Add value to hash-ref if mode 2 */
Expand Down Expand Up @@ -458,7 +465,7 @@ fc_get(obj, key)
INIT:
int key_len, val_len, found;
void * key_ptr, * val_ptr;
MU64 hash_page, hash_slot, expire_on, flags;
MU64 hash_page, hash_slot, expire_on, version, flags;
STRLEN pl_key_len;
SV * val;

Expand All @@ -477,7 +484,7 @@ fc_get(obj, key)
mmc_lock(cache, hash_page);

/* Get value data pointer */
found = mmc_read(cache, hash_slot, key_ptr, key_len, &val_ptr, &val_len, &expire_on, &flags);
found = mmc_read(cache, hash_slot, key_ptr, key_len, &val_ptr, &val_len, &expire_on, &version, &flags);

/* If not found, use undef */
if (found == -1) {
Expand All @@ -495,10 +502,11 @@ fc_get(obj, key)


void
fc_set(obj, key, val)
fc_set(obj, key, val, version)
SV * obj;
SV * key;
SV * val;
unsigned long version;
INIT:
int key_len, val_len;
void * key_ptr, * val_ptr;
Expand All @@ -524,7 +532,7 @@ fc_set(obj, key, val)
mmc_lock(cache, hash_page);

/* Get value data pointer */
mmc_write(cache, hash_slot, key_ptr, key_len, val_ptr, val_len, -1, flags);
mmc_write(cache, hash_slot, key_ptr, key_len, val_ptr, val_len, -1, version, flags);

mmc_unlock(cache);

Expand Down
42 changes: 31 additions & 11 deletions lib/Cache/FastMmap.pm
Original file line number Diff line number Diff line change
Expand Up @@ -787,15 +787,15 @@ sub get {
# Hash value, lock page, read result
my ($HashPage, $HashSlot) = fc_hash($Cache, $_[1]);
my $Unlock = $Self->_lock_page($HashPage);
my ($Val, $Flags, $Found, $ExpireOn) = fc_read($Cache, $HashSlot, $_[1]);
my ($Val, $Flags, $Found, $ExpireOn, $Version) = fc_read($Cache, $HashSlot, $_[1], 0);

# Value not found, check underlying data store
if (!$Found && (my $read_cb = $Self->{read_cb})) {

# Callback to read from underlying data store
# (unlock page first if we allow recursive calls
$Unlock = undef if $Self->{allow_recursive};
$Val = eval { $read_cb->($Self->{context}, $_[1]); };
($Val, my $Opts) = eval { $read_cb->($Self->{context}, $_[1]); };
my $Err = $@;
$Unlock = $Self->_lock_page($HashPage) if $Self->{allow_recursive};

Expand All @@ -805,9 +805,6 @@ sub get {
# If we found it, or want to cache not-found, store back into our cache
if (defined $Val || $Self->{cache_not_found}) {

# Are we doing writeback's? If so, need to mark as dirty in cache
my $write_back = $Self->{write_back};

$Val = $Self->{serialize}(\$Val) if $Self->{serialize};
$Val = $Self->{compress}($Val) if $Self->{compress};

Expand All @@ -816,7 +813,8 @@ sub get {
my $KVLen = length($_[1]) + (defined($Val) ? length($Val) : 0);
$Self->_expunge_page(2, 1, $KVLen);

fc_write($Cache, $HashSlot, $_[1], $Val, -1, 0);
my $version = $Opts && defined $Opts->{version} ? int $Opts->{version} : 0;
fc_write($Cache, $HashSlot, $_[1], $Val, -1, $version, 0);
}
}

Expand All @@ -830,7 +828,9 @@ sub get {
$Val = ${$Self->{deserialize}($Val)} if defined($Val) && $Self->{deserialize};

# If explicitly asked to skip unlocking, we return the reference to the unlocker
return ($Val, $Unlock, { $Found ? (expire_on => $ExpireOn) : () }) if $SkipUnlock;
if ($SkipUnlock) {
return ($Val, $Unlock, { $Found ? (expire_on => $ExpireOn, version => $Version) : () });
}

return $Val;
}
Expand Down Expand Up @@ -870,6 +870,7 @@ sub set {
defined $Opts->{expire_on} ? $Opts->{expire_on} :
(defined $Opts->{expire_time} ? parse_expire_time($Opts->{expire_time}, _time()): -1)
) : -1;
my $version = defined $Opts->{version} ? int $Opts->{version} : 0;

# Hash value, lock page
my ($HashPage, $HashSlot) = fc_hash($Cache, $_[1]);
Expand All @@ -893,7 +894,7 @@ sub set {
$Self->_expunge_page(2, 1, $KVLen);

# Now store into cache
my $DidStore = fc_write($Cache, $HashSlot, $_[1], $Val, $expire_on, $write_back ? FC_ISDIRTY : 0);
my $DidStore = fc_write($Cache, $HashSlot, $_[1], $Val, $expire_on, $version, $write_back ? FC_ISDIRTY : 0);

# Unlock page
$Unlock = undef;
Expand All @@ -907,6 +908,25 @@ sub set {
return $DidStore;
}

=item I<exists($Key, [ \%Options ])>
Search cache for given Key. Returns undef if not found. Does not
check any I<read_cb>, only checks for in the actual cache right now
=cut
sub exists {
my ($Self, $Cache) = ($_[0], $_[0]->{Cache});

# Hash value, lock page, read result
my ($HashPage, $HashSlot) = fc_hash($Cache, $_[1]);
my $Unlock = $Self->_lock_page($HashPage);
my (undef, $Flags, $Found, $ExpireOn, $Version) = fc_read($Cache, $HashSlot, $_[1], 1);

$Unlock = undef;

return $Found ? { expire_on => $ExpireOn, version => $Version } : undef;
}

=item I<get_and_set($Key, $AtomicSub)>
Atomically retrieve and set the value of a Key.
Expand Down Expand Up @@ -1070,7 +1090,7 @@ sub expire {
# Hash value, lock page, read result
my ($HashPage, $HashSlot) = fc_hash($Cache, $_[1]);
my $Unlock = $Self->_lock_page($HashPage);
my ($Val, $Flags, $Found) = fc_read($Cache, $HashSlot, $_[1]);
my ($Val, $Flags, $Found, $Version) = fc_read($Cache, $HashSlot, $_[1], 0);

# If we found it, remove it
if ($Found) {
Expand Down Expand Up @@ -1250,7 +1270,7 @@ sub multi_get {
# Hash key to get slot in this page and read
my $FinalKey = "$_[1]-$_";
(undef, $HashSlot) = fc_hash($Cache, $FinalKey);
my ($Val, $Flags, $Found, $ExpireOn) = fc_read($Cache, $HashSlot, $FinalKey);
my ($Val, $Flags, $Found, $ExpireOn) = fc_read($Cache, $HashSlot, $FinalKey, 0);
next unless $Found;

# If not using raw values, use thaw() to turn data back into object
Expand Down Expand Up @@ -1302,7 +1322,7 @@ sub multi_set {

# Now hash key and store into page
(undef, $HashSlot) = fc_hash($Cache, $FinalKey);
my $DidStore = fc_write($Cache, $HashSlot, $FinalKey, $Val, $expire_on, 0);
my $DidStore = fc_write($Cache, $HashSlot, $FinalKey, $Val, $expire_on, 0, 0);
}

# Unlock page
Expand Down
26 changes: 15 additions & 11 deletions mmap_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ int mmc_hash(
* cache_mmap * cache, MU64 hash_slot,
* void *key_ptr, int key_len,
* void **val_ptr, int *val_len,
* MU64 *expire_on, MU64 *flags
* MU64 *expire_on_p, MU64 * version_p, MU64 *flags_p
* )
*
* Read key from current page
Expand All @@ -416,7 +416,7 @@ int mmc_read(
mmap_cache *cache, MU64 hash_slot,
void *key_ptr, int key_len,
void **val_ptr, int *val_len,
MU64 *expire_on_p, MU64 *flags_p
MU64 *expire_on_p, MU64 *version_p, MU64 *flags_p
) {
MU64 * slot_ptr;

Expand Down Expand Up @@ -458,8 +458,9 @@ int mmc_read(
S_LastAccess(base_det) = now;

/* Copy values to pointers */
*flags_p = S_Flags(base_det);
*expire_on_p = expire_on;
*version_p = S_Version(base_det);
*flags_p = S_Flags(base_det);
*val_len = S_ValLen(base_det);
*val_ptr = S_ValPtr(base_det);

Expand All @@ -478,7 +479,7 @@ int mmc_read(
* cache_mmap * cache, MU64 hash_slot,
* void *key_ptr, int key_len,
* void *val_ptr, int val_len,
* MU64 expire_on, MU64 flags
* MU64 expire_on, MU64 version, MU64 flags
* )
*
* Write key to current page
Expand All @@ -488,7 +489,7 @@ int mmc_write(
mmap_cache *cache, MU64 hash_slot,
void *key_ptr, int key_len,
void *val_ptr, int val_len,
MU64 expire_on, MU64 flags
MU64 expire_on, MU64 version, MU64 flags
) {
int did_store = 0;
MU64 kvlen = KV_SlotLen(key_len, val_len);
Expand Down Expand Up @@ -525,6 +526,7 @@ int mmc_write(
S_LastAccess(base_det) = now;
S_ExpireOn(base_det) = expire_on;
S_SlotHash(base_det) = hash_slot;
S_Version(base_det) = version;
S_Flags(base_det) = flags;
S_KeyLen(base_det) = (MU64)key_len;
S_ValLen(base_det) = (MU64)val_len;
Expand Down Expand Up @@ -565,7 +567,7 @@ int mmc_write(
int mmc_delete(
mmap_cache *cache, MU64 hash_slot,
void *key_ptr, int key_len,
MU64 * flags
MU64 * flags_p, MU64 * version_p
) {
/* Search slots for key */
MU64 * slot_ptr = _mmc_find_slot(cache, hash_slot, key_ptr, key_len, 2);
Expand All @@ -581,7 +583,8 @@ int mmc_delete(

/* Store flags in output pointer */
MU64 * base_det = S_Ptr(cache->p_base, *slot_ptr);
*flags = S_Flags(base_det);
*flags_p = S_Flags(base_det);
*version_p = S_Version(base_det);

_mmc_delete_slot(cache, slot_ptr);
return 1;
Expand Down Expand Up @@ -985,7 +988,7 @@ void mmc_get_details(
MU64 * base_det,
void ** key_ptr, int * key_len,
void ** val_ptr, int * val_len,
MU64 * last_access, MU64 * expire_on, MU64 * flags
MU64 * last_access_p, MU64 * expire_on_p, MU64 * version_p, MU64 * flags_p
) {
cache = cache;

Expand All @@ -995,9 +998,10 @@ void mmc_get_details(
*val_ptr = S_ValPtr(base_det);
*val_len = S_ValLen(base_det);

*last_access = S_LastAccess(base_det);
*expire_on = S_ExpireOn(base_det);
*flags = S_Flags(base_det);
*last_access_p = S_LastAccess(base_det);
*expire_on_p = S_ExpireOn(base_det);
*version_p = S_Version(base_det);
*flags_p = S_Flags(base_det);
}


Expand Down
Loading

0 comments on commit d11d594

Please sign in to comment.