You can run this notebook in a live session Binder or view it on Github.

c072ebd381a84f7fa821cd0cd6ba892d

Distributed, Advanced

Distributed futures

[1]:
from dask.distributed import Client
c = Client(n_workers=4)
c.cluster

In the previous chapter, we showed that executing a calculation (created using delayed) with the distributed executor is identical to any other executor. However, we now have access to additional functionality, and control over what data is held in memory.

To begin, the futures interface (derived from the built-in concurrent.futures) allows map-reduce like functionality. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with submit() and map(). Notice that the call returns immediately, giving one or more futures, whose status begins as “pending” and later becomes “finished”. There is no blocking of the local Python session.

Here is the simplest example of submit in action:

[2]:
def inc(x):
    return x + 1

fut = c.submit(inc, 1)
fut
[2]:
Future: inc status: pending, key: inc-5265354abff49c6ac83830d2f726c1ca

We can re-execute the following cell as often as we want as a way to poll the status of the future. This could of course be done in a loop, pausing for a short time on each iteration. We could continue with our work, or view a progressbar of work still going on, or force a wait until the future is ready.

In the meantime, the status dashboard (link above next to the Cluster widget) has gained a new element in the task stream, indicating that inc() has completed, and the progress section at the problem shows one task complete and held in memory.

[3]:
fut
[3]:
Future: inc status: finished, type: builtins.int, key: inc-5265354abff49c6ac83830d2f726c1ca

Possible alternatives you could investigate:

from dask.distributed import wait, progress
progress(fut)

would show a progress bar in this notebook, rather than having to go to the dashboard. This progress bar is also asynchronous, and doesn’t block the execution of other code in the meanwhile.

wait(fut)

would block and force the notebook to wait until the computation pointed to by fut was done. However, note that the result of inc() is sitting in the cluster, it would take no time to execute the computation now, because Dask notices that we are asking for the result of a computation it already knows about. More on this later.

[4]:
# grab the information back - this blocks if fut is not ready
c.gather(fut)
# equivalent action when only considering a single future
# fut.result()
[4]:
2

Here we see an alternative way to execute work on the cluster: when you submit or map with the inputs as futures, the computation moves to the data rather than the other way around, and the client, in the local Python session, need never see the intermediate values. This is similar to building the graph using delayed, and indeed, delayed can be used in conjunction with futures. Here we use the delayed object total from before.

[5]:
# Some trivial work that takes time
# repeated from the Distributed chapter.

from dask import delayed
import time

def inc(x):
    time.sleep(5)
    return x + 1

def dec(x):
    time.sleep(3)
    return x - 1

def add(x, y):
    time.sleep(7)
    return x + y

x = delayed(inc)(1)
y = delayed(dec)(2)
total = delayed(add)(x, y)
[6]:
# notice the difference from total.compute()
# notice that this cell completes immediately
fut = c.compute(total)
fut
[6]:
Future: add status: pending, key: add-969d7f9e-c2d5-4908-b186-2cf582431d9e
[7]:
c.gather(fut) # waits until result is ready
[7]:
3

Client.submit

submit takes a function and arguments, pushes these to the cluster, returning a Future representing the result to be computed. The function is passed to a worker process for evaluation. Note that this cell returns immediately, while computation may still be ongoing on the cluster.

[8]:
fut = c.submit(inc, 1)
fut
[8]:
Future: inc status: pending, key: inc-4bd9c3b4b694029c8be2532e6cc62286

This looks a lot like doing compute(), above, except now we are passing the function and arguments directly to the cluster. To anyone used to concurrent.futures, this will look familiar. This new fut behaves the same way as the one above. Note that we have now over-written the previous definition of fut, which will get garbage-collected, and, as a result, that previous result is released by the cluster

Exercise: Rebuild the above delayed computation using Client.submit instead

The arguments passed to submit can be futures from other submit operations or delayed objects. The former, in particular, demonstrated the concept of moving the computation to the data which is one of the most powerful elements of programming with Dask.

[9]:
# Your code here
[10]:
x = c.submit(inc, 1)
y = c.submit(dec, 2)
total = c.submit(add, x, y)

print(total)     # This is still a future
c.gather(total)   # This blocks until the computation has finished

<Future: pending, key: add-d5618e271129402960f64674e9b3c788>
[10]:
3

Each futures represents a result held, or being evaluated by the cluster. Thus we can control caching of intermediate values - when a future is no longer referenced, its value is forgotten. In the solution, above, futures are held for each of the function calls. These results would not need to be re-evaluated if we chose to submit more work that needed them.

