From 401e5a5a124dfa2dbf6257e0d7f30cb9976809b6 Mon Sep 17 00:00:00 2001 From: Ilya Verbitskiy Date: Fri, 5 Jun 2015 09:16:49 +0200 Subject: [PATCH] vertex statistics manager now handles all callbacks --- .../qosreporter/InputGateReporterManager.java | 57 +++-- .../OutputGateReporterManager.java | 35 ++- .../qosreporter/StreamTaskQosCoordinator.java | 189 +++++++------- .../listener/QosReportingListenerHelper.java | 242 ------------------ .../sampling/BernoulliSampler.java | 2 +- .../InputGateInterArrivalTimeSampler.java | 8 +- .../InputGateInterReadTimeSampler.java | 12 +- .../qosreporter/sampling/Sampler.java | 7 + .../vertex/AbstractVertexQosReporter.java | 21 +- .../vertex/CountingGateReporter.java | 29 +++ .../vertex/InputGateReceiveCounter.java | 20 -- .../vertex/OutputGateEmitStatistics.java | 17 -- .../qosreporter/vertex/ReadReadReporter.java | 12 +- .../ReadReadVertexQosReporterGroup.java | 21 +- .../qosreporter/vertex/ReadWriteReporter.java | 4 +- .../vertex/VertexConsumptionReporter.java | 3 +- .../vertex/VertexEmissionReporter.java | 2 +- .../vertex/VertexStatisticsReportManager.java | 104 ++++++-- 18 files changed, 315 insertions(+), 470 deletions(-) delete mode 100644 nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/listener/QosReportingListenerHelper.java rename nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/{vertex => sampling}/InputGateInterArrivalTimeSampler.java (88%) rename nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/{vertex => sampling}/InputGateInterReadTimeSampler.java (75%) create mode 100644 nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/Sampler.java create mode 100644 nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/CountingGateReporter.java delete mode 100644 nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/InputGateReceiveCounter.java delete mode 100644 nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/OutputGateEmitStatistics.java diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/InputGateReporterManager.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/InputGateReporterManager.java index 8117ec641..8a1f37df3 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/InputGateReporterManager.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/InputGateReporterManager.java @@ -1,12 +1,14 @@ package eu.stratosphere.nephele.streaming.taskmanager.qosreporter; +import eu.stratosphere.nephele.streaming.message.qosreport.EdgeLatency; +import eu.stratosphere.nephele.streaming.taskmanager.qosmodel.QosReporterID; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex.CountingGateReporter; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex.ReportTimer; + import java.util.Collections; import java.util.HashSet; import java.util.concurrent.CopyOnWriteArrayList; -import eu.stratosphere.nephele.streaming.message.qosreport.EdgeLatency; -import eu.stratosphere.nephele.streaming.taskmanager.qosmodel.QosReporterID; - /** * A instance of this class keeps track of and reports on the latencies of an * input gate's input channels. @@ -22,7 +24,7 @@ * @author Bjoern Lohrmann * */ -public class InputGateReporterManager { +public class InputGateReporterManager extends CountingGateReporter { /** * No need for a thread-safe set because it is only accessed in synchronized @@ -39,20 +41,20 @@ public class InputGateReporterManager { private QosReportForwarderThread reportForwarder; + private ReportTimer reportTimer; + private class EdgeLatencyReporter { public QosReporterID.Edge reporterID; - long timeOfNextReport; - long accumulatedLatency; int tagsReceived; - public void sendReportIfDue(long now) { - if (this.reportIsDue(now)) { + public void sendReportIfDue() { + if (this.reportIsDue()) { this.sendReport(); - this.reset(now); + this.reset(); } } @@ -63,14 +65,11 @@ private void sendReport() { .addToNextReport(channelLatency); } - public boolean reportIsDue(long now) { - return this.tagsReceived > 0 && now >= this.timeOfNextReport; + public boolean reportIsDue() { + return this.tagsReceived > 0; } - public void reset(long now) { - this.timeOfNextReport = now - + InputGateReporterManager.this.reportForwarder - .getConfigCenter().getAggregationInterval(); + public void reset() { this.accumulatedLatency = 0; this.tagsReceived = 0; } @@ -83,13 +82,21 @@ public void update(TimestampTag tag, long now) { } } + public InputGateReporterManager() { + } + public InputGateReporterManager(QosReportForwarderThread qosReporter, int noOfInputChannels) { + initReporter(qosReporter, noOfInputChannels); + } + public void initReporter(QosReportForwarderThread qosReporter, int noOfInputChannels) { this.reportForwarder = qosReporter; this.reportersByChannelIndexInRuntimeGate = new CopyOnWriteArrayList(); this.fillChannelLatenciesWithNulls(noOfInputChannels); this.reporters = new HashSet(); + this.reportTimer = new ReportTimer(this.reportForwarder.getConfigCenter().getAggregationInterval()); + setReporter(true); } private void fillChannelLatenciesWithNulls(int noOfInputChannels) { @@ -100,13 +107,26 @@ private void fillChannelLatenciesWithNulls(int noOfInputChannels) { public void reportLatencyIfNecessary(int channelIndex, TimestampTag timestampTag) { + long now = System.currentTimeMillis(); + EdgeLatencyReporter info = this.reportersByChannelIndexInRuntimeGate .get(channelIndex); if (info != null) { - long now = System.currentTimeMillis(); info.update(timestampTag, now); - info.sendReportIfDue(now); + } + + sendReportsIfDue(now); + } + + private void sendReportsIfDue(long now) { + if (reportTimer.reportIsDue()) { + for (EdgeLatencyReporter reporter : reportersByChannelIndexInRuntimeGate) { + if (reporter != null) { + reporter.sendReportIfDue(); + } + } + reportTimer.reset(now); } } @@ -114,7 +134,7 @@ public synchronized boolean containsReporter(QosReporterID.Edge reporterID) { return this.reporters.contains(reporterID); } - public synchronized void addEdgeQosReporterConfig( + public void addEdgeQosReporterConfig( int channelIndexInRuntimeGate, QosReporterID.Edge reporterID) { if (this.reporters.contains(reporterID)) { @@ -123,7 +143,6 @@ public synchronized void addEdgeQosReporterConfig( EdgeLatencyReporter info = new EdgeLatencyReporter(); info.reporterID = reporterID; - info.timeOfNextReport = System.currentTimeMillis(); info.accumulatedLatency = 0; info.tagsReceived = 0; this.reportersByChannelIndexInRuntimeGate.set( diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/OutputGateReporterManager.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/OutputGateReporterManager.java index 108d6e049..b0d9bc545 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/OutputGateReporterManager.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/OutputGateReporterManager.java @@ -4,6 +4,8 @@ import eu.stratosphere.nephele.streaming.taskmanager.qosmodel.QosReporterID; import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.edge.OutputBufferLifetimeSampler; import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.BernoulliSampleDesign; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex.CountingGateReporter; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex.ReportTimer; import eu.stratosphere.nephele.types.AbstractTaggableRecord; import java.util.Collections; @@ -25,7 +27,7 @@ * @author Bjoern Lohrmann * */ -public class OutputGateReporterManager { +public class OutputGateReporterManager extends CountingGateReporter { /** * No need for a thread-safe set because it is only accessed in synchronized @@ -43,6 +45,8 @@ public class OutputGateReporterManager { private QosReportForwarderThread reportForwarder; + private ReportTimer reportTimer; + public class OutputChannelChannelStatisticsReporter { private QosReporterID.Edge reporterID; @@ -157,16 +161,14 @@ public int getRecordsSinceLastTag() { } public void sendReportIfDue(long now) { - if (this.reportIsDue(now)) { + if (this.reportIsDue()) { this.sendReport(now); this.reset(now); } } - private boolean reportIsDue(long now) { - return now - this.timeOfLastReport > OutputGateReporterManager.this.reportForwarder - .getConfigCenter().getAggregationInterval() - && this.recordsEmittedSinceLastReport > 0 + private boolean reportIsDue() { + return this.recordsEmittedSinceLastReport > 0 && this.outputBuffersSentSinceLastReport > 0 && this.outputBufferLifetimeSampler.hasSample(); } @@ -223,7 +225,7 @@ public void outputBufferSent(long currentAmountTransmitted) { this.outputBuffersSentSinceLastReport++; this.currentAmountTransmitted = currentAmountTransmitted; this.outputBufferLifetimeSampler.outputBufferSent(); - sendReportIfDue(System.currentTimeMillis()); + sendReportsIfDue(System.currentTimeMillis()); } public void outputBufferAllocated() { @@ -231,14 +233,33 @@ public void outputBufferAllocated() { } } + public OutputGateReporterManager() { + } + public OutputGateReporterManager(QosReportForwarderThread qosReporter, int noOfOutputChannels) { + initReporter(qosReporter, noOfOutputChannels); + } + public void initReporter(QosReportForwarderThread qosReporter, int noOfOutputChannels) { this.reportForwarder = qosReporter; this.reporters = new HashSet(); this.reportersByChannelIndexInRuntimeGate = new CopyOnWriteArrayList(); Collections.addAll(this.reportersByChannelIndexInRuntimeGate, new OutputChannelChannelStatisticsReporter[noOfOutputChannels]); + this.reportTimer = new ReportTimer(this.reportForwarder.getConfigCenter().getAggregationInterval()); + setReporter(true); + } + + public void sendReportsIfDue(long now) { + if (reportTimer.reportIsDue()) { + for (OutputChannelChannelStatisticsReporter reporter : reportersByChannelIndexInRuntimeGate) { + if (reporter != null) { + reporter.sendReportIfDue(now); + } + } + reportTimer.reset(now); + } } public void recordEmitted(int channelIndex, AbstractTaggableRecord record) { diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/StreamTaskQosCoordinator.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/StreamTaskQosCoordinator.java index 25e831716..6a08bd9af 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/StreamTaskQosCoordinator.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/StreamTaskQosCoordinator.java @@ -20,28 +20,29 @@ import eu.stratosphere.nephele.streaming.message.action.SetOutputBufferLifetimeTargetAction; import eu.stratosphere.nephele.streaming.message.action.VertexQosReporterConfig; import eu.stratosphere.nephele.streaming.taskmanager.qosmodel.QosReporterID; -import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.listener.QosReportingListenerHelper; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.listener.InputGateQosReportingListener; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.listener.OutputGateQosReportingListener; import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex.VertexStatisticsReportManager; import eu.stratosphere.nephele.streaming.taskmanager.runtime.StreamTaskEnvironment; import eu.stratosphere.nephele.streaming.taskmanager.runtime.io.StreamInputGate; import eu.stratosphere.nephele.streaming.taskmanager.runtime.io.StreamOutputGate; import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask; +import eu.stratosphere.nephele.types.AbstractTaggableRecord; import eu.stratosphere.nephele.types.Record; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.util.ArrayList; import java.util.Set; /** * An instance of this class implements Qos data reporting for a specific vertex * and its ingoing/outgoing edges on a task manager while the vertex actually * runs. - * + * * This class is thread-safe. - * + * * @author Bjoern Lohrmann - * + * */ public class StreamTaskQosCoordinator implements QosReporterConfigListener { @@ -53,28 +54,8 @@ public class StreamTaskQosCoordinator implements QosReporterConfigListener { private final StreamTaskEnvironment taskEnvironment; private final QosReportForwarderThread reporterThread; - - private final QosReporterConfigCenter reporterConfigCenter; - - /** - * For each input gate of the task for whose channels latency reporting is - * required, this list contains a InputGateReporterManager. A - * InputGateReporterManager keeps track of and reports on the latencies for - * all of the input gate's channels. This is a sparse list (may contain - * nulls), indexed by the runtime gate's own indices. - */ - private ArrayList inputGateReporters; - /** - * For each output gate of the task for whose output channels QoS statistics - * are required (throughput, output buffer lifetime, ...), this list - * contains a OutputGateReporterManager. Each OutputGateReporterManager - * keeps track of and reports on Qos statistics all of the output gate's - * channels and also attaches tags to records sent via its channels. This is - * a sparse list (may contain nulls), indexed by the runtime gate's own - * indices. - */ - private ArrayList outputGateReporters; + private final QosReporterConfigCenter reporterConfigCenter; /** * For each input/output gate combination for which Qos reports are @@ -97,8 +78,6 @@ public StreamTaskQosCoordinator(RuntimeTask task, this.reporterThread, this.taskEnvironment.getNumberOfInputGates(), this.taskEnvironment.getNumberOfOutputGates()); - this.inputGateReporters = new ArrayList(); - this.outputGateReporters = new ArrayList(); this.isShutdown = false; this.prepareQosReporting(); @@ -137,22 +116,14 @@ private void installVertexStatisticsReporter( if (reporterConfig.getInputGateID() != null) { StreamInputGate inputGate = this.taskEnvironment .getInputGate(reporterConfig.getInputGateID()); - - QosReportingListenerHelper.listenToVertexStatisticsOnInputGate( - inputGate, this.vertexStatisticsManager); - - inputGateIndex = inputGate.getIndex(); + ensureInputGateListener(inputGate); } int outputGateIndex = -1; if (reporterConfig.getOutputGateID() != null) { StreamOutputGate outputGate = this.taskEnvironment .getOutputGate(reporterConfig.getOutputGateID()); - - QosReportingListenerHelper.listenToVertexStatisticsOnOutputGate( - outputGate, this.vertexStatisticsManager); - - outputGateIndex = outputGate.getIndex(); + ensureOutputGateListener(outputGate); } this.vertexStatisticsManager.addReporter(inputGateIndex, @@ -172,16 +143,11 @@ private void installInputGateListeners() { .getConnectedChannelID()) != null; if (!mustReportQosForGate) { - this.inputGateReporters.add(null); this.reporterConfigCenter.setQosReporterConfigListener( inputGate.getGateID(), this); break; } - InputGateReporterManager reporter = new InputGateReporterManager( - this.reporterThread, inputGate.getNumberOfInputChannels()); - this.inputGateReporters.add(reporter); - for (int j = 0; j < inputGate.getNumberOfInputChannels(); j++) { int runtimeChannelIndex = inputGate.getInputChannel(j) .getChannelIndex(); @@ -192,12 +158,11 @@ private void installInputGateListeners() { .getEdgeQosReporter(sourceChannelID); QosReporterID.Edge reporterID = (QosReporterID.Edge) edgeReporter .getReporterID(); - reporter.addEdgeQosReporterConfig(runtimeChannelIndex, - reporterID); + vertexStatisticsManager.addInputGateReporter(inputGate.getIndex(), runtimeChannelIndex, + inputGate.getNumberOfInputChannels(), reporterID); } - QosReportingListenerHelper.listenToChannelLatenciesOnInputGate( - inputGate, reporter); + ensureInputGateListener(inputGate); } } @@ -212,17 +177,11 @@ private void installOutputGateListeners() { .getEdgeQosReporter(outputGate.getOutputChannel(0).getID()) != null; if (!mustReportQosForGate) { - this.outputGateReporters.add(null); this.reporterConfigCenter.setQosReporterConfigListener( outputGate.getGateID(), this); break; } - OutputGateReporterManager gateReporterManager = new OutputGateReporterManager( - this.reporterThread, outputGate.getNumberOfOutputChannels()); - - this.outputGateReporters.add(gateReporterManager); - for (int j = 0; j < outputGate.getNumberOfOutputChannels(); j++) { int runtimeChannelIndex = outputGate.getOutputChannel(j) .getChannelIndex(); @@ -233,13 +192,11 @@ private void installOutputGateListeners() { .getEdgeQosReporter(sourceChannelID); QosReporterID.Edge reporterID = (QosReporterID.Edge) edgeReporter .getReporterID(); - gateReporterManager.addEdgeQosReporterConfig( - runtimeChannelIndex, reporterID); + vertexStatisticsManager.addOutputGateReporter(outputGate.getIndex(), runtimeChannelIndex, + outputGate.getNumberOfOutputChannels(), reporterID); } - QosReportingListenerHelper - .listenToOutputChannelStatisticsOnOutputGate(outputGate, - gateReporterManager); + ensureOutputGateListener(outputGate); } } @@ -268,7 +225,7 @@ public synchronized void handleLimitBufferSizeAction( } } } - + public void handleSetOutputLatencyTargetAction( SetOutputBufferLifetimeTargetAction action) { @@ -293,8 +250,6 @@ public void handleSetOutputLatencyTargetAction( outputGate.enqueueQosAction(action); } } - - } @@ -337,59 +292,44 @@ public synchronized void newEdgeQosReporter( StreamInputGate inputGate = this.taskEnvironment .getInputGate(edgeReporter.getInputGateID()); + StreamOutputGate outputGate = this.taskEnvironment + .getOutputGate(edgeReporter.getOutputGateID()); + if (inputGate != null) { int runtimeGateIndex = inputGate.getIndex(); int runtimeChannelIndex = inputGate.getInputChannel( edgeReporter.getTargetChannelID()).getChannelIndex(); - if (this.inputGateReporters.get(runtimeGateIndex) == null) { - this.createAndRegisterGateReporterManager(inputGate); - } - - this.inputGateReporters.get(runtimeGateIndex) - .addEdgeQosReporterConfig(runtimeChannelIndex, reporterID); - } else { - StreamOutputGate outputGate = this.taskEnvironment - .getOutputGate(edgeReporter.getOutputGateID()); + vertexStatisticsManager.addInputGateReporter(runtimeGateIndex, + runtimeChannelIndex, inputGate.getNumberOfInputChannels(), reporterID); + ensureInputGateListener(inputGate); + } else if (outputGate != null) { int runtimeGateIndex = outputGate.getIndex(); int runtimeChannelIndex = outputGate.getOutputChannel( edgeReporter.getSourceChannelID()).getChannelIndex(); - if (this.outputGateReporters.get(runtimeGateIndex) == null) { - this.createAndRegisterGateReporterManager(outputGate); - } + vertexStatisticsManager.addOutputGateReporter(runtimeGateIndex, + runtimeChannelIndex, outputGate.getNumberOfOutputChannels(), reporterID); - this.outputGateReporters.get(runtimeGateIndex) - .addEdgeQosReporterConfig(runtimeChannelIndex, reporterID); + ensureOutputGateListener(outputGate); } } - private void createAndRegisterGateReporterManager( - StreamInputGate inputGate) { - - int runtimeGateIndex = inputGate.getIndex(); - InputGateReporterManager gateReporterManager = new InputGateReporterManager( - this.reporterThread, inputGate.getNumberOfInputChannels()); - - this.inputGateReporters.set(runtimeGateIndex, gateReporterManager); - - QosReportingListenerHelper.listenToChannelLatenciesOnInputGate( - inputGate, gateReporterManager); + private void ensureInputGateListener(StreamInputGate inputGate) { + InputGateQosReportingListener listener = inputGate.getQosReportingListener(); + if (listener == null) { + listener = new InputGateListener(inputGate.getIndex()); + inputGate.setQosReportingListener(listener); + } } - private void createAndRegisterGateReporterManager( - StreamOutputGate outputGate) { - - int runtimeGateIndex = outputGate.getIndex(); - - OutputGateReporterManager gateReporterManager = new OutputGateReporterManager( - this.reporterThread, outputGate.getNumberOfOutputChannels()); - - this.outputGateReporters.set(runtimeGateIndex, gateReporterManager); - - QosReportingListenerHelper.listenToOutputChannelStatisticsOnOutputGate( - outputGate, gateReporterManager); + private void ensureOutputGateListener(StreamOutputGate outputGate) { + OutputGateQosReportingListener listener = outputGate.getQosReportingListener(); + if (listener == null) { + listener = new OutputGateListener(outputGate.getIndex()); + outputGate.setQosReportingListener(listener); + } } public synchronized void shutdownReporting() { @@ -407,7 +347,6 @@ private void shutdownOutputGateReporters() { outputGate.setQosReportingListener(null); this.reporterConfigCenter.unsetQosReporterConfigListener(outputGate.getGateID()); } - this.outputGateReporters.clear(); } private void shutdownInputGateReporters() { @@ -417,6 +356,56 @@ private void shutdownInputGateReporters() { inputGate.setQosReportingListener(null); this.reporterConfigCenter.unsetQosReporterConfigListener(inputGate.getGateID()); } - this.inputGateReporters.clear(); + } + + private class OutputGateListener implements OutputGateQosReportingListener { + private final int gateIndex; + + public OutputGateListener(int gateIndex) { + this.gateIndex = gateIndex; + } + + @Override + public void outputBufferSent(int channelIndex, long currentAmountTransmitted) { + vertexStatisticsManager.outputBufferSent(gateIndex, channelIndex, currentAmountTransmitted); + } + + @Override + public void recordEmitted(int channelIndex, AbstractTaggableRecord record) { + vertexStatisticsManager.recordEmitted(gateIndex, channelIndex, record); + } + + @Override + public void outputBufferAllocated(int channelIndex) { + vertexStatisticsManager.outputBufferAllocated(gateIndex, channelIndex); + } + } + + private class InputGateListener implements InputGateQosReportingListener { + private final int gateIndex; + + public InputGateListener(int gateIndex) { + this.gateIndex = gateIndex; + } + + @Override + public void recordReceived(int channelIndex, AbstractTaggableRecord record) { + vertexStatisticsManager.recordReceived(gateIndex); + TimestampTag timestampTag = (TimestampTag) record.getTag(); + if (timestampTag != null) { + vertexStatisticsManager.reportLatenciesIfNecessary(gateIndex, channelIndex, timestampTag); + } + } + + @Override + public void tryingToReadRecord() { + vertexStatisticsManager.tryingToReadRecord(gateIndex); + } + + @Override + public void inputBufferConsumed(int channelIndex, long bufferInterarrivalTimeNanos, int recordsReadFromBuffer) { + vertexStatisticsManager.inputBufferConsumed(gateIndex, channelIndex, bufferInterarrivalTimeNanos, + recordsReadFromBuffer); + } } } diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/listener/QosReportingListenerHelper.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/listener/QosReportingListenerHelper.java deleted file mode 100644 index 18dfa9980..000000000 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/listener/QosReportingListenerHelper.java +++ /dev/null @@ -1,242 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ -package eu.stratosphere.nephele.streaming.taskmanager.qosreporter.listener; - -import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.InputGateReporterManager; -import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.OutputGateReporterManager; -import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.TimestampTag; -import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex.VertexStatisticsReportManager; -import eu.stratosphere.nephele.streaming.taskmanager.runtime.io.StreamInputGate; -import eu.stratosphere.nephele.streaming.taskmanager.runtime.io.StreamOutputGate; -import eu.stratosphere.nephele.types.AbstractTaggableRecord; -import eu.stratosphere.nephele.types.Record; - -/** - * Utility class that creates {@link InputGateQosReportingListener} - * {@link OutputGateQosReportingListener} and adds them to the respective - * input/output gates. - * - * @author Bjoern Lohrmann - * - */ -public class QosReportingListenerHelper { - - public static void listenToVertexStatisticsOnInputGate( - StreamInputGate inputGate, - final VertexStatisticsReportManager vertexStatsManager) { - - final int gateIndex = inputGate.getIndex(); - - InputGateQosReportingListener listener = new InputGateQosReportingListener() { - @Override - public void recordReceived(int inputChannelIndex, - AbstractTaggableRecord record) { - vertexStatsManager.recordReceived(gateIndex); - } - - @Override - public void tryingToReadRecord() { - vertexStatsManager.tryingToReadRecord(gateIndex); - } - - @Override - public void inputBufferConsumed(int channelIndex, - long bufferInterarrivalTimeNanos, int recordsReadFromBuffer) { - vertexStatsManager.inputBufferConsumed(gateIndex, channelIndex, bufferInterarrivalTimeNanos, recordsReadFromBuffer); - } - }; - InputGateQosReportingListener oldListener = inputGate - .getQosReportingListener(); - if (oldListener != null) { - inputGate.setQosReportingListener(createChainedListener( - oldListener, listener)); - } else { - inputGate.setQosReportingListener(listener); - } - } - - public static void listenToVertexStatisticsOnOutputGate( - StreamOutputGate outputGate, - final VertexStatisticsReportManager vertexStatsManager) { - - final int gateIndex = outputGate.getIndex(); - - OutputGateQosReportingListener listener = new OutputGateQosReportingListener() { - @Override - public void recordEmitted(int outputChannelIndex, - AbstractTaggableRecord record) { - vertexStatsManager.recordEmitted(gateIndex); - } - - @Override - public void outputBufferAllocated(int channelIndex) { - // do nothing - } - - @Override - public void outputBufferSent(int outputChannelIndex, - long currentAmountTransmitted) { - // do nothing - } - }; - - OutputGateQosReportingListener oldListener = outputGate - .getQosReportingListener(); - - if (oldListener != null) { - outputGate.setQosReportingListener(createChainedListener(listener, - oldListener)); - } else { - outputGate.setQosReportingListener(listener); - } - } - - public static void listenToChannelLatenciesOnInputGate( - StreamInputGate inputGate, - final InputGateReporterManager reporter) { - - InputGateQosReportingListener listener = new InputGateQosReportingListener() { - @Override - public void recordReceived(int inputChannelIndexinRuntimeGate, - AbstractTaggableRecord record) { - - TimestampTag timestampTag = (TimestampTag) record.getTag(); - - if (timestampTag != null) { - reporter.reportLatencyIfNecessary( - inputChannelIndexinRuntimeGate, timestampTag); - } - } - - @Override - public void tryingToReadRecord() { - // nothing to do - } - - @Override - public void inputBufferConsumed(int channelIndex, - long bufferInterarrivalTimeNanos, int recordsReadFromBuffer) { - // nothing to do - } - }; - - InputGateQosReportingListener oldListener = inputGate - .getQosReportingListener(); - - if (oldListener != null) { - inputGate.setQosReportingListener(createChainedListener(listener, - oldListener)); - } else { - inputGate.setQosReportingListener(listener); - } - } - - public static void listenToOutputChannelStatisticsOnOutputGate( - StreamOutputGate outputGate, - final OutputGateReporterManager gateReporterManager) { - - OutputGateQosReportingListener listener = new OutputGateQosReportingListener() { - - @Override - public void outputBufferSent(int runtimeGateChannelIndex, - long currentAmountTransmitted) { - gateReporterManager.outputBufferSent(runtimeGateChannelIndex, - currentAmountTransmitted); - } - - @Override - public void recordEmitted(int runtimeGateChannelIndex, - AbstractTaggableRecord record) { - gateReporterManager.recordEmitted(runtimeGateChannelIndex, - record); - } - - @Override - public void outputBufferAllocated(int channelIndex) { - gateReporterManager.outputBufferAllocated(channelIndex); - } - }; - - OutputGateQosReportingListener oldListener = outputGate - .getQosReportingListener(); - - if (oldListener != null) { - outputGate.setQosReportingListener(createChainedListener( - oldListener, listener)); - } else { - outputGate.setQosReportingListener(listener); - } - } - - private static InputGateQosReportingListener createChainedListener( - final InputGateQosReportingListener first, - final InputGateQosReportingListener second) { - - return new InputGateQosReportingListener() { - @Override - public void recordReceived(int inputChannel, - AbstractTaggableRecord record) { - first.recordReceived(inputChannel, record); - second.recordReceived(inputChannel, record); - } - - @Override - public void tryingToReadRecord() { - first.tryingToReadRecord(); - second.tryingToReadRecord(); - } - - @Override - public void inputBufferConsumed(int channelIndex, - long bufferInterarrivalTimeNanos, int recordsReadFromBuffer) { - - first.inputBufferConsumed(channelIndex, bufferInterarrivalTimeNanos, - recordsReadFromBuffer); - second.inputBufferConsumed(channelIndex, - bufferInterarrivalTimeNanos, recordsReadFromBuffer); - } - }; - } - - private static OutputGateQosReportingListener createChainedListener( - final OutputGateQosReportingListener first, - final OutputGateQosReportingListener second) { - - return new OutputGateQosReportingListener() { - - @Override - public void outputBufferSent(int channelIndex, - long currentAmountTransmitted) { - first.outputBufferSent(channelIndex, currentAmountTransmitted); - second.outputBufferSent(channelIndex, currentAmountTransmitted); - } - - @Override - public void recordEmitted(int outputChannel, - AbstractTaggableRecord record) { - - first.recordEmitted(outputChannel, record); - second.recordEmitted(outputChannel, record); - } - - @Override - public void outputBufferAllocated(int channelIndex) { - first.outputBufferAllocated(channelIndex); - second.outputBufferAllocated(channelIndex); - } - }; - } - -} diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/BernoulliSampler.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/BernoulliSampler.java index e330a4e9f..14af0752a 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/BernoulliSampler.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/BernoulliSampler.java @@ -17,7 +17,7 @@ * @author Bjoern Lohrmann * */ -public class BernoulliSampler { +public class BernoulliSampler implements Sampler { private final BernoulliSampleDesign samplingDesign; diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/InputGateInterArrivalTimeSampler.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/InputGateInterArrivalTimeSampler.java similarity index 88% rename from nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/InputGateInterArrivalTimeSampler.java rename to nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/InputGateInterArrivalTimeSampler.java index d6e95e9e3..8ab498133 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/InputGateInterArrivalTimeSampler.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/InputGateInterArrivalTimeSampler.java @@ -1,10 +1,8 @@ -package eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex; +package eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling; -import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.BernoulliSampler; -import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.Sample; import eu.stratosphere.nephele.streaming.util.StreamUtil; -public class InputGateInterArrivalTimeSampler { +public class InputGateInterArrivalTimeSampler implements Sampler { /** * Samples records interarrival times in microseconds. These are computed @@ -63,7 +61,7 @@ private void finalizeSamplePoint(int channelIndex, private void startSampleIfNecessary(int channelIndex) { if (interarrivalTimeSampler.shouldTakeSamplePoint()) { - accBufferInterarrivalTimes[channelIndex] = Long.valueOf(0); + accBufferInterarrivalTimes[channelIndex] = 0L; } } diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/InputGateInterReadTimeSampler.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/InputGateInterReadTimeSampler.java similarity index 75% rename from nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/InputGateInterReadTimeSampler.java rename to nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/InputGateInterReadTimeSampler.java index a656b4bc3..c46e98ef7 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/InputGateInterReadTimeSampler.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/InputGateInterReadTimeSampler.java @@ -1,18 +1,14 @@ -package eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex; - -import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.BernoulliSampler; -import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.Sample; +package eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling; /** * Samples the elapsed time between a read on a specific input gate identified * and the beginning of the next attempt read on any other input gate. Elapsed time is sampled in * microseconds. */ -public class InputGateInterReadTimeSampler { - +public class InputGateInterReadTimeSampler implements Sampler { + /** - * Samples the elapsed time between a read on the input gate identified - * {@link #inputGateIndex} and the next read on any other input gate. + * Samples the elapsed time between a read on the input gate and the next read on any other input gate. * Elapsed time is sampled in microseconds. */ private final BernoulliSampler readReadTimeSampler; diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/Sampler.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/Sampler.java new file mode 100644 index 000000000..2573b0294 --- /dev/null +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/sampling/Sampler.java @@ -0,0 +1,7 @@ +package eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling; + +public interface Sampler { + boolean hasSample(); + + Sample drawSampleAndReset(long now); +} diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/AbstractVertexQosReporter.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/AbstractVertexQosReporter.java index 8129fb62d..98811808f 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/AbstractVertexQosReporter.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/AbstractVertexQosReporter.java @@ -3,6 +3,7 @@ import eu.stratosphere.nephele.streaming.message.qosreport.VertexStatistics; import eu.stratosphere.nephele.streaming.taskmanager.qosmodel.QosReporterID; import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.QosReportForwarderThread; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.InputGateInterArrivalTimeSampler; import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.Sample; public abstract class AbstractVertexQosReporter implements VertexQosReporter { @@ -14,11 +15,11 @@ public abstract class AbstractVertexQosReporter implements VertexQosReporter { private final ReportTimer reportTimer; private long igReceiveCounterAtLastReport; - private final InputGateReceiveCounter igReceiveCounter; + private final CountingGateReporter igReceiveCounter; private final InputGateInterArrivalTimeSampler igInterarrivalTimeSampler; private long ogEmitCounterAtLastReport; - private final OutputGateEmitStatistics ogEmitCounter; + private final CountingGateReporter ogEmitCounter; private final int runtimeInputGateIndex; @@ -27,8 +28,8 @@ public abstract class AbstractVertexQosReporter implements VertexQosReporter { public AbstractVertexQosReporter(QosReportForwarderThread reportForwarder, QosReporterID.Vertex reporterID, ReportTimer reportTimer, int runtimeInputGateIndex, - int runtimeOutputGateIndex, InputGateReceiveCounter igReceiveCounter, - OutputGateEmitStatistics emitCounter) { + int runtimeOutputGateIndex, CountingGateReporter igReceiveCounter, + CountingGateReporter emitCounter) { this.reportForwarder = reportForwarder; this.reporterID = reporterID; @@ -38,7 +39,7 @@ public AbstractVertexQosReporter(QosReportForwarderThread reportForwarder, this.runtimeOutputGateIndex = runtimeOutputGateIndex; if (reporterID.hasInputGateID()) { - this.igReceiveCounterAtLastReport = igReceiveCounter.getRecordsReceived(); + this.igReceiveCounterAtLastReport = igReceiveCounter.getRecordsCount(); this.igReceiveCounter = igReceiveCounter; this.igInterarrivalTimeSampler = new InputGateInterArrivalTimeSampler(reportForwarder.getConfigCenter().getSamplingProbability() / 100.0); } else { @@ -47,7 +48,7 @@ public AbstractVertexQosReporter(QosReportForwarderThread reportForwarder, } if (reporterID.hasOutputGateID()) { - this.ogEmitCounterAtLastReport = emitCounter.getEmitted(); + this.ogEmitCounterAtLastReport = emitCounter.getRecordsCount(); this.ogEmitCounter = emitCounter; } else { this.ogEmitCounter = null; @@ -102,9 +103,9 @@ public void sendReport(long now, private double getRecordsConsumedPerSec(double secsPassed) { double recordsConsumedPerSec = -1; if (igReceiveCounter != null) { - recordsConsumedPerSec = (igReceiveCounter.getRecordsReceived() - igReceiveCounterAtLastReport) + recordsConsumedPerSec = (igReceiveCounter.getRecordsCount() - igReceiveCounterAtLastReport) / secsPassed; - igReceiveCounterAtLastReport = igReceiveCounter.getRecordsReceived(); + igReceiveCounterAtLastReport = igReceiveCounter.getRecordsCount(); } return recordsConsumedPerSec; } @@ -112,9 +113,9 @@ private double getRecordsConsumedPerSec(double secsPassed) { private double getRecordsEmittedPerSec(double secsPassed) { double recordEmittedPerSec = -1; if (ogEmitCounter != null) { - recordEmittedPerSec = (ogEmitCounter.getEmitted() - ogEmitCounterAtLastReport) + recordEmittedPerSec = (ogEmitCounter.getRecordsCount() - ogEmitCounterAtLastReport) / secsPassed; - ogEmitCounterAtLastReport = ogEmitCounter.getEmitted(); + ogEmitCounterAtLastReport = ogEmitCounter.getRecordsCount(); } return recordEmittedPerSec; } diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/CountingGateReporter.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/CountingGateReporter.java new file mode 100644 index 000000000..e0804ec8b --- /dev/null +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/CountingGateReporter.java @@ -0,0 +1,29 @@ +package eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex; + +/** + * A basic reporter providing (record) counting methods. + */ +public class CountingGateReporter { + private boolean reporter; + private long recordsCount; + + public boolean isReporter() { + return reporter; + } + + public void setReporter(boolean reporter) { + this.reporter = reporter; + } + + public long getRecordsCount() { + return recordsCount; + } + + public void countRecord() { + ++recordsCount; + } + + public void reset() { + recordsCount = 0; + } +} \ No newline at end of file diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/InputGateReceiveCounter.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/InputGateReceiveCounter.java deleted file mode 100644 index 23a3aef60..000000000 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/InputGateReceiveCounter.java +++ /dev/null @@ -1,20 +0,0 @@ -package eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex; - - -public class InputGateReceiveCounter { - - private long recordsReceived; - - - public void recordReceived() { - recordsReceived++; - } - - public long getRecordsReceived() { - return recordsReceived; - } - - public void reset() { - recordsReceived = 0; - } -} diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/OutputGateEmitStatistics.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/OutputGateEmitStatistics.java deleted file mode 100644 index df67c28ee..000000000 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/OutputGateEmitStatistics.java +++ /dev/null @@ -1,17 +0,0 @@ -package eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex; - -public class OutputGateEmitStatistics { - private long emitted; - - public void emitted() { - emitted++; - } - - public long getEmitted() { - return emitted; - } - - public void reset() { - emitted = 0; - } -} diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadReadReporter.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadReadReporter.java index f620e9871..a84075265 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadReadReporter.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadReadReporter.java @@ -17,7 +17,7 @@ public class ReadReadReporter implements VertexQosReporter { private final ReportTimer reportTimer; private long emitCounterAtLastReport; - private final OutputGateEmitStatistics outputGateEmitCounter; + private final CountingGateReporter outputGateEmitCounter; private final int runtimeInputGateIndex; @@ -28,8 +28,8 @@ public ReadReadReporter(QosReportForwarderThread reportForwarder, ReportTimer reportTimer, int runtimeInputGateIndex, int runtimeOutputGateIndex, - InputGateReceiveCounter igReceiveCounter, - OutputGateEmitStatistics emitCounter) { + CountingGateReporter igReceiveCounter, + CountingGateReporter emitCounter) { this.reportForwarder = reportForwarder; this.reporterID = reporterID; @@ -38,7 +38,7 @@ public ReadReadReporter(QosReportForwarderThread reportForwarder, this.runtimeInputGateIndex = runtimeInputGateIndex; this.runtimeOutputGateIndex = runtimeOutputGateIndex; - emitCounterAtLastReport = emitCounter.getEmitted(); + emitCounterAtLastReport = emitCounter.getRecordsCount(); this.outputGateEmitCounter = emitCounter; } @@ -60,9 +60,9 @@ public void sendReport(long now, private double getRecordsEmittedPerSec(double secsPassed) { double recordEmittedPerSec = -1; if (outputGateEmitCounter != null) { - recordEmittedPerSec = (outputGateEmitCounter.getEmitted() - emitCounterAtLastReport) + recordEmittedPerSec = (outputGateEmitCounter.getRecordsCount() - emitCounterAtLastReport) / secsPassed; - emitCounterAtLastReport = outputGateEmitCounter.getEmitted(); + emitCounterAtLastReport = outputGateEmitCounter.getRecordsCount(); } return recordEmittedPerSec; } diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadReadVertexQosReporterGroup.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadReadVertexQosReporterGroup.java index 20ab0323d..234fbf0fe 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadReadVertexQosReporterGroup.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadReadVertexQosReporterGroup.java @@ -1,12 +1,13 @@ package eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex; -import java.util.ArrayList; -import java.util.List; - import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.QosReportForwarderThread; -import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.BernoulliSampler; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.InputGateInterArrivalTimeSampler; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.InputGateInterReadTimeSampler; import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.Sample; +import java.util.ArrayList; +import java.util.List; + /** * Contains a group of {@link ReadReadReporter} to do read-read latency * measurements on a specific input gate. Read-read means, that the elapsed @@ -23,8 +24,8 @@ public class ReadReadVertexQosReporterGroup implements VertexQosReporter { private final List reporters = new ArrayList(); private final ReportTimer reportTimer; - - private final InputGateReceiveCounter igReceiveCounter; + + private final CountingGateReporter igReceiveCounter; private long igReceiveCounterAtLastReport; private final InputGateInterArrivalTimeSampler igInterArrivalTimeSampler; @@ -35,7 +36,7 @@ public class ReadReadVertexQosReporterGroup implements VertexQosReporter { public ReadReadVertexQosReporterGroup( QosReportForwarderThread reportForwarder, int inputGateIndex, - InputGateReceiveCounter igReceiveCounter) { + CountingGateReporter igReceiveCounter) { this.inputGateIndex = inputGateIndex; @@ -46,7 +47,7 @@ public ReadReadVertexQosReporterGroup( .getConfigCenter().getSamplingProbability() / 100.0); this.igReceiveCounter = igReceiveCounter; - this.igReceiveCounterAtLastReport = igReceiveCounter.getRecordsReceived(); + this.igReceiveCounterAtLastReport = igReceiveCounter.getRecordsCount(); this.reportTimer = new ReportTimer(reportForwarder.getConfigCenter() .getAggregationInterval()); @@ -80,9 +81,9 @@ protected void sendReportIfDue() { private double getRecordsConsumedPerSec(double secsPassed) { double recordsConsumedPerSec = -1; if (igReceiveCounter != null) { - recordsConsumedPerSec = (igReceiveCounter.getRecordsReceived() - igReceiveCounterAtLastReport) + recordsConsumedPerSec = (igReceiveCounter.getRecordsCount() - igReceiveCounterAtLastReport) / secsPassed; - igReceiveCounterAtLastReport = igReceiveCounter.getRecordsReceived(); + igReceiveCounterAtLastReport = igReceiveCounter.getRecordsCount(); } return recordsConsumedPerSec; } diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadWriteReporter.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadWriteReporter.java index 593119a16..7fc7c1d08 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadWriteReporter.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/ReadWriteReporter.java @@ -31,8 +31,8 @@ public class ReadWriteReporter extends AbstractVertexQosReporter { public ReadWriteReporter(QosReportForwarderThread reportForwarder, QosReporterID.Vertex reporterID, int runtimeInputGateIndex, - int runtimeOutputGateIndex, InputGateReceiveCounter igReceiveCounter, - OutputGateEmitStatistics emitCounter) { + int runtimeOutputGateIndex, CountingGateReporter igReceiveCounter, + CountingGateReporter emitCounter) { super(reportForwarder, reporterID, new ReportTimer(reportForwarder.getConfigCenter().getAggregationInterval()), diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexConsumptionReporter.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexConsumptionReporter.java index 0c707c7cf..ee8affaf4 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexConsumptionReporter.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexConsumptionReporter.java @@ -2,6 +2,7 @@ import eu.stratosphere.nephele.streaming.taskmanager.qosmodel.QosReporterID; import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.QosReportForwarderThread; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.InputGateInterReadTimeSampler; import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.Sample; public class VertexConsumptionReporter extends AbstractVertexQosReporter { @@ -10,7 +11,7 @@ public class VertexConsumptionReporter extends AbstractVertexQosReporter { public VertexConsumptionReporter(QosReportForwarderThread reportForwarder, QosReporterID.Vertex reporterID, int runtimeInputGateIndex, - InputGateReceiveCounter igReceiveCounter) { + CountingGateReporter igReceiveCounter) { super(reportForwarder, reporterID, new ReportTimer(reportForwarder .getConfigCenter().getAggregationInterval()), diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexEmissionReporter.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexEmissionReporter.java index 04d68955e..5f009686f 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexEmissionReporter.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexEmissionReporter.java @@ -7,7 +7,7 @@ public class VertexEmissionReporter extends AbstractVertexQosReporter { public VertexEmissionReporter(QosReportForwarderThread reportForwarder, QosReporterID.Vertex reporterID, int runtimeOutputGateIndex, - OutputGateEmitStatistics emitCounter) { + CountingGateReporter emitCounter) { super(reportForwarder, reporterID, new ReportTimer(reportForwarder .getConfigCenter().getAggregationInterval()), -1, diff --git a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexStatisticsReportManager.java b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexStatisticsReportManager.java index 9970ab4a8..bdc7a7e85 100644 --- a/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexStatisticsReportManager.java +++ b/nephele/nephele-streaming/src/main/java/eu/stratosphere/nephele/streaming/taskmanager/qosreporter/vertex/VertexStatisticsReportManager.java @@ -1,14 +1,18 @@ package eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReferenceArray; - import eu.stratosphere.nephele.streaming.SamplingStrategy; import eu.stratosphere.nephele.streaming.message.qosreport.VertexStatistics; import eu.stratosphere.nephele.streaming.taskmanager.qosmodel.QosReporterID; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.InputGateReporterManager; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.OutputGateReporterManager; import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.QosReportForwarderThread; import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.QosReporterConfigCenter; +import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.TimestampTag; import eu.stratosphere.nephele.streaming.util.StreamUtil; +import eu.stratosphere.nephele.types.AbstractTaggableRecord; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReferenceArray; /** * Handles the measurement and reporting of latencies and record @@ -31,12 +35,27 @@ */ public class VertexStatisticsReportManager { - // private static final Log LOG = - // LogFactory.getLog(TaskLatencyReporter.class); - private final QosReportForwarderThread reportForwarder; - private final AtomicReferenceArray inputGateReceiveCounter; - private final AtomicReferenceArray outputGateEmitStatistics; + + /** + * For each input gate of the task for whose channels latency reporting is + * required, this list contains a InputGateReporterManager. A + * InputGateReporterManager keeps track of and reports on the latencies for + * all of the input gate's channels. This is a sparse list (may contain + * nulls), indexed by the runtime gate's own indices. + */ + private AtomicReferenceArray inputGateReceiveCounter; + + /** + * For each output gate of the task for whose output channels QoS statistics + * are required (throughput, output buffer lifetime, ...), this list + * contains a OutputGateReporterManager. Each OutputGateReporterManager + * keeps track of and reports on Qos statistics all of the output gate's + * channels and also attaches tags to records sent via its channels. This is + * a sparse list (may contain nulls), indexed by the runtime gate's own + * indices. + */ + private AtomicReferenceArray outputGateEmitStatistics; private final ConcurrentHashMap reporters; private final AtomicReferenceArray reportersByInputGate; @@ -47,9 +66,9 @@ public VertexStatisticsReportManager(QosReportForwarderThread qosReporter, this.reportForwarder = qosReporter; - this.inputGateReceiveCounter = new AtomicReferenceArray( + this.inputGateReceiveCounter = new AtomicReferenceArray( noOfInputGates); - this.outputGateEmitStatistics = new AtomicReferenceArray( + this.outputGateEmitStatistics = new AtomicReferenceArray( noOfOutputGates); this.reportersByInputGate = StreamUtil @@ -63,11 +82,11 @@ public VertexStatisticsReportManager(QosReportForwarderThread qosReporter, } public void recordReceived(int runtimeInputGateIndex) { - InputGateReceiveCounter igCounter = inputGateReceiveCounter + InputGateReporterManager igCounter = inputGateReceiveCounter .get(runtimeInputGateIndex); if (igCounter != null) { - igCounter.recordReceived(); + igCounter.countRecord(); } for (VertexQosReporter reporter : this.reportersByInputGate @@ -83,11 +102,13 @@ public void tryingToReadRecord(int runtimeInputGateIndex) { } } - public void recordEmitted(int runtimeOutputGateIndex) { - OutputGateEmitStatistics ogStats = outputGateEmitStatistics - .get(runtimeOutputGateIndex); - if (ogStats != null) { - ogStats.emitted(); + public void recordEmitted(int runtimeOutputGateIndex, int outputChannel, AbstractTaggableRecord record) { + OutputGateReporterManager outputGateReporter = outputGateEmitStatistics.get(runtimeOutputGateIndex); + if (outputGateReporter != null) { + outputGateReporter.countRecord(); + if (outputGateReporter.isReporter()) { + outputGateReporter.recordEmitted(outputChannel, record); + } } for (VertexQosReporter reporter : this.reportersByOutputGate @@ -96,6 +117,27 @@ public void recordEmitted(int runtimeOutputGateIndex) { } } + public void outputBufferSent(int gateIndex, int channelIndex, long currentAmountTransmitted) { + OutputGateReporterManager outputGateReporter = outputGateEmitStatistics.get(gateIndex); + if (outputGateReporter != null && outputGateReporter.isReporter()) { + outputGateReporter.outputBufferSent(channelIndex, currentAmountTransmitted); + } + } + + public void outputBufferAllocated(int gateIndex, int channelIndex) { + OutputGateReporterManager outputGateReporter = outputGateEmitStatistics.get(gateIndex); + if (outputGateReporter != null) { + outputGateReporter.outputBufferAllocated(channelIndex); + } + } + + public void reportLatenciesIfNecessary(int gateIndex, int inputChannel, TimestampTag timestampTag) { + InputGateReporterManager inputGateReporter = inputGateReceiveCounter.get(gateIndex); + if (inputGateReporter != null) { + inputGateReporter.reportLatencyIfNecessary(inputChannel, timestampTag); + } + } + public boolean containsReporter(QosReporterID.Vertex reporterID) { return this.reporters.containsKey(reporterID); } @@ -110,9 +152,9 @@ public synchronized void addReporter(int runtimeInputGateIndex, if (!reporterID.isDummy()) { inputGateReceiveCounter.compareAndSet(runtimeInputGateIndex, null, - new InputGateReceiveCounter()); + new InputGateReporterManager()); outputGateEmitStatistics.compareAndSet(runtimeOutputGateIndex, null, - new OutputGateEmitStatistics()); + new OutputGateReporterManager()); switch (samplingStrategy) { case READ_WRITE: @@ -131,12 +173,12 @@ public synchronized void addReporter(int runtimeInputGateIndex, } else if (runtimeInputGateIndex != -1) { inputGateReceiveCounter.compareAndSet(runtimeInputGateIndex, null, - new InputGateReceiveCounter()); + new InputGateReporterManager()); addVertexConsumptionReporter(runtimeInputGateIndex, reporterID); } else if (runtimeOutputGateIndex != -1) { outputGateEmitStatistics.compareAndSet(runtimeOutputGateIndex, null, - new OutputGateEmitStatistics()); + new OutputGateReporterManager()); addVertexEmissionReporter(runtimeOutputGateIndex, reporterID); } } @@ -238,4 +280,24 @@ public void inputBufferConsumed(int inputGateIndex, int channelIndex, bufferInterarrivalTimeNanos, recordsReadFromBuffer); } } + + public InputGateReporterManager addInputGateReporter(int gateIndex, int channelIndex, int numberOfInputChannels, + QosReporterID.Edge reporterID) { + inputGateReceiveCounter.compareAndSet(gateIndex, null, new InputGateReporterManager()); + InputGateReporterManager inputGateReporter = inputGateReceiveCounter.get(gateIndex); + inputGateReporter.initReporter(reportForwarder, numberOfInputChannels); + inputGateReporter.addEdgeQosReporterConfig(channelIndex, reporterID); + + return inputGateReporter; + } + + public OutputGateReporterManager addOutputGateReporter(int gateIndex, int channelIndex, int numberOfOutputChannels, + QosReporterID.Edge reporterID) { + outputGateEmitStatistics.compareAndSet(gateIndex, null, new OutputGateReporterManager()); + OutputGateReporterManager outputGateReporter = outputGateEmitStatistics.get(gateIndex); + outputGateReporter.initReporter(reportForwarder, numberOfOutputChannels); + outputGateReporter.addEdgeQosReporterConfig(channelIndex, reporterID); + + return outputGateReporter; + } }