A plugin that enables provenance tracking in Dask
Dowload the repo from https://github.com/HPCI-Lab/yprov4dask and install the
plugin in your python environment by running pip install . from within the
root folder of the repository.
Install the package in development mode using pip install -e . if you want
new modifications to the code to be reflected immediately in your environment.
If the -e option is missing, you will have to reinstall the plugin every time
you modify it.
To use the plugin, simply import it into your code, instantiate the plugin and register it with your Dask client.
from dask.distributed import Client
from prov_tracking import ProvTracker
# Can be avoided if working with Jupyter notebooks
if __name__ == '__main__':
# Create you Dask client with any client options you want
client = Client()
# The plugin creation is as simple as this,
# no parameter is required
plugin = ProvTracker()
# The plugin requires a reference to the scheduler,
# but you must register it with the client before providing it
client.register_plugin(plugin)
plugin.start(client.scheduler)
# Your analysis...
# When the client is closed the provenance document is
# automatically generated. If you define you're custom cluster,
# the document is registered when the cluster is closed.
client.close()The plugin can only track what comes through the Dask scheduler, so if no
computation translates in Dask tasks, the provenance document will remain empy.
For example, if you open a dataset with xarray and you want to track its
provenance, make sure that it is using a DaskArray under the hood. If you use
xr.open_dataset, you can enable the use of Dask structured by providing some
value for the chunks argument. chunks={} is sufficient to enable Dask, but
knwo that this value might produce a very inefficient arrangement of data in
memory.
Upon plugin initialization you can provide the following options:
destination: str: folder in which the provenance document is saved. Defaults to./output.file_name: str: name of the provenance document. Defaults toyprov4wfs.json.keep_traceback: bool: tells if the plugin should register the traceback of the exceptions generated by failed tasks. Defaults toFalse.rich_types: bool: tells if datatypes of values such be richer, e.g. for tuples, track the type of each element instead of just saying that the value is a tuple. Defaults toFalse.execution_source: str: path for the single file being executed or the directory containing it. If provided, makes the plugin copy that file or the content of the entire directory intodestinationtogether with the provenance file.jupyter_tracking: bool: tells if the plugin should try to record in the provenance document the information about what line of code of the notebook generated each activity. Defaults toFalse. Notice how this option creates an additional process that communicates with both the Jupyter kernel and the plugin.
jupyter_timeout: float: tells how much seconds the plugin should wait for a new message from the Jupyter tracker before giving up. Defaults to1.0. Pass0.0orNoneto avoid blocking.
Given a Jupyter notebook, you can enable provenance tracking in the same exact way seen before. However, if you want a richer provenance document, you can enable provenance tracking by just setting a flag when instantiating the plugin
from dask.distributed import Client
from prov_tracking import ProvTracker
client = Client()
# Just an additional flag
plugin = ProvTracker(jupyter_tracking = True)
client.register_plugin(plugin)
plugin.start(client.scheduler)Now, as the Jupyter notebook executes, all function calls in a cell are extracted and any funtion whose name is in the following list is marked as a function that can generate tasks.
| Recognized functions | Conditional |
|---|---|
| submit | |
| compute | |
| get | |
| persist | |
| load | |
| fire_and_forget | |
| store | * |
| to_zarr | * |
| to_hdf5 | * |
| to_tiledb | * |
| to_netcdf | * |
| plot | |
| hvplot |
All functions refer to Dask APIs, while functions marked with * can be
prevented from creating tasks right away. store and all to_... accept a
compute argument which if set to False makes the functions return a
dask.Delayed object.
Notice how the tracker analysis capabilities are very limited. It extracts
function names from a static analysis of cell code. So, it cannot distinguish
functions with the same name, but called on objects of different types and, for
conditional functions, it can recognize compute = False, but if False is the
result of some sort of evaluation, nothing can be said about it. For this reason,
the tracker might infer wrong generating functions. To address this, the tracker
can recognizes specific hint provided in code. These are:
# Jupyter:ignore: written right before the line to be ignored, makes the tracker ignore the first function call in that line.# Jupyter:track: tells the tracker to treat the first function call in the next line as a generating function. These hints must be written as they are here to be recognized.
If some task doesn't have a generating function, it is associated with the last function used.
flag = False
# Jupyter:ignore
delayed = ds.to_zarr('output.zarr', compute = flag)If no hint was provided, the tracker would have considered that call to to_zarr
as a generating call.
def save_ds(delayed):
# Jupyter:ignore
delayed.compute()
flag = False
# Jupyter:ignore
delayed = ds.to_zarr('output.zarr', compute = flag)
# Jupyter:track
save_ds(delayed)Again, the tracker analysis capablities are kept as minimal as possible to reduce
overhead, so here you have to ignore the call to compute withing save_as and
mark the call to save_ds as a generating call.