Skip to content
This repository was archived by the owner on Sep 1, 2025. It is now read-only.

Commit bb29132

Browse files
authored
Merge pull request #4 from nextflow-io/3-create-a-channel-extension-point
feature: create a hello channel extension point
2 parents 22935dc + 11ab862 commit bb29132

File tree

10 files changed

+469
-4
lines changed

10 files changed

+469
-4
lines changed

README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
This project shows how to implement a simple Nextflow plugin named `nf-hello` that intercepts
44
workflow execution events to print a message when the execution starts and on workflow completion.
55

6+
Also, this plugin enriches the `channel` with a `producer` a `consumer` methods (`sayHello` and `goodbye`)
7+
allowing to include them into the script
8+
69
## Plugin assets
710
811
- `settings.gradle`
@@ -38,6 +41,18 @@ workflow execution events to print a message when the execution starts and on wo
3841

3942
The plugin unit tests.
4043

44+
## ExtensionPointS
45+
46+
ExtensionPoint is the basic interface who use nextflow-core to integrate plugins into it.
47+
It's only a basic interface and serves as starting point for more specialized extensions.
48+
49+
Among others, nextflow-core integrate following sub ExtensionPointS:
50+
51+
- `TraceObserverFactory` to provide a list of TraceObserverS
52+
- `ChannelExtensionPoint` to enrich the channel with custom methods
53+
54+
In this plugin you can find examples for both of them
55+
4156
## Compile & run unit tests
4257

4358
Run the following command in the project root directory (ie. where the file `settings.gradle` is located):

