Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package eu.stratosphere.nephele.streaming.taskmanager.qosreporter;

import eu.stratosphere.nephele.streaming.message.qosreport.EdgeLatency;
import eu.stratosphere.nephele.streaming.taskmanager.qosmodel.QosReporterID;
import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex.CountingGateReporter;
import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex.ReportTimer;

import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CopyOnWriteArrayList;

import eu.stratosphere.nephele.streaming.message.qosreport.EdgeLatency;
import eu.stratosphere.nephele.streaming.taskmanager.qosmodel.QosReporterID;

/**
* A instance of this class keeps track of and reports on the latencies of an
* input gate's input channels.
Expand All @@ -22,7 +24,7 @@
* @author Bjoern Lohrmann
*
*/
public class InputGateReporterManager {
public class InputGateReporterManager extends CountingGateReporter {

/**
* No need for a thread-safe set because it is only accessed in synchronized
Expand All @@ -39,20 +41,20 @@ public class InputGateReporterManager {

private QosReportForwarderThread reportForwarder;

private ReportTimer reportTimer;

private class EdgeLatencyReporter {

public QosReporterID.Edge reporterID;

long timeOfNextReport;

long accumulatedLatency;

int tagsReceived;

public void sendReportIfDue(long now) {
if (this.reportIsDue(now)) {
public void sendReportIfDue() {
if (this.reportIsDue()) {
this.sendReport();
this.reset(now);
this.reset();
}
}

Expand All @@ -63,14 +65,11 @@ private void sendReport() {
.addToNextReport(channelLatency);
}

public boolean reportIsDue(long now) {
return this.tagsReceived > 0 && now >= this.timeOfNextReport;
public boolean reportIsDue() {
return this.tagsReceived > 0;
}

public void reset(long now) {
this.timeOfNextReport = now
+ InputGateReporterManager.this.reportForwarder
.getConfigCenter().getAggregationInterval();
public void reset() {
this.accumulatedLatency = 0;
this.tagsReceived = 0;
}
Expand All @@ -83,13 +82,21 @@ public void update(TimestampTag tag, long now) {
}
}

public InputGateReporterManager() {
}

public InputGateReporterManager(QosReportForwarderThread qosReporter,
int noOfInputChannels) {
initReporter(qosReporter, noOfInputChannels);
}

public void initReporter(QosReportForwarderThread qosReporter, int noOfInputChannels) {
this.reportForwarder = qosReporter;
this.reportersByChannelIndexInRuntimeGate = new CopyOnWriteArrayList<EdgeLatencyReporter>();
this.fillChannelLatenciesWithNulls(noOfInputChannels);
this.reporters = new HashSet<QosReporterID>();
this.reportTimer = new ReportTimer(this.reportForwarder.getConfigCenter().getAggregationInterval());
setReporter(true);
}

private void fillChannelLatenciesWithNulls(int noOfInputChannels) {
Expand All @@ -100,21 +107,34 @@ private void fillChannelLatenciesWithNulls(int noOfInputChannels) {
public void reportLatencyIfNecessary(int channelIndex,
TimestampTag timestampTag) {

long now = System.currentTimeMillis();

EdgeLatencyReporter info = this.reportersByChannelIndexInRuntimeGate
.get(channelIndex);

if (info != null) {
long now = System.currentTimeMillis();
info.update(timestampTag, now);
info.sendReportIfDue(now);
}

sendReportsIfDue(now);
}

private void sendReportsIfDue(long now) {
if (reportTimer.reportIsDue()) {
for (EdgeLatencyReporter reporter : reportersByChannelIndexInRuntimeGate) {
if (reporter != null) {
reporter.sendReportIfDue();
}
}
reportTimer.reset(now);
}
}

public synchronized boolean containsReporter(QosReporterID.Edge reporterID) {
return this.reporters.contains(reporterID);
}

public synchronized void addEdgeQosReporterConfig(
public void addEdgeQosReporterConfig(
int channelIndexInRuntimeGate, QosReporterID.Edge reporterID) {

if (this.reporters.contains(reporterID)) {
Expand All @@ -123,7 +143,6 @@ public synchronized void addEdgeQosReporterConfig(

EdgeLatencyReporter info = new EdgeLatencyReporter();
info.reporterID = reporterID;
info.timeOfNextReport = System.currentTimeMillis();
info.accumulatedLatency = 0;
info.tagsReceived = 0;
this.reportersByChannelIndexInRuntimeGate.set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import eu.stratosphere.nephele.streaming.taskmanager.qosmodel.QosReporterID;
import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.edge.OutputBufferLifetimeSampler;
import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.sampling.BernoulliSampleDesign;
import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex.CountingGateReporter;
import eu.stratosphere.nephele.streaming.taskmanager.qosreporter.vertex.ReportTimer;
import eu.stratosphere.nephele.types.AbstractTaggableRecord;

import java.util.Collections;
Expand All @@ -25,7 +27,7 @@
* @author Bjoern Lohrmann
*
*/
public class OutputGateReporterManager {
public class OutputGateReporterManager extends CountingGateReporter {

/**
* No need for a thread-safe set because it is only accessed in synchronized
Expand All @@ -43,6 +45,8 @@ public class OutputGateReporterManager {

private QosReportForwarderThread reportForwarder;

private ReportTimer reportTimer;

public class OutputChannelChannelStatisticsReporter {

private QosReporterID.Edge reporterID;
Expand Down Expand Up @@ -157,16 +161,14 @@ public int getRecordsSinceLastTag() {
}

public void sendReportIfDue(long now) {
if (this.reportIsDue(now)) {
if (this.reportIsDue()) {
this.sendReport(now);
this.reset(now);
}
}

private boolean reportIsDue(long now) {
return now - this.timeOfLastReport > OutputGateReporterManager.this.reportForwarder
.getConfigCenter().getAggregationInterval()
&& this.recordsEmittedSinceLastReport > 0
private boolean reportIsDue() {
return this.recordsEmittedSinceLastReport > 0
&& this.outputBuffersSentSinceLastReport > 0
&& this.outputBufferLifetimeSampler.hasSample();
}
Expand Down Expand Up @@ -223,22 +225,41 @@ public void outputBufferSent(long currentAmountTransmitted) {
this.outputBuffersSentSinceLastReport++;
this.currentAmountTransmitted = currentAmountTransmitted;
this.outputBufferLifetimeSampler.outputBufferSent();
sendReportIfDue(System.currentTimeMillis());
sendReportsIfDue(System.currentTimeMillis());
}

public void outputBufferAllocated() {
this.outputBufferLifetimeSampler.outputBufferAllocated();
}
}

public OutputGateReporterManager() {
}

public OutputGateReporterManager(QosReportForwarderThread qosReporter,
int noOfOutputChannels) {
initReporter(qosReporter, noOfOutputChannels);
}

public void initReporter(QosReportForwarderThread qosReporter, int noOfOutputChannels) {
this.reportForwarder = qosReporter;
this.reporters = new HashSet<QosReporterID>();
this.reportersByChannelIndexInRuntimeGate = new CopyOnWriteArrayList<OutputChannelChannelStatisticsReporter>();
Collections.addAll(this.reportersByChannelIndexInRuntimeGate,
new OutputChannelChannelStatisticsReporter[noOfOutputChannels]);
this.reportTimer = new ReportTimer(this.reportForwarder.getConfigCenter().getAggregationInterval());
setReporter(true);
}

public void sendReportsIfDue(long now) {
if (reportTimer.reportIsDue()) {
for (OutputChannelChannelStatisticsReporter reporter : reportersByChannelIndexInRuntimeGate) {
if (reporter != null) {
reporter.sendReportIfDue(now);
}
}
reportTimer.reset(now);
}
}

public void recordEmitted(int channelIndex, AbstractTaggableRecord record) {
Expand Down
Loading