@@ -45,6 +45,7 @@ public class AttestationDutyLoader
45
45
scheduledDutiesFactory ;
46
46
private final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions ;
47
47
private final Spec spec ;
48
+ private final boolean useDvtEndpoint ;
48
49
49
50
public AttestationDutyLoader (
50
51
final ValidatorApiChannel validatorApiChannel ,
@@ -54,13 +55,15 @@ public AttestationDutyLoader(
54
55
final OwnedValidators validators ,
55
56
final ValidatorIndexProvider validatorIndexProvider ,
56
57
final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions ,
57
- final Spec spec ) {
58
+ final Spec spec ,
59
+ final boolean useDvtEndpoint ) {
58
60
super (validators , validatorIndexProvider );
59
61
this .validatorApiChannel = validatorApiChannel ;
60
62
this .forkProvider = forkProvider ;
61
63
this .scheduledDutiesFactory = scheduledDutiesFactory ;
62
64
this .beaconCommitteeSubscriptions = beaconCommitteeSubscriptions ;
63
65
this .spec = spec ;
66
+ this .useDvtEndpoint = useDvtEndpoint ;
64
67
}
65
68
66
69
@ Override
@@ -77,17 +80,27 @@ protected SafeFuture<Optional<AttesterDuties>> requestDuties(
77
80
final UInt64 epoch , final AttesterDuties duties ) {
78
81
final SlotBasedScheduledDuties <AttestationProductionDuty , AggregationDuty > scheduledDuties =
79
82
scheduledDutiesFactory .apply (duties .getDependentRoot ());
83
+
84
+ final Optional <DvtAttestationAggregations > dvtAttestationAggregationsForEpoch =
85
+ useDvtEndpoint
86
+ ? Optional .of (
87
+ new DvtAttestationAggregations (validatorApiChannel , duties .getDuties ().size ()))
88
+ : Optional .empty ();
89
+
80
90
return SafeFuture .allOf (
81
91
duties .getDuties ().stream ()
82
- .map (duty -> scheduleDuties (scheduledDuties , duty ))
92
+ .map (
93
+ duty ->
94
+ scheduleDuties (scheduledDuties , duty , dvtAttestationAggregationsForEpoch ))
83
95
.toArray (SafeFuture []::new ))
84
96
.<SlotBasedScheduledDuties <?, ?>>thenApply (__ -> scheduledDuties )
85
97
.alwaysRun (beaconCommitteeSubscriptions ::sendRequests );
86
98
}
87
99
88
100
private SafeFuture <Void > scheduleDuties (
89
101
final SlotBasedScheduledDuties <AttestationProductionDuty , AggregationDuty > scheduledDuties ,
90
- final AttesterDuty duty ) {
102
+ final AttesterDuty duty ,
103
+ final Optional <DvtAttestationAggregations > dvtAttestationAggregationLoader ) {
91
104
final Optional <Validator > maybeValidator = validators .getValidator (duty .getPublicKey ());
92
105
if (maybeValidator .isEmpty ()) {
93
106
return SafeFuture .COMPLETE ;
@@ -116,7 +129,8 @@ private SafeFuture<Void> scheduleDuties(
116
129
validator ,
117
130
duty .getSlot (),
118
131
aggregatorModulo ,
119
- unsignedAttestationFuture );
132
+ unsignedAttestationFuture ,
133
+ dvtAttestationAggregationLoader );
120
134
}
121
135
122
136
private SafeFuture <Optional <AttestationData >> scheduleAttestationProduction (
@@ -147,10 +161,19 @@ private SafeFuture<Void> scheduleAggregation(
147
161
final Validator validator ,
148
162
final UInt64 slot ,
149
163
final int aggregatorModulo ,
150
- final SafeFuture <Optional <AttestationData >> unsignedAttestationFuture ) {
164
+ final SafeFuture <Optional <AttestationData >> unsignedAttestationFuture ,
165
+ final Optional <DvtAttestationAggregations > dvtAttestationAggregation ) {
151
166
return forkProvider
152
167
.getForkInfo (slot )
153
168
.thenCompose (forkInfo -> validator .getSigner ().signAggregationSlot (slot , forkInfo ))
169
+ .thenCompose (
170
+ slotSignature ->
171
+ dvtAttestationAggregation
172
+ .map (
173
+ dvt ->
174
+ dvt .getCombinedSelectionProofFuture (
175
+ validatorIndex , slot , slotSignature ))
176
+ .orElse (SafeFuture .completedFuture (slotSignature )))
154
177
.thenAccept (
155
178
slotSignature -> {
156
179
final SpecVersion specVersion = spec .atSlot (slot );
0 commit comments