We can explicitly pass data from our local session into the cluster using scatter(), but usually better is to construct functions that do the loading of data within the workers themselves, so that there is no need to serialise and communicate the data. Most of the loading functions within Dask, sudh as dd.read_csv, work this way. Similarly, we normally don’t want to gather() results that are too big in memory.

The full API of the distributed scheduler gives details of interacting with the cluster, which remember, can be on your local machine or possibly on a massive computational resource.

The futures API offers a work submission style that can easily emulate the map/reduce paradigm (see c.map()) that may be familiar to many people. The intermediate results, represented by futures, can be passed to new tasks without having to bring the pull locally from the cluster, and new work can be assigned to work on the output of previous jobs that haven’t even begun yet.

Generally, any Dask operation that is executed using .compute() can be submitted for asynchronous execution using c.compute() instead, and this applies to all collections. Here is an example with the calculation previously seen in the Bag chapter. We have replaced the .compute() method there with the distributed client version, so, again, we could continue to submit more work (perhaps based on the result of the calculation), or, in the next cell, follow the progress of the computation. A similar progress-bar appears in the monitoring UI page.

[11]:
%run prep.py -d accounts
[12]:
import dask.bag as db
import os
import json
filename = os.path.join('data', 'accounts.*.json.gz')
lines = db.read_text(filename)
js = lines.map(json.loads)

f = c.compute(js.filter(lambda record: record['name'] == 'Alice')
       .pluck('transactions')
       .flatten()
       .pluck('amount')
       .mean())
[13]:
from dask.distributed import progress
# note that progress must be the last line of a cell
# in order to show up
progress(f)
[14]:
# get result.
c.gather(f)
[14]:
1415.7387784576792
[15]:
# release values by deleting the futures
del f, fut, x, y, total

Persist

Considering which data should be loaded by the workers, as opposed to passed, and which intermediate values to persist in worker memory, will in many cases determine the computation efficiency of a process.

In the example here, we repeat a calculation from the Array chapter - notice that each call to compute() is roughly the same speed, because the loading of the data is included every time.

[16]:
%run prep.py -d random
[17]:
import h5py
import os
f = h5py.File(os.path.join('data', 'random.hdf5'), mode='r')
dset = f['/x']
import dask.array as da
x = da.from_array(dset, chunks=(1000000,))

%time x.sum().compute()
%time x.sum().compute()
CPU times: user 37.9 ms, sys: 0 ns, total: 37.9 ms
Wall time: 375 ms
CPU times: user 20.8 ms, sys: 0 ns, total: 20.8 ms
Wall time: 61.2 ms
[17]:
4998647.0

If, instead, we persist the data to RAM up front (this takes a few seconds to complete - we could wait() on this process), then further computations will be much faster.

[18]:
# changes x from a set of delayed prescriptions
# to a set of futures pointing to data in RAM
# See this on the UI dashboard.
x = c.persist(x)
[19]:
%time x.sum().compute()
%time x.sum().compute()
CPU times: user 16.7 ms, sys: 3.93 ms, total: 20.6 ms
Wall time: 47.6 ms
CPU times: user 16 ms, sys: 209 µs, total: 16.2 ms
Wall time: 36.3 ms
[19]:
4998647.0

Naturally, persisting every intermediate along the way is a bad idea, because this will tend to fill up all available RAM and make the whole system slow (or break!). The ideal persist point is often at the end of a set of data cleaning steps, when the data is in a form which will get queried often.

Exercise: how is the memory associated with x released, once we know we are done with it?

[ ]:

Asynchronous computation

843353cc2605402d94954c11cbd07068

One benefit of using the futures API is that you can have dynamic computations that adjust as things progress. Here we implement a simple naive search by looping through results as they come in, and submit new points to compute as others are still running.

Watching the diagnostics dashboard as this runs you can see computations are being concurrently run while more are being submitted. This flexibility can be useful for parallel algorithms that require some level of synchronization.

Lets perform a very simple minimization using dynamic programming. The function of interest is known as Rosenbrock:

[20]:
# a simple function with interesting minima
import time

def rosenbrock(point):
    """Compute the rosenbrock function and return the point and result"""
    time.sleep(0.1)
    score = (1 - point[0])**2 + 2 * (point[1] - point[0]**2)**2
    return point, score

Initial setup, including creating a graphical figure. We use Bokeh for this, which allows for dynamic update of the figure as results come in.

[21]:
from bokeh.io import output_notebook, push_notebook
from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure, show
import numpy as np
output_notebook()

