@@ -25,16 +25,21 @@ fcsm::fcsm(struct nfs_client *_client,
25
25
fcsm::fctgt::fctgt (struct fcsm *fcsm,
26
26
uint64_t _flush_seq,
27
27
uint64_t _commit_seq,
28
- struct rpc_task *_task) :
28
+ struct rpc_task *_task,
29
+ std::atomic<bool > *_cv) :
29
30
flush_seq (_flush_seq),
30
31
commit_seq (_commit_seq),
31
32
task (_task),
32
- fcsm (fcsm)
33
+ fcsm (fcsm),
34
+ conditional_variable (_cv)
33
35
{
34
36
assert (fcsm->magic == FCSM_MAGIC);
35
37
// At least one of flush/commit goals must be set.
36
38
assert ((flush_seq != 0 ) || (commit_seq != 0 ));
37
39
40
+ // If conditional variable, it's initial value should be false.
41
+ assert (!conditional_variable || *conditional_variable == false );
42
+
38
43
if (task) {
39
44
// Only frontend write tasks must be specified.
40
45
assert (task->magic == RPC_TASK_MAGIC);
@@ -43,7 +48,7 @@ fcsm::fctgt::fctgt(struct fcsm *fcsm,
43
48
assert (task->rpc_api ->write_task .get_size () > 0 );
44
49
}
45
50
46
- AZLogInfo (" [{}] [FCSM] {} fctgt queued (F: {}, C: {}, T: {})" ,
51
+ AZLogDebug (" [{}] [FCSM] {} fctgt queued (F: {}, C: {}, T: {})" ,
47
52
fcsm->get_inode ()->get_fuse_ino (),
48
53
task ? " Blocking" : " Non-blocking" ,
49
54
flush_seq,
@@ -335,10 +340,13 @@ void fcsm::ctgtq_cleanup()
335
340
336
341
void fcsm::ensure_commit (uint64_t write_off,
337
342
uint64_t write_len,
338
- struct rpc_task *task)
343
+ struct rpc_task *task,
344
+ std::atomic<bool > *conditional_variable,
345
+ bool commit_full)
339
346
{
340
347
assert (inode->is_flushing );
341
348
assert (!inode->is_stable_write ());
349
+ assert (!commit_full || task == nullptr );
342
350
343
351
/*
344
352
* If any of the flush/commit targets are waiting completion, state machine
@@ -377,13 +385,23 @@ void fcsm::ensure_commit(uint64_t write_off,
377
385
uint64_t commit_bytes =
378
386
inode->get_filecache ()->get_bytes_to_commit ();
379
387
388
+ /*
389
+ * If commit_full flag is true, wait_for_ongoing_flush()
390
+ * committed the commit_pending bytes. Now we need to flush
391
+ * dirty bytes and commit them.
392
+ */
393
+ if (commit_full) {
394
+ assert (commit_bytes == 0 );
395
+ commit_bytes = inode->get_filecache ()->get_bytes_to_flush ();
396
+ }
397
+
380
398
if (commit_bytes == 0 ) {
381
399
/*
382
400
* TODO: Make sure this doesn't result in small-blocks being written.
383
401
*/
384
- const int64_t bytes =
385
- inode->get_filecache ()->get_bytes_to_flush () -
386
- inode-> get_filecache ()-> max_dirty_extent_bytes ();
402
+ const int64_t bytes = (inode-> get_filecache ()-> get_bytes_to_flush () -
403
+ inode->get_filecache ()->max_dirty_extent_bytes ());
404
+
387
405
commit_bytes = std::max (bytes, (int64_t ) 0 );
388
406
}
389
407
@@ -395,6 +413,9 @@ void fcsm::ensure_commit(uint64_t write_off,
395
413
if (task) {
396
414
task->reply_write (task->rpc_api ->write_task .get_size ());
397
415
}
416
+ if (conditional_variable) {
417
+ *conditional_variable = true ;
418
+ }
398
419
return ;
399
420
}
400
421
@@ -427,7 +448,8 @@ void fcsm::ensure_commit(uint64_t write_off,
427
448
ctgtq.emplace (this ,
428
449
0 /* target flush_seq */ ,
429
450
target_committed_seq_num /* target commit_seq */ ,
430
- task);
451
+ task,
452
+ conditional_variable);
431
453
return ;
432
454
}
433
455
@@ -467,11 +489,21 @@ void fcsm::ensure_commit(uint64_t write_off,
467
489
ctgtq.emplace (this ,
468
490
0 /* target flush_seq */ ,
469
491
target_committed_seq_num /* target commit_seq */ ,
470
- task);
492
+ task,
493
+ conditional_variable);
471
494
} else {
472
495
if (task) {
473
496
task->reply_write (task->rpc_api ->write_task .get_size ());
474
497
}
498
+
499
+ /*
500
+ * Flush_cache_and_wait() waiting for dirty_bytes to flushed,
501
+ * can't complete until all dirty bytes flushed. so add the
502
+ * flush target.
503
+ */
504
+ if (commit_full) {
505
+ ensure_flush (0 , 0 , nullptr , conditional_variable);
506
+ }
475
507
}
476
508
477
509
return ;
@@ -504,6 +536,17 @@ void fcsm::ensure_commit(uint64_t write_off,
504
536
505
537
assert (committing_seq_num == (prev_committing_seq_num + bytes));
506
538
assert (committing_seq_num > committed_seq_num);
539
+
540
+ /*
541
+ * Enqueue this target, on_commit_callback() completes
542
+ * this target. Otherwise task/conditional_variable
543
+ * waiting for this to complete never called.
544
+ */
545
+ ctgtq.emplace (this ,
546
+ 0 /* target flush_seq */ ,
547
+ target_committed_seq_num /* target commit_seq */ ,
548
+ task,
549
+ conditional_variable);
507
550
}
508
551
}
509
552
@@ -512,7 +555,8 @@ void fcsm::ensure_commit(uint64_t write_off,
512
555
*/
513
556
void fcsm::ensure_flush (uint64_t write_off,
514
557
uint64_t write_len,
515
- struct rpc_task *task)
558
+ struct rpc_task *task,
559
+ std::atomic<bool > *conditional_variable)
516
560
{
517
561
assert (inode->is_flushing );
518
562
/*
@@ -595,14 +639,16 @@ void fcsm::ensure_flush(uint64_t write_off,
595
639
/*
596
640
* If no task and no new flush target, don't add a dup target.
597
641
*/
598
- if (!task && (target_flushed_seq_num == last_flush_seq)) {
642
+ if (!task && conditional_variable &&
643
+ (target_flushed_seq_num == last_flush_seq)) {
599
644
return ;
600
645
}
601
646
602
647
ftgtq.emplace (this ,
603
648
target_flushed_seq_num /* target flush_seq */ ,
604
649
0 /* commit_seq */ ,
605
- task);
650
+ task,
651
+ conditional_variable);
606
652
return ;
607
653
}
608
654
@@ -617,6 +663,10 @@ void fcsm::ensure_flush(uint64_t write_off,
617
663
if (task) {
618
664
task->reply_write (task->rpc_api ->write_task .get_size ());
619
665
}
666
+
667
+ if (conditional_variable) {
668
+ *conditional_variable = true ;
669
+ }
620
670
return ;
621
671
}
622
672
@@ -653,12 +703,26 @@ void fcsm::ensure_flush(uint64_t write_off,
653
703
const uint64_t flushing_seq_num_before = flushing_seq_num;
654
704
assert (flushed_seq_num <= flushing_seq_num);
655
705
656
- // sync_membufs() will update flushing_seq_num() and mark fcsm running.
657
- inode->sync_membufs (bc_vec, false /* is_flush */ , task);
706
+ /*
707
+ * sync_membufs() will update flushing_seq_num() and mark fcsm running.
708
+ * Task is not passed to sync_membufs, but enqueued to ftgtq.
709
+ */
710
+ inode->sync_membufs (bc_vec, false /* is_flush */ , nullptr );
658
711
659
712
assert (is_running ());
660
713
assert (flushing_seq_num == (flushing_seq_num_before + bytes));
661
714
assert (flushed_seq_num <= flushing_seq_num);
715
+
716
+ /*
717
+ * Enqueue this target, on_flush_complete() completes
718
+ * this target. Otherwise task/conditional_variable
719
+ * waiting for this to complete never called.
720
+ */
721
+ ftgtq.emplace (this ,
722
+ target_flushed_seq_num /* target flush_seq */ ,
723
+ 0 /* commit_seq */ ,
724
+ task,
725
+ conditional_variable);
662
726
}
663
727
664
728
/* *
@@ -767,6 +831,9 @@ void fcsm::on_commit_complete(uint64_t commit_bytes)
767
831
768
832
tgt.task ->reply_write (
769
833
tgt.task ->rpc_api ->write_task .get_size ());
834
+ } else if (tgt.conditional_variable ) {
835
+ assert (*tgt.conditional_variable == false );
836
+ *tgt.conditional_variable = true ;
770
837
} else {
771
838
AZLogDebug (" [{}] [FCSM] completing non-blocking commit target: {}, "
772
839
" committed_seq_num: {}" ,
@@ -983,6 +1050,9 @@ void fcsm::on_flush_complete(uint64_t flush_bytes)
983
1050
984
1051
tgt.task ->reply_write (
985
1052
tgt.task ->rpc_api ->write_task .get_size ());
1053
+ } else if (tgt.conditional_variable ) {
1054
+ assert (*tgt.conditional_variable == false );
1055
+ *tgt.conditional_variable = true ;
986
1056
} else {
987
1057
AZLogDebug (" [{}] [FCSM] completing non-blocking flush target: {}, "
988
1058
" flushed_seq_num: {}" ,
0 commit comments