HPC deployments with Dask-Jobqueue


Loïc Estève

Inria

\(\)

About me

Particle Physics background
PhD main achievement: measure a cos and a sin +/- 0.8
3 years in finance
Last 4 years @ Inria open-source Python

Audience survey

Main Dask use cases around me

  • user-friendly way of using the cluster: you can stay in Python (i.e. no need to write shell submission scripts)
  • easy transition of your existing single-machine code to the cluster (and back for debugging)
  • embarassingly parallel use cases for grid-search or Reinforcement Learning
  • client.submit, dask collections not used at all

Other use case: Dask integration in scikit-learn

from sklearn.ensemble import RandomForestClassifier




clf = RandomForestClassifier(n_estimators=200, n_jobs=-1)


clf.fit(X, y)

also look at dask-ml

Other use case: Dask integration in scikit-learn

from sklearn.ensemble import RandomForestClassifier
import joblib
from dask.distributed import Client

client = Client('scheduler-address')
clf = RandomForestClassifier(n_estimators=200, n_jobs=-1)

with joblib.parallel_backend("dask", scatter=[X, y]):
    clf.fit(X, y)

also look at dask-ml

dask-jobqueue

from dask_jobqueue import SLURMCluster
# Each job will use one core and 4GB
cluster = SLURMCluster(cores=1, memory='4GB', **cluster_specific_kwargs)
cluster.scale(jobs=4)  # launch 4 jobs

from dask.distributed import Client
client = Client(cluster)
# user code

The tricky bit is cluster_specific_kwargs

Example (on our team SGE cluster)

from dask_jobqueue import SGECluster

resource_spec = 'h_vmem=10G,mem_req=1G'
queue = 'gaia.q'  
               
cluster = SGECluster(queue=queue,
                     cores=1, processes=1,            
                     memory='16GB',       
                     resource_spec=resource_spec,
                     interface='ib0')            

Behind the scenes

In [5]: print(cluster.job_script())                                                                                                  
#!/bin/bash

#!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=4G
#SBATCH -t 00:30:00
JOB_ID=${SLURM_JOB_ID%;*}

python -m distributed.cli.dask_worker tcp://192.168.0.11:43725 --nthreads 1\
  --memory-limit 4.00GB --name dask-worker--${JOB_ID}-- --death-timeout 60

Supported job schedulers

  • HTCondor
  • LSF
  • OAR
  • PBS (+ Moab variant)
  • SGE
  • SLURM

contribute your favourite one if not there already!

Tips

  • SSH tunnel handy : for Jupyter + diagnostics dashboard (jupyter-server-proxy useful too)
  • sshuttle is neat: poor man VPN through ssh. Very useful if SSH tunnel is blocked by cluster sys-admins.
  • .yaml configuration file for default cluster parameters

Where to run your main script

i.e. the script where the Dask cluster is created

  • cluster login node: easiest but not best practice. Need to be careful not to consume too much CPU or RAM in the script or your cluster sys-admin may shout at you.
  • interactive job. If you lose your scheduler because of walltime you lose all your work. Generally, interactive job walltime limit is quite low.
  • script that you submit via the job scheduler. Same caveat as previous bullet point. walltime less limited generally.

Testing

  • docker-compose setup for some job schedulers (SGE, SLURM, PBS)
  • would need some contributors for others: LSF, HTCondor, OAR. Rare intersection of skills: docker, HPC cluster sys-admin, interest in Dask-JobQueue
  • tests quite rudimentary. Tricky to test some cluster-specific quirks (e.g. non-uniform interfaces on scheduler and worker nodes)
  • we would need cluster-specific "champions" (LSF? HTCondor?) who are willing to test non-trivial changes

Approach

personal opinion

  • keep dask-jobqueue as mall as possible
  • most tricky functionality should live in dask.distributed
  • avoid cluster-specific tricks/features. Hope is equivalent functionality can be achieved through Dask. Example: job arrays.

Challenges

  • cluster-specific problems (but it does not work on my cluster!). Very hard to debug through github. Need cluster sys-admin involvement in some cases.
  • job scheduler specific quirks. Impossible to remember all of them.
  • limited resources (~4 hours per week between the two active maintainers). Contributions more than welcome!
  • how to get some cluster IT folks involved/interested? They have very varied users and different metrics (e.g. cluster utilization).

Improvements?

  • please tell us! Suggestions/complaints more than welcome!
  • documentation, esp. for people who are not very familiar with Dask or with HPC clusters

Possible suggestions for this workshop

  • heterogeneous Dask workers (e.g. some without GPUs, some with GPUs in a different scheduler queue) not very easy to do at the moment. Ideally you could scale them separately with .scale.
  • Dask resilience with Dask workers dying around the same time because of walltime. Pass –lifetime through the extra parameter. See #122.
  • Examples on Binder that people can run and play with before trying it on their cluster. Attempt in #276