Overview

If you are currently performing data processing in Jupyter notebooks, here is an example of how to convert your notebooks to Nextflow pipelines that can then be deployed on Mantle.

1

Jupyter notebook

You already have Jupyter notebooks that you run to process data. In this example, we’ll use a notebook written in Python.

2

Script(s)

Turn your notebook into one or more scripts. If this is your first Nextflow pipeline, you may want to write one script instead of splitting your workflow into multiple modules. In this example, we’ll make one script.

3

Nextflow pipeline

Write a Nextflow pipeline that executes your script(s) for you.

Example Jupyter notebook

Let’s take a look at a Jupyter notebook that performs some basic image preprocessing by normalizing and thresholding images:

Preprocess images

Normalize and threshold images

import os
import glob
import tqdm
import numpy as np
import skimage
import cv2

!! Inputs !!

Change these for each run

input_dir = "test_images"
output_dir = "preprocessed_images"

Function for image preprocessing

def preprocess(img_path: str) -> np.array:
    """
    Reads in, normalizes, and thresholds a single image.
    Returns np.array of preprocessed image.
    """

    # Read in
    img = skimage.io.imread(img_path)

    # Normalize
    norm = np.zeros_like(img)
    cv2.normalize(img, norm, 0, 255, cv2.NORM_MINMAX)

    # Threshold
    _, thresh = cv2.threshold(img, 0, 255, cv2.THRESH_OTSU)

    return thresh

Preprocess all the images in the input directory and write out to the output directory

all_image_paths = glob.glob(os.path.join(input_dir, "*.tif"))
[os.path.basename(path) for path in all_image_paths]

[‘t007.tif’, ‘t006.tif’, ‘t010.tif’, ‘t004.tif’, ‘t005.tif’, ‘t001.tif’, ‘t000.tif’, ‘t002.tif’, ‘t003.tif’, ‘t008.tif’, ‘t009.tif’]

# Make sure output directory exists
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
for path in tqdm.tqdm(all_image_paths):
    
    # Apply preprocessing 
    processed_img = preprocess(path)

    # Save
    basename = os.path.basename(path)
    extension_idx = basename.rfind(".")
    fname = os.path.join(output_dir, f"{basename[:extension_idx]}_preprocessed.tif")
    skimage.io.imsave(fname, processed_img, check_contrast=False)

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 11/11 [00:00<00:00, 26.19it/s]

The notebook takes two inputs that the user must change every time they want to run the notebook: input_dir, the path to the directory where the input images are stored, and output_dir, the path to the directory where the processed images should be saved.

Example script

Here, we’ve turned the notebook into an executable script:

#!/usr/bin/env python3

import argparse
import os
import glob
import tqdm
import numpy as np
import skimage
import cv2

def preprocess(img_path: str) -> np.array:
    """
    Reads in, normalizes, and thresholds a single image.
    Returns np.array of preprocessed image.
    """

    # Read in
    img = skimage.io.imread(img_path)

    # Normalize
    norm = np.zeros_like(img)
    cv2.normalize(img, norm, 0, 255, cv2.NORM_MINMAX)

    # Threshold
    _, thresh = cv2.threshold(img, 0, 255, cv2.THRESH_OTSU)

    return thresh

def main():

    # Instatiate argument parser
    parser = argparse.ArgumentParser(
        prog='example_script',
        description="Loads images and preprocesses by normalizing and thresholding."
    )
    # Add arguments to the argument parser
    parser.add_argument(
        'input_dir',
        type=str,
        help="Directory containing input images."
    )
    parser.add_argument(
        '-o',
        '--output_dir',
        dest='output_dir',
        type=str,
        help="Directory to which to save outputs."
    )
    # Run argument parser and extract data
    args = parser.parse_args()

    all_image_paths = glob.glob(os.path.join(args.input_dir, "*.tif"))

    # Make sure output directory exists
    if not os.path.exists(args.output_dir):
        os.mkdir(args.output_dir)

    # Preprocess all the images in the input directory
    # and write out to the output directory
    for path in all_image_paths:

        # Apply preprocessing
        processed_img = preprocess(path)

        # Save
        basename = os.path.basename(path)
        extension_idx = basename.rfind(".")
        fname = os.path.join(args.output_dir, f"{basename[:extension_idx]}_preprocessed.tif")
        skimage.io.imsave(fname, processed_img, check_contrast=False)

if __name__ == "__main__":
    main()

The shebang #!/usr/bin/env python3 indicates the interpreter that the program loader should use to run the script (Python3 in this case).

We use the argparse library to create an argument parser so that the script can take command line arguments as inputs.

If we were to run this script on its own, the usage would be:

./example_script.py <input_dir> -o <output_dir>

Example Nextflow pipeline

Now, we want to write a pipeline that will run the script we wrote in the last step.

We create a directory with the following structure:

example_nextflow_pipeline
│─── main.nf [1]
│─── nextflow.config [2]

│─── bin
│      └─── example_script.py [3]

│─── modules
│      └─── preprocessing
│             └───main.nf [4]

To run the Nextflow pipeline, change directory to example_nextflow_pipeline, then run:

nextflow run main.nf --input_dir <input_dir> --output_dir <output_dir>

Now, we’ll go through each component in detail.

[1] main.nf

This is the main pipeline script:


include { PREPROCESS_IMAGES } from './modules/preprocessing'

log.info """\
    EXAMPLE PIPELINE
    ---------------------
    input_dir: ${params.input_dir}
    output_dir: ${params.output_dir}
"""
.stripIndent(true)

workflow {
    PREPROCESS_IMAGES ( params.input_dir )
}

The first line imports the PREPROCESS_IMAGES process, which is contained in modules/preprocessing/main.nf.

The next line outputs information to the console.

Finally, we have the workflow block, which calls the PREPROCESS_IMAGES process with the input input_dir that was specified in the command line.

params contains the command line arguments, which were specified using the --input_dir and --output_dir options.

[2] nextflow.config

This is the Nextflow configuration file.

process.container = "<docker_image:tag>"
docker.enabled = true
docker.runOptions = '-u $(id -u):$(id -g) -v /Users:/Users'

In this example, we specify that processes should run in the given container. Additionally, we specify that Docker should always be used when executing the pipeline, and give some options for Docker.

For more information on Nextflow configuration, refer to the Nextflow documentation.

[3] bin/example_script.py

This is the example script that we wrote above.

Note that you need to make the script executable in order for Nextflow to run it. In Unix-like systems, you can do this in the shell by changing to the bin directory and running:

chmod +x example_script.py

[4] modules/preprocessing/main.nf

This is the module that contains the process that the Nextflow pipeline will execute.

process PREPROCESS_IMAGES {

    publishDir params.output_dir, mode: 'copy'
    
    input:
    path process_input_dir

    output:
    path('*')

    script:
    """
    example_script.py \\
    ${process_input_dir} \\
    -o "./" \\
    """
}

The publishDir directive indicates that the output files of this process should be published to output_dir, which is specified as an input to the Nextflow pipeline.

The input block of this process states that the input is the path to the input directory.

The output block of this process states that the outputs are all the paths to files produced by the process.

The script block defines the script that the process executes. In this case, we are running the example_script.py script, with the input_dir process input as the path to the directory with the input files, and with the current working directory ./ as the directory to which to write the output files (the output files are then published according to the publishDir directive).

For more information on Nextflow processes, refer to the Nextflow Documentation.