Step 12 - Map/reduce in NextFlow
Let’s implement a simple map/reduce schema with what we developed above. Until now, we basically covered the mapping stage: starting from 3 independent number, execute a function on each branch individually. Now, we want to calculate the sum at the end (reduce phase).
We do this by adding a process
to the example in Step 10
// Step - 12
process process_step12 {
input:
tuple val(id), val(input), val(term)
output:
tuple val("${id}"), val(output), val("${term}")
exec:
output = input.sum()
}
workflow step12 {
Channel.from( [ 1, 2, 3 ] ) \
| map{ el -> [ el.toString(), el, 10 ] } \
| process_step10a \
| toList \
| map{
[
"sum",
it.collect{ id, value, config -> value },
[ : ]
] } \
| process_step12 \
| view{ [ it[0], it[1] ] }
}
Running this yields:
> nextflow -q run . -entry step12
WARN: DSL 2 IS AN EXPERIMENTAL FEATURE UNDER DEVELOPMENT -- SYNTAX MAY CHANGE IN FUTURE RELEASE
[sum, 36]
A few remarks are in order here:
- We use the
toList
operator on the output ofprocess_step10a
. This can be regarded as merging the 3 parallel branches into one branch. The result has the signatureChannel[List[Triplet]]
. ThistoList
operator only outputs something on the outputChannel
when all incoming branches have finished and the merge can effectively be performed. - It’s important to note that what is passed through the pipe is still a
Channel
, only the number of branches, nodes, or whatever you want to call it differs. - The long
map{ [ "sum", ... }
line may seem complex at first, but it’s really not. We take inList[Triplet]
and convert this toTriplet
. The first element of the triplet is just an identifier (sum
). The last is the configuration map, but we don’t need configuration for the sum. As the second element we want to obtainList[Int]
, where the values are the 2nd element from the original triplets. The Groovy functioncollect
on an array is likemap
in many other languages.
The marble diagram can be depicted conceptually as follows, where we note that in effect it’s triplets rather than numbers that are contained in the marbles:
{width=”50%”}
Please note that though we define the pipeline sequentially, the 3 numbers are first handled in parallel and only combined when calling toList
. Stated differently, parallelism comes for free when defining workflows like this.
<!--
Marble spec, to be used on swirly.dev to reproduce the figure
-abc-------------|
a := 1
b := 2
c := 3
> process_10a
--def------------|
d := 11
e := 12
f := 13
> toList
-----g-----------|
g := [11, 12, 13]
> process_step12
------g----------|
g := 36
-->