-
Notifications
You must be signed in to change notification settings - Fork 22
Submission examples
Let's write a graph to execute a single command.
# coding: utf-8
from puliclient import Task, Graph
# First we create a graph
graph = Graph( 'a simple graph' )
# To define a Task, we need 3 arguments :
# - its name
# - a runner is a python class that defines the workflow execution for a given job type.
# Here, we will use the default runner which simply executes a command line.
# - an arguments dict
name = "list job"
runner = "puliclient.jobs.DefaultCommandRunner"
arguments = {"cmd": "ls"}
# Then add a new task to the graph
graph.addNewTask( name, runner=runner, arguments=arguments )
# Finally submit the graph to the server
graph.submit("puliserver", 8004)Let's write a graph to execute a sequence of commands.
This is useful to repeat a given process with several attributes changes. For instance we might want to render frame sequence for an animated movie.
# coding: utf-8
from puliclient import Task, Graph
# Common properties for a job are:
# - cmd: String indicating command line to execute
# - prod/shot: Strings indicating the production and shot names
# - start/end: Integers indicating start and end frame number when rendering a sequence
# - packetSize: An integer indicating the number of frames to "group" for rendering a sequence.
# Each group of frame will be rendered on a single node
defaultArgs = { "cmd":"sleep 30", "start":1, "end":10, "packetSize":1, "prod":"test", "shot":"test" }
defaultTags = { "prod":"test", "shot":"test" }
# A decomposer will analyze the arguments and decompose the process into one or several commands.
# DefaultTaskDecomposer will take into account: start, end and packetSize values to create
# several sub commands. Commands will go from start to end grouped by packetSize frames.
defaultDecomposer = "puliclient.jobs.DefaultTaskDecomposer"
# When creating a graph without a root task or taskgroup, a default taskgroup is created with the name
# of the graph
graph = Graph('simpleGraph', poolName='default', tags=defaultTags)
# A task is created, decomposed and have its resulting commands attached
graph.addNewTask( name="my task", arguments=defaultArgs, decomposer=defaultDecomposer, tags=defaultTags )
# Finally submit the graph to the server
graph.submit("pulitest", 8004)Let's create a graph with dependencies. The graph will have several tasks that will be executed in a specific order.
# coding: utf-8
from puliclient import Task, Graph
defaultArgs = { "cmd":"sleep 15", "start":1, "end":5, "packetSize":1, "prod":"test", "shot":"test" }
defaultTags = { "prod":"test", "shot":"test" }
defaultDecomposer = "puliclient.jobs.DefaultTaskDecomposer"
graph = Graph('simpleGraph', tags=defaultTags)
task1 = graph.addNewTask(name="task1", arguments=defaultArgs, tags=defaultTags, decomposer=defaultDecomposer)
task2 = graph.addNewTask(name="task2", arguments=defaultArgs, tags=defaultTags, decomposer=defaultDecomposer)
task3 = graph.addNewTask(name="task3", arguments=defaultArgs, tags=defaultTags, decomposer=defaultDecomposer)
task4 = graph.addNewTask(name="task4", arguments=defaultArgs, tags=defaultTags, decomposer=defaultDecomposer)
task5 = graph.addNewTask(name="task5", arguments=defaultArgs, tags=defaultTags, decomposer=defaultDecomposer)
task6 = graph.addNewTask(name="task6", arguments=defaultArgs, tags=defaultTags, decomposer=defaultDecomposer)
# Create a chain of dependencies, execution order will be:
# task1 > task4 > task3 > task2 > task5 > task6
# A dependency is considered valid when a specific status is reached for the target task.
# The default status value is DONE.
graph.addChain( [task1, task4, task3, task2, task5, task6] )
graph.submit(host="puliserver")Let's create a graph with dependencies. The graph shape will look like this:
A
/ \
B C
\ /
D
Note: In puli's GUI however, all tasks will be presented on the same level.
There is no consideration, here, of grouping tasks in a taskgroup.
In this example we will review several ways to add task or taskgroups.
- Create a task object and add it to the graph. Adding an elem to the graph can be done one elem at a time or with a list of elements)
- Create a task object directly on a graph (recommended)
# coding: utf-8
from puliclient import *
defaultArguments = { "cmd":"sleep 5", "start":1, "end":1, "packetSize":1, "prod":"test", "shot":"test" }
defaultTags = { "prod":"test", "shot":"test" }
defaultDecomposer = "puliclient.jobs.DefaultTaskDecomposer"
graph = Graph('my job', tags=defaultTags)
#
# Several ways to create and add nodes
#
# Create nodes detached from the graph
A = Task(name="A", arguments=defaultArguments, tags=defaultTags, decomposer=defaultDecomposer)
B = Task(name="B", arguments=defaultArguments, tags=defaultTags, decomposer=defaultDecomposer)
C = Task(name="C", arguments=defaultArguments, tags=defaultTags, decomposer=defaultDecomposer)
# and add them in a list
graph.addList( [A, B] )
# Or elem by elem
graph.add( C )
# Or add elem directly in the graph
D = graph.addNewTask( name="D", arguments=defaultArguments, tags=defaultTags, decomposer=defaultDecomposer)
# Create complex dependencies like a diamond shaped graph
# NB: default end status is [DONE]
# A
# / \
# B C
# \ /
# D
graph.addEdges( [
(A, B),
(A, C),
(B, D),
(C, D)
] )
graph.submit("pulitest", 8004)Let's create a graph composed of task groups and tasks. A taskgroup has no execution of itself, it can be viewed as a container for tasks. A task is an element composed of one or several commands.
The graph will have the following structure:
multi-level
|-taskgroup1
| |-task1.1
| |-task1.2
| `-task1.3
`-taskgroup2
|-task2.1
`-task2.2
Note: Despite the fact that task and task groups are nested, there are no dependencies between these tasks.
If puli finds enough available workers, all tasks might be processed concurrently.
There is no restriction for the depth of the resulting structure. Dependencies can be set between tasks and taskgroups without restriction.
# coding: utf-8
from puliclient import *
defaultArgs = { "cmd":"sleep 5", "start":1, "end":1, "packetSize":1, "prod":"test", "shot":"test" }
defaultTags = { "prod":"test", "shot":"test" }
graph = Graph('multi-level', tags=defaultTags)
tg1 = graph.addNewTaskGroup( name="taskgroup1" )
tg1.addNewTask( name="task1.1", arguments=defaultArgs, tags=defaultTags )
tg1.addNewTask( name="task1.2", arguments=defaultArgs, tags=defaultTags )
tg1.addNewTask( name="task1.3", arguments=defaultArgs, tags=defaultTags )
tg2 = graph.addNewTaskGroup( name="taskgroup2" )
tg2.addNewTask( name="task2.1", arguments=defaultArgs, tags=defaultTags )
tg2.addNewTask( name="task2.2", arguments=defaultArgs, tags=defaultTags )
graph.submit("pulitest", 8004)Let's review several graphs illustrating the types of dependencies that can be set. We have already seen the possibility of setting dependencies between tasks in A chain of tasks and A diamond shaped graph.
From puli's point of view, dependencies can only be set on a task, not a taskgroup.
During the creation of a graph however, dependencies can be set on task or taskgroups indifferently.
When the graph is submitted, a preprocess will identify the dependencies on taskgroups to resolve them in terms of tasks.
For instance, given the following structure:
task1
taskgroup1
- task2
- task3
The dependency:
taskgroup1 depends on task1
will, at submit time, become:
task2 depends on task1
task3 depends on task1
Here are the types of dependencies that we will check:
- a task waits for the end of another task
- a task waits for the end of a taskgroup
- a taskgroup waits for the end of a task
- a taskgroup waits for the end of a taskgroup
# coding: utf-8
from puliclient import Task, Graph
defaultArgs = { "cmd":"sleep 8", "start":1, "end":3, "packetSize":1, "prod":"test", "shot":"test" }
defaultTags = { "prod":"test", "shot":"test" }
#
# 1. a task waits for the end of another task
#
graph = Graph('1st case', tags=defaultTags)
task1 = graph.addNewTask( name="task1", arguments=defaultArgs, tags=defaultTags )
task2 = graph.addNewTask( name="task2", arguments=defaultArgs, tags=defaultTags )
graph.addEdges( [(task1, task2)] )
graph.submit("pulitest", 8004)
#
# 2. a task waits for the end of a taskgroup
#
graph = Graph('2nd case', tags=defaultTags)
tg1 = graph.addNewTaskGroup( name="TG1" )
tg1.addNewTask( name="task1", arguments=defaultArgs, tags=defaultTags )
tg1.addNewTask( name="task2", arguments=defaultArgs, tags=defaultTags )
task3 = graph.addNewTask( name="task3", arguments=defaultArgs, tags=defaultTags )
graph.addEdges( [(tg1, task3)] )
graph.submit("pulitest", 8004)
#
# 3. a taskgroup waits for the end of a task
#
graph = Graph('3rd case', tags=defaultTags)
tg1 = graph.addNewTaskGroup( name="TG1" )
tg1.addNewTask( name="task1", arguments=defaultArgs, tags=defaultTags )
tg1.addNewTask( name="task2", arguments=defaultArgs, tags=defaultTags )
task3 = graph.addNewTask( name="task3", arguments=defaultArgs, tags=defaultTags )
graph.addEdges( [ (task3, tg1) ] )
graph.submit("pulitest", 8004)
#
# 4. a taskgroup waits for the end of another taskgroup
#
graph = Graph('4th case', tags=defaultTags)
tg1 = graph.addNewTaskGroup( name="TG1" )
tg1.addNewTask( name="task1", arguments=defaultArgs, tags=defaultTags )
tg1.addNewTask( name="task2", arguments=defaultArgs, tags=defaultTags )
tg2 = graph.addNewTaskGroup( name="TG2" )
tg2.addNewTask( name="task3", arguments=defaultArgs, tags=defaultTags )
tg2.addNewTask( name="task4", arguments=defaultArgs, tags=defaultTags )
graph.addEdges( [(tg2, tg1)] )
# Dependencies between taskgroups are in fact dependencies between each subtasks of a group.
# This "process" is executed when submitting the graph.
graph.submit("pulitest", 8004)