Dask is a popular open-source library that allows you to parallelize your Python code using a distributed cluster easily. It can parallelize arbitrary Python code in the form of a task DAG (Directed Acyclic Graph), but it also offers parallelized versions of popular Python data science libraries like numpy or Pandas.
To install Dask, load a recent version of Python 3 and install Dask using
pip. We heavily
recommend you to install Python packages inside a Python virtual environment.
# Load Python (preferably use a specific Python version) $ ml Python3 # Create a virtual environment $ python3 -m venv dask # Activate and update the virtual environment $ source dask/bin/activate (dask) $ pip install -U setuptools pip wheel # Install Dask (dask) $ pip install distributed
Starting a Dask Cluster¶
Before you can use Dask, you need to set up a Dask cluster, which consists of a single server component which coordinates the cluster, and an arbitrary number of workers, which actually execute your tasks.
After you start a PBS job, you should therefore first start the server and the workers on the available computing nodes and then run your Python program that uses Dask. There are multiple ways of deploying the cluster. A common scenario is to run a Dask server on a single computing node, run a single worker per node on all remaining computing nodes and then run your program on the node with the server.
There are some performance considerations to be taken into account regarding Dask cluster deployment, see below for more information.
All the following deployment methods assume that you are inside a Python environment that has Dask installed. Do not forget to load Python and activate the correct virtual environment at the beginning of your PBS job! And also do the same after connecting to any worker nodes manually using SSH.
Both the server and the worker nodes can be started using a CLI command. If you prefer manual deployment, you can manually start the server on a selected node and then start the workers on other nodes available inside your PBS job.
# Start the server on some node N $ dask-scheduler # Start a single worker on some other node, pass it the address of the server $ dask-worker tcp://<hostname-of-N>:8786
Dask actually contains built-in support for automating Dask deployment using SSH. It also supports nodefiles provided by PBS, so inside of your PBS job, you can simply run
$ dask-ssh --hostfile $PBS_NODEFILE
to start the Dask cluster on all available computing nodes. This will start the server on the first node of your PBS job and then a single worker on each node. The first node will therefore be shared by a server and a worker, which might not be ideal from a performance point of view.
Note that for this to work, the
paramikoPython library has to be installed inside your Python environment (you can install it using
$ pip install paramiko).
You can also start the Cluster directly from your Python script. In this way you can start the scheduler and the workers on separate nodes to avoid overcrowding the server node.
Other Deployment Options¶
Dask has a lot of other ways of being deployed, e.g. using MPI, or using a shared file on the network file system. It also allows you to create a PBS job directly, wait for it to be started and then it starts the whole cluster inside the PBS job. You can find more information about Dask HPC deployment here.
Connecting to the Cluster¶
Once you have deployed your cluster, you must create a
Client at the beginning of your program
and pass it the address of the server.
from distributed import Client client = Client("<hostname-of-server>:8786")
Once the client connects to the server successfully, all subsequent Dask computations will be parallelized using the cluster.
Below are some examples of computations that you can perform with Dask. Note that the code should only be executed after a client was connected to a server!
Parallelize Arbitrary Python Code Using
delayed function (or a decorator) turns a Python function into a lazy computation. If you
call such a function, it will not execute right away. It will only return a future object that can
be composed with other futures to build a DAG of tasks. After you describe your whole computation,
you can actually execute it using
import dask @dask.delayed def inc(x): return x + 1 @dask.delayed def double(x): return x * 2 @dask.delayed def add(x, y): return x + y data = [1, 2, 3, 4, 5] output =  for x in data: a = inc(x) b = double(x) c = add(a, b) output.append(c) total = dask.delayed(sum)(output) # `total` is just a lazy computation # To get the actual value, call dask.compute result = dask.compute(total)
You can find more information about the
Dask contains a module called
dataframe which mirrors the API of
pandas, a popular library for
tabular analysis. Using
dataframe you can distribute your
pandas code to multiple nodes easily.
You can find more information about it here.
Here is an example of its usage:
import dask.dataframe as pd # Load CSV df = pd.read_csv("table.csv") # Describe a lazy computation (this is not computed yet) df2 = df[df.y == "a"].x + 1 # Actually compute the table operations on worker nodes result = df2.compute()
Dask contains a module called
array which mirrors the API of
numpy, a popular library for
n-dimensional array computations. Using
array you can distribute your
numpy code to multiple
nodes easily. You can find more information about it
Here is an example of its usage:
import dask.array as np x = np.random.random((10000, 10000), chunks=(1000, 1000)) # Describe a lazy computation (this is not computed yet) y = x + x.T z = y[::2, 5000:].mean(axis=1) # Actually compute the arary operations on worker nodes result = z.compute()
Dask Performance Considerations¶
Dask should be fast enough by default for most use cases, but there are some considerations that should be taken into account.
Selecting the Number of Processes and Threads Per Worker¶
When starting a Dask worker on a node, it will by default use a number of threads equal to the number of cores on the given machine. At the same time, (C)Python uses a global lock (GIL) that prevents more than a single thread to execute at once. This is fine if your computational tasks are I/O bound or if they spend most of their time inside C libraries that release the GIL.
However, if your tasks executed with Dask are heavily compute-bound, and they hold the GIL (e.g. the heavy computation is performed directly inside Python), you might not be able to fully harness all cores of the worker node.
To solve this, you can run multiple workers (each with a reduced number of threads) per node. This is a trade-off. With more workers on a node, you will be able to utilize more cores (assuming your tasks are compute-bound). However, you will also increase the pressure on the central server and on the network, because there will be more workers that will communicate with each other and with the server.
You can choose the number of workers and threads per each worker using the
--nthreads parameters of
dask-worker (there are similar arguments when using other deployment
Some examples (assuming a node with 24 cores):
# Run a single worker using 24 threads. Reduces network and server traffic, but may not utilize all cores. $ dask-worker --nprocs 1 --nthreads 24 # Run 24 workers, each with a single thread. Maximizes core usage, but may overload server or network. $ dask-worker --nprocs 24 --nthreads 1 # Run 6 workers, each with 4 threads. Strikes a balance between core usage and network/server pressure. $ dask-worker --nprocs 6 --nthreads 4
From our experiments, we found that often it is best to run a single worker per each core of a node to achieve the best performance. With this configuration, we found that Dask scales reasonably up to 200 workers (e.g. <10 Barbora nodes). If you start to run into performance problems because of the amount of workers, try to use RSDS to achieve better scaling.
A common reason to use Dask is that your computation does not fit inside the memory of a single
node. Dask can alleviate this, but you still have to be careful about your memory usage. For
example, when you use the
array API, you should pay attention into how many
partitions (or chunks) is your data split it. If you use a single partition, it will probably take
a lot of memory, and it will not offer many possibilities for parallelization. You can find more
By default, multiple workers on a node will split the available memory. Therefore, if the node has 100 GiB of RAM and you start the Dask worker on a single node like this:
$ dask-worker --nprocs 10
Each worker will have around 10 GiB memory available. You can set the memory requirements of each
worker manually using the
If your program is strictly memory-bound, you can also try alternative approaches to Dask. As an example, Vaex is a library that allows you to easily process dataframes that do not fit inside your operating memory.
In some cases (especially with many workers), the network can be a bottleneck. By default, Dask uses TCP/IP for communication, but it also has support for UCX, which enables more efficient usage of the available InfiniBand interfaces. It is a bit cumbersome to set up, but if you want to try, check this tutorial.
Dask has built-in support for GPU workers. You can find more information about this use case here.
If you need to run a large amount of tasks and Dask does not perform well enough, you can try to use RSDS. It is our version of Dask which is optimized for HPC use cases, and it should provide better scaling than Dask. You can read more about RSDS in this article.