{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Parallel and Distributed Machine Learning\n", "\n", "[Dask-ML](https://dask-ml.readthedocs.io) has resources for parallel and distributed machine learning." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Types of Scaling\n", "\n", "There are a couple of distinct scaling problems you might face.\n", "The scaling strategy depends on which problem you're facing.\n", "\n", "1. CPU-Bound: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.\n", "2. Memory-bound: Data is larger than RAM, and sampling isn't an option.\n", "\n", "![](images/ml-dimensions.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* For in-memory problems, just use scikit-learn (or your favorite ML library).\n", "* For large models, use `dask_ml.joblib` and your favorite scikit-learn estimator\n", "* For large datasets, use `dask_ml` estimators" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Scikit-Learn in 5 Minutes\n", "\n", "Scikit-Learn has a nice, consistent API.\n", "\n", "1. You instantiate an `Estimator` (e.g. `LinearRegression`, `RandomForestClassifier`, etc.). All of the models *hyperparameters* (user-specified parameters, not the ones learned by the estimator) are passed to the estimator when it's created.\n", "2. You call `estimator.fit(X, y)` to train the estimator.\n", "3. Use `estimator` to inspect attributes, make predictions, etc. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's generate some random data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.datasets import make_classification\n", "\n", "X, y = make_classification(n_samples=10000, n_features=4, random_state=0)\n", "X[:8]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y[:8]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll fit a Support Vector Classifier." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.svm import SVC" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the estimator and fit it." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator = SVC(random_state=0)\n", "estimator.fit(X, y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Inspect the learned attributes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator.support_vectors_[:4]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the accuracy." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator.score(X, y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Hyperparameters" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Most models have *hyperparameters*. They affect the fit, but are specified up front instead of learned during training." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator = SVC(C=0.00001, shrinking=False, random_state=0)\n", "estimator.fit(X, y)\n", "estimator.support_vectors_[:4]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator.score(X, y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Hyperparameter Optimization\n", "\n", "There are a few ways to learn the best *hyper*parameters while training. One is `GridSearchCV`.\n", "As the name implies, this does a brute-force search over a grid of hyperparameter combinations." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.model_selection import GridSearchCV" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "estimator = SVC(gamma='auto', random_state=0, probability=True)\n", "param_grid = {\n", " 'C': [0.001, 10.0],\n", " 'kernel': ['rbf', 'poly'],\n", "}\n", "\n", "grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)\n", "grid_search.fit(X, y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Single-machine parallelism with scikit-learn\n", "\n", "![](images/unmerged_grid_search_graph.svg)\n", "\n", "Scikit-Learn has nice *single-machine* parallelism, via Joblib.\n", "Any scikit-learn estimator that can operate in parallel exposes an `n_jobs` keyword.\n", "This controls the number of CPU cores that will be used." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)\n", "grid_search.fit(X, y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Multi-machine parallelism with Dask\n", "\n", "![](images/merged_grid_search_graph.svg)\n", "\n", "Dask can talk to scikit-learn (via joblib) so that your *cluster* is used to train a model. \n", "\n", "If you run this on a laptop, it will take quite some time, but the CPU usage will be satisfyingly near 100% for the duration. To run faster, you would need a disrtibuted cluster. That would mean putting something in the call to `Client` something like\n", "\n", "```\n", "c = Client('tcp://my.scheduler.address:8786')\n", "```\n", "\n", "Details on the many ways to create a cluster can be found [here](https://docs.dask.org/en/latest/setup/single-distributed.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's try it on a larger problem (more hyperparameters)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import joblib\n", "import dask.distributed\n", "\n", "c = dask.distributed.Client()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "param_grid = {\n", " 'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],\n", " # Uncomment this for larger Grid searches on a cluster\n", " # 'kernel': ['rbf', 'poly', 'linear'],\n", " # 'shrinking': [True, False],\n", "}\n", "\n", "grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=5, n_jobs=-1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "with joblib.parallel_backend(\"dask\", scatter=[X, y]):\n", " grid_search.fit(X, y)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "grid_search.best_params_, grid_search.best_score_" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Training on Large Datasets\n", "\n", "Sometimes you'll want to train on a larger than memory dataset. `dask-ml` has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine's RAM." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask.array as da\n", "import dask.delayed\n", "from sklearn.datasets import make_blobs\n", "import numpy as np" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll make a small (random) dataset locally using scikit-learn." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "n_centers = 12\n", "n_features = 20\n", "\n", "X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)\n", "\n", "centers = np.zeros((n_centers, n_features))\n", "\n", "for i in range(n_centers):\n", " centers[i] = X_small[y_small == i].mean(0)\n", " \n", "centers[:4]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The small dataset will be the template for our large random dataset.\n", "We'll use `dask.delayed` to adapt `sklearn.datasets.make_blobs`, so that the actual dataset is being generated on our workers. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "n_samples_per_block = 200000\n", "n_blocks = 500\n", "\n", "delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,\n", " centers=centers,\n", " n_features=n_features,\n", " random_state=i)[0]\n", " for i in range(n_blocks)]\n", "arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X.dtype)\n", " for obj in delayeds]\n", "X = da.concatenate(arrays)\n", "X" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "X = X.persist() # Only run this on the cluster." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.\n", "\n", "They follow the scikit-learn API, so if you're familiar with scikit-learn, you'll feel at home with Dask-ML." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask_ml.cluster import KMeans" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "clf = KMeans(init_max_iter=3, oversampling_factor=10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time clf.fit(X)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "clf.labels_" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "clf.labels_[:10].compute()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" }, "nbsphinx": { "execute": "never" } }, "nbformat": 4, "nbformat_minor": 4 }