{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Distributed" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we have seen so far, Dask allows you to simply construct graphs of tasks with dependencies, as well as have graphs created automatically for you using functional, Numpy or Pandas syntax on data collections. None of this would be very useful, if there weren't also a way to execute these graphs, in a parallel and memory-aware way. So far we have been calling `thing.compute()` or `dask.compute(thing)` without worrying what this entails. Now we will discuss the options available for that execution, and in particular, the distributed scheduler, which comes with additional functionality.\n", "\n", "Dask comes with four available schedulers:\n", "- \"threaded\" (aka \"threading\"): a scheduler backed by a thread pool\n", "- \"processes\": a scheduler backed by a process pool\n", "- \"single-threaded\" (aka \"sync\"): a synchronous scheduler, good for debugging\n", "- distributed: a distributed scheduler for executing graphs on multiple machines, see below.\n", "\n", "To select one of these for computation, you can specify at the time of asking for a result, e.g.,\n", "```python\n", "myvalue.compute(scheduler=\"single-threaded\") # for debugging\n", "```\n", "\n", "You can also set a default scheduler either temporarily\n", "```python\n", "with dask.config.set(scheduler='processes'):\n", " # set temporarily for this block only\n", " # all compute calls within this block will use the specified scheduler\n", " myvalue.compute()\n", " anothervalue.compute()\n", "```\n", "\n", "Or globally\n", "```python\n", "# set until further notice\n", "dask.config.set(scheduler='processes')\n", "```\n", "\n", "Let's try out a few schedulers on the familiar case of the flights data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%run prep.py -d flights" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask.dataframe as dd\n", "import os\n", "df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),\n", " parse_dates={'Date': [0, 1, 2]},\n", " dtype={'TailNum': object,\n", " 'CRSElapsedTime': float,\n", " 'Cancelled': bool})\n", "\n", "# Maximum average non-cancelled delay grouped by Airport\n", "largest_delay = df[~df.Cancelled].groupby('Origin').DepDelay.mean().max()\n", "largest_delay" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# each of the following gives the same results (you can check!)\n", "# any surprises?\n", "import time\n", "for sch in ['threading', 'processes', 'sync']:\n", " t0 = time.time()\n", " r = largest_delay.compute(scheduler=sch)\n", " t1 = time.time()\n", " print(f\"{sch:>10}, {t1 - t0:0.4f} s; result, {r:0.2f} hours\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Some Questions to Consider:\n", "\n", "- How much speedup is possible for this task (hint, look at the graph).\n", "- Given how many cores are on this machine, how much faster could the parallel schedulers be than the single-threaded scheduler.\n", "- How much faster was using threads over a single thread? Why does this differ from the optimal speedup?\n", "- Why is the multiprocessing scheduler so much slower here?\n", "\n", "The `threaded` scheduler is a fine choice for working with large datasets out-of-core on a single machine, as long as the functions being used release the [GIL](https://wiki.python.org/moin/GlobalInterpreterLock) most of the time. NumPy and pandas release the GIL in most places, so the `threaded` scheduler is the default for `dask.array` and `dask.dataframe`. The distributed scheduler, perhaps with `processes=False`, will also work well for these workloads on a single machine.\n", "\n", "For workloads that do hold the GIL, as is common with `dask.bag` and custom code wrapped with `dask.delayed`, we recommend using the distributed scheduler, even on a single machine. Generally speaking, it's more intelligent and provides better diagnostics than the `processes` scheduler.\n", "\n", "https://docs.dask.org/en/latest/scheduling.html provides some additional details on choosing a scheduler.\n", "\n", "For scaling out work across a cluster, the distributed scheduler is required." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Making a cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Simple method" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `dask.distributed` system is composed of a single centralized scheduler and one or more worker processes. [Deploying](https://docs.dask.org/en/latest/setup.html) a remote Dask cluster involves some additional effort. But doing things locally is just involves creating a `Client` object, which lets you interact with the \"cluster\" (local threads or processes on your machine). For more information see [here](https://docs.dask.org/en/latest/setup/single-distributed.html). \n", "\n", "Note that `Client()` takes a lot of optional [arguments](https://distributed.dask.org/en/latest/local-cluster.html#api), to configure the number of processes/threads, memory limits and other " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client\n", "\n", "# Setup a local cluster.\n", "# By default this sets up 1 worker per core\n", "client = Client()\n", "client.cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you aren't in jupyterlab and using the `dask-labextension`, be sure to click the `Dashboard` link to open up the diagnostics dashboard.\n", "\n", "## Executing with the distributed client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Consider some trivial calculation, such as we've used before, where we have added sleep statements in order to simulate real work being done." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask import delayed\n", "import time\n", "\n", "def inc(x):\n", " time.sleep(5)\n", " return x + 1\n", "\n", "def dec(x):\n", " time.sleep(3)\n", " return x - 1\n", "\n", "def add(x, y):\n", " time.sleep(7)\n", " return x + y" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By default, creating a `Client` makes it the default scheduler. Any calls to `.compute` will use the cluster your `client` is attached to, unless you specify otherwise, as above.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "x = delayed(inc)(1)\n", "y = delayed(dec)(2)\n", "total = delayed(add)(x, y)\n", "total.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The tasks will appear in the web UI as they are processed by the cluster and, eventually, a result will be printed as output of the cell above. Note that the kernel is blocked while waiting for the result. The resulting tasks block graph might look something like below. Hovering over each block gives which function it related to, and how long it took to execute. ![this](images/tasks.png)\n", "\n", "You can also see a simplified version of the graph being executed on Graph pane of the dashboard, so long as the calculation is in-flight." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's return to the flights computation from before, and see what happens on the dashboard (you may wish to have both the notebook and dashboard side-by-side). How did does this perform compared to before?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time largest_delay.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this particular case, this should be as fast or faster than the best case, threading, above. Why do you suppose this is? You should start your reading [here](https://distributed.dask.org/en/latest/index.html#architecture), and in particular note that the distributed scheduler was a complete rewrite with more intelligence around sharing of intermediate results and which tasks run on which worker. This will result in better performance in *some* cases, but still larger latency and overhead compared to the threaded scheduler, so there will be rare cases where it performs worse. Fortunately, the dashboard now gives us a lot more [diagnostic information](https://distributed.dask.org/en/latest/diagnosing-performance.html). Look at the Profile page of the dashboard to find out what takes the biggest fraction of CPU time for the computation we just performed?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If all you want to do is execute computations created using delayed, or run calculations based on the higher-level data collections, then that is about all you need to know to scale your work up to cluster scale. However, there is more detail to know about the distributed scheduler that will help with efficient usage. See the chapter Distributed, Advanced." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Exercise\n", "\n", "Run the following computations while looking at the diagnostics page. In each case what is taking the most time?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Number of flights\n", "_ = len(df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Number of non-cancelled flights\n", "_ = len(df[~df.Cancelled])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Number of non-cancelled flights per-airport\n", "_ = df[~df.Cancelled].groupby('Origin').Origin.count().compute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Average departure delay from each airport?\n", "_ = df[~df.Cancelled].groupby('Origin').DepDelay.mean().compute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Average departure delay per day-of-week\n", "_ = df.groupby(df.Date.dt.dayofweek).DepDelay.mean().compute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client.shutdown()" ] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.3" } }, "nbformat": 4, "nbformat_minor": 4 }