Skip to content
Björn Lohrmann edited this page Jul 10, 2014 · 2 revisions

Dynamic task chaining is an optimization applied dynamically at runtime by Stratosphere Streaming. The goal is to eliminate the overhead and delay of handing over records between task threads running within the same task manager. This is achieved by sequentially executing of the user code of these tasks within one thread (instead of one thread per task). For this to be possible, the user code needs to be wrapped in a function to be invoked once per input record. These methods are called "chainable". The IocTask class is an extension of the AbstractTask class and offers the user the possibility to specify chainable methods.

The following example will demonstrate how to use IocTask (and some annotations) to create a simple task.

Creating a simple Task

First of all create a new class which inherits from the IocTask class and override the method setup().

public class SimpleIocTask extends IocTask {
  @Override
  protected void setup() {
    initReader(0, StringRecord.class);
    initWriter(0, StringRecord.class);
  }
}

The method setup() should be used to initialize your readers and writers with the methods initReader(int, Class) and initWriter(int, Class), respectively.

The initialization methods take an index and a class as input and initialize a RecordReader (or RecordWriter) with the given index and of the given type.

It is important to note that the readers and writers should be initialized with the indices in order starting by 0.

This, for example, is not allowed and will throw an IllegalConfigurationException:

protected void setup() {
  initReader(1, StringRecord.class);
  initReader(0, StringRecord.class);
  initWriter(0, StringRecord.class);
}

Defining user methods

So far we created a class which can't do anything (how useful). So now let's look at how to add some user methods.

User methods have arbitrary names (except ones defined in the superclass) and take a Record as the first argument (according to the type of the initialized reader) and any number of Collector<Record> as the last arguments (according to the type of the initialized writer).

Let's extend our SimpleIocTask:

public class SimpleIocTask extends IocTask {
  @Override
  protected void setup() {
    initReader(0, StringRecord.class);
    initWriter(0, StringRecord.class);
  }

  public void execute(StringRecord record, Collector<StringRecord> out) {
    out.collect(new StringRecord("SIOCT: " + record.toString()));
  }
}

So far the user method would do a simple transformation of the incoming record and send it on with the collector.

The problem is that this method won't be called as our SimpleIocTask hasn't defined how to do so (duh!). To express a mapping between a reader, writers and a user method there exists the following annotation:

@ReadFromWriteTo(int readerIndex, int[] writerIndices)

Now let's use this annotation:

public class SimpleIocTask extends IocTask {
  @Override
  protected void setup() {
    initReader(0, StringRecord.class);
    initWriter(0, StringRecord.class);
  }

  @ReadFromWriteTo(readerIndex = 0, writerIndices = 0)
  public void execute(StringRecord record, Collector<StringRecord> out) {
    out.collect(new StringRecord("SIOCT: " + record.toString()));
  }
}

Note that readerIndex and writerIndices are the indices of your initialized readers and writers, respectively.

If you need to map to more than one writer you can use pass an array to writerIndices, of course. The user method would then look like this:

public class SimpleIocTask extends IocTask {
  @Override
  protected void setup() {
    initReader(0, StringRecord.class);
    initWriter(0, StringRecord.class);
    initWriter(1, StringRecord.class);
  }

  @ReadFromWriteToMultiple(readerIndex = 0, writerIndices = {0, 1})
  public void execute(StringRecord record, Collector<StringRecord> out1, Collector<StringRecord> out2) {
    out1.collect(new StringRecord("SIOCT1: " + record.toString()));
    out2.collect(new StringRecord("SIOCT2: " + record.toString()));
  }
}

Defining finish methods

One problem that arise with this approach of inversion of control is that you won't notice when you receive the last record. For this purpose there exists one more annotation:

@LastRecordReadFromWriteTo(int readerIndex, int[] writerIndices)

With these annotations you can annotate a method which will be called after the reader with the specified readerIndex has no more input to follow. This user method takes any number of Collector<Record> as the arguments (as specified by writerIndices).

Now let's write a method which will count the number of records received and then print it out:

public class SimpleIocTask extends IocTask {
  private final AtomicInteger counter = new AtomicInteger(0);

  @Override
  protected void setup() {
    initReader(0, StringRecord.class);
    initWriter(0, StringRecord.class);
  }

  @ReadFromWriteTo(readerIndex = 0, writerIndices = 0)
  public void execute(StringRecord record, Collector<StringRecord> out) {
    counter.incrementAndGet();
    out.collect(new StringRecord("SIOCT: " + record.toString()));
  }

  @LastRecordReadFromWriteTo(readerIndex = 0, writerIndices = 0)
  public void finish(Collector<StringRecord> out) {
    out.collect(new StringRecord("Received " + counters.get() + " records from reader 0!"));
  }
}

Last but not least you can override the shutdown() method which will be called after the whole task is done (all readers finished).

public class SimpleIocTask extends IocTask {
  private final AtomicInteger counter = new AtomicInteger(0);

  @Override
  protected void setup() {
    initReader(0, StringRecord.class);
    initWriter(0, StringRecord.class);
  }

  @ReadFromWriteTo(readerIndex = 0, writerIndices = 0)
  public void execute(StringRecord record, Collector<StringRecord> out) {
    counter.incrementAndGet();
    out.collect(new StringRecord("SIOCT: " + record.toString()));
  }

  @LastRecordReadFromWriteTo(readerIndex = 0, writerIndices = 0)
  public void finish(Collector<StringRecord> out) {
    out.collect(new StringRecord("Received " + counters.get() + " records from reader 0!"));
  }

  @Override
  protected void shutdown() {
    System.out.println("SIOCT: I'm done!");
  }
}

Clone this wiki locally