Dask
Overview
Dask is a Python library for parallel and distributed computing that enables three critical capabilities:
- •Larger-than-memory execution on single machines for data exceeding available RAM
- •Parallel processing for improved computational speed across multiple cores
- •Distributed computation supporting terabyte-scale datasets across multiple machines
Dask scales from laptops (processing ~100 GiB) to clusters (processing ~100 TiB) while maintaining familiar Python APIs.
When to Use This Skill
This skill should be used when:
- •Process datasets that exceed available RAM
- •Scale pandas or NumPy operations to larger datasets
- •Parallelize computations for performance improvements
- •Process multiple files efficiently (CSVs, Parquet, JSON, text logs)
- •Build custom parallel workflows with task dependencies
- •Distribute workloads across multiple cores or machines
Core Capabilities
Dask provides five main components, each suited to different use cases:
1. DataFrames - Parallel Pandas Operations
Purpose: Scale pandas operations to larger datasets through parallel processing.
When to Use:
- •Tabular data exceeds available RAM
- •Need to process multiple CSV/Parquet files together
- •Pandas operations are slow and need parallelization
- •Scaling from pandas prototype to production
Reference Documentation: For comprehensive guidance on Dask DataFrames, refer to references/dataframes.md which includes:
- •Reading data (single files, multiple files, glob patterns)
- •Common operations (filtering, groupby, joins, aggregations)
- •Custom operations with
map_partitions - •Performance optimization tips
- •Common patterns (ETL, time series, multi-file processing)
Quick Example:
import dask.dataframe as dd
# Read multiple files as single DataFrame
ddf = dd.read_csv('data/2024-*.csv')
# Operations are lazy until compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').mean().compute()
Key Points:
- •Operations are lazy (build task graph) until
.compute()called - •Use
map_partitionsfor efficient custom operations - •Convert to DataFrame early when working with structured data from other sources
2. Arrays - Parallel NumPy Operations
Purpose: Extend NumPy capabilities to datasets larger than memory using blocked algorithms.
When to Use:
- •Arrays exceed available RAM
- •NumPy operations need parallelization
- •Working with scientific datasets (HDF5, Zarr, NetCDF)
- •Need parallel linear algebra or array operations
Reference Documentation: For comprehensive guidance on Dask Arrays, refer to references/arrays.md which includes:
- •Creating arrays (from NumPy, random, from disk)
- •Chunking strategies and optimization
- •Common operations (arithmetic, reductions, linear algebra)
- •Custom operations with
map_blocks - •Integration with HDF5, Zarr, and XArray
Quick Example:
import dask.array as da # Create large array with chunks x = da.random.random((100000, 100000), chunks=(10000, 10000)) # Operations are lazy y = x + 100 z = y.mean(axis=0) # Compute result result = z.compute()
Key Points:
- •Chunk size is critical (aim for ~100 MB per chunk)
- •Operations work on chunks in parallel
- •Rechunk data when needed for efficient operations
- •Use
map_blocksfor operations not available in Dask
3. Bags - Parallel Processing of Unstructured Data
Purpose: Process unstructured or semi-structured data (text, JSON, logs) with functional operations.
When to Use:
- •Processing text files, logs, or JSON records
- •Data cleaning and ETL before structured analysis
- •Working with Python objects that don't fit array/dataframe formats
- •Need memory-efficient streaming processing
Reference Documentation: For comprehensive guidance on Dask Bags, refer to references/bags.md which includes:
- •Reading text and JSON files
- •Functional operations (map, filter, fold, groupby)
- •Converting to DataFrames
- •Common patterns (log analysis, JSON processing, text processing)
- •Performance considerations
Quick Example:
import dask.bag as db
import json
# Read and parse JSON files
bag = db.read_text('logs/*.json').map(json.loads)
# Filter and transform
valid = bag.filter(lambda x: x['status'] == 'valid')
processed = valid.map(lambda x: {'id': x['id'], 'value': x['value']})
# Convert to DataFrame for analysis
ddf = processed.to_dataframe()
Key Points:
- •Use for initial data cleaning, then convert to DataFrame/Array
- •Use
foldbyinstead ofgroupbyfor better performance - •Operations are streaming and memory-efficient
- •Convert to structured formats (DataFrame) for complex operations
4. Futures - Task-Based Parallelization
Purpose: Build custom parallel workflows with fine-grained control over task execution and dependencies.
When to Use:
- •Building dynamic, evolving workflows
- •Need immediate task execution (not lazy)
- •Computations depend on runtime conditions
- •Implementing custom parallel algorithms
- •Need stateful computations
Reference Documentation: For comprehensive guidance on Dask Futures, refer to references/futures.md which includes:
- •Setting up distributed client
- •Submitting tasks and working with futures
- •Task dependencies and data movement
- •Advanced coordination (queues, locks, events, actors)
- •Common patterns (parameter sweeps, dynamic tasks, iterative algorithms)
Quick Example:
from dask.distributed import Client
client = Client() # Create local cluster
# Submit tasks (executes immediately)
def process(x):
return x ** 2
futures = client.map(process, range(100))
# Gather results
results = client.gather(futures)
client.close()
Key Points:
- •Requires distributed client (even for single machine)
- •Tasks execute immediately when submitted
- •Pre-scatter large data to avoid repeated transfers
- •~1ms overhead per task (not suitable for millions of tiny tasks)
- •Use actors for stateful workflows
5. Schedulers - Execution Backends
Purpose: Control how and where Dask tasks execute (threads, processes, distributed).
When to Choose Scheduler:
- •Threads (default): NumPy/Pandas operations, GIL-releasing libraries, shared memory benefit
- •Processes: Pure Python code, text processing, GIL-bound operations
- •Synchronous: Debugging with pdb, profiling, understanding errors
- •Distributed: Need dashboard, multi-machine clusters, advanced features
Reference Documentation: For comprehensive guidance on Dask Schedulers, refer to references/schedulers.md which includes:
- •Detailed scheduler descriptions and characteristics
- •Configuration methods (global, context manager, per-compute)
- •Performance considerations and overhead
- •Common patterns and troubleshooting
- •Thread configuration for optimal performance
Quick Example:
import dask
import dask.dataframe as dd
# Use threads for DataFrame (default, good for numeric)
ddf = dd.read_csv('data.csv')
result1 = ddf.mean().compute() # Uses threads
# Use processes for Python-heavy work
import dask.bag as db
bag = db.read_text('logs/*.txt')
result2 = bag.map(python_function).compute(scheduler='processes')
# Use synchronous for debugging
dask.config.set(scheduler='synchronous')
result3 = problematic_computation.compute() # Can use pdb
# Use distributed for monitoring and scaling
from dask.distributed import Client
client = Client()
result4 = computation.compute() # Uses distributed with dashboard
Key Points:
- •Threads: Lowest overhead (~10 µs/task), best for numeric work
- •Processes: Avoids GIL (~10 ms/task), best for Python work
- •Distributed: Monitoring dashboard (~1 ms/task), scales to clusters
- •Can switch schedulers per computation or globally
Best Practices
For comprehensive performance optimization guidance, memory management strategies, and common pitfalls to avoid, refer to references/best-practices.md. Key principles include:
Start with Simpler Solutions
Before using Dask, explore:
- •Better algorithms
- •Efficient file formats (Parquet instead of CSV)
- •Compiled code (Numba, Cython)
- •Data sampling
Critical Performance Rules
1. Don't Load Data Locally Then Hand to Dask
# Wrong: Loads all data in memory first
import pandas as pd
df = pd.read_csv('large.csv')
ddf = dd.from_pandas(df, npartitions=10)
# Correct: Let Dask handle loading
import dask.dataframe as dd
ddf = dd.read_csv('large.csv')
2. Avoid Repeated compute() Calls
# Wrong: Each compute is separate
for item in items:
result = dask_computation(item).compute()
# Correct: Single compute for all
computations = [dask_computation(item) for item in items]
results = dask.compute(*computations)
3. Don't Build Excessively Large Task Graphs
- •Increase chunk sizes if millions of tasks
- •Use
map_partitions/map_blocksto fuse operations - •Check task graph size:
len(ddf.__dask_graph__())
4. Choose Appropriate Chunk Sizes
- •Target: ~100 MB per chunk (or 10 chunks per core in worker memory)
- •Too large: Memory overflow
- •Too small: Scheduling overhead
5. Use the Dashboard
from dask.distributed import Client client = Client() print(client.dashboard_link) # Monitor performance, identify bottlenecks
Common Workflow Patterns
ETL Pipeline
import dask.dataframe as dd
# Extract: Read data
ddf = dd.read_csv('raw_data/*.csv')
# Transform: Clean and process
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])
# Load: Aggregate and save
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean']})
summary.to_parquet('output/summary.parquet')
Unstructured to Structured Pipeline
import dask.bag as db
import json
# Start with Bag for unstructured data
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'valid')
# Convert to DataFrame for structured analysis
ddf = bag.to_dataframe()
result = ddf.groupby('category').mean().compute()
Large-Scale Array Computation
import dask.array as da
# Load or create large array
x = da.from_zarr('large_dataset.zarr')
# Process in chunks
normalized = (x - x.mean()) / x.std()
# Save result
da.to_zarr(normalized, 'normalized.zarr')
Custom Parallel Workflow
from dask.distributed import Client
client = Client()
# Scatter large dataset once
data = client.scatter(large_dataset)
# Process in parallel with dependencies
futures = []
for param in parameters:
future = client.submit(process, data, param)
futures.append(future)
# Gather results
results = client.gather(futures)
Selecting the Right Component
Use this decision guide to choose the appropriate Dask component:
Data Type:
- •Tabular data → DataFrames
- •Numeric arrays → Arrays
- •Text/JSON/logs → Bags (then convert to DataFrame)
- •Custom Python objects → Bags or Futures
Operation Type:
- •Standard pandas operations → DataFrames
- •Standard NumPy operations → Arrays
- •Custom parallel tasks → Futures
- •Text processing/ETL → Bags
Control Level:
- •High-level, automatic → DataFrames/Arrays
- •Low-level, manual → Futures
Workflow Type:
- •Static computation graph → DataFrames/Arrays/Bags
- •Dynamic, evolving → Futures
Integration Considerations
File Formats
- •Efficient: Parquet, HDF5, Zarr (columnar, compressed, parallel-friendly)
- •Compatible but slower: CSV (use for initial ingestion only)
- •For Arrays: HDF5, Zarr, NetCDF
Conversion Between Collections
# Bag → DataFrame ddf = bag.to_dataframe() # DataFrame → Array (for numeric data) arr = ddf.to_dask_array(lengths=True) # Array → DataFrame ddf = dd.from_dask_array(arr, columns=['col1', 'col2'])
With Other Libraries
- •XArray: Wraps Dask arrays with labeled dimensions (geospatial, imaging)
- •Dask-ML: Machine learning with scikit-learn compatible APIs
- •Distributed: Advanced cluster management and monitoring
Debugging and Development
Iterative Development Workflow
- •Test on small data with synchronous scheduler:
dask.config.set(scheduler='synchronous') result = computation.compute() # Can use pdb, easy debugging
- •Validate with threads on sample:
sample = ddf.head(1000) # Small sample # Test logic, then scale to full dataset
- •Scale with distributed for monitoring:
from dask.distributed import Client client = Client() print(client.dashboard_link) # Monitor performance result = computation.compute()
Common Issues
Memory Errors:
- •Decrease chunk sizes
- •Use
persist()strategically and delete when done - •Check for memory leaks in custom functions
Slow Start:
- •Task graph too large (increase chunk sizes)
- •Use
map_partitionsormap_blocksto reduce tasks
Poor Parallelization:
- •Chunks too large (increase number of partitions)
- •Using threads with Python code (switch to processes)
- •Data dependencies preventing parallelism
Reference Files
All reference documentation files can be read as needed for detailed information:
- •
references/dataframes.md- Complete Dask DataFrame guide - •
references/arrays.md- Complete Dask Array guide - •
references/bags.md- Complete Dask Bag guide - •
references/futures.md- Complete Dask Futures and distributed computing guide - •
references/schedulers.md- Complete scheduler selection and configuration guide - •
references/best-practices.md- Comprehensive performance optimization and troubleshooting
Load these files when users need detailed information about specific Dask components, operations, or patterns beyond the quick guidance provided here.