Skip to content
Björn Lohrmann edited this page May 8, 2014 · 2 revisions

The IoCTask class is an extension of the AbstractTask class and offers the invocation of user methods on demand, hence provides an inversion of control.

The following example will demonstrate how to use IoCTask (and some annotations) to create a simple task.

Creating a simple Task

First of all create a new class which inherits from the IoCTask class and override the method setup().

public class SimpleIoCTask extends IoCTask {
  protected void setup() {
    initReader(0, StringRecord.class);
    initWriter(0, StringRecord.class);
  }
}

The method setup() should be used to initialize your readers and writers with the methods initReader(int, Class) and initWriter(int, Class), respectively.

The initialization methods take an index and a class as input and initialize a RecordReader (or RecordWriter) with the given index and of the given type.

It is important to note that the readers and writers should be initialized with the indices in order starting by 0.

This, for example, is not allowed and will throw an IllegalConfigurationException:

protected void setup() {
  initReader(1, StringRecord.class);
  initReader(0, StringRecord.class);
  initWriter(0, StringRecord.class);
}

Defining user methods

So far we created a class which can't do anything (how useful). So now let's look at how to add some user methods.

User methods have arbitrary names (except ones defined in the superclass) and take a Record as the first argument (according to the type of the initialized reader) and any number of Collector<Record> as the last arguments (according to the type of the initialized writer).

Let's extend our SimpleIocTask:

public class SimpleIoCTask extends IoCTask {
  protected void setup() {
    initReader(0, StringRecord.class);
    initWriter(0, StringRecord.class);
  }

  public void execute(StringRecord record, Collector<StringRecord> out) {
    out.emit(new StringRecord("SIOCT: " + record.toString()));
  }
}

So far the user method would do a simple transformation to an incoming record and send it on with the collector.

The problem is that this method won't be called as our SimpleIoCTask hasn't defined how to do so (duh!). To express a mapping between a reader, writers and a user method there exist the following two annatations:

@ReadFromWriteTo(int readerIndex, int writerIndex);
@ReadFromWriteToMultiple(int readerIndex, int[] writerIndices);

where @ReadFromWriteTo is just a special case of @ReadFromWriteToMultiple mapping to only one writer.

Now let's use these annotation:

public class SimpleIoCTask extends IoCTask {
  protected void setup() {
    initReader(0, StringRecord.class);
    initWriter(0, StringRecord.class);
  }

  @ReadFromWriteTo(readerIndex = 0, writerIndex = 0);
  public void execute(StringRecord record, Collector<StringRecord> out) {
    out.emit(new StringRecord("SIOCT: " + record.toString()));
  }
}

Note that readerIndex and writerIndex are the indices of your initialized readers and writers, respectively.

If you need to map to more than one writer you can use @ReadFromWriteToMultiple, of course. The user method would then look like this:

public class SimpleIoCTask extends IoCTask {
  protected void setup() {
    initReader(0, StringRecord.class);
    initWriter(0, StringRecord.class);
    initWriter(1, StringRecord.class);
  }

  @ReadFromWriteToMultiple(readerIndex = 0, writerIndices = {0, 1});
  public void execute(StringRecord record, Collector<StringRecord> out1, Collector<StringRecord> out2) {
    out1.emit(new StringRecord("SIOCT1: " + record.toString()));
    out2.emit(new StringRecord("SIOCT2: " + record.toString()));
  }
}

Defining finish methods

One problem that arise with this approach of inversion of control is that you won't notice when you receive the last record. For this purpose there exist two more annotations:

@LastRecordReadFromWriteTo(int readerIndex, int writerIndex);
@LastRecordReadFromWriteToMultiple(int readerIndex, int[] writerIndices);

With these annotations you can annotate a method which will be called after the reader with the specified readerIndex has no more input to follow. This user method takes any number of Collector<Record> as the arguments (as specified by writerIndex or writerIndices).

Now let's write a method which will count the number of records received and then print it out:

public class SimpleIoCTask extends IoCTask {
  private final AtomicInteger counter = new AtomicInteger(0);

  protected void setup() {
    initReader(0, StringRecord.class);
    initWriter(0, StringRecord.class);
  }

  @ReadFromWriteTo(readerIndex = 0, writerIndex = 0);
  public void execute(StringRecord record, Collector<StringRecord> out) {
    counter.incrementAndGet();
    out.emit(new StringRecord("SIOCT: " + record.toString()));
  }

  @LastRecordReadFromWriteTo(readerIndex = 0, writerIndex = 0);
  public void finish(Collector<StringRecord> out) {
    System.out.println("Received " + counters.get() + " records!");
    out.emit(new StringRecord("SIOCT: I'm done!"));
  }
}

Clone this wiki locally