Write a pipeline
Introduction
The Mantle Pipelines feature allows you to run Nextflow pipelines within Mantle, without using the command line. When you upload a pipeline to Mantle, you specify the input configuration, which is turned into a graphical user interface within Mantle. In addition to the typical input types (strings, integers, floats, and Booleans), you can also use files and Mantle datasets as inputs. Within the scripts in your pipeline, you can use the Mantle SDK to access Mantle dataset inputs. Furthermore, you can use the SDK to create output datasets, and to link output files and datasets to the pipeline to ensure that you always know your data’s lineage.
Existing Nextflow pipelines can be adapted to be compatible with Mantle by writing two new Nextflow processes and adding them to your pipeline. The first is for pre-processing, in which you will use the Mantle SDK to get the Mantle dataset inputs of the pipeline and download the associated files. The second is for post-processing, in which you will create any dataset outputs, as well as register output files and datasets to the pipeline. We provide a Docker image for the Mantle SDK that can be used with these two processes.
Nextflow
If you need help getting started with Nextflow, you may find the Jupyter to Nextflow guide to be useful.
For more information on writing a Nextflow pipeline, see the Nextflow documentation.
Nextflow also has a large community building open source pipelines that can be found on nf-core.
Mantle Default Inputs
When Mantle kicks off a Nextflow pipeline, it adds the following inputs:
pipeline_run_id
: The ID of the pipeline that is being run.outdir
: The directory where the outputs of the pipeline should be written.
We provide examples below for how to use these parameters in your pipeline.
Nextflow Configuration
Mantle provides a base configuration file to run your pipeline that you can augment with your own nextflow.config
file.
If you installed the AWS CLI on your AMI when setting up your Batch environment for Mantle, you will need to include the following block to your configuration file:
aws {
batch.cliPath = '/miniconda/bin/aws'
}
If you followed the instructions in the Set Up guide, you can use the above block as is.
If you are unsure of the cliPath; run which aws
within the EC2 instance that generated the AMI.
We run your pipeline with the following configuration.
params {
pipeline_run_id = "<pipeline_run_id>"
mantle_env = "<env>"
mantle_tenant = "<tenant>"
outdir = "<bucket_dir>/results"
}
env {
TENANT = "<tenant>"
ENV = "<env>"
MANTLE_TENANT = "<tenant>"
MANTLE_ENV = "<env>"
}
process {
executor = 'awsbatch'
queue = "<aws_queue>"
}
plugins {
id 'nf-amazon'
id 'nf-mantle@1.0.12'
}
aws {
region = "<aws_region>"
batch {
workDir = "<bucket_dir>"
jobQueue = "<aws_queue_arn>"
}
accessKey = "<access_key_id>"
secretKey = "<secret_access_key>"
}
Using Docker
Mantle recommends using Docker to ensure that your pipeline is reproducible by running in a consistent environment.
To use Docker in your pipeline, you need to specify the Docker image to use in either your nextflow.config
file or in each process
block of your Nextflow module.
Make sure that the Docker image is built for the architecture of your AMI.
Here is an example of how to specify a Docker image in your nextflow.config
file:
// The docker image to use for the pipeline
process.container = 'docker_image:tag'
Here is an example of how to specify a Docker image in an individual Nextflow process:
process PROCESS_NAME {
...
container 'docker_image:tag'
...
}
Pushing Docker containers to AWS ECR
If you are using AWS, you can push your Docker containers to ECR and use them in your pipeline.
Pre-requisites:
- You need to have the AWS CLI installed on your machine.
- You need to have an ECR repository set up.
- You need to have the AWS CLI configured with the correct credentials.
Here is an example of how to push a Docker container to ECR:
# Login to ECR
aws ecr get-login-password --region <aws_region> | docker login --username AWS --password-stdin <aws_account_id>.dkr.ecr.<aws_region>.amazonaws.com
# Build the Docker container
# From within the directory containing the Dockerfile
docker build -t <aws_account_id>.dkr.ecr.<aws_region>.amazonaws.com/<repository_name>:<tag> .
# Push the Docker container to ECR
docker push <aws_account_id>.dkr.ecr.<aws_region>.amazonaws.com/<repository_name>:<tag>
Accessing Inputs Within a Pipeline
When you upload a pipeline to Mantle, you specify the inputs and their types using the input configuration.
To access Mantle dataset inputs in a script within your pipeline, use the Mantle SDK (see below for more information).
All other inputs are stored as Nextflow parameters. For example, if your pipeline takes an input named classification_model
of type string
,
you can access the value using params.classification_model
.
Using the Mantle SDK in a Nextflow Process
Passing variables to the SDK
In order to use the Mantle SDK within a Nextflow module, you need to add the following to your workflow step:
process PROCESS_NAME {
...
secret 'MANTLE_USER'
secret 'MANTLE_PASSWORD'
input:
val pipelineId
...
script:
"""
your_script.py ${pipelineId} # Add any additional arguments here
"""
}
The USER
and PASSWORD
are used for authentication, and are passed in by Mantle via Nextflow secrets to your pipeline.
The pipelineId
is used to identify the pipeline that is being run, and is passed in via the input variable.
Within your script, you can use the SDK functions to interact with the pipeline run.
SDK functions
For more information on functions available in the Mantle SDK, see the Mantle SDK documentation.
Updating an Existing Pipeline
Existing Nextflow pipelines can be adapted to be compatible with Mantle by writing two new Nextflow processes and adding them to your pipeline. The first is for pre-processing, in which you will use the Mantle SDK to get the Mantle dataset inputs of the pipeline and download the associated files. The second is for post-processing, in which you will create any dataset outputs, as well as register output files and datasets to the pipeline.
The recommended workflow looks like this:
workflow {
// Get data files and sample metadata using pipeline run ID from Mantle SDK
MANTLE_STAGE_INPUTS (
params.pipeline_run_id
)
// ... add your pipeline modules here...
// Sync outputs back into mantle
MANTLE_UPLOAD_RESULTS (
params.pipeline_run_id,
params.outdir
)
}
Pre-processing
Adding this process ensures that the pipeline has access to the necessary input data and that it is organized appropriately for processing. We will use the Mantle SDK to get the input Mantle datasets and stage the associated files in the current working directory.
Here is an example for a pipeline that takes FASTQ files as input:
Nextflow process
process MANTLE_STAGE_INPUTS {
tag "${pipelineId}-mantleSDK_stageInputs"
/*
Secrets are used for authentication.
Provided by Mantle.
*/
secret 'MANTLE_USER'
secret 'MANTLE_PASSWORD'
// Use the Mantle SDK Docker container.
container 'public.ecr.aws/c7j2m0e6/mantle-sdk:latest'
/*
The pipelineId is used to identify the pipeline that is being run,
and is passed in by Mantle via the input variable.
Provided by Mantle.
*/
input:
val pipelineId
/*
The process outputs a tuple consisting of the pipelineID and
the paths to the staged input FASTQ files.
The emit option assigns a name identifier to the output channel.
*/
output:
tuple val(pipelineId), path('*R1*.fastq.gz'), path('*R2*.fastq.gz'), emit: staged_fastqs
/*
The process runs the script mantle_stage_inputs.py to stage
all the input FASTQ files in the current working directory.
*/
script:
def stage_directory = "./"
"""
mantle_stage_inputs.py ${pipelineId} ${stage_directory} \
--mantle_env ${ENV} \
--tenant ${TENANT}
"""
}
Python script
#!/usr/local/bin/python3
import argparse
import os
import mantlebio
from mantlebio.core.pipeline_run.mantle_pipeline_run import _IPipelineRun
def get_run(run_id: str, env: str=None, tenant: str=None) -> _IPipelineRun:
"""
Logs into mantle and gets the pipeline run.
Args:
run_id (str): ID of the pipeline run
env (str): Mantle environment, defaults to None (uses default)
tenant (str): Mantle tenant, defaults to None
(valid for users with only one tenant)
"""
# Log into Mantle
mantle = mantlebio.MantleClient(
env=env, tenant_id=tenant
)
return mantle.pipeline_run.get(run_id)
def pull_datasets(run: _IPipelineRun, stage_dir: str):
"""
Helper function for downloading files from datasets.
You need to implement the logic here based on your specific requirements.
This example is for FASTQ files from an experiment with two reads.
"""
# Get the input entities of type "fastqs" from the run
datasets = run.get_input_datasets("fastqs")
# Get data from S3 into the current directory.
for dataset in datasets:
# Download the S3 file from the "read1" property into the staging directory
dataset.download_s3("read1", os.path.join(stage_dir, f"{entity.get_name()}.R1-Sequences.fastq.gz"))
# Download the S3 file from the "read2" property into the staging directory
dataset.download_s3("read2", os.path.join(stage_dir, f"{entity.get_name()}.R2-Sequences.fastq.gz"))
def stage_input_datasets(pipeline_id: str, stage_dir: str, env=None, tenant=None):
"""
Function to download/stage files from datasetes for the given pipeline_id into stage_dir.
"""
run = get_run(pipeline_id, env, tenant)
pull_datasets(run, stage_dir)
def main():
parser = argparse.ArgumentParser(
description="Download files for a given pipeline_id into a specified directory.")
parser.add_argument("pipeline_id", type=str, help="The ID of the pipeline")
parser.add_argument("stage_dir", type=str, default=".",
help="The directory where files should be downloaded")
parser.add_argument("--mantle_env", default=None, required=False,
help="Mantle environment")
parser.add_argument("--tenant", default=None, required=False,
help="Mantle tenant")
args = parser.parse_args()
# Call the download function
stage_input_datasets(args.pipeline_id, args.stage_dir,
args.mantle_env, args.tenant)
if __name__ == "__main__":
main()
Post-processing
This process is responsible for registering the outputs of the pipeline with Mantle. In this step, it is recommmended to use the Mantle SDK to register the outputs of the pipeline. This will allow Mantle to track the outputs of the pipeline and allow users to view them in the Mantle interface.
Here is an example of registering output files to a pipeline:
Nextflow process
process MANTLE_UPLOAD_RESULTS {
tag "${pipelineId}-mantleSDK_uploadResults"
// Files created in this process will be published to this directory
publishDir "${params.outdir}/mantle_upload_results", mode: 'copy'
/*
Secrets are used for authentication.
Provided by Mantle.
*/
secret 'MANTLE_USER'
secret 'MANTLE_PASSWORD'
// Use the Mantle SDK Docker container.
container 'public.ecr.aws/c7j2m0e6/mantle-sdk:latest'
/*
The pipelineId is used to identify the pipeline that is being run,
and is passed in by Mantle via the input variable.
Provided by Mantle.
*/
input:
val pipelineId
path outdir, stageAs: 'results/*'
/*
This process outputs the pipelineId
and a text file containing the timestamp of pipeline completion.
*/
output:
tuple val(pipelineId), path('*.txt'), emit: completion_timestamp
script:
"""
mantle_upload_results.py ${pipelineId} ${outdir} \
--mantle_env ${ENV} \
--tenant ${TENANT}
date > results_uploaded_mantle.txt
"""
}
Python script
#!/usr/local/bin/python3
import argparse
import os
import sys
import mantlebio
from mantlebio.core.pipeline_run.mantle_pipeline_run import _IPipelineRun
def get_run(run_id: str, env: str=None, tenant: str=None) -> _IPipelineRun:
"""
Logs into mantle and gets the pipeline run.
Args:
run_id (str): ID of the pipeline run
env (str): Mantle environment, defaults to None (uses default)
tenant (str): Mantle tenant, defaults to None
(valid for users with only one tenant)
"""
# Log into Mantle
mantle = mantlebio.MantleClient(
env=env, tenant_id=tenant
)
return mantle.pipeline_run.get(run_id)
def upload_outputs(pipeline_id, directory, env=None, tenant=None):
"""
Upload all output files in the given directory to the given pipeline_id.
"""
run = get_run(pipeline_id, env, tenant)
for root, _, files in os.walk(directory):
for filename in files:
file_path = os.path.join(root, filename)
if os.path.isfile(file_path):
run.add_file_output(filename, file_path)
def main():
parser = argparse.ArgumentParser(description="Process pipeline files.")
parser.add_argument("pipeline_id", type=str, help="The ID of the pipeline")
parser.add_argument("results_dir", type=str,
help="The directory of results")
parser.add_argument(
'--mantle_env', help='Mantle environment', default=None, required=False)
parser.add_argument('--tenant', help='Mantle tenant', default=None, required=False)
args = parser.parse_args()
# Upload the output files to the pipeline_id
# Assumes that all output files have been published to the results_dir
upload_outputs(args.pipeline_id, args.results_dir, args.mantle_env, args.tenant)
if __name__ == "__main__":
main()
Using a Monorepo
Mantle runs Nextflow pipelines via a Github integration, which requires a single main.nf
file to run.
To have multiple pipelines within a single repository, you need to specify a single main.nf
file that imports the other pipelines and defines their entrypoints.
Here is an example of a main.nf
file that imports two pipelines and defines their entrypoints:
include { WGS_WORKFLOW } from './workflows/WGS'
include { RNASEQ_WORKFLOW } from './workflows/RNASEQ'
// Named workflow
workflow WGS {
WGS_WORKFLOW()
}
workflow RNASEQ {
RNASEQ_WORKFLOW()
}
// Main workflow required by Nextflow.
// Used if entry point is not specified.
workflow {
WGS()
}
Each named workflow can be run in mantle as a separate pipeline.