Link Search Menu Expand Document

Step 20 - The order of events in a stream

We touch upon a point that we have encountered but not really considered in-depth: the order of things in the Channel or stream. We’ve noticed that the order is not predictable and we’ve discussed that this is to be expected. In general, the duration of a process step may depend on the data or the number of resources available at the time of running. Also, the example where we joined the different parallel branches (Step 12 - REF) was independent of the order because it just calculated the sum.

Another consequence of the undetermined order of events is the fact that during a join or reduce phase (for instance with toList), the resulting order is undetermined and this messes up the caching functionality of NextFlow.

Let us give an example with a reduce process that does depend on the order of events. We divide the first element from the map phase by the second one:

// Step - 20a
process process_step20 {
    input:
        tuple val(id), val(input), val(term)
    output:
        tuple val("${id}"), val(output), val("${term}")
    exec:
        output = input[0] / input[1]
}
workflow step20a {
    Channel.from( [ 1, 2 ] ) \
        | map{ el -> [ el.toString(), el, 10 ] } \
        | process_step10a \
        | toList \
        | map{ [
                  "sum",
                  it.collect{ id, value, config -> value },
                  [ : ]
               ] } \
        | process_step20 \
        | view{ [ it[0], it[1] ] }
}

If you run this code like this, you get something like this when launching multiple times:

> nextflow -q run . -entry step20a -with-docker
WARN: DSL 2 IS AN EXPERIMENTAL FEATURE UNDER DEVELOPMENT -- SYNTAX MAY CHANGE IN FUTURE RELEASE
[sum, 0.9166666667]
> nextflow -q run . -entry step20a -with-docker
WARN: DSL 2 IS AN EXPERIMENTAL FEATURE UNDER DEVELOPMENT -- SYNTAX MAY CHANGE IN FUTURE RELEASE
[sum, 1.0909090909]

As an illustration, I’ve added the -with-docker option.

Luckily, there is a variant of the toList channel operator that takes into account sorting: toSortedList. There are other operators as well, but we leave it as an exercise to look those up. The workflow code above simply becomes:

// Step - 20b
workflow step20b {
    Channel.from( [ 1, 2 ] ) \
        | map{ el -> [ el.toString(), el, 10 ] } \
        | process_step10a \
        | toSortedList{ a,b -> a[0] <=> b[0] } \
        | map{ [ "sum", it.collect{ id, value, config -> value }, [ : ] ] } \
        | process_step20 \
        | view{ [ it[0], it[1] ] }
}

In this example, we sort (alphabetically) on the id in the triplet.