diff --git a/content/conf.py b/content/conf.py index 86a9452..edd9513 100644 --- a/content/conf.py +++ b/content/conf.py @@ -107,11 +107,14 @@ 'python': ('https://docs.python.org/3', None), # #'sphinx': ('https://www.sphinx-doc.org/', None), 'numpy': ('https://numpy.org/doc/stable/', None), + 'scipy': ('https://docs.scipy.org/doc/scipy-1.15.0/', None), # #'scipy': ('https://docs.scipy.org/doc/scipy/reference/', None), -# #'pandas': ('https://pandas.pydata.org/docs/', None), + 'pandas': ('https://pandas.pydata.org/docs/', None), # #'matplotlib': ('https://matplotlib.org/', None), # 'seaborn': ('https://seaborn.pydata.org/', None), 'ipython': ('https://ipython.readthedocs.io/en/stable/', None), + 'dask': ('https://docs.dask.org/en/stable/', None), + 'xarray': ('https://docs.xarray.dev/en/stable/', None), } # sphinx-hoverxref diff --git a/content/dask.rst b/content/dask.rst index 9a5a94d..0ca9375 100644 --- a/content/dask.rst +++ b/content/dask.rst @@ -81,40 +81,74 @@ Dask provides four different schedulers: Here we will focus on using a ``LocalCluster``, and it is recommended to use -a distributed sceduler ``dask.distributed``. It is more sophisticated, offers more features, +a distributed scheduler ``dask.distributed``. It is more sophisticated, offers more features, but requires minimum effort to set up. It can run locally on a laptop and scale up to a cluster. -We can start a ``LocalCluster`` scheduler which makes use of all the cores and RAM -we have on the machine by: -.. code-block:: python - - from dask.distributed import Client, LocalCluster - # create a local cluster - cluster = LocalCluster() - # connect to the cluster we just created - client = Client(cluster) - client +.. callout:: Alternative 1: Initializing a Dask ``LocalCluster`` via JupyterLab + :class: dropdown + This makes use of the ``dask-labextension`` which is pre-installed in our conda environment. + + #. Start New Dask Cluster from the sidebar and by clicking on ``+ NEW`` button. + #. Click on the ``< >`` button to inject the client code into a notebook cell. Execute it. + -Or you can simply lauch a Client() call which is shorthand for what is described above. + |dask-1| |dask-2| -.. code-block:: python + 3. You can scale the cluster for more resources or launch the dashboard. - from dask.distributed import Client - client = Client() # same as Client(processes=True) - client + |dask-3| + .. |dask-1| image:: ./img/jlab-dask-1.png + :width: 49% -We can also specify the resources to be allocated to a Dask cluster by: + .. |dask-2| image:: ./img/jlab-dask-2.png + :width: 49% -.. code-block:: python - - from dask.distributed import Client, LocalCluster - # create a local cluster with - # 4 workers - # 1 thread per worker - # 4 GiB memory limit for a worker - cluster = LocalCluster(n_workers=4,threads_per_worker=1,memory_limit='4GiB') + .. |dask-3| image:: ./img/jlab-dask-3.png + :width: 100% + +**Alternative 2**: We can also start a ``LocalCluster`` scheduler manually, which makes use of: + +.. tabs:: + + .. tab:: all resources + + all the cores and RAM we have on the machine by: + + .. code-block:: python + + from dask.distributed import Client, LocalCluster + # create a local cluster + cluster = LocalCluster() + # connect to the cluster we just created + client = Client(cluster) + client + + + Or you can simply lauch a Client() call which is shorthand for what is described above. + + .. code-block:: python + + from dask.distributed import Client + client = Client() # same as Client(processes=True) + client + + .. tab:: specified resources + + which limits the compute resources available as follows: + + .. code-block:: python + + from dask.distributed import Client, LocalCluster + + cluster = LocalCluster( + n_workers=4, + threads_per_worker=1, + memory_limit='4GiB' # memory limit per worker + ) + client = Client(cluster) + client .. note:: @@ -136,6 +170,7 @@ you can modify the number of workers manually or automatically based on workload cluster.adapt(minimum=1, maximum=10) # Allows the cluster to auto scale to 10 when tasks are computed + Dask distributed scheduler also provides live feedback via its interactive dashboard. A link that redirects to the dashboard will prompt in the terminal where the scheduler is created, and it is also shown when you create a Client and connect the scheduler. @@ -232,23 +267,23 @@ Let us further calculate the sum of the dask array: sum_da = ones.sum() -So far, it is only a symbolic representation of the array. -One way to trigger the computation is to call :meth:`compute`: +So far, only a task graph of the computation is prepared. +We can visualize the task graph by calling :meth:`visualize`: .. code-block:: python - dask.compute(sum_da) - # or - sum_da.compute() + dask.visualize(sum_da) + # or + sum_da.visualize() -We can visualize the symbolic operations by calling :meth:`visualize`: +One way to trigger the computation is to call :meth:`compute`: .. code-block:: python - dask.visualize(sum_da) - # or - sum_da.visualize() + dask.compute(sum_da) + # or + sum_da.compute() You can find additional details and examples here @@ -276,16 +311,24 @@ a Dask dataframe: import dask.dataframe as dd url = "https://raw.githubusercontent.com/pandas-dev/pandas/master/doc/data/titanic.csv" - # read from Pandas DataFrame + df = pd.read_csv(url, index_col="Name") + # read a Dask Dataframe from a Pandas Dataframe ddf = dd.from_pandas(df, npartitions=10) - # "blocksize=None" means a single chunk is used + +Alternatively you can directly read into a Dask dataframe, whilst also modifying +how the dataframe is partitioned in terms of ``blocksize``:: + + # blocksize=None which means a single chunk is used df = dd.read_csv(url,blocksize=None).set_index('Name') ddf= df.repartition(npartitions=10) + # blocksize="4MB" or blocksize=4e6 ddf = dd.read_csv(url,blocksize="4MB").set_index('Name') ddf.npartitions - # blocksize="default" means the chunk is computed based on available memory and cores with a maximum of 64MB + + # blocksize="default" means the chunk is computed based on + # available memory and cores with a maximum of 64MB ddf = dd.read_csv(url,blocksize="default").set_index('Name') ddf.npartitions @@ -319,9 +362,13 @@ preprocessing log files, JSON records, or other user defined Python objects. We will content ourselves with implementing a dask version of the word-count problem, specifically the step where we count words in a text. +.. _word-count-problem: + .. demo:: Demo: Dask version of word-count - First navigate to the ``word-count-hpda`` directory. The serial version (wrapped in + If you have not already cloned or downloaded ``word-count-hpda`` repository, + `get it from here `__. + Then, navigate to the ``word-count-hpda`` directory. The serial version (wrapped in multiple functions in the ``source/wordcount.py`` code) looks like this: .. code-block:: python @@ -344,7 +391,11 @@ specifically the step where we count words in a text. else: counts[word] = 1 - sorted_counts = sorted(list(counts.items()), key=lambda key_value: key_value[1], reverse=True) + sorted_counts = sorted( + list(counts.items()), + key=lambda key_value: key_value[1], + reverse=True + ) sorted_counts[:10] @@ -357,15 +408,32 @@ specifically the step where we count words in a text. DELIMITERS = ". , ; : ? $ @ ^ < > # % ` ! * - = ( ) [ ] { } / \" '".split() text = db.read_text(filename, blocksize='1MiB') - sorted_counts = text.filter(lambda word: word not in DELIMITERS).str.lower().str.strip().str.split().flatten().frequencies().topk(10,key=1).compute() + sorted_counts = ( + text + .filter(lambda word: word not in DELIMITERS) + .str.lower() + .str.strip() + .str.split() + .flatten() + .frequencies().topk(10,key=1) + .compute() + ) sorted_counts The last two steps of the pipeline could also have been done with a dataframe: .. code-block:: python - - filtered = text.filter(lambda word: word not in DELIMITERS).str.lower().str.strip().str.split().flatten() + :emphasize-lines: 9-10 + + filtered = ( + text + .filter(lambda word: word not in DELIMITERS) + .str.lower() + .str.strip() + .str.split() + .flatten() + ) ddf = filtered.to_dataframe(columns=['words']) ddf['words'].value_counts().compute()[:10] @@ -375,104 +443,16 @@ specifically the step where we count words in a text. analysing a very large text file (all tweets in a year? a genome?). Dask provides both parallelisation and the ability to utilize RAM on multiple machines. +Exercise set 1 +-------------- -Dask delayed -^^^^^^^^^^^^ - -Sometimes problems don't fit into one of the collections like -``dask.array`` or ``dask.dataframe``, they are not as simple as just a big array or dataframe. -In these cases, ``dask.delayed`` may be the right choice. If the problem is paralellisable, -we can use ``dask.delayed`` which allows users to make function calls lazy -and thus can be put into a task graph with dependencies. - - -Consider the following example. The functions are very simple, and they *sleep* -for a prescribed time to simulate real work: - -.. literalinclude:: example/delay.py - -Let us run the example first, one after the other in sequence: - -.. code-block:: ipython - - %%timeit - x = inc(1) - y = dec(2) - z = add(x, y) - # 902 ms ± 367 µs per loop (mean ± std. dev. of 7 runs, 1 loop each) - - -Note that the first two functions ``inc`` and ``dec`` don't depend on each other, -we could have called them in parallel. We can call ``dask.delayed`` on these funtions -to make them lazy and tasks into a graph which we will run later on parallel hardware. - -.. code-block:: ipython - - import dask - inc_delay = dask.delayed(inc) - dec_delay = dask.delayed(dec) - add_delay = dask.delayed(add) - - -.. code-block:: ipython - - %%timeit - x = inc_delay(1) - y = dec_delay(2) - z = add_delay(x, y) - # 59.6 µs ± 356 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each) - - -.. code-block:: ipython - - %%timeit - x = inc_delay(1) - y = dec_delay(2) - z = add_delay(x, y) - z.compute() - # 603 ms ± 181 µs per loop (mean ± std. dev. of 7 runs, 1 loop each) - - -.. callout:: Default scheduler for dask collections - - ``dask.array`` and ``dask.dataframe`` use the ``threads`` scheduler - - ``dask.bag`` uses the ``processes`` scheduler - - In case to change the default scheduler, using `dask.config.set` is recommanded: - - .. code-block:: ipython - - # To set globally - dask.config.set(scheduler='processes') - x.compute() - - # To set it as a context manager - with dask.config.set(scheduler='threads'): - x.compute() +Choose an exercise with the data structure that you are most interested in: +:ref:`ex-dask-array`, :ref:`ex-dask-df` or :ref:`ex-dask-bag`. +.. _ex-dask-array: -Comparison to Spark -------------------- - -Dask has much in common with the `Apache Spark `__. -Here are `some differences `__ -between the two frameworks: - -- Dask is smaller and more lightweight but is used together with other packages in - the Python ecosystem. Spark is an all-in-one project with its own ecosystem. -- Spark is written in Scala, with some support for Python and R, while Dask is in Python. -- Spark is more focused on business intelligence (SQL, lightweight machine learning) while - Dask is more general and is used more in scientific applications. -- Both Dask and Spark can scale from one to thousands of nodes. -- Dask supports the NumPy model for multidimensional arrays which Spark doesn't. -- Spark generally expects users to compose computations out of high-level primitives - (map, reduce, groupby, join, etc.), while Dask allows to specify arbitrary task - graphs for more complex and custom systems. - - -Exercises ---------- +1.1. using dask.array +^^^^^^^^^^^^^^^^^^^^^ .. challenge:: Chunk size @@ -515,242 +495,198 @@ Exercises with millions of tasks will lead to overhead being in the range from minutes to hours which is not recommended. -.. challenge:: Dask delay - - We extend the previous example a little bit more by applying the function - on a data array using for loop and adding an *if* condition: - - .. literalinclude:: example/delay_more.py - - - Please add dask.delayed to parallelize the program as much as possible - and check graph visualizations. - - .. solution:: - - .. literalinclude:: example/delay_more_solution.py - +.. _ex-dask-df: -.. challenge:: Testing different schedulers - - We will test different schedulers and compare the performance on a simple task calculating - the mean of a random generated array. - - Here is the code using NumPy: +1.2. using dask.dataframe +^^^^^^^^^^^^^^^^^^^^^^^^^ - .. literalinclude:: example/dask_gil.py - :language: ipython - :lines: 1-7 +.. exercise:: Benchmarking DataFrame.apply() - Here we run the same code using different schedulers from Dask: + Recall the + :ref:`word count ` + project that we encountered earlier and the :func:`scipy.optimize.curve_fit` function. + The :download:`results.csv ` file contains word counts of the 10 + most frequent words in different texts, and we want to fit a power law to the + individual distributions in each row. - .. tabs:: + Here are our fitting functions: - .. tab:: ``serial`` + .. code-block:: python - .. literalinclude:: example/dask_gil.py - :language: ipython - :lines: 9-12 + from scipy.optimize import curve_fit - .. tab:: ``threads`` + def powerlaw(x, A, s): + return A * np.power(x, s) - .. literalinclude:: example/dask_gil_threads.py - :language: ipython - :lines: 1-10 + def fit_powerlaw(row): + X = np.arange(row.shape[0]) + 1.0 + params, cov = curve_fit(f=powerlaw, xdata=X, ydata=row, p0=[100, -1], bounds=(-np.inf, np.inf)) + return params[1] - .. literalinclude:: example/dask_gil_threads.py - :language: ipython - :lines: 12-15 + Compare the performance of + :meth:`dask.dataframe.DataFrame.apply` with + :meth:`pandas.DataFrame.apply` + for the this example. You will probably see a slowdown due to the parallelisation + overhead. But what if you add a ``time.sleep(0.01)`` inside :meth:`fit_powerlaw` to + emulate a time-consuming calculation? - .. literalinclude:: example/dask_gil_threads.py - :language: ipython - :lines: 17-20 + .. callout:: Hints + :class: dropdown + + - You will need to call :meth:`apply` on the dataframe starting from column 1: ``dataframe.iloc[:,1:].apply()`` + - Remember that both Pandas and Dask have the :meth:`read_csv` function. + - Try repartitioning the dataframe into 4 partitions with ``ddf4=ddf.repartition(npartitions=4)``. + - You will probably get a warning in your Dask version that `You did not provide metadata`. + To remove the warning, add the ``meta=(None, "float64")`` flag to :meth:`apply`. For the + current data, this does not affect the performance. - .. literalinclude:: example/dask_gil_threads.py - :language: ipython - :lines: 22-25 + .. callout:: More hints with Pandas code + :class: dropdown - .. tab:: ``processes`` + You need to reimplement the highlighted part which creates the + dataframe and applies the :func:`fit_powerlaw` function. - .. literalinclude:: example/dask_gil_processes.py - :language: ipython - :lines: 1-10 + .. literalinclude:: exercise/apply_pd.py + :language: ipython + :emphasize-lines: 16-17 - .. literalinclude:: example/dask_gil_processes.py - :language: ipython - :lines: 12-15 - .. literalinclude:: example/dask_gil_processes.py - :language: ipython - :lines: 17-20 + .. solution:: - .. literalinclude:: example/dask_gil_processes.py - :language: ipython - :lines: 22-25 + .. literalinclude:: exercise/apply_dask.py + :language: ipython - .. tab:: ``distributed`` - .. literalinclude:: example/dask_gil_distributed.py - :language: ipython - :lines: 1-14 +.. _ex-dask-bag: - .. literalinclude:: example/dask_gil_distributed.py - :language: ipython - :lines: 16-17 +1.3. using dask.bag +^^^^^^^^^^^^^^^^^^^ - .. literalinclude:: example/dask_gil_distributed.py - :language: ipython - :lines: 19-21 +.. exercise:: Break down the dask.bag computational pipeline - .. literalinclude:: example/dask_gil_distributed.py - :language: ipython - :lines: 23-25 + Revisit the + :ref:`word count problem ` + and the implementation with a ``dask.bag`` that we saw above. + + - To get a feeling for the computational pipeline, break down the computation into + separate steps and investigate intermediate results using :meth:`.compute`. + - Benchmark the serial and ``dask.bag`` versions. Do you see any speedup? + What if you have a larger textfile? You can for example concatenate all texts into + a single file: ``cat data/*.txt > data/all.txt``. - .. literalinclude:: example/dask_gil_distributed.py - :language: ipython - :lines: 27 +Low level interface: delayed +---------------------------- +Sometimes problems don't fit into one of the collections like +``dask.array`` or ``dask.dataframe``, they are not as simple as just a big array or dataframe. +In these cases, ``dask.delayed`` may be the right choice. If the problem is paralellisable, +we can use ``dask.delayed`` which allows users to make function calls lazy +and thus can be put into a task graph with dependencies. - .. solution:: Testing different schedulers +Consider the following example. The functions are very simple, and they *sleep* +for a prescribed time to simulate real work: - Comparing profiling from mt_1, mt_2 and mt_4: Using ``threads`` scheduler is limited by the GIL on pure Python code. - In our case, although it is not a pure Python function, it is still limited by GIL, therefore no multi-core speedup +.. literalinclude:: example/delay.py - Comparing profiling from mt_1, mp_1 and dis_1: Except for ``threads``, the other two schedulers copy data between processes - and this can introduce performance penalties, particularly when the data being transferred between processes is large. +Let us run the example first, one after the other in sequence: - Comparing profiling from serial, mt_1, mp_1 and dis_1: Creating and destroying threads and processes have overheads, - ``processes`` have even more overhead than ``threads`` +.. code-block:: ipython - Comparing profiling from mp_1, mp_2 and mp_4: Running multiple processes is only effective when there is enough computational - work to do i.e. CPU-bound tasks. In this very example, most of the time is actually spent on transferring the data - rather than computing the mean + %%timeit + x = inc(1) + y = dec(2) + z = add(x, y) + # 902 ms ± 367 µs per loop (mean ± std. dev. of 7 runs, 1 loop each) - Comparing profiling from ``processes`` and ``distributed``: Using ``distributed`` scheduler has advantages over ``processes``, - this is related to better handling of data copying, i.e. ``processes`` scheduler copies data for every task, while - ``distributed`` scheduler copies data for each worker. +Note that the first two functions ``inc`` and ``dec`` don't depend on each other, +we could have called them in parallel. We can call ``dask.delayed`` on these functions +to make them lazy and tasks into a graph which we will run later on parallel hardware. +.. code-block:: ipython -.. challenge:: SVD with large skinny matrix using ``distributed`` scheduler + import dask + inc_delay = dask.delayed(inc) + dec_delay = dask.delayed(dec) + add_delay = dask.delayed(add) - We can use dask to compute SVD of a large matrix which does not fit into the memory of a - normal laptop/desktop. While it is computing, you should switch to the Dask dashboard and - watch column "Workers" and "Graph", so you must run this using ``distributed`` scheduler - .. code-block:: python +.. code-block:: ipython - import dask - import dask.array as da - X = da.random.random((2000000, 100), chunks=(10000, 100)) - X - u, s, v = da.linalg.svd(X) - dask.visualize(u, s, v) - s.compute() + %%timeit + x = inc_delay(1) + y = dec_delay(2) + z = add_delay(x, y) + # 59.6 µs ± 356 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each) - SVD is only supported for arrays with chunking in one dimension, which requires that the matrix - is either *tall-and-skinny* or *short-and-fat*. - If chunking in both dimensions is needed, one should use approximate algorithm. +.. code-block:: ipython - .. code-block:: python + %%timeit + x = inc_delay(1) + y = dec_delay(2) + z = add_delay(x, y) + z.compute() + # 603 ms ± 181 µs per loop (mean ± std. dev. of 7 runs, 1 loop each) - import dask - import dask.array as da - X = da.random.random((10000, 10000), chunks=(2000, 2000)) - u, s, v = da.linalg.svd_compressed(X, k=5) - dask.visualize(u, s, v) - s.compute() +.. callout:: Default scheduler for dask collections -.. callout:: Memory management + ``dask.array`` and ``dask.dataframe`` use the ``threads`` scheduler - You may observe that there are different memory categories showing on the dashboard: + ``dask.bag`` uses the ``processes`` scheduler - - process: Overall memory used by the worker process, as measured by the OS - - managed: Size of data that Dask holds in RAM, but most probably inaccurate, excluding spilled data. - - unmanaged: Memory that Dask is not directly aware of, this can be e.g. Python modules, - temporary arrays, memory leasks, memory not yet free()'d by the Python memory manager to the OS - - unmanaged recent: Unmanaged memory that has appeared within the last 30 seconds whch is not included - in the "unmanaged" memory measure - - spilled: Memory spilled to disk + In case to change the default scheduler, using `dask.config.set` is recommended: - The sum of managed + unmanaged + unmanaged recent is equal by definition to the process memory. + .. code-block:: ipython - When the managed memory exceeds 60% of the memory limit (target threshold), - the worker will begin to dump the least recently used data to disk. - Above 70% of the target memory usage based on process memory measurment (spill threshold), - the worker will start dumping unused data to disk. - - At 80% process memory load, currently executing tasks continue to run, but no additional tasks - in the worker's queue will be started. + # To set globally + dask.config.set(scheduler='processes') + x.compute() - At 95% process memory load (terminate threshold), all workers will be terminated. Tasks will be cancelled - as well and data on the worker will be lost and need to be recomputed. + # To set it as a context manager + with dask.config.set(scheduler='threads'): + x.compute() -.. exercise:: Benchmarking dask.dataframes.apply() +Comparison to Spark +------------------- - Recall the word-count project that we encountered earlier and the :meth:`scipy.optimize.curve_fit` function. - The :download:`results.csv ` file contains word counts of the 10 - most frequent words in different texts, and we want to fit a power law to the - individual distributions in each row. +Dask has much in common with the `Apache Spark `__. +Here are `some differences `__ +between the two frameworks: - Here are our fitting functions: +- Dask is smaller and more lightweight but is used together with other packages in + the Python ecosystem. Spark is an all-in-one project with its own ecosystem. +- Spark is written in Scala, with some support for Python and R, while Dask is in Python. +- Spark is more focused on business intelligence (SQL, lightweight machine learning) while + Dask is more general and is used more in scientific applications. +- Both Dask and Spark can scale from one to thousands of nodes. +- Dask supports the NumPy model for multidimensional arrays which Spark doesn't. +- Spark generally expects users to compose computations out of high-level primitives + (map, reduce, groupby, join, etc.), while Dask allows to specify arbitrary task + graphs for more complex and custom systems. - .. code-block:: python - from scipy.optimize import curve_fit +Exercise set 2 +-------------- - def powerlaw(x, A, s): - return A * np.power(x, s) - def fit_powerlaw(row): - X = np.arange(row.shape[0]) + 1.0 - params, cov = curve_fit(f=powerlaw, xdata=X, ydata=row, p0=[100, -1], bounds=(-np.inf, np.inf)) - return params[1] +.. challenge:: Dask delay - Compare the performance of :meth:`dask.dataframes.apply` with :meth:`pandas.dataframes.apply` - for the this example. You will probably see a slowdown due to the parallelisation - overhead. But what if you add a ``time.sleep(0.01)`` inside :meth:`fit_powerlaw` to - emulate a time-consuming calculation? + We extend the previous example a little bit more by applying the function + on a data array using for loop and adding an *if* condition: - .. solution:: Hints + .. literalinclude:: example/delay_more.py - - You will need to call :meth:`apply` on the dataframe starting from column 1: ``dataframe.iloc[:,1:].apply()`` - - Remember that both Pandas and Dask have the :meth:`read_csv` function. - - Try repartitioning the dataframe into 4 partitions with ``ddf4=ddf.repartition(npartitions=4)``. - - You will probably get a warning in your Dask version that `You did not provide metadata`. - To remove the warning, add the ``meta=(None, "float64")`` flag to :meth:`apply`. For the - current data, this does not affect the performance. + Please add ``dask.delayed`` to parallelize the program as much as possible + and check graph visualizations. .. solution:: - .. tabs:: - - .. tab:: Pandas - - .. literalinclude:: exercise/apply_pd.py - :language: ipython - - .. tab:: Dask - - .. literalinclude:: exercise/apply_dask.py - :language: ipython - - -.. exercise:: Break down the dask.bag computational pipeline - - Revisit the word-count problem and the implementation with a ``dask.bag`` that we - saw above. - - - To get a feeling for the computational pipeline, break down the computation into - separate steps and investigate intermediate results using :meth:`.compute`. - - Benchmark the serial and ``dask.bag`` versions. Do you see any speedup? - What if you have a larger textfile? You can for example concatenate all texts into - a single file: ``cat data/*.txt > data/all.txt``. + .. literalinclude:: example/delay_more_solution.py .. challenge:: Climate simulation data using Xarray and Dask @@ -765,7 +701,8 @@ Exercises https://xarray.pydata.org/en/stable/dask.html#reading-and-writing-data for more details. Note that the NetCDF files are here https://github.com/ENCCS/hpda-python/tree/main/content/data , - you need to download them to your laptop first, then depending on where you put the files, + you need to ``git clone`` the repository or download the files to your laptop first. + Then depending on where you put the files, you may need to adapt the path to the data folder in the Python code. .. code-block:: ipython @@ -777,11 +714,12 @@ Exercises ds=xr.open_mfdataset('./data/tas*.nc', parallel=True,use_cftime=True) - ``open_mfdataset()`` is for reading multiple files and will chunk each file into a single Dask array by default. + :func:`xarray.open_mfdataset` is for reading multiple files and will chunk each file into a single Dask array by default. One could supply the chunks keyword argument to control the size of the resulting Dask arrays. - Passing the keyword argument ``parallel=True`` to open_mfdataset() will speed up the reading of + Passing the keyword argument ``parallel=True`` to :func:`xarray.open_mfdataset` will speed up the reading of large multi-file datasets by executing those read tasks in parallel using ``dask.delayed``. + Explore the following operations line-by-line: .. code-block:: ipython @@ -800,4 +738,6 @@ Exercises .. keypoints:: - Dask uses lazy execution + - Dask can parallelize and perform out-of-memory computation. + That is, handle data that would not fit in the memory if loaded at once. - Only use Dask for processing very large amount of data diff --git a/content/dask_opt.rst b/content/dask_opt.rst new file mode 100644 index 0000000..7029127 --- /dev/null +++ b/content/dask_opt.rst @@ -0,0 +1,160 @@ +Dask (II) +========= + +.. challenge:: Testing different schedulers + + We will test different schedulers and compare the performance on a simple task calculating + the mean of a random generated array. + + Here is the code using NumPy: + + .. literalinclude:: example/dask_gil.py + :language: ipython + :lines: 1-7 + + Here we run the same code using different schedulers from Dask: + + .. tabs:: + + .. tab:: ``serial`` + + .. literalinclude:: example/dask_gil.py + :language: ipython + :lines: 9-12 + + .. tab:: ``threads`` + + .. literalinclude:: example/dask_gil_threads.py + :language: ipython + :lines: 1-10 + + .. literalinclude:: example/dask_gil_threads.py + :language: ipython + :lines: 12-15 + + .. literalinclude:: example/dask_gil_threads.py + :language: ipython + :lines: 17-20 + + .. literalinclude:: example/dask_gil_threads.py + :language: ipython + :lines: 22-25 + + .. tab:: ``processes`` + + .. literalinclude:: example/dask_gil_processes.py + :language: ipython + :lines: 1-10 + + .. literalinclude:: example/dask_gil_processes.py + :language: ipython + :lines: 12-15 + + .. literalinclude:: example/dask_gil_processes.py + :language: ipython + :lines: 17-20 + + .. literalinclude:: example/dask_gil_processes.py + :language: ipython + :lines: 22-25 + + .. tab:: ``distributed`` + + .. literalinclude:: example/dask_gil_distributed.py + :language: ipython + :lines: 1-14 + + .. literalinclude:: example/dask_gil_distributed.py + :language: ipython + :lines: 16-17 + + .. literalinclude:: example/dask_gil_distributed.py + :language: ipython + :lines: 19-21 + + .. literalinclude:: example/dask_gil_distributed.py + :language: ipython + :lines: 23-25 + + .. literalinclude:: example/dask_gil_distributed.py + :language: ipython + :lines: 27 + + + + .. solution:: Testing different schedulers + + Comparing profiling from mt_1, mt_2 and mt_4: Using ``threads`` scheduler is limited by the GIL on pure Python code. + In our case, although it is not a pure Python function, it is still limited by GIL, therefore no multi-core speedup + + Comparing profiling from mt_1, mp_1 and dis_1: Except for ``threads``, the other two schedulers copy data between processes + and this can introduce performance penalties, particularly when the data being transferred between processes is large. + + Comparing profiling from serial, mt_1, mp_1 and dis_1: Creating and destroying threads and processes have overheads, + ``processes`` have even more overhead than ``threads`` + + Comparing profiling from mp_1, mp_2 and mp_4: Running multiple processes is only effective when there is enough computational + work to do i.e. CPU-bound tasks. In this very example, most of the time is actually spent on transferring the data + rather than computing the mean + + Comparing profiling from ``processes`` and ``distributed``: Using ``distributed`` scheduler has advantages over ``processes``, + this is related to better handling of data copying, i.e. ``processes`` scheduler copies data for every task, while + ``distributed`` scheduler copies data for each worker. + + + +.. challenge:: SVD with large skinny matrix using ``distributed`` scheduler + + We can use dask to compute SVD of a large matrix which does not fit into the memory of a + normal laptop/desktop. While it is computing, you should switch to the Dask dashboard and + watch column "Workers" and "Graph", so you must run this using ``distributed`` scheduler + + .. code-block:: python + + import dask + import dask.array as da + X = da.random.random((2000000, 100), chunks=(10000, 100)) + X + u, s, v = da.linalg.svd(X) + dask.visualize(u, s, v) + s.compute() + + + SVD is only supported for arrays with chunking in one dimension, which requires that the matrix + is either *tall-and-skinny* or *short-and-fat*. + If chunking in both dimensions is needed, one should use approximate algorithm. + + .. code-block:: python + + import dask + import dask.array as da + X = da.random.random((10000, 10000), chunks=(2000, 2000)) + u, s, v = da.linalg.svd_compressed(X, k=5) + dask.visualize(u, s, v) + s.compute() + + +.. callout:: Memory management + + You may observe that there are different memory categories showing on the dashboard: + + - process: Overall memory used by the worker process, as measured by the OS + - managed: Size of data that Dask holds in RAM, but most probably inaccurate, excluding spilled data. + - unmanaged: Memory that Dask is not directly aware of, this can be e.g. Python modules, + temporary arrays, memory leasks, memory not yet free()'d by the Python memory manager to the OS + - unmanaged recent: Unmanaged memory that has appeared within the last 30 seconds whch is not included + in the "unmanaged" memory measure + - spilled: Memory spilled to disk + + The sum of managed + unmanaged + unmanaged recent is equal by definition to the process memory. + + When the managed memory exceeds 60% of the memory limit (target threshold), + the worker will begin to dump the least recently used data to disk. + Above 70% of the target memory usage based on process memory measurment (spill threshold), + the worker will start dumping unused data to disk. + + At 80% process memory load, currently executing tasks continue to run, but no additional tasks + in the worker's queue will be started. + + At 95% process memory load (terminate threshold), all workers will be terminated. Tasks will be cancelled + as well and data on the worker will be lost and need to be recomputed. diff --git a/content/img/jlab-dask-1.png b/content/img/jlab-dask-1.png new file mode 100644 index 0000000..3833594 Binary files /dev/null and b/content/img/jlab-dask-1.png differ diff --git a/content/img/jlab-dask-2.png b/content/img/jlab-dask-2.png new file mode 100644 index 0000000..c30d2ba Binary files /dev/null and b/content/img/jlab-dask-2.png differ diff --git a/content/img/jlab-dask-3.png b/content/img/jlab-dask-3.png new file mode 100644 index 0000000..60af478 Binary files /dev/null and b/content/img/jlab-dask-3.png differ diff --git a/content/index.rst b/content/index.rst index dc572f2..9cf47c3 100644 --- a/content/index.rst +++ b/content/index.rst @@ -69,6 +69,7 @@ and distributed computing. GPU-computing parallel-computing_opt optimization_opt + dask_opt .. toctree::