Submission to batch systems
Introduction
The command rbbt workflow task
is the basic way to run workflow tasks in
Rbbt. It starts executing the workflow steps in order, following the dependency
tree, to build up the result. If you have access to a HPC resource you can make
use of this approach by allocating enough resources for your job and running
this command.
Executing commands with rbbt workflow task
enacts the entire workflow
controlled from a single process, which means that it can make use of some of
the advanced features in Rbbt, like streaming outputs across workflow steps.
However, this approach has an important problem, which is that for
heterogeneous workloads the allocation made might not necessarily reflect the
actual needs of the different steps. In other words, one must allocate enough
resources for the most expensive steps, which might mean that when computing
steps that do not require so many resources these might end up underused.
For illustration consider a standard genome alignment workflow such as this one
defined in the HTS
workflow:
HTS#BAM HTS#BAM_rescore HTS#BAM_sorted HTS#BAM_duplicates HTS#BAM_bwa HTS#uBAM HTS#mark_adapters
The mark_adapters
and uBAM
steps are single-threaded, while the BAM_bwa
and BAM_rescore
can make use of multiple CPUs. Clearly reserving multiple
CPUs for this workflow will start by underusing the resources during the
initial steps.
Orchestration by dependency blocks
To allow more flexibility Rbbt can easily break down the execution on a particular workflow by asking the execution of intermediate dependencies in any particular way, which allows the use of other orchestration devices to replace the native approach. In fact, a hybrid approach can be used so that the dependency tree can be split into blocks of dependencies, each executing their internal steps using by the native scheduler, but using some other orchestration device for each block.
In our alignment workflow for instance we can define one single-threaded block
doing the first two steps of mark_adapters
and uBAM
, another block doing
BAM_bwa
with multiple CPUs, other single-threaded one doing BAM_duplicates
and a final multi-threaded one for the final steps for sorting and re-scoring.
To help with this process, Rbbt provides a set of functionalities that help deploy these blocks in SLURM and LSF.
The HPC subsystem
Rbbt supports SLURM and LSF almost identically through the commands under rbbt
slurm
and rbbt lsf
, which have identical sub-commands. In fact, they are
exactly the same, and calling them through one or the other serves only to
indicate rbbt which tools to use. The rbbt hpc
is exactly like the other two
but attempts to auto-detect the batch system to use. We will use rbbt hpc
here for illustration.
HPC Task
The rbbt hpc task
command is just like rbbt workflow task
but with a few
new option flags that can control the characteristics of the allocation: CPUs,
time, queue, etc. It still does no orchestration, but it is still very useful
and important to understand. It works almost exactly as the rbbt workflow
task
version, except that if it needs to execute something it sends it to the
batch system instead of running it directly. If the workflow that is required
is already computed the result will be presented as usual, and other things
like examining provenance or consulting documentation also work exactly the
same.
When rbbt hpc task
does need to execute a job it creates a script that it
submits to the batch system, this script prepares a few things and calls rbbt
workflow task
. To organize this process for you, each submission is associated
with its own directory under ~/rbbt-batch/
, which contains the command
script, a file with the job id, and, as the job gets started and completed
several other files appear. The files are the following:
command.batch
: Script submitted to the batch system callingrbbt workflow task
job.id
: Job id in the batch systemdependencies.list
: List of other job ids that need to be completed successfully before starting this one (see HPC Orchestrate)canfail_dependencies.list
: List of other job ids that need to be completed before starting this one but are allowed to fail (see HPC Orchestrate)inputs_dir
: Inputs for therbbt workflow task
called insidecommand.batch
, they are equivalent to the ones specified inrbbt hpc task
but are compatible with using contained and hardened environments (see below)std.err
: STDERR ofcommand.batch
std.out
: STDOUT ofcommand.batch
, which should only be fromrbbt workflow task
env.vars
: Environment variables before callingrbbt workflow task
exit.status
: Exit status of therbbt workflow task
commandprocpath.sqlite3
: Procpath metrics (if procpath is used)procpath.cpu.svg
: Procpath CPU usage plot (if procpath is used)procpath.rss.svg
: Procpath Memory usage plot (if procpath is used)sync.log
: When work is done in contained or hardened environments (see below), log of synchronization of resultssync.status
: Exit status of sync process
You can prepare the submission script but avoid submitting it to the batch
system by using the flag --dry_run
, which will also print the script for your
inspection.
The command rbbt hpc list
examines the content of ~/rbbt-batch
to display
the jobs that have been issued and their status: done, queue, running, error,
or aborted. This command can also be used to display the parameters specified
to the batch system (CPUs, time, queue, etc), the last few lines of the STDERR,
progress of current task, procpath metrics, etc.
The command rbbt hpc clean
can be used to clean up directories in
~/rbbt-batch
. By default it clean only error
and aborted
submissions. To
monitor the progress of a particular submission you can use rbbt hpc tail
,
which will wait until the submission starts executing and then will tail its
STDERR. You can also use the --tail
directly in rbbt hpc task
to
immediately start tailing a new submission.
HPC Orchestrate
The command rbbt hpc orchestrate
is similar to rbbt hpc task
but uses an
additional option called --orchestration_rules
which specify how the workflow
will be broken down into blocks and deployed into the batch system. The rules
are specified in a YAML file and are processed by rbbt hpc orchestrate
to
build the different submissions. All blocks will be immediately submitted,
using the features provided by SLURM and LSF to stablish dependencies between
jobs to ensure that their execution is done in order and that if one job fails
other jobs depending on it been completed successfully are abandoned. The
--orchestation_rules
file also specifies batch system parameters like CPUs
and time, as well as config_keys
specific for each task.
Consider the following --orchestration_rules
file:
--- defaults: time: 10h log: 0 config_keys: >- spark false GATK, forget_dep_tasks true forget_dep_tasks, remove_dep_tasks recursive remove_dep_tasks, remove_dep false HTS#mutect2_filters, remove_dep false HTS#BAM_sorted HTS: BAM_rescore: task_cpus: 20 config_keys: >- shard true rescore, cpus 8 rescore, cpus 20 sort, samtools_sort true bam_sort, threads 15 samtools_sort_threads, max_mem 2G samtools_sort_max_mem BAM_sorted: task_cpus: 4 config_keys: >- samtools_sort true bam_sort, threads 4 samtools_sort_threads, max_mem 3G samtools_sort_max_mem BAM_orientation_model: task_cpus: 4 BAM_duplicates: task_cpus: 20 BAM_multiplex: task_cpus: 20 BAM_bwa: task_cpus: 20 config_keys: cpus 18 bwa mutect2_pre: task_cpus: 20 config_keys: shard true mutect2, cpus 8 mutect2 haplotype: task_cpus: 20 config_keys: shard true haplotype, cpus 10 haplotype strelka_pre: task_cpus: 20 config_keys: cpus 20 strelka muse: task_cpus: 20 config_keys: cpus 20 muse svABA: config_keys: cpus 20 svABA Sample: defaults: task_cpus: 1 skip: true chains: pre_align: workflow: HTS tasks: uBAM, mark_adapters task_cpus: 1 post_mutect2: workflow: HTS tasks: mutect2_filtered, mutect2_clean, mutect2 task_cpus: 4 post_strelka: workflow: HTS tasks: >- HTS#strelka_pre_indels, HTS#strelka_filtered_indels, HTS#strelka_filtered, HTS#strelka task_cpus: 2
This file defines first a set of default values: 2 hours for each job, log
level 0 (DEBUG
) and some config_keys
that deactivate spark for all GATK
functions, and indicate that dependency_tasks
[^footnote_dep_task] must be
forgotten and recursively erased except for mutect2_filters
and BAM_sorted
,
which I’m interested to keep around for debug purposes.
The next section override and extend the defaults for particular tasks from the
HTS
workflow. For the Sample
workflow tasks are generally
dependency_tasks
and are immediate, so they are assigned skip: true
, which
instructs the orchestrator to avoid making a submission with them alone, and
always try to piggyback them on another submission. The final section indicates
some explicit bundles of tasks that should be joined into the same submissions
since they require similar allocations. Joining different steps into the same
submission is used mainly to reduce the number of submissions made and the time
spent queuing, but it could also allow some of the advanced streaming features
across workflow tasks; although currently streaming features are not used on
the HTS
pipelines.
To see what the bundled look like you can use the --dry_run
flag. When used
with the rbbt hpc orchestrate
command it will list the submission bundles.
For instance for the ARGO-NEAT using the previous YAML file rbbt hpc
orchestrate Sample -W HTS mutect2 -jn ARGO-NEAT --dry_run --orchestration_rules
~/.rbbt/etc/slurm/varcalling.yaml
it shows the following submissions
Manifest: HTS#mark_adapters, HTS#uBAM - tasks: 1 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted Deps: Manifest: HTS#BAM_bwa - tasks: 20 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted,cpus 18 bwa Deps: HTS#mark_adapters Manifest: HTS#BAM_duplicates - tasks: 20 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted Deps: HTS#BAM_bwa Manifest: HTS#BAM_sorted - tasks: 4 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted,samtools_sort true bam_sort,threads 4 samtools_sort_threads,max_mem 3G samtools_sort_max_mem Deps: HTS#BAM_duplicates Manifest: Sample#BAM_normal, Sample#BAM, HTS#BAM, HTS#BAM_rescore - tasks: 20 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted,shard true rescore,cpus 8 rescore,cpus 20 sort,samtools_sort true bam_sort,threads 15 samtools_sort_threads,max_mem 2G samtools_sort_max_mem Deps: HTS#BAM_sorted Manifest: HTS#mark_adapters, HTS#uBAM - tasks: 1 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted Deps: Manifest: HTS#BAM_bwa - tasks: 20 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted,cpus 18 bwa Deps: HTS#mark_adapters Manifest: HTS#BAM_duplicates - tasks: 20 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted Deps: HTS#BAM_bwa Manifest: HTS#BAM_sorted - tasks: 4 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted,samtools_sort true bam_sort,threads 4 samtools_sort_threads,max_mem 3G samtools_sort_max_mem Deps: HTS#BAM_duplicates Manifest: Sample#BAM, HTS#BAM, HTS#BAM_rescore - tasks: 20 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted,shard true rescore,cpus 8 rescore,cpus 20 sort,samtools_sort true bam_sort,threads 15 samtools_sort_threads,max_mem 2G samtools_sort_max_mem Deps: HTS#BAM_sorted Manifest: HTS#BAM_pileup_sumaries - tasks: 1 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted Deps: Sample#BAM Manifest: HTS#BAM_pileup_sumaries - tasks: 1 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted Deps: Sample#BAM_normal Manifest: HTS#contamination - tasks: 1 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted Deps: HTS#BAM_pileup_sumaries, Sample#BAM, Sample#BAM_normal Manifest: HTS#mutect2_pre - tasks: 20 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted,shard true mutect2,cpus 8 mutect2 Deps: Sample#BAM, Sample#BAM_normal Manifest: HTS#BAM_orientation_model - tasks: 4 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted Deps: HTS#mutect2_pre Manifest: HTS#mutect2_filters - tasks: 1 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted Deps: canfail:HTS#contamination, HTS#BAM_orientation_model, HTS#mutect2_pre, Sample#BAM, Sample#BAM_normal Manifest: Sample#mutect2, HTS#mutect2, HTS#mutect2_clean - tasks: 4 - time: 48h - config: spark false GATK,forget_dep_tasks true forget_dep_tasks,remove_dep_tasks recursive remove_dep_tasks,remove_dep false HTS#mutect2_filters,remove_dep false HTS#BAM_sorted Deps: HTS#mutect2_filters
The output is a bit verbose due to the config_keys
; the output in the
terminal is colored and should be easier to parse by eye. Note however how some
submissions list several tasks in the manifest, and the dependencies
established between them. All the submissions are sent at the same time, and
there is not process monitoring their progress, the batch system itself will
take care of starting a job when the dependencies finish. Note also how some
dependencies have the prefix canfail:
which indicate that work may continue
even if the dependency fails, this is used for instance for contamination
calculations, which do not make sense for models organisms like mouse.