@@ -525,59 +525,6 @@ let rec next ~__context =
525
525
else
526
526
rpc_of_events relevant
527
527
528
- type time = Xapi_database.Db_cache_types.Time .t
529
-
530
- type entry = {table : string ; obj : string ; time : time }
531
-
532
- type acc = {
533
- creates : entry list
534
- ; mods : entry list
535
- ; deletes : entry list
536
- ; last : time
537
- }
538
-
539
- let collect_events (subs , tables , last_generation ) acc table =
540
- let open Xapi_database in
541
- let open Db_cache_types in
542
- let table_value = TableSet. find table tables in
543
- let prepend_recent obj stat _ ({creates; mods; last; _} as entries ) =
544
- let Stat. {created; modified; deleted} = stat in
545
- if Subscription. object_matches subs table obj then
546
- let last = max last (max modified deleted) in
547
- let creates =
548
- if created > last_generation then
549
- {table; obj; time= created} :: creates
550
- else
551
- creates
552
- in
553
- let mods =
554
- if modified > last_generation && not (created > last_generation) then
555
- {table; obj; time= modified} :: mods
556
- else
557
- mods
558
- in
559
- {entries with creates; mods; last}
560
- else
561
- entries
562
- in
563
- let prepend_deleted obj stat ({deletes; last; _} as entries ) =
564
- let Stat. {created; modified; deleted} = stat in
565
- if Subscription. object_matches subs table obj then
566
- let last = max last (max modified deleted) in
567
- let deletes =
568
- if created < = last_generation then
569
- {table; obj; time= deleted} :: deletes
570
- else
571
- deletes
572
- in
573
- {entries with deletes; last}
574
- else
575
- entries
576
- in
577
- acc
578
- |> Table. fold_over_recent last_generation prepend_recent table_value
579
- |> Table. fold_over_deleted last_generation prepend_deleted table_value
580
-
581
528
let from_inner __context session subs from from_t timer batching =
582
529
let open Xapi_database in
583
530
let open From in
@@ -594,102 +541,159 @@ let from_inner __context session subs from from_t timer batching =
594
541
in
595
542
List. filter (fun table -> Subscription. table_matches subs table) all
596
543
in
544
+ let last_generation = ref from in
597
545
let last_msg_gen = ref from_t in
598
- let grab_range ~ since t =
546
+ let grab_range t =
599
547
let tableset = Db_cache_types.Database. tableset (Db_ref. get_database t) in
600
548
let msg_gen, messages =
601
549
if Subscription. table_matches subs " message" then
602
550
! Message. get_since_for_events ~__context ! last_msg_gen
603
551
else
604
552
(0L , [] )
605
553
in
606
- let events =
607
- let initial = {creates= [] ; mods= [] ; deletes= [] ; last= since} in
608
- let folder = collect_events (subs, tableset, since) in
609
- List. fold_left folder initial tables
610
- in
611
- (msg_gen, messages, tableset, events)
554
+ ( msg_gen
555
+ , messages
556
+ , tableset
557
+ , List. fold_left
558
+ (fun acc table ->
559
+ (* Fold over the live objects *)
560
+ let acc =
561
+ Db_cache_types.Table. fold_over_recent ! last_generation
562
+ (fun objref {Db_cache_types.Stat. created; modified; deleted} _
563
+ (creates , mods , deletes , last ) ->
564
+ if Subscription. object_matches subs table objref then
565
+ let last = max last (max modified deleted) in
566
+ (* mtime guaranteed to always be larger than ctime *)
567
+ ( ( if created > ! last_generation then
568
+ (table, objref, created) :: creates
569
+ else
570
+ creates
571
+ )
572
+ , ( if
573
+ modified > ! last_generation
574
+ && not (created > ! last_generation)
575
+ then
576
+ (table, objref, modified) :: mods
577
+ else
578
+ mods
579
+ )
580
+ , (* Only have a mod event if we don't have a created event *)
581
+ deletes
582
+ , last
583
+ )
584
+ else
585
+ (creates, mods, deletes, last)
586
+ )
587
+ (Db_cache_types.TableSet. find table tableset)
588
+ acc
589
+ in
590
+ (* Fold over the deleted objects *)
591
+ Db_cache_types.Table. fold_over_deleted ! last_generation
592
+ (fun objref {Db_cache_types.Stat. created; modified; deleted}
593
+ (creates , mods , deletes , last ) ->
594
+ if Subscription. object_matches subs table objref then
595
+ let last = max last (max modified deleted) in
596
+ (* mtime guaranteed to always be larger than ctime *)
597
+ if created > ! last_generation then
598
+ (creates, mods, deletes, last)
599
+ (* It was created and destroyed since the last update *)
600
+ else
601
+ (creates, mods, (table, objref, deleted) :: deletes, last)
602
+ (* It might have been modified, but we can't tell now *)
603
+ else
604
+ (creates, mods, deletes, last)
605
+ )
606
+ (Db_cache_types.TableSet. find table tableset)
607
+ acc
608
+ )
609
+ ([] , [] , [] , ! last_generation)
610
+ tables
611
+ )
612
612
in
613
613
(* Each event.from should have an independent subscription record *)
614
- let msg_gen, messages, tableset, events =
614
+ let msg_gen, messages, tableset, (creates, mods, deletes, last) =
615
615
with_call session subs (fun sub ->
616
616
let grab_nonempty_range =
617
- Throttle.Batching. with_recursive_loop batching @@ fun self since ->
618
- let result =
619
- Db_lock. with_lock (fun () -> grab_range ~since (Db_backend. make () ))
617
+ Throttle.Batching. with_recursive_loop batching @@ fun self arg ->
618
+ let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last))
619
+ as result
620
+ ) =
621
+ Db_lock. with_lock (fun () -> grab_range (Db_backend. make () ))
620
622
in
621
- let msg_gen, messages, _tables, events = result in
622
- let {creates; mods; deletes; last} = events in
623
623
if
624
624
creates = []
625
625
&& mods = []
626
626
&& deletes = []
627
627
&& messages = []
628
628
&& not (Clock.Timer. has_expired timer)
629
629
then (
630
- (* cur_id was bumped, but nothing relevent fell out of the database.
631
- Therefore the last ID the client got is equivalent to the current one. *)
630
+ last_generation := last ;
631
+ (* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
632
632
sub.cur_id < - last ;
633
+ (* last id the client got is equivalent to the current one *)
633
634
last_msg_gen := msg_gen ;
634
635
wait2 sub last timer ;
635
- (* The next iteration will fold over events starting after
636
- the last database event that matched a subscription. *)
637
- let next = last in
638
- (self [@ tailcall]) next
636
+ (self [@ tailcall]) arg
639
637
) else
640
638
result
641
639
in
642
- grab_nonempty_range from
640
+ grab_nonempty_range ()
643
641
)
644
642
in
645
- let {creates; mods; deletes; last} = events in
646
- let event_of op ?snapshot { table; obj; time} =
643
+ last_generation := last ;
644
+ let event_of op ?snapshot ( table , objref , time ) =
647
645
{
648
646
id= Int64. to_string time
649
647
; ts= " 0.0"
650
648
; ty= String. lowercase_ascii table
651
649
; op
652
- ; reference= obj
650
+ ; reference= objref
653
651
; snapshot
654
652
}
655
653
in
656
- let events_of ~kind ?(with_snapshot = true ) entries acc =
657
- let rec go events ({table; obj; time = _ } as entry ) =
658
- let snapshot =
654
+ let events =
655
+ List. fold_left
656
+ (fun acc x ->
657
+ let ev = event_of `del x in
658
+ if Subscription. event_matches subs ev then ev :: acc else acc
659
+ )
660
+ [] deletes
661
+ in
662
+ let events =
663
+ List. fold_left
664
+ (fun acc (table , objref , mtime ) ->
659
665
let serialiser = Eventgen. find_get_record table in
660
- if with_snapshot then
661
- serialiser ~__context ~self: obj ()
662
- else
663
- None
664
- in
665
- let event = event_of kind ?snapshot entry in
666
- if Subscription. event_matches subs event then
667
- event :: events
668
- else
669
- events
670
- in
671
- List. fold_left go acc entries
666
+ try
667
+ let xml = serialiser ~__context ~self: objref () in
668
+ let ev = event_of `_mod ?snapshot:xml (table, objref, mtime) in
669
+ if Subscription. event_matches subs ev then ev :: acc else acc
670
+ with _ -> acc
671
+ )
672
+ events mods
672
673
in
673
674
let events =
674
- [] (* Accumulate the events for objects stored in the database. *)
675
- |> events_of ~kind: `del ~with_snapshot: false deletes
676
- |> events_of ~kind: `_mod mods
677
- |> events_of ~kind: `add creates
675
+ List. fold_left
676
+ (fun acc (table , objref , ctime ) ->
677
+ let serialiser = Eventgen. find_get_record table in
678
+ try
679
+ let xml = serialiser ~__context ~self: objref () in
680
+ let ev = event_of `add ?snapshot:xml (table, objref, ctime) in
681
+ if Subscription. event_matches subs ev then ev :: acc else acc
682
+ with _ -> acc
683
+ )
684
+ events creates
678
685
in
679
686
let events =
680
- (* Messages require a special casing as their contents are not
681
- stored in the database. *)
682
687
List. fold_left
683
688
(fun acc mev ->
684
689
let event =
685
- let table = " message" in
686
690
match mev with
687
691
| Message. Create (_ref , message ) ->
688
692
event_of `add
689
693
?snapshot:(Some (API. rpc_of_message_t message))
690
- {table; obj = Ref. string_of _ref; time = 0L }
694
+ ( " message " , Ref. string_of _ref, 0L )
691
695
| Message. Del _ref ->
692
- event_of `del {table; obj = Ref. string_of _ref; time = 0L }
696
+ event_of `del ( " message " , Ref. string_of _ref, 0L )
693
697
in
694
698
event :: acc
695
699
)
0 commit comments