Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package nextflow.script
import java.nio.file.Path
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Session
import nextflow.Nextflow
import nextflow.extension.CH
import nextflow.extension.DumpHelper
import nextflow.script.params.EnvInParam
import nextflow.script.params.FileInParam
Expand Down Expand Up @@ -118,7 +121,7 @@ class ProcessEntryHandler {

// print process output directly if it is a single expression
if( output.size() == 1 && (output.getNames().isEmpty() || output.getNames().first() == '$out') ) {
result = (output[0] as DataflowVariable).get()
result = readChannelValue(output[0] as DataflowWriteChannel)
}

// otherwise, construct map of process emits
Expand All @@ -135,7 +138,7 @@ class ProcessEntryHandler {
final combinedOutputs = new LinkedHashMap<String, Object>(output.size())
for( final ch : output ) {
final name = reverseLookup.get(ch)
combinedOutputs.put(name, (ch as DataflowVariable).get())
combinedOutputs.put(name, readChannelValue(ch as DataflowWriteChannel))
}

result = combinedOutputs
Expand All @@ -144,6 +147,18 @@ class ProcessEntryHandler {
println DumpHelper.prettyPrintJson(result)
}

/**
* Reads a single value from a channel, handling both value channels
* ({@link DataflowVariable}) and queue channels ({@link groovyx.gpars.dataflow.DataflowBroadcast}).
*/
private static Object readChannelValue(DataflowWriteChannel ch) {
if( CH.isValue(ch) ) {
return (ch as DataflowVariable).get()
}
final readCh = CH.getReadChannel(ch)
return (readCh as DataflowReadChannel).getVal()
}

/**
* Gets the input arguments for a process by parsing input parameter structures
* and mapping them from session.params, supporting dot notation for complex inputs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package nextflow.script

import java.nio.file.Path
import groovyx.gpars.dataflow.DataflowBroadcast
import groovyx.gpars.dataflow.DataflowVariable
import nextflow.Session
import nextflow.script.params.FileInParam
import nextflow.script.params.InParam
Expand Down Expand Up @@ -231,4 +233,42 @@ class ProcessEntryHandlerTest extends Specification {
'/path/to/file1.txt,,/path/to/file2.txt, ,/path/to/file3.txt' | [Path.of('/path/to/file1.txt'), Path.of('/path/to/file2.txt'), Path.of('/path/to/file3.txt')]
'file1.txt,file2.txt' | [Path.of('file1.txt').toAbsolutePath(), Path.of('file2.txt').toAbsolutePath()]
}

def 'should read value from DataflowVariable channel' () {
given:
def session = Mock(Session)
def script = Mock(BaseScript)
def meta = Mock(ScriptMeta)
def handler = new ProcessEntryHandler(script, session, meta)
and:
def ch = new DataflowVariable()
ch.bind('hello')

expect:
handler.readChannelValue(ch) == 'hello'
}

def 'should read value from DataflowBroadcast channel' () {
given:
def session = Mock(Session)
def script = Mock(BaseScript)
def meta = Mock(ScriptMeta)
def handler = new ProcessEntryHandler(script, session, meta)
and:
def ch = new DataflowBroadcast()

when:
// simulate production flow: readChannelValue subscribes, then data arrives on another thread
def result = null
def thread = Thread.start {
result = handler.readChannelValue(ch)
}
// allow read channel to be created before binding
sleep(100)
ch.bind('world')
thread.join(5000)

then:
result == 'world'
}
}
Loading