plugins/nf-hello/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,15 @@ sourceSets {
5353

5454
dependencies {
5555
// This dependency is exported to consumers, that is to say found on their compile classpath.
56-
compileOnly 'io.nextflow:nextflow:21.04.0'
56+
compileOnly 'io.nextflow:nextflow:22.04.0'
5757
compileOnly 'org.slf4j:slf4j-api:1.7.10'
5858
compileOnly 'org.pf4j:pf4j:3.4.1'
5959
// add here plugins depepencies
6060

6161
// test configuration
6262
testImplementation "org.codehaus.groovy:groovy:3.0.8"
6363
testImplementation "org.codehaus.groovy:groovy-nio:3.0.8"
64-
testImplementation 'io.nextflow:nextflow:21.04.0'
64+
testImplementation 'io.nextflow:nextflow:22.04.0'
6565
testImplementation ("org.codehaus.groovy:groovy-test:3.0.8") { exclude group: 'org.codehaus.groovy' }
6666
testImplementation ("cglib:cglib-nodep:3.3.0")
6767
testImplementation ("org.objenesis:objenesis:3.1")
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package nextflow.hello
2+
3+
import groovy.util.logging.Slf4j
4+
import groovyx.gpars.dataflow.DataflowReadChannel
5+
import groovyx.gpars.dataflow.DataflowWriteChannel
6+
import groovyx.gpars.dataflow.expression.DataflowExpression
7+
import nextflow.Channel
8+
import nextflow.Global
9+
import nextflow.Session
10+
import nextflow.extension.ChannelExtensionPoint
11+
import nextflow.extension.CH
12+
import nextflow.NF
13+
import nextflow.extension.DataflowHelper
14+
import nextflow.plugin.Scoped
15+
16+
import java.util.concurrent.CompletableFuture
17+
18+
/**
19+
* @author : jorge <[email protected]>
20+
*
21+
*/
22+
@Slf4j
23+
@Scoped('hello')
24+
class HelloExtension extends ChannelExtensionPoint{
25+
26+
/*
27+
* A session hold information about current execution of the script
28+
*/
29+
private Session session
30+
31+
/*
32+
* nf-core initializes the plugin once loaded and session is ready
33+
* @param session
34+
*/
35+
@Override
36+
protected void init(Session session) {
37+
this.session = session
38+
}
39+
40+
/*
41+
* reverse is a `producer` method and will be available to the script because:
42+
*
43+
* - it's public
44+
* - it returns a DataflowWriteChannel
45+
*
46+
* nf-core will inspect the extension class and allow the script to call all these kind of methods
47+
*
48+
* the method can require arguments but it's not mandatory, it depends of the business logic of the method
49+
*
50+
* business logic can write into the channel once ready and values will be consumed from it
51+
*/
52+
DataflowWriteChannel reverse(String message) {
53+
createReverseChannel(message)
54+
}
55+
56+
static String goodbyeMessage
57+
58+
/*
59+
* goodbye is a `consumer` method as it receives values from a channel to perform some logic.
60+
*
61+
* Consumer methods are introspected by nextflow-core and include into the DSL if the method:
62+
*
63+
* - it's public
64+
* - it returns a DataflowWriteChannel
65+
* - it has only one arguments of DataflowReadChannel class
66+
*
67+
* a consumer method needs to proporcionate 2 closures:
68+
* - a closure to consume items (one by one)
69+
* - a finalizer closure
70+
*
71+
* in this case `goodbye` will consume a message and will store it as an upper case
72+
*/
73+
DataflowWriteChannel goodbye(DataflowReadChannel source) {
74+
final target = CH.createBy(source)
75+
final next = {
76+
goodbyeMessage = "$it".toString().toUpperCase()
77+
target.bind(it)
78+
}
79+
final done = {
80+
target.bind(Channel.STOP)
81+
}
82+
DataflowHelper.subscribeImpl(source, [onNext: next, onComplete: done])
83+
target
84+
}
85+
86+
protected DataflowWriteChannel createReverseChannel(final String message){
87+
final channel = CH.create()
88+
if( NF.isDsl2() ){
89+
session.addIgniter { ->
90+
businessLogicHere(channel, message)
91+
}
92+
}else{
93+
businessLogicHere(channel, message)
94+
}
95+
channel
96+
}
97+
98+
/*
99+
* businessLogicHere will send, across the channel, the message reversed
100+
* and after will send an STOP signal to let know the channel it has been finished
101+
*/
102+
protected static businessLogicHere(final DataflowWriteChannel channel, final String message){
103+
def future = CompletableFuture.runAsync({
104+
channel.bind(message.reverse())
105+
channel.bind(Channel.STOP)
106+
})
107+
future.exceptionally(this.&handlerException)
108+
}
109+
110+
/*
111+
* an util class to trace exceptions
112+
*/
113+
static private void handlerException(Throwable e) {
114+
final error = e.cause ?: e
115+
log.error(error.message, error)
116+
final session = Global.session as Session
117+
session?.abort(error)
118+
}
119+
}

plugins/nf-hello/src/main/nextflow/hello/HelloPlugin.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package nextflow.hello
1818

1919
import groovy.transform.CompileStatic
2020
import nextflow.plugin.BasePlugin
21+
import nextflow.plugin.Scoped
2122
import org.pf4j.PluginWrapper
2223

2324
/**
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Manifest-Version: 1.0
22
Plugin-Id: nf-hello
3-
Plugin-Version: 0.1.0
3+
Plugin-Version: 0.2.0
44
Plugin-Class: nextflow.hello.HelloPlugin
55
Plugin-Provider: nextflow
6-
Plugin-Requires: >=21.04.0
6+
Plugin-Requires: >=22.04.0
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
nextflow.hello.HelloFactory
2+
nextflow.hello.HelloExtension
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package nextflow.hello
2+
3+
import groovyx.gpars.dataflow.DataflowQueue
4+
import nextflow.Channel
5+
import nextflow.Session
6+
import nextflow.extension.ChannelExtensionDelegate
7+
import spock.lang.Specification
8+
9+
10+
/**
11+
* @author : jorge <[email protected]>
12+
*
13+
*/
14+
class ChannelExtensionHelloTest extends Specification{
15+
16+
def "should create a channel from hello"(){
17+
18+
given:
19+
def session = Mock(Session)
20+
21+
and:
22+
def helloExtension = new HelloExtension(); helloExtension.init(session)
23+
24+
when:
25+
def result = helloExtension.reverse("Hi")
26+
27+
then:
28+
result.val == 'iH'
29+
result.val == Channel.STOP
30+
}
31+
32+
def "should consume a message from script"(){
33+
34+
given:
35+
def session = Mock(Session)
36+
37+
and:
38+
def helloExtension = new HelloExtension(); helloExtension.init(session)
39+
40+
and:
41+
def ch = new DataflowQueue()
42+
ch.bind('Goodbye folks')
43+
ch.bind( Channel.STOP )
44+
45+
when:
46+
def result = helloExtension.goodbye(ch)
47+
48+
then:
49+
result.val == 'Goodbye folks'
50+
result.val == Channel.STOP
51+
helloExtension.goodbyeMessage == 'Goodbye folks'.toUpperCase()
52+
}
53+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package nextflow.hello
2+
3+
import nextflow.Channel
4+
import nextflow.extension.ChannelExtensionDelegate
5+
import nextflow.plugin.Plugins
6+
import spock.lang.Specification
7+
import spock.lang.Timeout
8+
9+
10+
/**
11+
* @author : jorge <[email protected]>
12+
*
13+
*/
14+
@Timeout(10)
15+
class HelloDslTest extends Specification{
16+
17+
def setup () {
18+
ChannelExtensionDelegate.reloadExtensionPoints()
19+
}
20+
21+
def 'should perform a hi and create a channel' () {
22+
when:
23+
def SCRIPT = '''
24+
channel.hello.reverse('hi!')
25+
'''
26+
and:
27+
def result = new MockScriptRunner([:]).setScript(SCRIPT).execute()
28+
then:
29+
result.val == '!ih'
30+
result.val == Channel.STOP
31+
}
32+
33+
def 'should store a goodbye' () {
34+
when:
35+
def SCRIPT = '''
36+
channel
37+
.of('Bye bye folks')
38+
.goodbye()
39+
'''
40+
and:
41+
def result = new MockScriptRunner([:]).setScript(SCRIPT).execute()
42+
then:
43+
result.val == 'Bye bye folks'
44+
result.val == Channel.STOP
45+
46+
and:
47+
HelloExtension.goodbyeMessage == 'Bye bye folks'.toUpperCase()
48+
}
49+
}

0 commit comments

Comments
 (0)