1717
1818package org .apache .nifi .cluster .coordination .flow ;
1919
20- import org .apache .commons .codec .digest .DigestUtils ;
2120import org .apache .nifi .cluster .protocol .DataFlow ;
2221import org .apache .nifi .cluster .protocol .NodeIdentifier ;
2322import org .apache .nifi .controller .serialization .VersionedFlowSynchronizer ;
23+ import org .apache .nifi .util .security .MessageDigestUtils ;
2424import org .slf4j .Logger ;
2525import org .slf4j .LoggerFactory ;
2626
2727import java .nio .charset .StandardCharsets ;
28+ import java .time .Duration ;
2829import java .util .Collections ;
2930import java .util .HashMap ;
3031import java .util .HashSet ;
32+ import java .util .HexFormat ;
3133import java .util .List ;
3234import java .util .Map ;
3335import java .util .Set ;
3436import java .util .concurrent .TimeUnit ;
3537import java .util .concurrent .atomic .AtomicInteger ;
36- import java .util .stream .Collectors ;
3738
3839/**
3940 * <p>
@@ -82,16 +83,16 @@ public synchronized boolean isElectionComplete() {
8283 final long nanosSinceStart = System .nanoTime () - startNanos ;
8384 if (nanosSinceStart > maxWaitNanos ) {
8485 final FlowCandidate elected = performElection ();
85- logger . info ( "Election is complete because the maximum allowed time has elapsed. "
86- + "The elected dataflow is held by the following nodes: {}" , elected . getNodes () );
86+ final Set < NodeIdentifier > nodes = elected == null ? Set . of () : elected . getNodes ();
87+ logger . info ( "Election completed after maximum duration [{} seconds] with Nodes {}" , Duration . ofNanos ( maxWaitNanos ). toSeconds (), nodes );
8788
8889 return true ;
8990 } else if (maxNodes != null ) {
9091 final int numVotes = getVoteCount ();
9192 if (numVotes >= maxNodes ) {
9293 final FlowCandidate elected = performElection ();
93- logger . info ( "Election is complete because the required number of nodes ({}) have voted. "
94- + "The elected dataflow is held by the following nodes: {}" , maxNodes , elected . getNodes () );
94+ final Set < NodeIdentifier > nodes = elected == null ? Set . of () : elected . getNodes ();
95+ logger . info ( "Election completed after maximum votes [{}] with Nodes {}" , maxNodes , nodes );
9596
9697 return true ;
9798 }
@@ -107,7 +108,7 @@ public boolean isVoteCounted(final NodeIdentifier nodeIdentifier) {
107108 }
108109
109110 private synchronized int getVoteCount () {
110- return candidateByFingerprint .values ().stream ().mapToInt (candidate -> candidate . getVotes () ).sum ();
111+ return candidateByFingerprint .values ().stream ().mapToInt (FlowCandidate :: getVotes ).sum ();
111112 }
112113
113114 @ Override
@@ -136,11 +137,11 @@ public synchronized DataFlow castVote(final DataFlow candidate, final NodeIdenti
136137 }
137138
138139 private String fingerprint (final DataFlow dataFlow ) {
139- final String flowFingerprint = DigestUtils .sha256Hex (dataFlow .getFlow ());
140+ final byte [] flowDigest = MessageDigestUtils .getDigest (dataFlow .getFlow ());
141+ final String flowFingerprint = HexFormat .of ().formatHex (flowDigest );
140142 final String authFingerprint = dataFlow .getAuthorizerFingerprint () == null ? "" : new String (dataFlow .getAuthorizerFingerprint (), StandardCharsets .UTF_8 );
141- final String candidateFingerprint = flowFingerprint + authFingerprint ;
142143
143- return candidateFingerprint ;
144+ return flowFingerprint + authFingerprint ;
144145 }
145146
146147 @ Override
@@ -155,7 +156,7 @@ private FlowCandidate performElection() {
155156
156157 final List <FlowCandidate > nonEmptyCandidates = candidateByFingerprint .values ().stream ()
157158 .filter (candidate -> !candidate .isFlowEmpty ())
158- .collect ( Collectors . toList () );
159+ .toList ();
159160
160161 if (nonEmptyCandidates .isEmpty ()) {
161162 // All flow candidates are empty flows. Just use one of them.
@@ -167,7 +168,7 @@ private FlowCandidate performElection() {
167168 final FlowCandidate elected ;
168169 if (nonEmptyCandidates .size () == 1 ) {
169170 // Only one flow is non-empty. Use that one.
170- elected = nonEmptyCandidates .iterator (). next ();
171+ elected = nonEmptyCandidates .getFirst ();
171172 } else {
172173 // Choose the non-empty flow that got the most votes.
173174 elected = nonEmptyCandidates .stream ()
@@ -196,7 +197,7 @@ public synchronized String getStatusDescription() {
196197 }
197198
198199 if (maxNodes != null ) {
199- final int votesNeeded = maxNodes . intValue () - getVoteCount ();
200+ final int votesNeeded = maxNodes - getVoteCount ();
200201 descriptionBuilder .append (" or after " ).append (votesNeeded ).append (" more vote" );
201202 descriptionBuilder .append (votesNeeded == 1 ? " is " : "s are " );
202203 descriptionBuilder .append ("cast, whichever occurs first." );
0 commit comments