# set up plot background
N = 500
x = np.linspace(-5, 5, N)
y = np.linspace(-5, 5, N)
xx, yy = np.meshgrid(x, y)
d = (1 - xx)**2 + 2 * (yy - xx**2)**2
d = np.log(d)

p = figure(x_range=(-5, 5), y_range=(-5, 5))
p.image(image=[d], x=-5, y=-5, dw=10, dh=10, palette="Spectral11");
Loading BokehJS ...

We start off with a point at (0, 0), and randomly scatter test points around it. Each evaluation takes ~100ms, and as result come in, we test to see if we have a new best point, and choose random points around that new best point, as the search box shrinks.

We print the function value and current best location each time we have a new best value.

[22]:
from dask.distributed import as_completed
from random import uniform

scale = 5                  # Intial random perturbation scale
best_point = (0, 0)        # Initial guess
best_score = float('inf')  # Best score so far
startx = [uniform(-scale, scale) for _ in range(10)]
starty = [uniform(-scale, scale) for _ in range(10)]

# set up plot
source = ColumnDataSource({'x': startx, 'y': starty, 'c': ['grey'] * 10})
p.circle(source=source, x='x', y='y', color='c')
t = show(p, notebook_handle=True)

# initial 10 random points
futures = [c.submit(rosenbrock, (x, y)) for x, y in zip(startx, starty)]
iterator = as_completed(futures)

for res in iterator:
    # take a completed point, is it an improvement?
    point, score = res.result()
    if score < best_score:
        best_score, best_point = score, point
        print(score, point)

    x, y = best_point
    newx, newy = (x + uniform(-scale, scale), y + uniform(-scale, scale))

    # update plot
    source.stream({'x': [newx], 'y': [newy], 'c': ['grey']}, rollover=20)
    push_notebook(document=t)

    # add new point, dynamically, to work on the cluster
    new_point = c.submit(rosenbrock, (newx, newy))
    iterator.add(new_point)  # Start tracking new task as well

    # Narrow search and consider stopping
    scale *= 0.99
    if scale < 0.001:
        break
point
16.37780931753531 (-1.7964066652277988, 1.1585122982386151)
8.205532166645092 (1.3285042379121244, 3.7770886540090576)
2.2259922226131548 (2.1084134385698645, 5.151598374803416)
0.329540864018972 (1.4882156329620768, 2.0012601748295236)
0.004096606890854935 (0.9971543104971037, 1.039530151361363)
0.0020793017435729727 (1.0391815459068776, 1.0963923479809146)
3.1298099557463704e-05 (0.9944475678832885, 0.988441921290214)
2.3295254452837808e-05 (1.0047542261726639, 1.0101195224926616)
3.3665671061465274e-06 (1.0016437706041885, 1.0038666914996335)
2.903914480967777e-06 (1.0003924532958632, 1.0019576421418575)
2.5027976202682037e-07 (1.0002590862161775, 1.0002156225603063)
5.624244386472204e-08 (0.9998474668075431, 0.9995665511396431)
[22]:
(0.999671991052999, 0.9991110718961499)

Debugging

When something goes wrong in a distributed job, it is hard to figure out what the problem was and what to do about it. When a task raises an exception, the exception will show up when that result, or other result that depend upon it, is gathered.

Consider the following delayed calculation to be computed by the cluster. As usual, we get back a future, which the cluster is working on to compute (this happens very slowly for the trivial procedure).

[23]:
@delayed
def ratio(a, b):
    return a // b

ina = [5, 25, 30]
inb = [5, 5, 6]
out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])
f = c.compute(out)
f
[23]:
Future: sum status: pending, key: sum-303bb98b-d422-41ad-b2c9-c93b53a72ad4

We only get to know what happened when we gather the result (this is also true for out.compute(), except we could not have done other stuff in the meantime). For the first set of inputs, it works fine.

[24]:
c.gather(f)
[24]:
11

But if we introduce bad input, an exception is raised. The exception happens in ratio, but only comes to our attention when calculating the sum.

[25]:
ina = [5, 25, 30]
inb = [5, 0, 6]
out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])
f = c.compute(out)
c.gather(f)
distributed.worker - WARNING -  Compute Failed
Function:  ratio
args:      (25, 0)
kwargs:    {}
Exception: ZeroDivisionError('integer division or modulo by zero')

---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
/tmp/ipykernel_7746/4286036942.py in <module>
      3 out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])
      4 f = c.compute(out)
