-
Notifications
You must be signed in to change notification settings - Fork 225
/
Copy pathsc_records.c
3835 lines (3417 loc) · 139 KB
/
sc_records.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2015 Bloomberg Finance L.P.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <unistd.h>
#include <poll.h>
#include "schemachange.h"
#include "sc_records.h"
#include "sc_global.h"
#include "sc_schema.h"
#include "sc_callbacks.h"
#include <bdb_fetch.h>
#include "dbinc/db_swap.h"
#include "llog_auto.h"
#include "llog_ext.h"
#include "bdb_osqllog.h"
#include "bdb_osql_log_rec.h"
#include "comdb2_atomic.h"
#include "epochlib.h"
#include "reqlog.h"
#include "logmsg.h"
#include "debug_switches.h"
int gbl_logical_live_sc = 0;
extern __thread snap_uid_t *osql_snap_info; /* contains cnonce */
extern int gbl_partial_indexes;
extern int gbl_debug_omit_zap_on_rebuild;
// Increase max threads to do SC -- called when no contention is detected
// A simple atomic add sufices here since this function is called from one
// place at any given time, currently from lkcounter_check() once per sec
static inline void increase_max_threads(uint32_t *maxthreads, int sc_threads)
{
if (*maxthreads >= sc_threads) return;
ATOMIC_ADD32_PTR(maxthreads, 1);
}
// Decrease max threads to do SC -- called when add_record gets an abort
// Used to backoff SC by having fewer threads running, decreasing contention
// We use atomic add here, since it may be called from multiple threads at once
// We also make certain that maxthreads does not go less than 1
static inline void decrease_max_threads(uint32_t *maxthreads)
{
if (*maxthreads <= 1) return;
/* ADDING -1 */
if (ATOMIC_ADD32_PTR(maxthreads, -1) < 1) XCHANGE32((*maxthreads), 1);
}
// increment number of rebuild threads in use
// if we are at capacity, then return 1 for failure
// if we were successful we return 0
static inline int use_rebuild_thr(uint32_t *thrcount, uint32_t *maxthreads)
{
if (*thrcount >= *maxthreads) return 1;
ATOMIC_ADD32_PTR(thrcount, 1);
return 0;
}
// decrement number of rebuild threads in use
static inline void release_rebuild_thr(uint32_t *thrcount)
{
assert(*thrcount >= 1);
/* ADDING -1 */
ATOMIC_ADD32_PTR(thrcount, -1);
}
/* Return true if there were writes to table undergoing SC (data->from)
* Note that data is local to this thread so this accounting happens
* for each thread performing SC.
*/
static inline int tbl_had_writes(struct convert_record_data *data)
{
int64_t oldcount = data->write_count;
data->write_count = data->from->write_count[RECORD_WRITE_INS] +
data->from->write_count[RECORD_WRITE_UPD] +
data->from->write_count[RECORD_WRITE_DEL];
return (data->write_count - oldcount) != 0;
}
static inline void print_final_sc_stat(struct convert_record_data *data)
{
sc_printf(
data->s,
"[%s] TOTAL converted %lld sc_adds %d sc_updates %d sc_deletess %d\n",
data->from->tablename,
data->from->sc_nrecs - (data->from->sc_adds + data->from->sc_updates +
data->from->sc_deletes),
data->from->sc_adds, data->from->sc_updates, data->from->sc_deletes);
}
/* prints global stats if not printed in the last sc_report_freq,
* returns 1 if successful
*/
static inline int print_aggregate_sc_stat(struct convert_record_data *data,
int now, int sc_report_freq)
{
uint32_t copy_total_lasttime = data->cmembers->total_lasttime;
/* Do work without locking */
if (now < copy_total_lasttime + sc_report_freq) return 0;
/* If time is up to print, atomically set total_lastime
* if this thread successful in setting, it can continue
* to print. If it failed, another thread is doing that work.
*/
int res = CAS32(data->cmembers->total_lasttime, copy_total_lasttime, now);
if (!res) return 0;
/* number of adds after schema cursor (by definition, all adds)
* number of updates before cursor
* number of deletes before cursor
* number of genids added since sc began (adds + updates)
*/
if (data->live)
sc_printf(data->s,
"[%s] >> adds %u upds %d dels %u extra genids "
"%u\n",
data->from->tablename, data->from->sc_adds,
data->from->sc_updates, data->from->sc_deletes,
data->from->sc_adds + data->from->sc_updates);
/* totals across all threads */
if (data->scanmode != SCAN_PARALLEL) return 1;
long long total_nrecs_diff =
data->from->sc_nrecs - data->from->sc_prev_nrecs;
data->from->sc_prev_nrecs = data->from->sc_nrecs;
sc_printf(data->s,
"[%s] progress TOTAL %lld +%lld actual "
"progress total %lld rate %lld r/s\n",
data->from->tablename, data->from->sc_nrecs, total_nrecs_diff,
data->from->sc_nrecs -
(data->from->sc_adds + data->from->sc_updates),
total_nrecs_diff / sc_report_freq);
return 1;
}
static inline void lkcounter_check(struct convert_record_data *data, int now)
{
uint32_t copy_lasttime = data->cmembers->lkcountercheck_lasttime;
int lkcounter_freq = bdb_attr_get(data->from->dbenv->bdb_attr,
BDB_ATTR_SC_CHECK_LOCKWAITS_SEC);
/* Do work without locking */
if (now < copy_lasttime + lkcounter_freq) return;
/* If time is up to do work, atomically set total_lastime
* if this thread successful in setting, it can continue
* to adjust num threads. If it failed, another thread is doing that work.
*/
int res = CAS32(data->cmembers->lkcountercheck_lasttime, copy_lasttime, now);
if (!res) return;
/* check lock waits -- there is no way to differentiate lock waits because
* of writes, with the exception that if there were writes in the last n
* seconds we may have been slowing them down. */
int64_t ndeadlocks = 0, nlockwaits = 0;
bdb_get_lock_counters(thedb->bdb_env, &ndeadlocks, NULL, &nlockwaits, NULL);
int64_t diff_deadlocks = ndeadlocks - data->cmembers->ndeadlocks;
int64_t diff_lockwaits = nlockwaits - data->cmembers->nlockwaits;
data->cmembers->ndeadlocks = ndeadlocks;
data->cmembers->nlockwaits = nlockwaits;
logmsg(LOGMSG_DEBUG,
"%s: diff_deadlocks=%" PRId64 ", diff_lockwaits=%" PRId64
", maxthr=%d, currthr=%d\n",
__func__, diff_deadlocks, diff_lockwaits, data->cmembers->maxthreads,
data->cmembers->thrcount);
increase_max_threads(
&data->cmembers->maxthreads,
bdb_attr_get(data->from->dbenv->bdb_attr, BDB_ATTR_SC_USE_NUM_THREADS));
}
/* If the schema is resuming it sets sc_genids to be the last genid for each
* stripe.
* If the schema change is not resuming it sets them all to zero
* If success it returns 0, if failure it returns <0 */
int init_sc_genids(struct dbtable *db, struct schema_change_type *s)
{
void *rec;
int orglen, bdberr, stripe;
unsigned long long *sc_genids;
if (db->sc_genids == NULL) {
db->sc_genids = malloc(sizeof(unsigned long long) * MAXDTASTRIPE);
if (db->sc_genids == NULL) {
logmsg(LOGMSG_ERROR,
"init_sc_genids: failed to allocate sc_genids\n");
return -1;
}
}
sc_genids = db->sc_genids;
/* if we aren't resuming simply zero the genids */
if (!s->resume) {
/* if we may have to resume this schema change, clear the progress in
* llmeta */
if (bdb_clear_high_genid(NULL /*input_trans*/, db->tablename,
db->dtastripe, &bdberr) ||
bdberr != BDBERR_NOERROR) {
logmsg(LOGMSG_ERROR, "init_sc_genids: failed to clear high "
"genids\n");
return -1;
}
bzero(sc_genids, sizeof(unsigned long long) * MAXDTASTRIPE);
return 0;
}
/* prepare for the largest possible data */
orglen = MAXLRL;
rec = malloc(orglen);
/* get max genid for each stripe */
for (stripe = 0; stripe < db->dtastripe; ++stripe) {
int rc;
uint8_t ver;
int dtalen = orglen;
/* get this stripe's newest genid and store it in sc_genids,
* if we have been rebuilding the data files we can grab the genids
* straight from there, otherwise we look in the llmeta table */
if (is_dta_being_rebuilt(db->plan)) {
rc = bdb_find_newest_genid(db->handle, NULL, stripe, rec, &dtalen,
dtalen, &sc_genids[stripe], &ver,
&bdberr);
if (rc == 1)
sc_genids[stripe] = 0ULL;
} else
rc = bdb_get_high_genid(db->tablename, stripe, &sc_genids[stripe],
&bdberr);
if (rc < 0 || bdberr != BDBERR_NOERROR) {
sc_errf(s, "init_sc_genids: failed to find newest genid for "
"stripe: %d\n",
stripe);
free(rec);
return -1;
}
sc_printf(s, "[%s] resuming stripe %2d from 0x%016llx\n", db->tablename,
stripe, sc_genids[stripe]);
}
free(rec);
return 0;
}
// this is only good for converting old schema to new schema full record
// because we only have one map from old schema to new schema
//(ie no index mapping--that can speedup insertion into indices too)
static inline int convert_server_record_cachedmap(
struct dbtable *db, const char *table, int tagmap[], const void *inbufp, char *outbuf,
struct schema_change_type *s, struct schema *from, struct schema *to,
blob_buffer_t *blobs, int maxblobs)
{
char err[1024];
struct convert_failure reason;
int rc =
stag_to_stag_buf_cachedmap(db, tagmap, from, to, (char *)inbufp, outbuf,
0 /*flags*/, &reason, blobs, maxblobs);
if (rc) {
convert_failure_reason_str(&reason, table, from->tag, to->tag, err,
sizeof(err));
sc_client_error(s, "cannot convert data %s", err);
return rc;
}
return 0;
}
static int convert_server_record_blobs(const void *inbufp, const char *from_tag,
struct dbrecord *db,
struct schema_change_type *s,
blob_buffer_t *blobs, int maxblobs)
{
char *inbuf = (char *)inbufp;
struct convert_failure reason;
char err[1024];
if (from_tag == NULL) from_tag = ".ONDISK";
int rc = stag_to_stag_buf_blobs(get_dbtable_by_name(db->table), from_tag, inbuf,
db->tag, db->recbuf, &reason, blobs, maxblobs, 1);
if (rc) {
convert_failure_reason_str(&reason, db->table, from_tag, db->tag, err,
sizeof(err));
sc_client_error(s, "cannot convert data %s", err);
return 1;
}
return 0;
}
/* free/cleanup all resources associated with convert_record_data */
void convert_record_data_cleanup(struct convert_record_data *data)
{
if (data->trans) {
trans_abort(&data->iq, data->trans);
data->trans = NULL;
}
if (data->dmp) {
bdb_dtadump_done(data->from->handle, data->dmp);
data->dmp = NULL;
}
free_blob_status_data(&data->blb);
free_blob_status_data(&data->blbcopy);
free_blob_buffers(data->freeblb,
sizeof(data->freeblb) / sizeof(data->freeblb[0]));
if (data->dta_buf) {
free(data->dta_buf);
data->dta_buf = NULL;
}
if (data->old_dta_buf) {
free(data->old_dta_buf);
data->old_dta_buf = NULL;
}
if (data->unpack_dta_buf) {
free(data->unpack_dta_buf);
data->unpack_dta_buf = NULL;
}
if (data->unpack_old_dta_buf) {
free(data->unpack_old_dta_buf);
data->unpack_old_dta_buf = NULL;
}
if (data->blb_buf) {
free(data->blb_buf);
data->blb_buf = NULL;
}
if (data->old_blb_buf) {
free(data->old_blb_buf);
data->old_blb_buf = NULL;
}
if (data->rec) {
free_db_record(data->rec);
data->rec = NULL;
}
}
static inline int convert_server_record(const void *inbufp,
const char *from_tag,
struct dbrecord *db,
struct schema_change_type *s)
{
return convert_server_record_blobs(inbufp, from_tag, db, s, NULL /*blobs*/,
0 /*maxblobs*/);
}
static void delay_sc_if_needed(struct convert_record_data *data,
db_seqnum_type *ss)
{
const int mult = 100;
static int inco_delay = 0; /* all stripes will see this */
int rc;
/* wait for replication on what we just committed */
if ((data->nrecs % data->num_records_per_trans) == 0) {
if ((rc = trans_wait_for_seqnum(&data->iq, gbl_myhostname, ss)) != 0) {
sc_errf(data->s, "delay_sc_if_needed: error waiting for replication rcode %d\n", rc);
} else if (gbl_sc_inco_chk) { /* committed successfully */
int num;
if ((num = bdb_get_num_notcoherent(thedb->bdb_env)) != 0) {
if (num > inco_delay) { /* only goes up, or resets to 0 */
inco_delay = num;
sc_printf(data->s, "%d incoherent nodes - throttle sc %dms\n",
num, inco_delay * mult);
}
} else if (inco_delay != 0) {
inco_delay = 0;
sc_printf(data->s, "0 incoherent nodes - pedal to the metal\n");
}
} else { /* no incoherent chk */
inco_delay = 0;
}
}
if (inco_delay)
poll(NULL, 0, inco_delay * mult);
/* if we're in commitdelay mode, magnify the delay by 5 here */
int delay = bdb_attr_get(data->from->dbenv->bdb_attr, BDB_ATTR_COMMITDELAY);
if (delay != 0)
poll(NULL, 0, delay * 5);
else if (BDB_ATTR_GET(thedb->bdb_attr, SC_FORCE_DELAY))
usleep(gbl_sc_usleep);
/* if sanc list is not ok, snooze for 100 ms */
if (!net_sanctioned_list_ok(data->from->dbenv->handle_sibling))
poll(NULL, 0, 100);
}
static int report_sc_progress(struct convert_record_data *data, int now)
{
int copy_sc_report_freq = gbl_sc_report_freq;
if (copy_sc_report_freq > 0 &&
now >= data->lasttime + copy_sc_report_freq) {
/* report progress to interested parties */
long long diff_nrecs = data->nrecs - data->prev_nrecs;
data->lasttime = now;
data->prev_nrecs = data->nrecs;
/* print thread specific stats */
sc_printf(data->s,
"[%s] progress stripe %d changed genids %u progress %lld"
" recs +%lld (%lld r/s)\n",
data->from->tablename, data->stripe, data->n_genids_changed,
data->nrecs, diff_nrecs, diff_nrecs / copy_sc_report_freq);
/* now do global sc data */
int res = print_aggregate_sc_stat(data, now, copy_sc_report_freq);
/* check headroom only if this thread printed the global stats */
if (res && check_sc_headroom(data->s, data->from, data->to)) {
if (data->s->force) {
sc_printf(data->s, "Proceeding despite low disk headroom\n");
} else {
return -1;
}
}
}
return 0;
}
static int prepare_and_verify_newdb_record(struct convert_record_data *data,
void *dta, int dtalen,
unsigned long long *dirty_keys,
int leakcheck)
{
int rc = 0;
int dta_needs_conversion = 1;
if ((!data->to->plan || !data->to->plan->dta_plan) &&
data->s->rebuild_index)
dta_needs_conversion = 0;
if (dta_needs_conversion) {
if (!data->s->force_rebuild &&
!data->s->use_old_blobs_on_rebuild) /* We have correct blob data in
this. */
bzero(data->wrblb, sizeof(data->wrblb));
/* convert current. this converts blob fields, but we need to make sure
* we add the right blobs separately. */
rc = convert_server_record_cachedmap(data->to, data->to->tablename, data->tagmap,
dta, data->rec->recbuf, data->s, data->from->schema,
data->to->schema, data->wrblb,
sizeof(data->wrblb) / sizeof(data->wrblb[0]));
if (rc) {
logmsg(LOGMSG_ERROR, "%s:%d failed to convert record rc=%d\n",
__func__, __LINE__, rc);
return -2;
}
/* TODO do the blobs returned by convert_server_record_blobs() need to
* be converted to client blobs? */
/* we are responsible for freeing any blob data that
* convert_server_record_blobs() returns to us with free_blob_buffers().
* if the plan calls for a full blob rebuild, data retrieved by
* bdb_fetch_blobs_by_rrn_and_genid() may be added into wrblb in the
* loop below, this blob data MUST be freed with free_blob_status_data()
* so we need to make a copy of what we have right now so we can free it
* seperately */
free_blob_buffers(data->freeblb,
sizeof(data->freeblb) / sizeof(data->freeblb[0]));
memcpy(data->freeblb, data->wrblb, sizeof(data->freeblb));
}
/* map old blobs to new blobs */
if (!data->s->force_rebuild && !data->s->use_old_blobs_on_rebuild &&
((gbl_partial_indexes && data->to->ix_partial) || data->to->ix_expr ||
!gbl_use_plan || !data->to->plan || !data->to->plan->plan_blobs)) {
if (!leakcheck)
bzero(data->wrblb, sizeof(data->wrblb));
for (int ii = 0; ii < data->to->numblobs; ii++) {
int fromblobix = data->toblobs2fromblobs[ii];
if (fromblobix >= 0 && data->blb.blobptrs[fromblobix] != NULL) {
if (data->wrblb[ii].data) {
/* this shouldn't happen because only bcstr to vutf8
* conversions should return any blob data from
* convert_server_record_blobs() and if we're createing a
* new vutf8 blob it should not have a fromblobix */
sc_errf(data->s,
"convert_record: attempted to "
"overwrite blob data retrieved from "
"convert_server_record_blobs() with data from "
"bdb_fetch_blobs_by_rrn_and_genid(). This would "
"leak memory and shouldn't ever happen. to blob %d "
"from blob %d\n",
ii, fromblobix);
return -2;
}
data->wrblb[ii].exists = 1;
data->wrblb[ii].data =
((char *)data->blb.blobptrs[fromblobix]) +
data->blb.bloboffs[fromblobix];
data->wrblb[ii].length = data->blb.bloblens[fromblobix];
data->wrblb[ii].collected = data->wrblb[ii].length;
}
}
}
/* Write record to destination table */
data->iq.usedb = data->to;
int rebuild = (data->to->plan && data->to->plan->dta_plan) ||
data->s->schema_change == SC_CONSTRAINT_CHANGE;
uint8_t *p_buf_data = data->rec->recbuf;
if (!dta_needs_conversion) {
p_buf_data = dta;
}
*dirty_keys = -1ULL;
if (gbl_partial_indexes && data->to->ix_partial) {
*dirty_keys =
verify_indexes(data->to, p_buf_data, data->wrblb, MAXBLOBS, 1);
if (*dirty_keys == -1ULL) {
rc = ERR_VERIFY_PI;
return rc;
}
}
assert(data->trans != NULL);
rc = verify_check_constraints(data->iq.usedb, p_buf_data, data->wrblb,
MAXBLOBS, 1);
if (rc < 0) {
logmsg(LOGMSG_DEBUG, "%s:%d internal error during CHECK constraint\n",
__func__, __LINE__);
return ERR_CONSTR;
} else if (rc > 0) {
logmsg(LOGMSG_DEBUG, "%s:%d CHECK constraint failed for '%s'\n",
__func__, __LINE__,
data->iq.usedb->check_constraints[rc - 1].consname);
return ERR_CONSTR;
}
rc = verify_record_constraint(&data->iq, data->to, data->trans, p_buf_data,
*dirty_keys, data->wrblb, MAXBLOBS,
".NEW..ONDISK", rebuild, 0);
if (rc)
return rc;
if (gbl_partial_indexes && data->to->ix_partial) {
rc = verify_partial_rev_constraint(data->from, data->to, data->trans,
p_buf_data, *dirty_keys,
".NEW..ONDISK");
if (rc)
return rc;
}
return 0;
}
int gbl_sc_logbytes_per_second = 4 * (40 * 1024 * 1024); // 4 logs/sec
static pthread_mutex_t sc_bps_lk = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t sc_bps_cd = PTHREAD_COND_INITIALIZER;
static int64_t sc_bytes_this_second;
static int sc_current_millisecond;
static void throttle_sc_logbytes(int estimate)
{
if (gbl_sc_logbytes_per_second == 0)
return;
Pthread_mutex_lock(&sc_bps_lk);
do
{
int now = comdb2_time_epochms();
if (sc_current_millisecond < now - 1000) {
sc_current_millisecond = now;
sc_bytes_this_second = 0;
}
if (gbl_sc_logbytes_per_second > 0 && sc_bytes_this_second > gbl_sc_logbytes_per_second) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 1;
pthread_cond_timedwait(&sc_bps_cd, &sc_bps_lk, &ts);
}
}
while ((gbl_sc_logbytes_per_second > 0) && (sc_bytes_this_second > gbl_sc_logbytes_per_second));
sc_bytes_this_second += estimate;
Pthread_mutex_unlock(&sc_bps_lk);
}
static void increment_sc_logbytes(int64_t bytes)
{
Pthread_mutex_lock(&sc_bps_lk);
int now = comdb2_time_epochms();
if (sc_current_millisecond < now - 1000) {
sc_current_millisecond = now;
sc_bytes_this_second = 0;
}
sc_bytes_this_second += bytes;
Pthread_mutex_unlock(&sc_bps_lk);
}
/* converts a single record and prepares for the next one
* should be called from a while loop
* param data: pointer to all the state information
* ret code: 1 means there are more records to convert
* 0 means all work successfully done
* <0 means there was a failure (-2 skips some cleanup steps)
*/
static int convert_record(struct convert_record_data *data)
{
int dtalen = 0, rc, rrn, opfailcode = 0, ixfailnum = 0;
unsigned long long genid, ngenid, check_genid;
int64_t logbytes = 0;
void *dta = NULL;
int no_wait_rowlock = 0;
int64_t estimate = 0;
printf("hello\n");
if (debug_switch_convert_record_sleep())
sleep(1000);
if (data->s->sc_thd_failed) {
if (!data->s->retry_bad_genids)
sc_errf(data->s,
"Stoping work on stripe %d because the thread for stripe %d failed\n",
data->stripe, data->s->sc_thd_failed - 1);
return -1;
}
if (gbl_sc_abort || data->from->sc_abort ||
(data->s->iq && data->s->iq->sc_should_abort)) {
sc_client_error(data->s, "Schema change aborted");
return -1;
}
if (tbl_had_writes(data)) {
/* NB: if we return here, writes could block SC forever, so lets not */
usleep(gbl_sc_usleep);
}
/* if master queue latency increased, slow down*/
if (gbl_altersc_latency && gbl_altersc_delay_usec > 0)
usleep(gbl_altersc_delay_usec);
if (data->trans == NULL) {
/* Schema-change writes are always page-lock, not rowlock */
throttle_sc_logbytes(0);
rc = trans_start_sc_lowpri(&data->iq, &data->trans);
if (rc) {
sc_errf(data->s, "Error %d starting transaction\n", rc);
return -2;
}
}
data->iq.debug = debug_this_request(gbl_debug_until);
Pthread_mutex_lock(&gbl_sc_lock);
if (gbl_who > 0) {
gbl_who--;
data->iq.debug = 1;
}
Pthread_mutex_unlock(&gbl_sc_lock);
/* Get record to convert. We support four scan modes:-
* - SCAN_STRIPES - DEPRECATED AND REMOVED:
* read one record at a time from one of the
* stripe files, in order. This is primarily to support
* live schema change.
* - SCAN_PARALLEL - start one thread for each stripe, the thread
* reads all the records in its stripe in order
* - SCAN_DUMP - bulk dump the data file(s). Fastest possible
* scan mode.
* - SCAN_INDEX - use regular ix_ routines to scan the primary
* key. Dog slow because it causes the data file scan to be
* in essentially random order so you get lots and lots of
* cache misses. However this is a good way to uncorrupt
* databases that were hit by the "oops, dtastripe didn't delete
* records" bug in the early days.
*/
data->iq.usedb = data->from;
data->iq.timeoutms = gbl_sc_timeoutms;
/* Make sure that we do not insert random bytes (data->dta_buf is malloc'd)
for inline field stored offline (e.g., vutf[100] but payload is longer than 100).
It is technically fine to insert random bytes, since the inline portion is ignored
for payload longer than the inline size. However inserting uniform bytes here
is going to help us achieve a better compression ratio and lower disk use. */
if (!gbl_debug_omit_zap_on_rebuild) { // test that field is zeroed out regardless (by default = 0)
memset(data->dta_buf, 0, data->from->lrl);
}
/* Make sure that we do not inherit inline data from previous row
(e.g., previous row has inline data, current row does not). See the comment above. */
if (!gbl_debug_omit_zap_on_rebuild) { // test that field is zeroed out regardless (by default = 0)
memset(data->rec->recbuf, 0, data->rec->bufsize);
}
if (data->scanmode == SCAN_PARALLEL || data->scanmode == SCAN_PAGEORDER) {
if (data->scanmode == SCAN_PARALLEL) {
rc = dtas_next(&data->iq, data->sc_genids, &genid, &data->stripe, 1,
data->dta_buf, data->trans, data->from->lrl, &dtalen,
NULL);
} else {
rc = dtas_next_pageorder(
&data->iq, data->sc_genids, &genid, &data->stripe, 1,
data->dta_buf, data->trans, data->from->lrl, &dtalen, NULL);
}
#ifdef LOGICAL_LIVESC_DEBUG
logmsg(LOGMSG_DEBUG, "(%u) %s rc=%d genid %llx (%llu)\n", (unsigned int)pthread_self(), __func__, rc, genid,
genid);
#endif
if (rc == 0) {
dta = data->dta_buf;
check_genid = bdb_normalise_genid(data->to->handle, genid);
/* Whatever be the case, leave the lock*/
if (check_genid != genid && !data->s->retry_bad_genids) {
logmsg(LOGMSG_ERROR,
"Have old-style genids in table, disabling plan\n");
data->s->retry_bad_genids = 1;
return -1;
}
if (gbl_rowlocks) {
rc = bdb_trylock_row_write(data->from->handle, data->trans,
genid);
if (rc) {
rc = RC_INTERNAL_RETRY;
no_wait_rowlock = 1;
goto err;
}
}
} else if (rc == 1) {
/* we have finished all the records in our stripe
* set pointer to -1 so all insert/update/deletes will be
* the the left of SC pointer. This works because we now hold
* a lock to the last page of the stripe.
*/
if (data->s->logical_livesc) {
data->s->sc_convert_done[data->stripe] = 1;
sc_printf(
data->s,
"[%s] finished converting stripe %d, last genid %llx\n",
data->from->tablename, data->stripe,
data->sc_genids[data->stripe]);
return 0;
}
// AZ: determine what locks we hold at this time
// bdb_dump_active_locks(data->to->handle, stdout);
data->sc_genids[data->stripe] = -1ULL;
if (debug_switch_scconvert_finish_delay()) {
logmsg(LOGMSG_WARN, "scgenid reset. sleeping 10 sec.\n");
sleep(10);
}
int usellmeta = 0;
if (!data->to->plan) {
usellmeta = 1; /* new dta does not have old genids */
} else if (data->to->plan->dta_plan) {
usellmeta = 0; /* the genid is in new dta */
} else {
usellmeta = 1; /* dta is not being built */
}
rc = 0;
if (usellmeta && !is_dta_being_rebuilt(data->to->plan)) {
int bdberr;
rc = bdb_set_high_genid_stripe(NULL, data->to->tablename,
data->stripe, -1ULL, &bdberr);
if (rc != 0) rc = -1; // convert_record expects -1
}
sc_printf(data->s,
"[%s] finished stripe %d, setting genid %llx, rc %d\n",
data->from->tablename, data->stripe,
data->sc_genids[data->stripe], rc);
return rc;
} else if (rc == RC_INTERNAL_RETRY) {
trans_abort(&data->iq, data->trans);
data->trans = NULL;
data->totnretries++;
if (data->cmembers->is_decrease_thrds)
decrease_max_threads(&data->cmembers->maxthreads);
else
poll(0, 0, (rand() % 500 + 10));
return 1;
} else if (rc != 0) {
sc_errf(data->s, "error %d reading database records\n", rc);
return -2;
}
rrn = 2;
} else if (data->scanmode == SCAN_DUMP) {
int bdberr;
uint8_t ver;
if (data->dmp == NULL) {
data->dmp = bdb_dtadump_start(data->from->handle, &bdberr, 0, 0);
if (data->dmp == NULL) {
sc_errf(data->s, "bdb_dtadump_start rc %d\n", bdberr);
return -1;
}
}
rc = bdb_dtadump_next(data->from->handle, data->dmp, &dta, &dtalen,
&rrn, &genid, &ver, &bdberr);
vtag_to_ondisk(data->iq.usedb, dta, &dtalen, ver, genid);
if (rc == 1) {
/* no more records - success! */
return 0;
} else if (rc != 0) {
sc_errf(data->s, "bdb error %d reading database records\n", bdberr);
return -2;
}
check_genid = bdb_normalise_genid(data->to->handle, genid);
if (check_genid != genid && !data->s->retry_bad_genids) {
logmsg(LOGMSG_ERROR,
"Have old-style genids in table, disabling plan\n");
data->s->retry_bad_genids = 1;
return -1;
}
} else if (data->scanmode == SCAN_INDEX) {
if (data->nrecs == 0) {
bzero(data->lastkey, MAXKEYLEN);
rc = ix_find(&data->iq, 0 /*ixnum*/, data->lastkey, 0 /*keylen*/,
data->curkey, &rrn, &genid, data->dta_buf, &dtalen,
data->from->lrl);
} else {
char *tmp = data->curkey;
data->curkey = data->lastkey;
data->lastkey = tmp;
rc = ix_next(&data->iq, 0 /*ixnum*/, data->lastkey, 0 /*keylen*/,
data->lastkey, data->lastrrn, data->lastgenid,
data->curkey, &rrn, &genid, data->dta_buf, &dtalen,
data->from->lrl, 0 /*context - 0 means don't care*/);
}
if (rc == IX_FND || rc == IX_FNDMORE) {
/* record found */
data->lastrrn = rrn;
data->lastgenid = genid;
dta = data->dta_buf;
check_genid = bdb_normalise_genid(data->to->handle, genid);
if (check_genid != genid && !data->s->retry_bad_genids) {
logmsg(LOGMSG_ERROR,
"Have old-style genids in table, disabling plan\n");
data->s->retry_bad_genids = 1;
return -1;
}
} else if (rc == IX_NOTFND || rc == IX_PASTEOF || rc == IX_EMPTY) {
/* no more records - success! */
return 0;
} else {
sc_errf(data->s, "ix_find/ix_next error rcode %d\n", rc);
return -2;
}
} else {
sc_errf(data->s, "internal error - bad scan mode!\n");
return -2;
}
/* Report wrongly sized records */
if (dtalen != data->from->lrl) {
sc_errf(data->s, "invalid record size for rrn %d genid 0x%llx (%d bytes"
" but expected %d)\n",
rrn, genid, dtalen, data->from->lrl);
return -2;
}
/* Read associated blobs. We usually don't need to do this in
* planned schema change (since blobs aren't convertible the btrees
* don't change.. unless we're changing compression options. */
if (data->from->numblobs != 0 &&
((gbl_partial_indexes && data->to->ix_partial) || data->to->ix_expr ||
!gbl_use_plan || !data->to->plan || !data->to->plan->plan_blobs ||
data->s->force_rebuild || data->s->use_old_blobs_on_rebuild)) {
int bdberr;
free_blob_status_data(&data->blb);
bdb_fetch_args_t args = {0};
bzero(data->wrblb, sizeof(data->wrblb));
int blobrc;
memcpy(&data->blb, &data->blbcopy, sizeof(data->blb));
blobrc = bdb_fetch_blobs_by_rrn_and_genid_tran(
data->from->handle, data->trans, rrn, genid, data->from->numblobs,
data->blobix, data->blb.bloblens, data->blb.bloboffs,
(void **)data->blb.blobptrs, &args, &bdberr);
if (blobrc != 0 && bdberr == BDBERR_DEADLOCK) {
trans_abort(&data->iq, data->trans);
data->trans = NULL;
data->totnretries++;
if (data->cmembers->is_decrease_thrds)
decrease_max_threads(&data->cmembers->maxthreads);
else
poll(0, 0, (rand() % 500 + 10));
return 1;
}
if (blobrc != 0) {
sc_errf(data->s, "convert_record: "
"bdb_fetch_blobs_by_rrn_and_genid bdberr %d\n",
bdberr);
return -2;
}
rc = check_and_repair_blob_consistency(
&data->iq, data->iq.usedb, ".ONDISK", &data->blb, dta);
if (data->s->force_rebuild || data->s->use_old_blobs_on_rebuild) {
for (int ii = 0; ii < data->from->numblobs; ii++) {
if (data->blb.blobptrs[ii] != NULL) {
data->wrblb[ii].exists = 1;
data->wrblb[ii].data = malloc(data->blb.bloblens[ii]);
memcpy(data->wrblb[ii].data,
((char *)data->blb.blobptrs[ii]) +
data->blb.bloboffs[ii],
data->blb.bloblens[ii]);
data->wrblb[ii].length = data->blb.bloblens[ii];
data->wrblb[ii].collected = data->wrblb[ii].length;
}
}
}
if (rc != 0) {
sc_errf(data->s,
"unexpected blob inconsistency rc %d, rrn %d, genid "
"0x%llx\n",
rc, rrn, genid);
free_blob_status_data(&data->blb);
return -2;
}
}
int usellmeta = 0;
if (!data->to->plan) {
usellmeta = 1; /* new dta does not have old genids */
} else if (data->to->plan->dta_plan) {
usellmeta = 0; /* the genid is in new dta */
} else {
usellmeta = 1; /* dta is not being built */
}
int dta_needs_conversion = 1;
if (usellmeta && data->s->rebuild_index)
dta_needs_conversion = 0;
/* Write record to destination table */
data->iq.usedb = data->to;
unsigned long long dirty_keys = -1ULL;
if (data->s->use_new_genids) {
assert(!gbl_use_plan);
ngenid = get_genid(thedb->bdb_env, get_dtafile_from_genid(check_genid));
} else {
ngenid = check_genid;
}
if (ngenid != genid)
data->n_genids_changed++;
rc = prepare_and_verify_newdb_record(data, dta, dtalen, &dirty_keys, 1);
if (rc) {
sc_errf(data->s,
"failed to prepare and verify newdb record rc %d, rrn %d, genid 0x%llx\n",
rc, rrn, genid);
if (rc == -2)
return -2; /* convertion failure */
goto err;
}
int addflags = RECFLAGS_NO_TRIGGERS | RECFLAGS_NO_CONSTRAINTS |
RECFLAGS_NEW_SCHEMA | RECFLAGS_KEEP_GENID;
if (data->to->plan && gbl_use_plan) addflags |= RECFLAGS_NO_BLOBS;
char *tagname = ".NEW..ONDISK";
uint8_t *p_tagname_buf = (uint8_t *)tagname;
uint8_t *p_tagname_buf_end = p_tagname_buf + 12;
uint8_t *p_buf_data = data->rec->recbuf;
uint8_t *p_buf_data_end = p_buf_data + data->rec->bufsize;
estimate = data->rec->bufsize;
if (!dta_needs_conversion) {
p_buf_data = dta;
p_buf_data_end = p_buf_data + dtalen;
}