-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathdefrag.c
More file actions
1357 lines (1178 loc) · 53.7 KB
/
defrag.c
File metadata and controls
1357 lines (1178 loc) · 53.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Active memory defragmentation
* Try to find key / value allocations that need to be re-allocated in order
* to reduce external fragmentation.
* We do that by scanning the keyspace and for each pointer we have, we can try to
* ask the allocator if moving it to a new address will help reduce fragmentation.
*
* Copyright (c) 2020, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/*
* Copyright (c) Valkey Contributors
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*/
#include "server.h"
#include "hashtable.h"
#include "eval.h"
#include "script.h"
#include "module.h"
#include <stdbool.h>
#include <stddef.h>
#ifdef HAVE_DEFRAG
typedef enum { DEFRAG_NOT_DONE = 0,
DEFRAG_DONE = 1 } doneStatus;
/*
* Defragmentation is performed in stages. Each stage is serviced by a stage function
* (defragStageFn). The stage function is passed a target (void*) to defrag. The contents of that
* target are unique to the particular stage - and may even be NULL for some stage functions. The
* same stage function can be used multiple times (for different stages) each having a different
* target.
*
* The stage function is required to maintain an internal static state. This allows the stage
* function to continue when invoked in an iterative manner. When invoked with a 0 endtime, the
* stage function is required to clear it's internal state and prepare to begin a new stage. It
* should return false (more work to do) as it should NOT perform any real "work" during init.
*
* Parameters:
* endtime - This is the monotonic time that the function should end and return. This ensures
* a bounded latency due to defrag. When endtime is 0, the internal state should be
* cleared, preparing to begin the stage with a new target.
* target - This is the "thing" that should be defragged. It's type is dependent on the
* type of the stage function. This might be a dict, a kvstore, a DB, or other.
* privdata - A pointer to arbitrary private data which is unique to the stage function.
*
* Returns:
* - DEFRAG_DONE if the stage is complete
* - DEFRAG_NOT_DONE if there is more work to do
*/
typedef doneStatus (*defragStageFn)(monotime endtime, void *target, void *privdata);
typedef struct {
defragStageFn stage_fn; // The function to be invoked for the stage
void *target; // The target that the function will defrag
void *privdata; // Private data, unique to the stage function
} StageDescriptor;
/* Globals needed for the main defrag processing logic.
* Doesn't include variables specific to a stage or type of data. */
struct DefragContext {
monotime start_cycle; // Time of beginning of defrag cycle
long long start_defrag_hits; // server.stat_active_defrag_hits captured at beginning of cycle
list *remaining_stages; // List of stages which remain to be processed
StageDescriptor *current_stage; // The stage that's currently being processed
long long timeproc_id; // Eventloop ID of the timerproc (or AE_DELETED_EVENT_ID)
monotime timeproc_end_time; // Ending time of previous timerproc execution
long timeproc_overage_us; // A correction value if over target CPU percent
};
static struct DefragContext defrag;
/* There are a number of stages which process a kvstore. To simplify this, a stage helper function
* `defragStageKvstoreHelper()` is defined. This function aids in iterating over the kvstore. It
* uses these definitions.
*/
/* State of the kvstore helper. The private data (privdata) passed to the kvstore helper MUST BEGIN
* with a kvstoreIterState (or be passed as NULL). */
#define KVS_SLOT_DEFRAG_LUT -2
#define KVS_SLOT_UNASSIGNED -1
typedef struct {
kvstore *kvs;
int slot;
unsigned long cursor;
} kvstoreIterState;
/* The kvstore helper uses this function to perform tasks before continuing the iteration. For the
* main hash table, large items are set aside and processed by this function before continuing with
* iteration over the kvstore.
* endtime - This is the monotonic time that the function should end and return.
* privdata - Private data for functions invoked by the helper. If provided in the call to
* `defragStageKvstoreHelper()`, the `kvstoreIterState` portion (at the beginning)
* will be updated with the current kvstore iteration status.
*
* Returns:
* - DEFRAG_DONE if the pre-continue work is complete
* - DEFRAG_NOT_DONE if there is more work to do
*/
typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *privdata);
// Private data for main dictionary keys
typedef struct {
kvstoreIterState kvstate;
int dbid;
} defragKeysCtx;
static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
// Private data for pubsub kvstores
typedef hashtable *(*getClientChannelsFn)(client *);
typedef struct {
getClientChannelsFn fn;
} getClientChannelsFnWrapper;
typedef struct {
kvstoreIterState kvstate;
getClientChannelsFn getPubSubChannels;
} defragPubSubCtx;
static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
/* When scanning a main kvstore, large elements are queued for later handling rather than
* causing a large latency spike while processing a hash table bucket. This list is only used
* for stage: "defragStageDbKeys". It will only contain values for the current kvstore being
* defragged.
* Note that this is a list of key names. It's possible that the key may be deleted or modified
* before "later" and we will search by key name to find the entry when we defrag the item later.
*/
static list *defrag_later;
static unsigned long defrag_later_cursor;
/* Defrag function which allocates and copies memory if needed, but DOESN'T free the old block.
* It is the responsibility of the caller to free the old block if a non-NULL value (new block)
* is returned. (Returns NULL if no relocation was needed.)
*/
static void *activeDefragAllocWithoutFree(void *ptr, size_t *allocation_size) {
size_t size;
void *newptr;
if (!allocatorShouldDefrag(ptr)) {
server.stat_active_defrag_misses++;
return NULL;
}
/* move this allocation to a new allocation.
* make sure not to use the thread cache. so that we don't get back the same
* pointers we try to free */
size = zmalloc_size(ptr);
newptr = allocatorDefragAlloc(size);
memcpy(newptr, ptr, size);
if (allocation_size) *allocation_size = size;
server.stat_active_defrag_hits++;
return newptr;
}
/* Defrag helper for generic allocations.
*
* Returns NULL in case the allocation wasn't moved.
* When it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
void *activeDefragAlloc(void *ptr) {
size_t allocation_size;
void *newptr = activeDefragAllocWithoutFree(ptr, &allocation_size);
if (newptr) allocatorDefragFree(ptr, allocation_size);
return newptr;
}
/* Defrag helper for sds strings
*
* Returns NULL in case the allocation wasn't moved.
* When it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
sds activeDefragSds(sds sdsptr) {
void *ptr = sdsAllocPtr(sdsptr);
void *newptr = activeDefragAlloc(ptr);
if (newptr) {
size_t offset = sdsptr - (char *)ptr;
sdsptr = (char *)newptr + offset;
return sdsptr;
}
return NULL;
}
/* Performs defrag on a string-type (or generic) robj, but does not free the old robj. This is the
* caller's responsibility. This is necessary for string objects with multiple references. In this
* case the caller can fix the references before freeing the original object.
*/
static robj *activeDefragStringObWithoutFree(robj *ob, size_t *allocation_size) {
if (ob->type == OBJ_STRING && ob->encoding == OBJ_ENCODING_RAW) {
// Try to defrag the linked sds, regardless of if robj will be moved
sds newsds = activeDefragSds((sds)objectGetVal(ob));
if (newsds) objectSetVal(ob, newsds);
}
robj *new_robj = activeDefragAllocWithoutFree(ob, allocation_size);
return new_robj;
}
/* Defrag helper for robj and/or string objects
*
* Returns NULL in case the allocation wasn't moved.
* When it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
robj *activeDefragStringOb(robj *ob) {
size_t allocation_size;
if (ob->refcount != 1) return NULL; // Unsafe to defrag if multiple refs
robj *new_robj = activeDefragStringObWithoutFree(ob, &allocation_size);
if (new_robj) allocatorDefragFree(ob, allocation_size);
return new_robj;
}
/* Internal function used by zslDefrag */
static void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) {
int i;
for (i = 0; i < zslGetHeight(zsl); i++) {
if (update[i]->level[i].forward == oldnode) update[i]->level[i].forward = newnode;
}
serverAssert(zslGetHeader(zsl) != oldnode);
if (newnode->level[0].forward) {
serverAssert(newnode->level[0].forward->backward == oldnode);
newnode->level[0].forward->backward = newnode;
} else {
serverAssert(zslGetTail(zsl) == oldnode);
zslSetTail(zsl, newnode);
}
}
/* Hashtable scan callback for sorted set. It defragments a single skiplist
* node, updates skiplist pointers, and updates the hashtable pointer to the
* node. */
static void activeDefragZsetNode(void *privdata, void *entry_ref) {
zskiplist *zsl = privdata;
zskiplistNode **node_ref = (zskiplistNode **)entry_ref;
zskiplistNode *node = *node_ref;
size_t allocation_size;
zskiplistNode *newnode = activeDefragAllocWithoutFree(node, &allocation_size);
if (newnode == NULL) return;
const double score = node->score;
/* find skiplist pointers that need to be updated if we end up moving the
* skiplist node. */
sds ele = zslGetNodeElement(node);
zskiplistNode *update[ZSKIPLIST_MAXLEVEL];
zskiplistNode *x = zslGetHeader(zsl);
for (int i = zslGetHeight(zsl) - 1; i >= 0; i--) {
/* stop when we've reached the end of this level or the next node comes
* after our target in sorted order. Even though defrag replacements does not impact the skip list order,
* when scores are equal, we MUST compare elements lexicographically to maintain correct skip list ordering.
* Otherwise we might miss locating the entry. */
zskiplistNode *next = x->level[i].forward;
while (next &&
(next->score < score ||
(next->score == score && sdscmp(zslGetNodeElement(next), ele) < 0))) {
x = next;
next = x->level[i].forward;
}
update[i] = x;
}
/* should have arrived at intended node */
serverAssert(x->level[0].forward == node);
zslUpdateNode(zsl, node, newnode, update);
*node_ref = newnode; /* update hashtable pointer */
allocatorDefragFree(node, allocation_size);
}
#define DEFRAG_SDS_DICT_NO_VAL 0
#define DEFRAG_SDS_DICT_VAL_IS_SDS 1
#define DEFRAG_SDS_DICT_VAL_IS_STROB 2
#define DEFRAG_SDS_DICT_VAL_VOID_PTR 3
#define DEFRAG_SDS_DICT_VAL_LUA_SCRIPT 4
typedef void *(dictDefragAllocFunction)(void *ptr);
typedef struct {
dictDefragAllocFunction *defragKey;
dictDefragAllocFunction *defragVal;
} dictDefragFunctions;
static void activeDefragDictCallback(void *privdata, void *entry_ref) {
dictDefragFunctions *defragfns = privdata;
dictEntry **de_ref = (dictEntry **)entry_ref;
dictEntry *de = *de_ref;
/* Defrag the entry itself */
dictEntry *newentry = activeDefragAlloc(de);
if (newentry) {
de = newentry;
*de_ref = newentry;
}
/* Defrag the key */
if (defragfns->defragKey) {
void *newkey = defragfns->defragKey(de->key);
if (newkey) de->key = newkey;
}
/* Defrag the value */
if (defragfns->defragVal) {
void *newval = defragfns->defragVal(de->v.val);
if (newval) de->v.val = newval;
}
}
/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
static void activeDefragSdsDict(dict *d, int val_type) {
unsigned long cursor = 0;
dictDefragFunctions defragfns = {
.defragKey = (dictDefragAllocFunction *)activeDefragSds,
.defragVal = (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS ? (dictDefragAllocFunction *)activeDefragSds
: val_type == DEFRAG_SDS_DICT_VAL_IS_STROB ? (dictDefragAllocFunction *)activeDefragStringOb
: val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR ? (dictDefragAllocFunction *)activeDefragAlloc
: val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT ? (dictDefragAllocFunction *)evalActiveDefragScript
: NULL)};
do {
cursor = hashtableScanDefrag(d, cursor, activeDefragDictCallback,
&defragfns, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
static void activeDefragSdsHashtableCallback(void *privdata, void *entry_ref) {
UNUSED(privdata);
sds *sds_ref = (sds *)entry_ref;
sds new_sds = activeDefragSds(*sds_ref);
if (new_sds != NULL) *sds_ref = new_sds;
}
/* Defrag a list of ptr, sds or robj string values */
static void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
quicklistNode *newnode, *node = *node_ref;
unsigned char *newzl;
if ((newnode = activeDefragAlloc(node))) {
if (newnode->prev)
newnode->prev->next = newnode;
else
ql->head = newnode;
if (newnode->next)
newnode->next->prev = newnode;
else
ql->tail = newnode;
*node_ref = node = newnode;
}
if ((newzl = activeDefragAlloc(node->entry))) node->entry = newzl;
}
static void activeDefragQuickListNodes(quicklist *ql) {
quicklistNode *node = ql->head;
while (node) {
activeDefragQuickListNode(ql, &node);
node = node->next;
}
}
/* when the value has lots of elements, we want to handle it later and not as
* part of the main dictionary scan. this is needed in order to prevent latency
* spikes when handling large items */
static void defragLater(robj *obj) {
if (!defrag_later) {
defrag_later = listCreate();
listSetFreeMethod(defrag_later, sdsfreeVoid);
defrag_later_cursor = 0;
}
sds key = sdsdup(objectGetKey(obj));
listAddNodeTail(defrag_later, key);
}
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
static long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) {
quicklist *ql = objectGetVal(ob);
quicklistNode *node;
serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
/* Find starting node */
if (*cursor == 0) {
node = ql->head;
} else {
node = quicklistBookmarkFind(ql, "_AD");
if (!node) {
*cursor = 0;
return 0; /* either bookmark failed to create or deleted, skip this one and continue with other quicklists*/
}
node = node->next;
}
/* Process nodes until time expires or list ends */
while (node) {
activeDefragQuickListNode(ql, &node);
server.stat_active_defrag_scanned++;
/* Check time limit after processing each node */
if (getMonotonicUs() > endtime) {
if (quicklistBookmarkCreate(&ql, "_AD", node)) {
objectSetVal(ob, ql); /* bookmark creation may have re-allocated the quicklist */
(*cursor)++;
return 1;
}
break; /* bookmark creation failed - skip this one and continue with other quicklists */
}
node = node->next;
}
/* Completed processing all nodes */
quicklistBookmarkDelete(ql, "_AD");
*cursor = 0;
return 0;
}
static void scanLaterZsetCallback(void *privdata, void *element_ref) {
activeDefragZsetNode(privdata, element_ref);
server.stat_active_defrag_scanned++;
}
static void scanLaterZset(robj *ob, unsigned long *cursor) {
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
zset *zs = (zset *)objectGetVal(ob);
*cursor = hashtableScanDefrag(zs->ht, *cursor, scanLaterZsetCallback, zs->zsl, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}
/* Used as hashtable scan callback when all we need is to defrag the hashtable
* internals (the allocated buckets) and not the elements. */
static void scanHashtableCallbackCountScanned(void *privdata, void *elemref) {
UNUSED(privdata);
UNUSED(elemref);
server.stat_active_defrag_scanned++;
}
static void scanLaterSet(robj *ob, unsigned long *cursor) {
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HASHTABLE);
hashtable *ht = objectGetVal(ob);
*cursor = hashtableScanDefrag(ht, *cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}
static void scanLaterHash(robj *ob, unsigned long *cursor) {
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HASHTABLE);
*cursor = hashTypeScanDefrag(ob, *cursor, activeDefragAlloc);
}
static void defragQuicklist(robj *ob) {
quicklist *ql = objectGetVal(ob), *newql;
serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
if ((newql = activeDefragAlloc(ql))) {
objectSetVal(ob, newql);
ql = newql;
}
if (ql->len > server.active_defrag_max_scan_fields)
defragLater(ob);
else
activeDefragQuickListNodes(ql);
}
static void defragZsetSkiplist(robj *ob) {
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
zset *zs = (zset *)objectGetVal(ob);
zset *newzs;
zskiplist *newzsl;
if ((newzs = activeDefragAlloc(zs))) {
objectSetVal(ob, newzs);
zs = newzs;
}
if ((newzsl = activeDefragAlloc(zs->zsl))) zs->zsl = newzsl;
hashtable *newtable;
if ((newtable = hashtableDefragTables(zs->ht, activeDefragAlloc))) zs->ht = newtable;
if (hashtableSize(zs->ht) > server.active_defrag_max_scan_fields)
defragLater(ob);
else {
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(zs->ht, cursor, activeDefragZsetNode, zs->zsl, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
}
/* Defragment a hash object.
*
* Large hashtable-encoded hashes are deferred via `defrag_later`.
* Smaller ones are defragmented immediately, possibly over multiple passes.
* Listpack-encoded hashes are always handled in a single pass. */
static void defragHash(robj *ob) {
hashtable *ht = objectGetVal(ob);
if (ob->encoding == OBJ_ENCODING_HASHTABLE && hashtableSize(ht) > server.active_defrag_max_scan_fields) {
/* Large hashtable-encoded hashes are deferred via `defrag_later` */
defragLater(ob);
} else {
/* Smaller hashtables are defragmented immediately, possibly over multiple passes.
* Listpack-encoded hashes are always handled in a single pass in hashTypeScanDefrag. */
unsigned long cursor = 0;
do {
cursor = hashTypeScanDefrag(ob, cursor, activeDefragAlloc);
} while (cursor != 0);
}
}
static void defragSet(robj *ob) {
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HASHTABLE);
hashtable *ht = objectGetVal(ob);
if (hashtableSize(ht) > server.active_defrag_max_scan_fields) {
defragLater(ob);
} else {
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
/* defrag the hashtable struct and tables */
hashtable *new_hashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (new_hashtable) objectSetVal(ob, new_hashtable);
}
/* Defrag callback for radix tree iterator, called for each node,
* used in order to defrag the nodes allocations. */
static int defragRaxNode(raxNode **noderef) {
raxNode *newnode = activeDefragAlloc(*noderef);
if (newnode) {
*noderef = newnode;
return 1;
}
return 0;
}
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
static int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime) {
static unsigned char last[sizeof(streamID)];
raxIterator ri;
long iterations = 0;
serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
stream *s = objectGetVal(ob);
raxStart(&ri, s->rax);
if (*cursor == 0) {
/* if cursor is 0, we start new iteration */
defragRaxNode(&s->rax->head);
/* assign the iterator node callback before the seek, so that the
* initial nodes that are processed till the first item are covered */
ri.node_cb = defragRaxNode;
raxSeek(&ri, "^", NULL, 0);
} else {
/* if cursor is non-zero, we seek to the static 'last' */
if (!raxSeek(&ri, ">", last, sizeof(last))) {
*cursor = 0;
raxStop(&ri);
return 0;
}
/* assign the iterator node callback after the seek, so that the
* initial nodes that are processed till now aren't covered */
ri.node_cb = defragRaxNode;
}
(*cursor)++;
while (raxNext(&ri)) {
void *newdata = activeDefragAlloc(ri.data);
if (newdata) raxSetData(ri.node, ri.data = newdata);
server.stat_active_defrag_scanned++;
if (++iterations > 128) {
if (getMonotonicUs() > endtime) {
serverAssert(ri.key_len == sizeof(last));
memcpy(last, ri.key, ri.key_len);
raxStop(&ri);
return 1;
}
iterations = 0;
}
}
raxStop(&ri);
*cursor = 0;
return 0;
}
/* optional callback used defrag each rax element (not including the element pointer itself) */
typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata);
/* defrag radix tree including:
* 1) rax struct
* 2) rax nodes
* 3) rax entry data (only if defrag_data is specified)
* 4) call a callback per element, and allow the callback to return a new pointer for the element */
static void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
raxIterator ri;
rax *rax;
if ((rax = activeDefragAlloc(*raxref))) *raxref = rax;
rax = *raxref;
raxStart(&ri, rax);
ri.node_cb = defragRaxNode;
defragRaxNode(&rax->head);
raxSeek(&ri, "^", NULL, 0);
while (raxNext(&ri)) {
void *newdata = NULL;
if (element_cb) newdata = element_cb(&ri, element_cb_data);
if (defrag_data && !newdata) newdata = activeDefragAlloc(ri.data);
if (newdata) raxSetData(ri.node, ri.data = newdata);
}
raxStop(&ri);
}
typedef struct {
streamCG *cg;
streamConsumer *c;
} PendingEntryContext;
static void *defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
PendingEntryContext *ctx = privdata;
streamNACK *nack = ri->data, *newnack;
nack->consumer = ctx->c; /* update nack pointer to consumer */
newnack = activeDefragAlloc(nack);
if (newnack) {
/* update consumer group pointer to the nack */
void *prev;
raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
serverAssert(prev == nack);
}
return newnack;
}
static void *defragStreamConsumer(raxIterator *ri, void *privdata) {
streamConsumer *c = ri->data;
streamCG *cg = privdata;
void *newc = activeDefragAlloc(c);
if (newc) {
c = newc;
}
sds newsds = activeDefragSds(c->name);
if (newsds) c->name = newsds;
if (c->pel) {
PendingEntryContext pel_ctx = {cg, c};
defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
}
return newc; /* returns NULL if c was not defragged */
}
static void *defragStreamConsumerGroup(raxIterator *ri, void *privdata) {
streamCG *cg = ri->data;
UNUSED(privdata);
if (cg->consumers) defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
if (cg->pel) defragRadixTree(&cg->pel, 0, NULL, NULL);
return NULL;
}
static void defragStream(robj *ob) {
serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
stream *s = objectGetVal(ob), *news;
/* handle the main struct */
if ((news = activeDefragAlloc(s))) {
objectSetVal(ob, news);
s = news;
}
if (raxSize(s->rax) > server.active_defrag_max_scan_fields) {
rax *newrax = activeDefragAlloc(s->rax);
if (newrax) s->rax = newrax;
defragLater(ob);
} else
defragRadixTree(&s->rax, 1, NULL, NULL);
if (s->cgroups) defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
}
/* Defrag a module key. This is either done immediately or scheduled
* for later. Returns then number of pointers defragged.
*/
static void defragModule(serverDb *db, robj *obj) {
serverAssert(obj->type == OBJ_MODULE);
/* Fun fact (and a bug since forever): The key is passed to
* moduleDefragValue as an sds string, but the parameter is declared to be
* an robj and it's passed as such to the module type defrag callbacks.
* Nobody can ever have used this, i.e. accessed the key name in the defrag
* or free_effort module type callbacks. */
void *sds_key_passed_as_robj = objectGetKey(obj);
if (!moduleDefragValue(sds_key_passed_as_robj, obj, db->id)) defragLater(obj);
}
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
static void defragKey(defragKeysCtx *ctx, robj **elemref) {
serverDb *db = server.db[ctx->dbid];
int slot = ctx->kvstate.slot;
robj *newob, *ob;
unsigned char *newzl;
ob = *elemref;
/* Try to defrag robj and/or string value. */
if ((newob = activeDefragStringOb(ob))) {
*elemref = newob;
if (objectGetExpire(newob) >= 0) {
/* Replace the pointer in the expire table without accessing the old
* pointer. */
hashtable *expires_ht = kvstoreGetHashtable(db->expires, slot);
bool replaced = hashtableReplaceReallocatedEntry(expires_ht, ob, newob);
serverAssert(replaced);
}
if (newob->type == OBJ_HASH && hashTypeHasVolatileFields(newob)) {
/* Check if this is a hash object containing volatile fields.
* and update keys_with_volatile_items after defrag. */
hashtable *keys_with_volatile_items_ht = kvstoreGetHashtable(db->keys_with_volatile_items, slot);
bool replaced = hashtableReplaceReallocatedEntry(keys_with_volatile_items_ht, ob, newob);
serverAssert(replaced);
}
ob = newob;
}
if (ob->type == OBJ_STRING) {
/* Already handled in activeDefragStringOb. */
} else if (ob->type == OBJ_LIST) {
if (ob->encoding == OBJ_ENCODING_QUICKLIST) {
defragQuicklist(ob);
} else if (ob->encoding == OBJ_ENCODING_LISTPACK) {
if ((newzl = activeDefragAlloc(objectGetVal(ob)))) objectSetVal(ob, newzl);
} else {
serverPanic("Unknown list encoding");
}
} else if (ob->type == OBJ_SET) {
if (ob->encoding == OBJ_ENCODING_HASHTABLE) {
defragSet(ob);
} else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) {
void *newptr, *ptr = objectGetVal(ob);
if ((newptr = activeDefragAlloc(ptr))) objectSetVal(ob, newptr);
} else {
serverPanic("Unknown set encoding");
}
} else if (ob->type == OBJ_ZSET) {
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
if ((newzl = activeDefragAlloc(objectGetVal(ob)))) objectSetVal(ob, newzl);
} else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
defragZsetSkiplist(ob);
} else {
serverPanic("Unknown sorted set encoding");
}
} else if (ob->type == OBJ_HASH) {
defragHash(ob);
} else if (ob->type == OBJ_STREAM) {
defragStream(ob);
} else if (ob->type == OBJ_MODULE) {
defragModule(db, ob);
} else {
serverPanic("Unknown object type");
}
}
/* Defrag scan callback for the main db dictionary. */
static void dbKeysScanCallback(void *privdata, void *elemref) {
long long hits_before = server.stat_active_defrag_hits;
defragKey((defragKeysCtx *)privdata, (robj **)elemref);
if (server.stat_active_defrag_hits != hits_before)
server.stat_active_defrag_key_hits++;
else
server.stat_active_defrag_key_misses++;
server.stat_active_defrag_scanned++;
}
/* Defrag scan callback for a pubsub channels hashtable. */
static void defragPubsubScanCallback(void *privdata, void *elemref) {
defragPubSubCtx *ctx = privdata;
void **clients_ref = (void **)elemref;
hashtable *newclients, *clients = *clients_ref;
robj *newchannel, *channel = *(robj **)hashtableMetadata(clients);
/* Try to defrag the channel name. */
serverAssert(channel->refcount == (int)hashtableSize(clients) + 1);
newchannel = activeDefragStringOb(channel);
if (newchannel) {
*(robj **)hashtableMetadata(clients) = newchannel;
/* The channel name is shared by the client's pubsub(shard) and server's
* pubsub(shard), after defraging the channel name, we need to update
* the reference in the clients' dictionary. */
hashtableIterator iter;
hashtableInitIterator(&iter, clients, 0);
void *c;
while (hashtableNext(&iter, &c)) {
hashtable *client_channels = ctx->getPubSubChannels(c);
bool replaced = hashtableReplaceReallocatedEntry(client_channels, channel, newchannel);
serverAssert(replaced);
}
hashtableCleanupIterator(&iter);
}
/* Try to defrag the dictionary of clients that is stored as the value part. */
if ((newclients = hashtableDefragTables(clients, activeDefragAlloc)))
*clients_ref = newclients;
server.stat_active_defrag_scanned++;
}
/* returns 0 more work may or may not be needed (see non-zero cursor),
* and 1 if time is up and more work is needed. */
static int defragLaterItem(robj *ob, unsigned long *cursor, monotime endtime, int dbid) {
if (ob) {
if (ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST) {
return scanLaterList(ob, cursor, endtime);
} else if (ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HASHTABLE) {
scanLaterSet(ob, cursor);
} else if (ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST) {
scanLaterZset(ob, cursor);
} else if (ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HASHTABLE) {
scanLaterHash(ob, cursor);
} else if (ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM) {
return scanLaterStreamListpacks(ob, cursor, endtime);
} else if (ob->type == OBJ_MODULE) {
/* Fun fact (and a bug since forever): The key is passed to
* moduleLateDefrag as an sds string, but the parameter is declared
* to be an robj and it's passed as such to the module type defrag
* callbacks. Nobody can ever have used this, i.e. accessed the key
* name in the defrag module type callback. */
void *sds_key_passed_as_robj = objectGetKey(ob);
return moduleLateDefrag(sds_key_passed_as_robj, ob, cursor, endtime, dbid);
} else {
*cursor = 0; /* object type/encoding may have changed since we schedule it for later */
}
} else {
*cursor = 0; /* object may have been deleted already */
}
return 0;
}
// A kvstoreHelperPreContinueFn
static doneStatus defragLaterStep(monotime endtime, void *privdata) {
defragKeysCtx *ctx = privdata;
unsigned int iterations = 0;
long long prev_defragged = server.stat_active_defrag_hits;
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
while (defrag_later && listLength(defrag_later) > 0) {
listNode *head = listFirst(defrag_later);
sds key = head->value;
void *found = NULL;
kvstoreHashtableFind(ctx->kvstate.kvs, ctx->kvstate.slot, key, &found);
robj *ob = found;
long long key_defragged = server.stat_active_defrag_hits;
bool timeout = (defragLaterItem(ob, &defrag_later_cursor, endtime, ctx->dbid) == 1);
if (key_defragged != server.stat_active_defrag_hits) {
server.stat_active_defrag_key_hits++;
} else {
server.stat_active_defrag_key_misses++;
}
if (timeout) break;
if (defrag_later_cursor == 0) {
// the item is finished, move on
listDelNode(defrag_later, head);
}
if (++iterations > 16 || server.stat_active_defrag_hits > prev_defragged ||
server.stat_active_defrag_scanned - prev_scanned > 64) {
if (getMonotonicUs() > endtime) break;
iterations = 0;
prev_defragged = server.stat_active_defrag_hits;
prev_scanned = server.stat_active_defrag_scanned;
}
}
return (!defrag_later || listLength(defrag_later) == 0) ? DEFRAG_DONE : DEFRAG_NOT_DONE;
}
/* This helper function handles most of the work for iterating over a kvstore. 'privdata', if
* provided, MUST begin with 'kvstoreIterState' and this part is automatically updated by this
* function during the iteration. */
static doneStatus defragStageKvstoreHelper(monotime endtime,
kvstore *kvs,
hashtableScanFunction scan_fn,
kvstoreHelperPreContinueFn precontinue_fn,
void *privdata) {
static kvstoreIterState state; // STATIC - this persists
if (endtime == 0) {
// Starting the stage, set up the state information for this stage
state.kvs = kvs;
state.slot = KVS_SLOT_DEFRAG_LUT;
state.cursor = 0;
return DEFRAG_NOT_DONE;
}
if (kvs != state.kvs) {
// There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage.
return DEFRAG_DONE;
}
unsigned int iterations = 0;
long long prev_defragged = server.stat_active_defrag_hits;
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
if (state.slot == KVS_SLOT_DEFRAG_LUT) {
// Before we start scanning the kvstore, handle the main structures
do {
state.cursor = kvstoreHashtableDefragTables(kvs, state.cursor, activeDefragAlloc);
if (getMonotonicUs() >= endtime) return DEFRAG_NOT_DONE;
} while (state.cursor != 0);
state.slot = KVS_SLOT_UNASSIGNED;
}
while (true) {
if (++iterations > 16 || server.stat_active_defrag_hits > prev_defragged || server.stat_active_defrag_scanned - prev_scanned > 64) {
if (getMonotonicUs() >= endtime) break;
iterations = 0;
prev_defragged = server.stat_active_defrag_hits;
prev_scanned = server.stat_active_defrag_scanned;
}
if (precontinue_fn) {
if (privdata) *(kvstoreIterState *)privdata = state;
if (precontinue_fn(endtime, privdata) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE;
}
if (!state.cursor) {
// If there's no cursor, we're ready to begin a new kvstore slot.
if (state.slot == KVS_SLOT_UNASSIGNED) {
state.slot = kvstoreGetFirstNonEmptyHashtableIndex(kvs);
} else {
state.slot = kvstoreGetNextNonEmptyHashtableIndex(kvs, state.slot);
}
if (state.slot == KVS_SLOT_UNASSIGNED) return DEFRAG_DONE;
}
// Whatever privdata's actual type, this function requires that it begins with kvstoreIterState.
if (privdata) *(kvstoreIterState *)privdata = state;
state.cursor = kvstoreHashtableScanDefrag(kvs, state.slot, state.cursor,
scan_fn, privdata, activeDefragAlloc,
HASHTABLE_SCAN_EMIT_REF);
}
return DEFRAG_NOT_DONE;
}
// Target is a DBID
static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = server.db[dbid];
static defragKeysCtx ctx; // STATIC - this persists
if (endtime == 0) {
ctx.dbid = dbid;
// Don't return yet. Call the helper with endtime==0 below.
}
serverAssert(ctx.dbid == dbid);
return defragStageKvstoreHelper(endtime, db->keys,
dbKeysScanCallback, defragLaterStep, &ctx);
}
// Target is a DBID
static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = server.db[dbid];
return defragStageKvstoreHelper(endtime, db->expires,
scanHashtableCallbackCountScanned, NULL, NULL);
}
// Target is a DBID
static doneStatus defragStageKeysWithvolaItemsKvstore(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = server.db[dbid];
return defragStageKvstoreHelper(endtime, db->keys_with_volatile_items,
scanHashtableCallbackCountScanned, NULL, NULL);
}
static doneStatus defragStagePubsubKvstore(monotime endtime, void *target, void *privdata) {
// target is server.pubsub_channels or server.pubsubshard_channels
getClientChannelsFnWrapper *fnWrapper = privdata;
defragPubSubCtx ctx;
ctx.getPubSubChannels = fnWrapper->fn;
return defragStageKvstoreHelper(endtime, (kvstore *)target,