Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package nextflow.script
import java.nio.file.Path
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.DataflowReadChannel
import nextflow.Session
import nextflow.Nextflow
import nextflow.extension.CH
import nextflow.extension.DumpHelper
import nextflow.script.params.EnvInParam
import nextflow.script.params.FileInParam
Expand Down Expand Up @@ -88,8 +89,11 @@ class ProcessEntryHandler {
// Get input parameter values and execute the process
final inputArgs = getProcessArguments(processDef)
final output = meta.getProcess(processName).run(inputArgs as Object[]) as ChannelOut
// Create read channels now, before the dataflow network fires,
// so they are subscribed before values are emitted
final readChannels = createReadChannels(output)
session.addIgniter {
printOutput(processName, output)
printOutput(processName, output, readChannels)
}
return output
}
Expand All @@ -103,12 +107,18 @@ class ProcessEntryHandler {
}

/**
* Prints the process outputs.
*
* @param processName
* @param output
* Creates read channels for all output channels before the dataflow network fires.
* This ensures broadcast channels have subscribers before values are emitted.
*/
private void printOutput(String processName, ChannelOut output) {
protected static List<DataflowReadChannel> createReadChannels(ChannelOut output) {
final result = new ArrayList<DataflowReadChannel>(output.size())
for( final ch : output ) {
result.add(CH.getReadChannel(ch))
}
return result
}

private void printOutput(String processName, ChannelOut output, List<DataflowReadChannel> readChannels) {
if( output.isEmpty() ) {
log.debug("Process ${processName} does not declare any outputs")
return
Expand All @@ -118,7 +128,7 @@ class ProcessEntryHandler {

// print process output directly if it is a single expression
if( output.size() == 1 && (output.getNames().isEmpty() || output.getNames().first() == '$out') ) {
result = (output[0] as DataflowVariable).get()
result = readChannels[0].getVal()
}

// otherwise, construct map of process emits
Expand All @@ -133,9 +143,9 @@ class ProcessEntryHandler {

// combine process emits into map
final combinedOutputs = new LinkedHashMap<String, Object>(output.size())
for( final ch : output ) {
final name = reverseLookup.get(ch)
combinedOutputs.put(name, (ch as DataflowVariable).get())
for( int i = 0; i < output.size(); i++ ) {
final name = reverseLookup.get(output[i])
combinedOutputs.put(name, readChannels[i].getVal())
}

result = combinedOutputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
package nextflow.script

import java.nio.file.Path
import groovyx.gpars.dataflow.DataflowBroadcast
import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.Session
import nextflow.extension.CH
import nextflow.script.params.FileInParam
import nextflow.script.params.InParam
import nextflow.script.params.TupleInParam
Expand Down Expand Up @@ -212,6 +218,25 @@ class ProcessEntryHandlerTest extends Specification {
tupleElements[1].toString().contains('file.fa')
}

def 'should create read channels from mixed channel types' () {
given:
'createReadChannels handles DataflowVariable, DataflowQueue, and DataflowBroadcast'
def var = new DataflowVariable()
var.bind('value1')
def broadcast = new DataflowBroadcast()
def output = new ChannelOut([var, broadcast] as List<DataflowWriteChannel>)

when:
def readChannels = ProcessEntryHandler.createReadChannels(output)

then:
readChannels.size() == 2
// DataflowVariable is returned as-is (it implements DataflowReadChannel)
readChannels[0].getVal() == 'value1'
// DataflowBroadcast is converted to a read channel via createReadChannel
readChannels[1] != null
}

def 'should parse file input correctly' () {
given:
def session = Mock(Session)
Expand Down
Loading