----> 5 c.gather(f)

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1974             else:
   1975                 local_worker = None
-> 1976             return self.sync(
   1977                 self._gather,
   1978                 futures,

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    829             return future
    830         else:
--> 831             return sync(
    832                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    833             )

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1839                             exc = CancelledError(key)
   1840                         else:
-> 1841                             raise exception.with_traceback(traceback)
   1842                         raise exc
   1843                     if errors == "skip":

/tmp/ipykernel_7746/1636418833.py in ratio()
      1 @delayed
      2 def ratio(a, b):
----> 3     return a // b
      4
      5 ina = [5, 25, 30]

ZeroDivisionError: integer division or modulo by zero

The display in this case makes the origin of the exception obvious, but this is not always the case. How should this be debugged, how would we go about finding out the exact conditions that caused the exception?

The first step, of course, is to write well-tested code which makes appropriate assertions about its input and clear warnings and error messages when something goes wrong. This applies to all code.

The most typical thing to do is to execute some portion of the computation in the local thread, so that we can run the Python debugger and query the state of things at the time that the exception happened. Obviously, this cannot be performed on the whole data-set when dealing with Big Data on a cluster, but a suitable sample will probably do even then.

[26]:
import dask
with dask.config.set(scheduler="sync"):
    # do NOT use c.compute(out) here - we specifically do not
    # want the distributed scheduler
    out.compute()
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
/tmp/ipykernel_7746/1072099285.py in <module>
      3     # do NOT use c.compute(out) here - we specifically do not
      4     # want the distributed scheduler
----> 5     out.compute()

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         """
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    442         postcomputes.append(x.__dask_postcompute__())
    443
--> 444     results = schedule(dsk, keys, **kwargs)
    445     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    446

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    525     """
    526     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 527     return get_async(apply_sync, 1, dsk, keys, **kwargs)
    528
    529

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    492
    493                 while state["ready"] and len(state["running"]) < num_workers:
--> 494                     fire_task()
    495
    496             succeeded = True

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/dask/local.py in fire_task()
    454                 )
    455                 # Submit
--> 456                 apply_async(
    457                     execute_task,
    458                     args=(

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/dask/local.py in apply_sync(func, args, kwds, callback)
    514 def apply_sync(func, args=(), kwds={}, callback=None):
    515     """ A naive synchronous version of apply_async """
--> 516     res = func(*args, **kwds)
    517     if callback is not None:
    518         callback(res)

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    225         failed = False
    226     except BaseException as e:
--> 227         result = pack_exception(e, dumps)
    228         failed = True
    229     return key, result, failed

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/tmp/ipykernel_7746/1636418833.py in ratio(a, b)
      1 @delayed
      2 def ratio(a, b):
----> 3     return a // b
      4
      5 ina = [5, 25, 30]

ZeroDivisionError: integer division or modulo by zero
[27]:
# uncomment to enter post-mortem debugger
# %debug

The trouble with this approach is that Dask is meant for the execution of large datasets/computations - you probably can’t simply run the whole thing in one local thread, else you wouldn’t have used Dask in the first place. So the code above should only be used on a small part of the data that also exihibits the error. Furthermore, the method will not work when you are dealing with futures (such as f, above, or after persisting) instead of delayed-based computations.

As an alternative, you can ask the scheduler to analyze your calculation and find the specific sub-task responsible for the error, and pull only it and its dependnecies locally for execution.

[28]:
c.recreate_error_locally(f)
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
/tmp/ipykernel_7746/4271091791.py in <module>
----> 1 c.recreate_error_locally(f)

/usr/share/miniconda3/envs/dask-tutorial/lib/python3.8/site-packages/distributed/recreate_exceptions.py in recreate_error_locally(self, future)
    175             self.client.loop, self._recreate_error_locally, future
    176         )
--> 177         func(*args, **kwargs)

/tmp/ipykernel_7746/1636418833.py in ratio(a, b)
      1 @delayed
      2 def ratio(a, b):
----> 3     return a // b
      4
      5 ina = [5, 25, 30]

ZeroDivisionError: integer division or modulo by zero
[29]:
# uncomment to enter post-mortem debugger
# %debug

Finally, there are errors other than exceptions, when we need to look at the state of the scheduler/workers. In the standard “LocalCluster” we started, we have direct access to these.

[30]:
[(k, v.state) for k, v in c.cluster.scheduler.tasks.items() if v.exception is not None]
[30]:
[('ratio-625dcd0c-86f7-4219-8e56-1d791c01db11', 'erred')]