@@ -45,51 +45,51 @@ class ReviveManager {
4545 private final int batchSize ;
4646 ShuffleClientImpl shuffleClient ;
4747 private ScheduledExecutorService batchReviveRequestScheduler =
48- ThreadUtils .newDaemonSingleThreadScheduledExecutor ("batch-revive-scheduler" );
48+ ThreadUtils .newDaemonSingleThreadScheduledExecutor ("batch-revive-scheduler" );
4949 private ThreadPoolExecutor batchReviveRequestHandler =
50- ThreadUtils .newDaemonFixedThreadPool (2 , "batch-revive-handler" );
50+ ThreadUtils .newDaemonFixedThreadPool (2 , "batch-revive-handler" );
5151 private ThreadPoolExecutor batchReportRequestHandler =
52- ThreadUtils .newDaemonFixedThreadPool (2 , "batch-report-handler" );
52+ ThreadUtils .newDaemonFixedThreadPool (2 , "batch-report-handler" );
5353
5454 public ReviveManager (ShuffleClientImpl shuffleClient , CelebornConf conf ) {
5555 this .shuffleClient = shuffleClient ;
5656 this .interval = conf .clientPushReviveInterval ();
5757 this .batchSize = conf .clientPushReviveBatchSize ();
5858
5959 batchReviveRequestScheduler .scheduleWithFixedDelay (
60- () -> {
61- try {
62- Map <Integer , Set <ReviveRequest >> urgentMap = new HashMap <>();
63- Map <Integer , Set <ReviveRequest >> nonUrgentMap = new HashMap <>();
64- do {
65- ArrayList <ReviveRequest > batchRequests = new ArrayList <>();
66- requestQueue .drainTo (batchRequests , batchSize );
67- for (ReviveRequest req : batchRequests ) {
68- Set <ReviveRequest > set = null ;
69- if (req .urgent ) {
70- set = urgentMap .computeIfAbsent (req .shuffleId , id -> new HashSet <>());
71- } else {
72- set = nonUrgentMap .computeIfAbsent (req .shuffleId , id -> new HashSet <>());
73- }
74- set .add (req );
75- }
76- if (!urgentMap .isEmpty ()) {
77- reviveInternal (urgentMap , true );
78- }
79- if (!nonUrgentMap .isEmpty ()) {
80- reviveInternal (nonUrgentMap , false );
81- }
82- // break the loop if remaining requests is less than half of
83- // `celeborn.client.push.revive.batchSize`
84- } while (requestQueue .size () > batchSize / 2 );
85- } catch (Throwable e ) {
86- logger .error ("Exception when batchRevive: " , e );
87- throw e ;
60+ () -> {
61+ try {
62+ Map <Integer , Set <ReviveRequest >> urgentMap = new HashMap <>();
63+ Map <Integer , Set <ReviveRequest >> nonUrgentMap = new HashMap <>();
64+ do {
65+ ArrayList <ReviveRequest > batchRequests = new ArrayList <>();
66+ requestQueue .drainTo (batchRequests , batchSize );
67+ for (ReviveRequest req : batchRequests ) {
68+ Set <ReviveRequest > set = null ;
69+ if (req .urgent ) {
70+ set = urgentMap .computeIfAbsent (req .shuffleId , id -> new HashSet <>());
71+ } else {
72+ set = nonUrgentMap .computeIfAbsent (req .shuffleId , id -> new HashSet <>());
73+ }
74+ set .add (req );
75+ }
76+ if (!urgentMap .isEmpty ()) {
77+ reviveInternal (urgentMap , true );
78+ }
79+ if (!nonUrgentMap .isEmpty ()) {
80+ reviveInternal (nonUrgentMap , false );
8881 }
89- },
90- interval ,
91- interval ,
92- TimeUnit .MILLISECONDS );
82+ // break the loop if remaining requests is less than half of
83+ // `celeborn.client.push.revive.batchSize`
84+ } while (requestQueue .size () > batchSize / 2 );
85+ } catch (Throwable e ) {
86+ logger .error ("Exception when batchRevive: " , e );
87+ throw e ;
88+ }
89+ },
90+ interval ,
91+ interval ,
92+ TimeUnit .MILLISECONDS );
9393 }
9494
9595 public void reviveInternal (Map <Integer , Set <ReviveRequest >> shuffleMap , boolean urgent ) {
@@ -111,48 +111,51 @@ public void processRequests(int shuffleId, Collection<ReviveRequest> requests, b
111111 Iterator <ReviveRequest > iter = requests .iterator ();
112112 while (iter .hasNext ()) {
113113 ReviveRequest req = iter .next ();
114- if ((urgent && shuffleClient .newerPartitionLocationExists (shuffleId , req .partitionId , req .clientMaxEpoch ))
115- || shuffleClient .mapperEnded (shuffleId , req .mapId )) {
114+ if ((urgent
115+ && shuffleClient .newerPartitionLocationExists (
116+ shuffleId , req .partitionId , req .clientMaxEpoch ))
117+ || shuffleClient .mapperEnded (shuffleId , req .mapId )) {
116118 req .reviveStatus = StatusCode .SUCCESS .getValue ();
117119 } else {
118120 filteredRequests .add (req );
119121 mapIds .add (req .mapId );
120122 if (!requestsToSend .containsKey (req .partitionId )
121- || requestsToSend .get (req .partitionId ).clientMaxEpoch < req .clientMaxEpoch ) {
123+ || requestsToSend .get (req .partitionId ).clientMaxEpoch < req .clientMaxEpoch ) {
122124 requestsToSend .put (req .partitionId , req );
123125 }
124126 }
125127 }
126128
127129 ThreadPoolExecutor handler = urgent ? batchReviveRequestHandler : batchReportRequestHandler ;
128130 if (!requestsToSend .isEmpty ()) {
129- handler .submit (() -> {
130- try {
131- // Call reviveBatch. Return null means Exception caught or
132- // SHUFFLE_NOT_REGISTERED
133- //Do not use WriterTracerHere because traceInfo is set afterward
134- long reviveStartTime = System .nanoTime ();
135- Map <Integer , Integer > results =
131+ handler .submit (
132+ () -> {
133+ try {
134+ // Call reviveBatch. Return null means Exception caught or
135+ // SHUFFLE_NOT_REGISTERED
136+ // Do not use WriterTracerHere because traceInfo is set afterward
137+ long reviveStartTime = System .nanoTime ();
138+ Map <Integer , Integer > results =
136139 shuffleClient .reviveBatch (shuffleId , mapIds , requestsToSend .values (), urgent );
137- long reviveCostTime = System .nanoTime () - reviveStartTime ;
138- if (results == null ) {
139- for (ReviveRequest req : filteredRequests ) {
140- req .reviveStatus = StatusCode .REVIVE_FAILED .getValue ();
141- }
142- } else {
143- for (ReviveRequest req : filteredRequests ) {
144- if (shuffleClient .mapperEnded (shuffleId , req .mapId )) {
145- req .reviveStatus = StatusCode .SUCCESS .getValue ();
140+ long reviveCostTime = System .nanoTime () - reviveStartTime ;
141+ if (results == null ) {
142+ for (ReviveRequest req : filteredRequests ) {
143+ req .reviveStatus = StatusCode .REVIVE_FAILED .getValue ();
144+ }
146145 } else {
147- req .reviveStatus = results .get (req .partitionId );
146+ for (ReviveRequest req : filteredRequests ) {
147+ if (shuffleClient .mapperEnded (shuffleId , req .mapId )) {
148+ req .reviveStatus = StatusCode .SUCCESS .getValue ();
149+ } else {
150+ req .reviveStatus = results .get (req .partitionId );
151+ }
152+ }
148153 }
154+ } catch (Throwable e ) {
155+ logger .error ("Exception when processRequests: " , e );
156+ throw e ;
149157 }
150- }
151- } catch (Throwable e ) {
152- logger .error ("Exception when processRequests: " , e );
153- throw e ;
154- }
155- });
158+ });
156159 }
157160 }
158161
0 commit comments