Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-security</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,24 @@

package org.apache.nifi.cluster.coordination.flow;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.serialization.VersionedFlowSynchronizer;
import org.apache.nifi.util.security.MessageDigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* <p>
Expand Down Expand Up @@ -82,16 +83,16 @@ public synchronized boolean isElectionComplete() {
final long nanosSinceStart = System.nanoTime() - startNanos;
if (nanosSinceStart > maxWaitNanos) {
final FlowCandidate elected = performElection();
logger.info("Election is complete because the maximum allowed time has elapsed. "
+ "The elected dataflow is held by the following nodes: {}", elected.getNodes());
final Set<NodeIdentifier> nodes = elected == null ? Set.of() : elected.getNodes();
logger.info("Election completed after maximum duration [{} seconds] with Nodes {}", Duration.ofNanos(maxWaitNanos).toSeconds(), nodes);

return true;
} else if (maxNodes != null) {
final int numVotes = getVoteCount();
if (numVotes >= maxNodes) {
final FlowCandidate elected = performElection();
logger.info("Election is complete because the required number of nodes ({}) have voted. "
+ "The elected dataflow is held by the following nodes: {}", maxNodes, elected.getNodes());
final Set<NodeIdentifier> nodes = elected == null ? Set.of() : elected.getNodes();
logger.info("Election completed after maximum votes [{}] with Nodes {}", maxNodes, nodes);

return true;
}
Expand All @@ -107,7 +108,7 @@ public boolean isVoteCounted(final NodeIdentifier nodeIdentifier) {
}

private synchronized int getVoteCount() {
return candidateByFingerprint.values().stream().mapToInt(candidate -> candidate.getVotes()).sum();
return candidateByFingerprint.values().stream().mapToInt(FlowCandidate::getVotes).sum();
}

@Override
Expand Down Expand Up @@ -136,11 +137,11 @@ public synchronized DataFlow castVote(final DataFlow candidate, final NodeIdenti
}

private String fingerprint(final DataFlow dataFlow) {
final String flowFingerprint = DigestUtils.sha256Hex(dataFlow.getFlow());
final byte[] flowDigest = MessageDigestUtils.getDigest(dataFlow.getFlow());
final String flowFingerprint = HexFormat.of().formatHex(flowDigest);
final String authFingerprint = dataFlow.getAuthorizerFingerprint() == null ? "" : new String(dataFlow.getAuthorizerFingerprint(), StandardCharsets.UTF_8);
final String candidateFingerprint = flowFingerprint + authFingerprint;

return candidateFingerprint;
return flowFingerprint + authFingerprint;
}

@Override
Expand All @@ -155,7 +156,7 @@ private FlowCandidate performElection() {

final List<FlowCandidate> nonEmptyCandidates = candidateByFingerprint.values().stream()
.filter(candidate -> !candidate.isFlowEmpty())
.collect(Collectors.toList());
.toList();

if (nonEmptyCandidates.isEmpty()) {
// All flow candidates are empty flows. Just use one of them.
Expand All @@ -167,7 +168,7 @@ private FlowCandidate performElection() {
final FlowCandidate elected;
if (nonEmptyCandidates.size() == 1) {
// Only one flow is non-empty. Use that one.
elected = nonEmptyCandidates.iterator().next();
elected = nonEmptyCandidates.getFirst();
} else {
// Choose the non-empty flow that got the most votes.
elected = nonEmptyCandidates.stream()
Expand Down Expand Up @@ -196,7 +197,7 @@ public synchronized String getStatusDescription() {
}

if (maxNodes != null) {
final int votesNeeded = maxNodes.intValue() - getVoteCount();
final int votesNeeded = maxNodes - getVoteCount();
descriptionBuilder.append(" or after ").append(votesNeeded).append(" more vote");
descriptionBuilder.append(votesNeeded == 1 ? " is " : "s are ");
descriptionBuilder.append("cast, whichever occurs first.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,6 @@
<artifactId>spring-security-oauth2-jose</artifactId>
<scope>provided</scope> <!-- expected to be provided by parent classloader -->
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<scope>provided</scope> <!-- expected to be provided by parent classloader -->
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.nifi.web.api.dto;

import jakarta.ws.rs.WebApplicationException;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
Expand Down Expand Up @@ -178,6 +177,7 @@
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.security.MessageDigestUtils;
import org.apache.nifi.web.FlowModification;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.ResourceClaimDetailsDTO;
Expand Down Expand Up @@ -260,6 +260,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.text.Collator;
import java.text.NumberFormat;
import java.util.ArrayList;
Expand All @@ -270,6 +271,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.HexFormat;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -4117,7 +4119,7 @@ private JVMDiagnosticsSnapshotDTO createJvmDiagnosticsSnapshotDto(final FlowCont
final RepositoryUsageDTO usageDto = new RepositoryUsageDTO();
usageDto.setName(repoName);

usageDto.setFileStoreHash(DigestUtils.sha256Hex(flowController.getContentRepoFileStoreName(repoName)));
usageDto.setFileStoreHash(getDigest(flowController.getContentRepoFileStoreName(repoName)));
usageDto.setFreeSpace(FormatUtils.formatDataSize(usage.getFreeSpace()));
usageDto.setFreeSpaceBytes(usage.getFreeSpace());
usageDto.setTotalSpace(FormatUtils.formatDataSize(usage.getTotalSpace()));
Expand All @@ -4137,7 +4139,7 @@ private JVMDiagnosticsSnapshotDTO createJvmDiagnosticsSnapshotDto(final FlowCont
final RepositoryUsageDTO usageDto = new RepositoryUsageDTO();
usageDto.setName(repoName);

usageDto.setFileStoreHash(DigestUtils.sha256Hex(flowController.getProvenanceRepoFileStoreName(repoName)));
usageDto.setFileStoreHash(getDigest(flowController.getProvenanceRepoFileStoreName(repoName)));
usageDto.setFreeSpace(FormatUtils.formatDataSize(usage.getFreeSpace()));
usageDto.setFreeSpaceBytes(usage.getFreeSpace());
usageDto.setTotalSpace(FormatUtils.formatDataSize(usage.getTotalSpace()));
Expand All @@ -4156,7 +4158,7 @@ private JVMDiagnosticsSnapshotDTO createJvmDiagnosticsSnapshotDto(final FlowCont

flowFileRepoUsage.setName(repoName);

flowFileRepoUsage.setFileStoreHash(DigestUtils.sha256Hex(flowController.getFlowRepoFileStoreName()));
flowFileRepoUsage.setFileStoreHash(getDigest(flowController.getFlowRepoFileStoreName()));
flowFileRepoUsage.setFreeSpace(FormatUtils.formatDataSize(usage.getFreeSpace()));
flowFileRepoUsage.setFreeSpaceBytes(usage.getFreeSpace());
flowFileRepoUsage.setTotalSpace(FormatUtils.formatDataSize(usage.getTotalSpace()));
Expand Down Expand Up @@ -5209,4 +5211,10 @@ private ProcessingPerformanceStatusDTO createProcessingPerformanceStatusDTO(fina

return performanceStatusDTO;
}

private String getDigest(final String name) {
final byte[] bytes = name.getBytes(StandardCharsets.UTF_8);
final byte[] digest = MessageDigestUtils.getDigest(bytes);
return HexFormat.of().formatHex(digest);
}
}
Loading