{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Data Storage" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "Efficient storage can dramatically improve performance, particularly when operating repeatedly from disk.\n", "\n", "Decompressing text and parsing CSV files is expensive. One of the most effective strategies with medium data is to use a binary storage format like HDF5. Often the performance gains from doing this is sufficient so that you can switch back to using Pandas again instead of using `dask.dataframe`.\n", "\n", "In this section we'll learn how to efficiently arrange and store your datasets in on-disk binary formats. We'll use the following:\n", "\n", "1. [Pandas `HDFStore`](http://pandas.pydata.org/pandas-docs/stable/io.html#io-hdf5) format on top of `HDF5`\n", "2. Categoricals for storing text data numerically\n", "\n", "**Main Take-aways**\n", "\n", "1. Storage formats affect performance by an order of magnitude\n", "2. Text data will keep even a fast format like HDF5 slow\n", "3. A combination of binary formats, column storage, and partitioned data turns one second wait times into 80ms wait times." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%run prep.py -d accounts" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Read CSV" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First we read our csv data as before.\n", "\n", "CSV and other text-based file formats are the most common storage for data from many sources, because they require minimal pre-processing, can be written line-by-line and are human-readable. Since Pandas' `read_csv` is well-optimized, CSVs are a reasonable input, but far from optimized, since reading required extensive text parsing." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "filename = os.path.join('data', 'accounts.*.csv')\n", "filename" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask.dataframe as dd\n", "df_csv = dd.read_csv(filename)\n", "df_csv.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Write to HDF5" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "HDF5 and netCDF are binary array formats very commonly used in the scientific realm.\n", "\n", "Pandas contains a specialized HDF5 format, `HDFStore`. The ``dd.DataFrame.to_hdf`` method works exactly like the ``pd.DataFrame.to_hdf`` method." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "target = os.path.join('data', 'accounts.h5')\n", "target" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# convert to binary format, takes some time up-front\n", "%time df_csv.to_hdf(target, '/data')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# same data as before\n", "df_hdf = dd.read_hdf(target, '/data')\n", "df_hdf.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Compare CSV to HDF5 speeds" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We do a simple computation that requires reading a column of our dataset and compare performance between CSV files and our newly created HDF5 file. Which do you expect to be faster?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time df_csv.amount.sum().compute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time df_hdf.amount.sum().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sadly they are about the same, or perhaps even slower. \n", "\n", "The culprit here is `names` column, which is of `object` dtype and thus hard to store efficiently. There are two problems here:\n", "\n", "1. How do we store text data like `names` efficiently on disk?\n", "2. Why did we have to read the `names` column when all we wanted was `amount`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1. Store text efficiently with categoricals" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can use Pandas categoricals to replace our object dtypes with a numerical representation. This takes a bit more time up front, but results in better performance.\n", "\n", "More on categoricals at the [pandas docs](http://pandas.pydata.org/pandas-docs/stable/categorical.html) and [this blogpost](http://matthewrocklin.com/blog/work/2015/06/18/Categoricals)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Categorize data, then store in HDFStore\n", "%time df_hdf.categorize(columns=['names']).to_hdf(target, '/data2')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# It looks the same\n", "df_hdf = dd.read_hdf(target, '/data2')\n", "df_hdf.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# But loads more quickly\n", "%time df_hdf.amount.sum().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is now definitely faster than before. This tells us that it's not only the file type that we use but also how we represent our variables that influences storage performance. \n", "\n", "How does the performance of reading depend on the scheduler we use? You can try this with threaded, processes and distributed.\n", "\n", "However this can still be better. We had to read all of the columns (`names` and `amount`) in order to compute the sum of one (`amount`). We'll improve further on this with `parquet`, an on-disk column-store. First though we learn about how to set an index in a dask.dataframe." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Exercise" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`fastparquet` is a library for interacting with parquet-format files, which are a very common format in the Big Data ecosystem, and used by tools such as Hadoop, Spark and Impala." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "target = os.path.join('data', 'accounts.parquet')\n", "df_csv.categorize(columns=['names']).to_parquet(target, storage_options={\"has_nulls\": True}, engine=\"fastparquet\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Investigate the file structure in the resultant new directory - what do you suppose those files are for?\n", "\n", "`to_parquet` comes with many options, such as compression, whether to explicitly write NULLs information (not necessary in this case), and how to encode strings. You can experiment with these, to see what effect they have on the file size and the processing times, below." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ls -l data/accounts.parquet/" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_p = dd.read_parquet(target)\n", "# note that column names shows the type of the values - we could\n", "# choose to load as a categorical column or not.\n", "df_p.dtypes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Rerun the sum computation above for this version of the data, and time how long it takes. You may want to try this more than once - it is common for many libraries to do various setup work when called for the first time." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time df_p.amount.sum().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When archiving data, it is common to sort and partition by a column with unique identifiers, to facilitate fast look-ups later. For this data, that column is `id`. Time how long it takes to retrieve the rows corresponding to `id==100` from the raw CSV, from HDF5 and parquet versions, and finally from a new parquet version written after applying `set_index('id')`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# df_p.set_index('id').to_parquet(...)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Remote files" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask can access various cloud- and cluster-oriented data storage services such as Amazon S3 or HDFS\n", "\n", "Advantages:\n", "* scalable, secure storage\n", "\n", "Disadvantages:\n", "* network speed becomes bottleneck\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The way to set up dataframes (and other collections) remains very similar to before. Note that the data here is available anonymously, but in general an extra parameter `storage_options=` can be passed with further details about how to interact with the remote storage.\n", "\n", "```python\n", "taxi = dd.read_csv('s3://nyc-tlc/trip data/yellow_tripdata_2015-*.csv',\n", " storage_options={'anon': True})\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Warning**: operations over the Internet can take a long time to run. Such operations work really well in a cloud clustered set-up, e.g., amazon EC2 machines reading from S3 or Google compute machines reading from GCS." ] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }