Skip to content
This repository was archived by the owner on Jan 31, 2025. It is now read-only.
This repository was archived by the owner on Jan 31, 2025. It is now read-only.

NullPointerException in WriteBufferedP.init() when using ProcessorWrapper #2159

Open
@TomaszGaweda

Description

@TomaszGaweda

Hi,

when using ProcessorWrapper and eg. list sink, Following exception is thrown:

Caused by: java.lang.NullPointerException
	at com.hazelcast.jet.impl.connector.WriteBufferedP.init(WriteBufferedP.java:67)
	at com.hazelcast.jet.impl.processor.ProcessorWrapper.init(ProcessorWrapper.java:80)
	at com.hazelcast.jet.impl.processor.ProcessorWrapper.init(ProcessorWrapper.java:80)
	at com.hazelcast.jet.impl.execution.ProcessorTasklet.init(ProcessorTasklet.java:228)
	at com.hazelcast.jet.impl.execution.TaskletExecutionService$BlockingWorker.run(TaskletExecutionService.java:289)
	... 5 more

Reproduction code:

class ReproTest {

    static JetInstance jet = Jet.newJetInstance();
    static IMap<String, Integer> inputMap;
    static IList<Integer> outputList;

    @BeforeAll
    static void setUp() {
        inputMap = jet.getMap("testInputMap");
        outputList = jet.getList("outputList");
    }
    
    @Test
    void reproduction() {
        // given
        for (int i = 0; i < 10; i++) {
            inputMap.put("" + i, i);
        }
        var pipeline = Pipeline.create();
        pipeline.readFrom(Sources.map(inputMap))
                .map(e -> e.getValue()).setLocalParallelism(4)
                .writeTo(Sinks.list(outputList));
        DAG dag = pipeline.toDag();
        for (Vertex vertex : dag) {
            vertex.updateMetaSupplier(ps -> new WrappingProcessorMetaSupplier(ps,
                    proc -> new TestWrapper(proc)));
        }

        // when
        Job job = jet.newJob(dag);
        job.join();

        // then
        var list = List.copyOf(outputList);
        assertThat(list).hasSize(inputMap.size());
    }

    static class TestWrapper extends ProcessorWrapper {
        TestWrapper(Processor proc) {
            super(proc);
        }
        @Override
        public void process(int ordinal, Inbox inbox) {
                    getWrapped().process(ordinal, inbox);
        }
    }

}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions