Skip to content

Cannot merge already-split MultiPipes #23

@jtc1246

Description

@jtc1246

Hello, I want to ask whether is it able to merge some already-split MultiPipes. Currently if I try to merge 2 split MultiPipes, it will show "WindFlow Error: empty MultiPipe cannot be merged" in prepareMergeSet function. But if I add a map operator before the merge of 2 pipes, it will show "WindFlow Error: the requested merge operation is not supported" in execute_Merge in pipegraph.hpp.

We want to know whether there is any method to resolve this problem and merge the pipes after splitting. This is very helpful to us. Thank you.

Following is the code I used:

#include "WindFlow/wf/windflow.hpp"
#include <iostream>
#include <string>
#include <vector>

using namespace std;
using namespace wf;

int data_count = 20;

struct TestData {
    int key;
    int data;
    int source_id;
};

vector<TestData> result_1;

class TestDataSource1 {
   public:
    void operator()(Source_Shipper<TestData>& shipper) {
        for (int i = 0; i < data_count; i++) {
            TestData data;
            data.key = i % 5;
            data.data = i * 2;
            data.source_id = 1;
            shipper.pushWithTimestamp(data, i * 1000000L);
        }
        cout << "data scource 1 finished" << endl;
    }
};

class TestDataSource2 {
   public:
    void operator()(Source_Shipper<TestData>& shipper) {
        for (int i = 0; i < data_count; i++) {
            TestData data;
            data.key = i % 5;
            data.data = i * 3;
            data.source_id = 2;
            shipper.pushWithTimestamp(data, i * 1000000L);
        }
        cout << "data scource 2 finished" << endl;
    }
};

void build_graph(PipeGraph& graph) {
    MultiPipe& pipe1 = graph.add_source(Source_Builder(TestDataSource1()).withParallelism(1).withOutputBatchSize(4).build());
    MultiPipe& pipe2 = graph.add_source(Source_Builder(TestDataSource2()).withParallelism(1).withOutputBatchSize(4).build());

    auto split_func = [](const TestData& data) -> int { return (data.key >= 2 ? 1 : 0); };
    auto sink_to_vector = [](optional<reference_wrapper<TestData>> out) {
        if (out) {
            result_1.push_back(out->get());
        } else {
            cout << "Sink 1 finished, received " << result_1.size() << " items." << endl;
        }
    };
    auto sink_to_stdout = [](optional<reference_wrapper<TestData>> out) {
        if (out) {
            cout << "Sink 2 received data: key " << out->get().key << ", data " << out->get().data << ", source_id " << out->get().source_id << endl;
        } else {
            cout << "Sink 2 finished." << endl;
        }
    };

    pipe1.split(split_func, 2);
    pipe2.split(split_func, 2);
    pipe1.select(0).add_sink(Sink_Builder(sink_to_stdout).withParallelism(1).build());
    pipe2.select(0).add_sink(Sink_Builder(sink_to_stdout).withParallelism(1).build());

    MultiPipe& pipe1_new = pipe1.select(1);
    MultiPipe& pipe2_new = pipe2.select(1);
    // pipe1_new.add(Map_Builder([](const TestData& in) -> TestData {
    //     return in;
    // }).withParallelism(1).withOutputBatchSize(4).build());
    // pipe2_new.add(Map_Builder([](const TestData& in) -> TestData {
    //     return in;
    // }).withParallelism(1).withOutputBatchSize(4).build());
    MultiPipe& merged = pipe1_new.merge(pipe2_new);
    merged.add_sink(Sink_Builder(sink_to_vector).withParallelism(1).build());
}

int main() {
    // cout << "test" << endl;
    PipeGraph graph("test_graph", wf::Execution_Mode_t::DEFAULT, wf::Time_Policy_t::EVENT_TIME);
    build_graph(graph);
    int result = graph.run();
    cout << "Graph run finished, result: " << result << endl;
    for (TestData& data : result_1) {
        cout << "Result data: key " << data.key << ", data " << data.data << ", source_id " << data.source_id << endl;
    }
    return result;
}

// g++ test.cpp -I ./WindFlow/wf -I ./fastflow -o test -DNO_DEFAULT_MAPPING

The output is:

jtc@jiaotianchengs-MacBook-Pro:~/Desktop/windflow$ g++ test.cpp -I ./WindFlow/wf -I ./fastflow -o test -DNO_DEFAULT_MAPPING
jtc@jiaotianchengs-MacBook-Pro:~/Desktop/windflow$ ./test 
WindFlow Error: empty MultiPipe cannot be merged
jtc@jiaotianchengs-MacBook-Pro:~/Desktop/windflow$ g++ test.cpp -I ./WindFlow/wf -I ./fastflow -o test -DNO_DEFAULT_MAPPING # uncomment the code of adding map
jtc@jiaotianchengs-MacBook-Pro:~/Desktop/windflow$ ./test                                                                  
WindFlow Error: the requested merge operation is not supported
jtc@jiaotianchengs-MacBook-Pro:~/Desktop/windflow$ 

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions