{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\"Dask\n", "\n", "\n", "# Dask DataFrames\n", "\n", "We finished Chapter 1 by building a parallel dataframe computation over a directory of CSV files using `dask.delayed`. In this section we use `dask.dataframe` to automatically build similiar computations, for the common case of tabular computations. Dask dataframes look and feel like Pandas dataframes but they run on the same infrastructure that powers `dask.delayed`.\n", "\n", "In this notebook we use the same airline data as before, but now rather than write for-loops we let `dask.dataframe` construct our computations for us. The `dask.dataframe.read_csv` function can take a globstring like `\"data/nycflights/*.csv\"` and build parallel computations on all of our data at once.\n", "\n", "## When to use `dask.dataframe`\n", "\n", "Pandas is great for tabular datasets that fit in memory. Dask becomes useful when the dataset you want to analyze is larger than your machine's RAM. The demo dataset we're working with is only about 200MB, so that you can download it in a reasonable time, but `dask.dataframe` will scale to datasets much larger than memory." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "The `dask.dataframe` module implements a blocked parallel `DataFrame` object that mimics a large subset of the Pandas `DataFrame` API. One Dask `DataFrame` is comprised of many in-memory pandas `DataFrames` separated along the index. One operation on a Dask `DataFrame` triggers many pandas operations on the constituent pandas `DataFrame`s in a way that is mindful of potential parallelism and memory constraints.\n", "\n", "**Related Documentation**\n", "\n", "* [DataFrame documentation](https://docs.dask.org/en/latest/dataframe.html)\n", "* [DataFrame screencast](https://youtu.be/AT2XtFehFSQ)\n", "* [DataFrame API](https://docs.dask.org/en/latest/dataframe-api.html)\n", "* [DataFrame examples](https://examples.dask.org/dataframe.html)\n", "* [Pandas documentation](https://pandas.pydata.org/pandas-docs/stable/)\n", "\n", "**Main Take-aways**\n", "\n", "1. Dask DataFrame should be familiar to Pandas users\n", "2. The partitioning of dataframes is important for efficient execution" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%run prep.py -d flights" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client\n", "\n", "client = Client(n_workers=4)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We create artifical data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from prep import accounts_csvs\n", "accounts_csvs()\n", "\n", "import os\n", "import dask\n", "filename = os.path.join('data', 'accounts.*.csv')\n", "filename" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Filename includes a glob pattern `*`, so all files in the path matching that pattern will be read into the same Dask DataFrame." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask.dataframe as dd\n", "df = dd.read_csv(filename)\n", "df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# load and count number of rows\n", "len(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What happened here?\n", "- Dask investigated the input path and found that there are three matching files \n", "- a set of jobs was intelligently created for each chunk - one per original CSV file in this case\n", "- each file was loaded into a pandas dataframe, had `len()` applied to it\n", "- the subtotals were combined to give you the final grand total." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Real Data\n", "\n", "Lets try this with an extract of flights in the USA across several years. This data is specific to flights out of the three airports in the New York City area." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),\n", " parse_dates={'Date': [0, 1, 2]})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that the respresentation of the dataframe object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can view the start and end of the data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "raises-exception" ] }, "outputs": [], "source": [ "df.tail() # this fails" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### What just happened?\n", "\n", "Unlike `pandas.read_csv` which reads in the entire file before inferring datatypes, `dask.dataframe.read_csv` only reads in a sample from the beginning of the file (or first file if using a glob). These inferred datatypes are then enforced when reading all partitions.\n", "\n", "In this case, the datatypes inferred in the sample are incorrect. The first `n` rows have no value for `CRSElapsedTime` (which pandas infers as a `float`), and later on turn out to be strings (`object` dtype). Note that Dask gives an informative error message about the mismatch. When this happens you have a few options:\n", "\n", "- Specify dtypes directly using the `dtype` keyword. This is the recommended solution, as it's the least error prone (better to be explicit than implicit) and also the most performant.\n", "- Increase the size of the `sample` keyword (in bytes)\n", "- Use `assume_missing` to make `dask` assume that columns inferred to be `int` (which don't allow missing values) are actually floats (which do allow missing values). In our particular case this doesn't apply.\n", "\n", "In our case we'll use the first option and directly specify the `dtypes` of the offending columns. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),\n", " parse_dates={'Date': [0, 1, 2]},\n", " dtype={'TailNum': str,\n", " 'CRSElapsedTime': float,\n", " 'Cancelled': bool})" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.tail() # now works" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Computations with `dask.dataframe`\n", "\n", "We compute the maximum of the `DepDelay` column. With just pandas, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maximums\n", "\n", "```python\n", "maxes = []\n", "for fn in filenames:\n", " df = pd.read_csv(fn)\n", " maxes.append(df.DepDelay.max())\n", " \n", "final_max = max(maxes)\n", "```\n", "\n", "We could wrap that `pd.read_csv` with `dask.delayed` so that it runs in parallel. Regardless, we're still having to think about loops, intermediate results (one per file) and the final reduction (`max` of the intermediate maxes). This is just noise around the real task, which pandas solves with\n", "\n", "```python\n", "df = pd.read_csv(filename, dtype=dtype)\n", "df.DepDelay.max()\n", "```\n", "\n", "`dask.dataframe` lets us write pandas-like code, that operates on larger than memory datasets in parallel." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time df.DepDelay.max().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This writes the delayed computation for us and then runs it. \n", "\n", "Some things to note:\n", "\n", "1. As with `dask.delayed`, we need to call `.compute()` when we're done. Up until this point everything is lazy.\n", "2. Dask will delete intermediate results (like the full pandas dataframe for each file) as soon as possible.\n", " - This lets us handle datasets that are larger than memory\n", " - This means that repeated computations will have to load all of the data in each time (run the code above again, is it faster or slower than you would expect?)\n", " \n", "As with `Delayed` objects, you can view the underlying task graph using the `.visualize` method:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# notice the parallelism\n", "df.DepDelay.max().visualize()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercises\n", "\n", "In this section we do a few `dask.dataframe` computations. If you are comfortable with Pandas then these should be familiar. You will have to think about when to call `compute`.\n", "\n", "### 1.) How many rows are in our dataset?\n", "\n", "If you aren't familiar with pandas, how would you check how many records are in a list of tuples?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Your code here" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true } }, "outputs": [], "source": [ "len(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2.) In total, how many non-canceled flights were taken?\n", "\n", "With pandas, you would use [boolean indexing](https://pandas.pydata.org/pandas-docs/stable/indexing.html#boolean-indexing)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Your code here" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true } }, "outputs": [], "source": [ "len(df[~df.Cancelled])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.) In total, how many non-cancelled flights were taken from each airport?\n", "\n", "*Hint*: use [`df.groupby`](https://pandas.pydata.org/pandas-docs/stable/groupby.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Your code here" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true } }, "outputs": [], "source": [ "df[~df.Cancelled].groupby('Origin').Origin.count().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4.) What was the average departure delay from each airport?\n", "\n", "Note, this is the same computation you did in the previous notebook (is this approach faster or slower?)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Your code here" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true } }, "outputs": [], "source": [ "df.groupby(\"Origin\").DepDelay.mean().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5.) What day of the week has the worst average departure delay?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Your code here" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true } }, "outputs": [], "source": [ "df.groupby(\"DayOfWeek\").DepDelay.mean().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Sharing Intermediate Results\n", "\n", "When computing all of the above, we sometimes did the same operation more than once. For most operations, `dask.dataframe` hashes the arguments, allowing duplicate computations to be shared, and only computed once.\n", "\n", "For example, lets compute the mean and standard deviation for departure delay of all non-canceled flights. Since dask operations are lazy, those values aren't the final results yet. They're just the recipe required to get the result.\n", "\n", "If we compute them with two calls to compute, there is no sharing of intermediate computations." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "non_cancelled = df[~df.Cancelled]\n", "mean_delay = non_cancelled.DepDelay.mean()\n", "std_delay = non_cancelled.DepDelay.std()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "mean_delay_res = mean_delay.compute()\n", "std_delay_res = std_delay.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But let's try by passing both to a single `compute` call." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using `dask.compute` takes roughly 1/2 the time. This is because the task graphs for both results are merged when calling `dask.compute`, allowing shared operations to only be done once instead of twice. In particular, using `dask.compute` only does the following once:\n", "\n", "- the calls to `read_csv`\n", "- the filter (`df[~df.Cancelled]`)\n", "- some of the necessary reductions (`sum`, `count`)\n", "\n", "To see what the merged task graphs between multiple results look like (and what's shared), you can use the `dask.visualize` function (we might want to use `filename='graph.pdf'` to save the graph to disk so that we can zoom in more easily):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dask.visualize(mean_delay, std_delay)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## How does this compare to Pandas?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Pandas is more mature and fully featured than `dask.dataframe`. If your data fits in memory then you should use Pandas. The `dask.dataframe` module gives you a limited `pandas` experience when you operate on datasets that don't fit comfortably in memory.\n", "\n", "During this tutorial we provide a small dataset consisting of a few CSV files. This dataset is 45MB on disk that expands to about 400MB in memory. This dataset is small enough that you would normally use Pandas.\n", "\n", "We've chosen this size so that exercises finish quickly. Dask.dataframe only really becomes meaningful for problems significantly larger than this, when Pandas breaks with the dreaded \n", "\n", " MemoryError: ...\n", " \n", "Furthermore, the distributed scheduler allows the same dataframe expressions to be executed across a cluster. To enable massive \"big data\" processing, one could execute data ingestion functions such as `read_csv`, where the data is held on storage accessible to every worker node (e.g., amazon's S3), and because most operations begin by selecting only some columns, transforming and filtering the data, only relatively small amounts of data need to be communicated between the machines.\n", "\n", "Dask.dataframe operations use `pandas` operations internally. Generally they run at about the same speed except in the following two cases:\n", "\n", "1. Dask introduces a bit of overhead, around 1ms per task. This is usually negligible.\n", "2. When Pandas releases the GIL `dask.dataframe` can call several pandas operations in parallel within a process, increasing speed somewhat proportional to the number of cores. For operations which don't release the GIL, multiple processes would be needed to get the same speedup." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dask DataFrame Data Model\n", "\n", "For the most part, a Dask DataFrame feels like a pandas DataFrame.\n", "So far, the biggest difference we've seen is that Dask operations are lazy; they build up a task graph instead of executing immediately (more details coming in [Schedulers](05_distributed.ipynb)).\n", "This lets Dask do operations in parallel and out of core.\n", "\n", "In [Dask Arrays](03_array.ipynb), we saw that a `dask.array` was composed of many NumPy arrays, chunked along one or more dimensions.\n", "It's similar for `dask.dataframe`: a Dask DataFrame is composed of many pandas DataFrames. For `dask.dataframe` the chunking happens only along the index.\n", "\n", "\n", "\n", "We call each chunk a *partition*, and the upper / lower bounds are *divisions*.\n", "Dask *can* store information about the divisions. For now, partitions come up when you write custom functions to apply to Dask DataFrames" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Converting `CRSDepTime` to a timestamp\n", "\n", "This dataset stores timestamps as `HHMM`, which are read in as integers in `read_csv`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "crs_dep_time = df.CRSDepTime.head(10)\n", "crs_dep_time" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To convert these to timestamps of scheduled departure time, we need to convert these integers into `pd.Timedelta` objects, and then combine them with the `Date` column.\n", "\n", "In pandas we'd do this using the `pd.to_timedelta` function, and a bit of arithmetic:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "# Get the first 10 dates to complement our `crs_dep_time`\n", "date = df.Date.head(10)\n", "\n", "# Get hours as an integer, convert to a timedelta\n", "hours = crs_dep_time // 100\n", "hours_timedelta = pd.to_timedelta(hours, unit='h')\n", "\n", "# Get minutes as an integer, convert to a timedelta\n", "minutes = crs_dep_time % 100\n", "minutes_timedelta = pd.to_timedelta(minutes, unit='m')\n", "\n", "# Apply the timedeltas to offset the dates by the departure time\n", "departure_timestamp = date + hours_timedelta + minutes_timedelta\n", "departure_timestamp" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Custom code and Dask Dataframe\n", "\n", "We could swap out `pd.to_timedelta` for `dd.to_timedelta` and do the same operations on the entire dask DataFrame. But let's say that Dask hadn't implemented a `dd.to_timedelta` that works on Dask DataFrames. What would you do then?\n", "\n", "`dask.dataframe` provides a few methods to make applying custom functions to Dask DataFrames easier:\n", "\n", "- [`map_partitions`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions)\n", "- [`map_overlap`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_overlap)\n", "- [`reduction`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.reduction)\n", "\n", "Here we'll just be discussing `map_partitions`, which we can use to implement `to_timedelta` on our own:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Look at the docs for `map_partitions`\n", "\n", "help(df.CRSDepTime.map_partitions)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The basic idea is to apply a function that operates on a DataFrame to each partition.\n", "In this case, we'll apply `pd.to_timedelta`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hours = df.CRSDepTime // 100\n", "# hours_timedelta = pd.to_timedelta(hours, unit='h')\n", "hours_timedelta = hours.map_partitions(pd.to_timedelta, unit='h')\n", "\n", "minutes = df.CRSDepTime % 100\n", "# minutes_timedelta = pd.to_timedelta(minutes, unit='m')\n", "minutes_timedelta = minutes.map_partitions(pd.to_timedelta, unit='m')\n", "\n", "departure_timestamp = df.Date + hours_timedelta + minutes_timedelta" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "departure_timestamp" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "departure_timestamp.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Exercise: Rewrite above to use a single call to `map_partitions`\n", "\n", "This will be slightly more efficient than two separate calls, as it reduces the number of tasks in the graph." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def compute_departure_timestamp(df):\n", " pass # TODO: implement this" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "raises-exception" ] }, "outputs": [], "source": [ "departure_timestamp = df.map_partitions(compute_departure_timestamp)\n", "\n", "departure_timestamp.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true } }, "outputs": [], "source": [ "def compute_departure_timestamp(df):\n", " hours = df.CRSDepTime // 100\n", " hours_timedelta = pd.to_timedelta(hours, unit='h')\n", "\n", " minutes = df.CRSDepTime % 100\n", " minutes_timedelta = pd.to_timedelta(minutes, unit='m')\n", "\n", " return df.Date + hours_timedelta + minutes_timedelta\n", "\n", "departure_timestamp = df.map_partitions(compute_departure_timestamp)\n", "departure_timestamp.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Limitations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### What doesn't work?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask.dataframe only covers a small but well-used portion of the Pandas API.\n", "This limitation is for two reasons:\n", "\n", "1. The Pandas API is *huge*\n", "2. Some operations are genuinely hard to do in parallel (e.g. sort)\n", "\n", "Additionally, some important operations like ``set_index`` work, but are slower\n", "than in Pandas because they include substantial shuffling of data, and may write out to disk." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Learn More\n", "\n", "\n", "* [DataFrame documentation](https://docs.dask.org/en/latest/dataframe.html)\n", "* [DataFrame screencast](https://youtu.be/AT2XtFehFSQ)\n", "* [DataFrame API](https://docs.dask.org/en/latest/dataframe-api.html)\n", "* [DataFrame examples](https://examples.dask.org/dataframe.html)\n", "* [Pandas documentation](https://pandas.pydata.org/pandas-docs/stable/)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client.shutdown()" ] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }