Eventually every interesting workload outgrows a single machine. A query that scans a petabyte, a pipeline that reshuffles terabytes of joins, a training run that will not fit in a single GPU's memory — all of them, sooner or later, demand a set of machines cooperating. The discipline of making that cooperation fast, correct, and survivable is distributed computing, and the tools it has produced — MapReduce, Spark, Dask, Ray, DDP and FSDP for training, YARN and Kubernetes for scheduling, Raft for agreement — are the substrate on top of which every other chapter in this guide ultimately sits. This chapter traces the shape of that substrate: the laws that constrain it, the frameworks that dominate it, and the operational realities that make distributed systems the most interesting engineering problem most ML teams actually face.
The first two sections are orientation: why distribution is ever worth the complexity, and the handful of laws (Amdahl, Gustafson, CAP, FLP) that bound what any distributed system can do. Sections three through six are the analytical-compute lineage — MapReduce as the ancestor, the Hadoop/HDFS ecosystem that followed, Apache Spark as the dominant current engine, and the internals (RDDs, DataFrames, Catalyst, Tungsten) that make Spark work. Sections seven and eight go deep on the two issues that decide whether a distributed job is fast or slow: the shuffle and the partitioning-and-locality story. Sections nine and ten cover Dask and Ray — the Python-native alternatives that increasingly sit alongside Spark in ML-heavy platforms. Sections eleven and twelve are distributed training: the data-parallel / model-parallel / FSDP taxonomy, and the collective-communication primitives (all-reduce, NCCL, Horovod) underneath. Section thirteen covers schedulers and resource managers (YARN, Kubernetes, Slurm); fourteen, the consensus and coordination layer (Raft, ZooKeeper, etcd); fifteen, the newer serverless and warehouse-native execution models. Section sixteen is the operational reality — stragglers, skew, debugging, observability — and seventeen closes with where all of it compounds into ML practice.
Conventions: this chapter treats distributed computing as a single subject rather than as separate "big data" and "distributed ML" islands, because in practice the same primitives (partitioning, shuffling, collective communication, fault tolerance, scheduling) appear in both. References to "the warehouse" still assume the storage layer from Chapter 02; references to "the pipeline" assume Chapter 03; references to "the stream" assume Chapter 04. Vendor names are used where the abstraction has no generic term — Spark specifically, distributed SQL engines generically; PyTorch DDP specifically, data-parallel training generically.
A single computer has a fixed number of cores, a fixed amount of memory, a fixed disk, and a fixed network card. Distributed computing is what happens when the job outgrows those numbers — when the data will not fit, the computation will not finish, or the availability target will not tolerate a single machine's failure. Everything in this chapter follows from that constraint.
The reasons divide cleanly. The first is scale of data: a petabyte does not fit on one server's disk, and a hundred terabytes will not fit in one server's RAM, so the data itself has to live across many nodes. The second is scale of computation: even where the data fits, a single-machine run takes hours or days that the business will not wait for, and the answer is to spread the work across a cluster. The third is availability: a single machine fails, and some services cannot tolerate the downtime, so the system has to be built so that the loss of any one node is a non-event. Most real systems are some blend of the three; the design centre of gravity depends on which of them dominates.
Distribution is not free. A single-machine program has a single address space, a single clock, and a single failure domain; a distributed program has none of those. Communication happens over a network, which is orders of magnitude slower than memory and can drop, delay, or duplicate messages. Machines fail partially — one node down while the rest keep running — and the program has to notice and respond. There is no global clock, so ordering of events has to be reasoned about explicitly. These are the costs that every abstraction in this chapter is trying to hide, with varying degrees of success.
The chapter moves from the laws that constrain any distributed system, through the analytical-compute lineage (MapReduce, Hadoop, Spark) and the mechanics that make it run (shuffle, partitioning, locality), into Python-native alternatives (Dask, Ray), then into distributed training for deep learning (DP, DDP, FSDP, collectives), and finally into the cross-cutting infrastructure (schedulers, consensus, serverless) and the operational discipline the whole apparatus needs. By the end the goal is not to have memorised a framework but to be able to look at a workload and know, roughly, which piece of this machinery it belongs on.
A surprising fraction of production workloads run on Spark clusters that a single beefy machine could have handled, with less ops burden, lower latency, and lower cost. Distribution earns its place when the data genuinely does not fit, the compute genuinely does not finish, or the availability requirement genuinely rules out a single node. Until one of those three is true, a single large machine — now routinely 128 cores and 2 TB of RAM — is often the right answer.
A handful of results from the theory of parallel and distributed systems set ceilings on what any framework can do. Amdahl bounds how much parallelism can help a fixed workload; Gustafson reframes that bound when the workload scales; CAP says that under a network partition you pick consistency or availability; the end-to-end and locality arguments explain why the network, not the CPU, is usually the bottleneck. These are not frameworks to compare — they are the physics the frameworks are working inside.
Amdahl's law (1967) observes that if a fraction s of a program is inherently serial, no amount of parallelism reduces total runtime below s — so a workload that is 10% serial caps out at a 10x speedup no matter how many cores you throw at it. Gustafson's law (1988) rephrases the same arithmetic from the other side: in practice people do not hold the problem size fixed and add machines; they hold runtime fixed and solve bigger problems, and from that angle the serial fraction matters less. Both are true and both are useful. The practical lesson is that the serial fraction — coordinator steps, final reductions, barrier synchronisations — is what limits scaling, and reducing it is usually more valuable than adding nodes.
CAP (Brewer, 2000) says that a networked system can guarantee at most two of consistency, availability, partition tolerance, and since partitions happen in reality the real choice is C-or-A under partition. PACELC (Abadi, 2010) extends the bookkeeping: even without a partition there is still a latency-versus-consistency trade-off. These are the results that explain why different storage systems make different choices — why Spanner burns a round-trip for strong consistency and Dynamo gives it up for availability — and why there is no single "best" distributed database.
A good rule of thumb for distributed performance: computation is cheap; communication is expensive; coordination is more expensive still. Moving a gigabyte between cores on one machine is measured in milliseconds; moving it between machines on a data-centre network is measured in tens of milliseconds; moving it across regions is hundreds. Every frame in this chapter — data locality, partitioning, the shuffle, gradient all-reduce — is a variation on the same optimisation: compute where the data lives, and when you must move data, move as little as possible, in as few exchanges as possible.
Peter Deutsch's eight "fallacies" — the network is reliable, latency is zero, bandwidth is infinite, the network is secure, topology doesn't change, there is one administrator, transport cost is zero, the network is homogeneous — are each worth reading once and remembering forever. Almost every distributed-systems bug is a violation of one of them made concrete.
In 2004, Jeff Dean and Sanjay Ghemawat published MapReduce: Simplified Data Processing on Large Clusters. The programming model it described — two functions, map and reduce, run across a partitioned dataset — is simple enough to teach in an afternoon, and it defined how big-data processing worked for the following decade. Nobody writes raw MapReduce jobs anymore, but almost every system in this chapter is an evolution of it.
The idea is almost embarrassingly small. Input data is split into pieces. A user-supplied map function runs on each piece and emits key-value pairs. The framework groups pairs by key (the shuffle). A user-supplied reduce function runs on each key's group and emits the output. The user writes two functions; the framework handles everything else — splitting, scheduling, retries, sorting, moving bytes across the cluster. The genius of the paper is not the model; it is the observation that an astonishing number of large-scale analytical jobs fit it.
Word count — emit (word, 1) in map, sum the counts in reduce — is the canonical example for the same reason that sorting is the canonical algorithms example: it is the smallest problem that exercises every piece of the machinery. Indexing the web, building log rollups, computing aggregate statistics, generating inverted indexes — all of them turn out to be variations of word count with richer keys and values. That was the insight that let Google run its whole crawling-and-indexing pipeline on top of a single abstraction for years.
Right: the separation of what to compute from how to run it on a cluster, the use of disk-backed intermediate results for fault tolerance, the assumption that nodes will fail and the framework should re-execute their tasks. Wrong: the rigidity of the two-stage model forces multi-step jobs into long chains of MapReduce phases, each of which materialises all intermediate data to disk, which is expensive and slow for iterative workloads (like machine learning) that revisit the same data many times. The next decade of systems — Spark especially — is the answer to that second half.
"MapReduce: Simplified Data Processing on Large Clusters" (Dean & Ghemawat, 2004) is one of the most-cited systems papers of the century and an extraordinarily clear piece of technical writing. It pairs naturally with "The Google File System" (2003), which describes the storage layer MapReduce ran on top of; the two together explain how the industry's idea of "big data" was invented.
The MapReduce paper described Google's internal systems. Hadoop — started at Yahoo! in 2006 — re-implemented the idea in open source, complete with a storage layer (HDFS) modelled on GFS. For most of the 2010s it was how everyone outside Google did big data. Its legacy is less as running infrastructure than as the ecosystem from which almost every modern tool emerged.
HDFS — the Hadoop Distributed File System — splits files into large blocks (typically 128 MB or 256 MB), replicates each block across three machines, and exposes a single namespace over the whole cluster. A NameNode tracks which blocks are where; DataNodes hold the blocks. The original bet of HDFS was to co-locate compute and storage on the same nodes so that MapReduce tasks would run on the machine that held their input block — moving the code to the data, because the code is tiny and the data is huge. That bet held while on-prem clusters dominated and has since largely been unwound as object stores (S3, GCS) decoupled storage from compute.
Around HDFS and MapReduce grew an entire menagerie. YARN (2013) generalised the cluster manager so non-MapReduce workloads could run on the same hardware. Hive put SQL on top of MapReduce, making analysts able to query the cluster. HBase added a wide-column store on HDFS. Pig, Sqoop, Oozie, ZooKeeper, Impala, Tez — for the better part of a decade, knowing Hadoop meant knowing most of a periodic table of projects, each of which solved one piece of the pipeline. That complexity is both a fair criticism and a fair measure of how much of modern data infrastructure was prototyped there.
Three things pushed Hadoop out of the centre of the story. Spark replaced MapReduce for most analytical workloads (Section 05). Cloud object stores replaced HDFS for most storage (we saw this in Chapter 02). And managed services replaced hand-run clusters for most teams. What remains, though, is significant: HDFS-style block replication and coarse partitioning, YARN's notion of a cluster resource manager, Hive's metastore (now the backbone of every lakehouse catalogue), and the basic mental model that data and compute live together across a cluster. That model did not die with Hadoop; it just moved.
If you work with Parquet, Hive-style partitioning, the Hive metastore, Iceberg/Delta/Hudi, or YARN, you are using Hadoop-era abstractions, even if you have never touched a NameNode. The ecosystem was ragged; the ideas were sound enough to outlive it.
Apache Spark started as a 2009 research project at UC Berkeley's AMPLab (Matei Zaharia and colleagues) and has since become the default engine for large-scale analytical and ML-adjacent batch processing. Its contribution is narrow and deep: take MapReduce's model, cache intermediate results in memory, expose a rich API, and run multi-stage jobs as a single DAG rather than a chain of independent MapReduces.
The original Spark abstraction is the Resilient Distributed Dataset — a partitioned, immutable collection of records with a recorded lineage of the operations that produced it. Lineage is the clever part: if a partition is lost to a failure, Spark can re-derive it by replaying the operations that produced it, rather than materialising every intermediate result to disk the way MapReduce did. That one decision — trade disk materialisation for lineage-based recovery — is what made Spark an order of magnitude faster than Hadoop MapReduce on iterative workloads, and it is what made it viable for ML pipelines.
Over time Spark's API surface moved up a level. DataFrames (with a schema) and Datasets (typed, JVM-only) became the recommended interfaces, with the RDD sitting underneath as an escape hatch. The Catalyst optimiser plans queries on DataFrames much like a database optimiser plans SQL — predicate pushdown, column pruning, join reordering, code generation — which is how Spark SQL ended up competitive with warehouse-native engines for a wide class of analytical workloads. For most users today, "Spark" in practice means DataFrames + SQL, with PySpark as the front-end and Catalyst doing the heavy lifting invisibly.
Spark has accumulated a set of libraries that ride on the same engine: Spark SQL for relational queries, Structured Streaming (we met it in Chapter 04) for micro-batch streams, MLlib for distributed ML, GraphX for graph computation, and language bindings for Scala, Java, Python, and R. The Python binding — PySpark — is by far the most used, and the Pandas-on-Spark API lets teams scale Pandas-style code to a cluster without rewriting it into a different dialect. That Pandas-shaped on-ramp is a large part of why Spark, not Hadoop, ended up as the centre of the industry stack.
For most of the 2020s, the honest default for "I have too much data for Pandas and I want SQL + DataFrames + Python at scale" has been Spark. Not because it is always the fastest — modern warehouse engines (BigQuery, Snowflake, DuckDB for single-node) often beat it — but because it is the least-surprising choice, runs on every major cloud, speaks every storage format, and has a decade of operational tooling. It is worth knowing even if you never run a cluster yourself.
A Spark program does not run when you write it. It runs when an action (count(), show(), write()) triggers the engine to compile your query into a plan, cut the plan into stages, cut the stages into tasks, and ship the tasks across the cluster. Understanding that pipeline — lazy evaluation, DAG, stages, tasks, driver, executors — is the difference between using Spark and debugging Spark.
Every transformation you write — filter, select, join, groupBy — is lazy. Nothing executes; the engine is simply recording the operations as a logical plan. When an action is called, Catalyst optimises the logical plan (predicate pushdown, column pruning, join strategy choice), compiles it into a physical plan, and hands it to the execution layer. The lazy model is what allows the whole-query optimisation to happen; it is also why inserting a print mid-pipeline does not always do what a Python programmer would expect.
A running Spark application has one driver process — which plans the query, tracks progress, and holds the SparkContext — and many executor processes spread across the cluster. The driver decomposes the physical plan into stages, each stage into tasks (one task per data partition), and sends the tasks to executors. Executors run tasks, hold cached partitions in memory, and report back. If an executor fails, the driver re-schedules its tasks on other executors and uses RDD lineage to recompute any lost partitions. The driver is a single point of failure for the job; executors are fungible.
Stages are divided by wide dependencies — operations that need data from many input partitions to produce one output partition, which forces a shuffle across the network. Narrow operations (filter, map, select) stay inside a stage; wide operations (groupBy, join on mismatched keys, repartition) create a new stage boundary. Reading Spark's UI is, more than anything else, reading the stage graph and noticing which stages are shuffles, which are skewed, and which are spending their time waiting for the slowest task. The details of that shuffle are important enough to get the next section on their own.
Most Spark performance problems are visible in the UI: a stage with a long tail (skew), a shuffle moving tens of gigabytes (bad partitioning), a task failing repeatedly (bad data), a job with an unreasonable number of stages (unnecessary shuffles). Learning to read the UI pays back faster than any other skill you can pick up around Spark.
When data has to be regrouped by key across the cluster — for a groupBy, a join, a distinct, a repartition — every executor has to write its output to local disk, partitioned by the destination, and every executor has to read back the slice addressed to it. That step, the shuffle, is the single most expensive operation in most distributed analytical jobs, and understanding it is how you tune them.
A shuffle does three expensive things. It writes intermediate data to local disk (I/O), it transfers that data across the network (bandwidth), and it requires a synchronisation barrier between stages (the downstream stage cannot start until the upstream stage has finished, so the slowest task on the slowest node gates everything). On top of that, the intermediate data is often many times larger than the final result — before aggregation, a groupBy may be shipping every record to its key's destination partition. Most performance pathologies in Spark jobs are shuffle pathologies.
Skew is when the data is not evenly distributed across keys: 90% of events belong to one customer, one country, one hour. After a shuffle those records all end up on one executor, and that executor takes ten times longer than the rest. The symptom is a stage with a long tail — 200 tasks finish in seconds, one task runs for an hour — and it is the single most common cause of slow Spark jobs. Mitigations include salting keys, using skew-aware joins (Spark 3's adaptive query execution helps here), and pre-aggregating with a map-side combine before the shuffle.
The cheapest shuffle is the one that does not happen. If inputs are already partitioned by the join key, a broadcast join (for a small side) or a bucketed join (for two pre-partitioned sides) skips the shuffle entirely. Pre-aggregating (reduceByKey rather than groupByKey) ships less data. Co-locating datasets on the same partitioning scheme pays dividends for every join that follows. The rule of thumb is: look for shuffles first, and for each one ask whether it can be eliminated, reduced, or made non-blocking.
Spark 3 introduced AQE, which re-plans parts of the query at runtime using statistics gathered from the first stages — switching join strategies, coalescing shuffle partitions, splitting skewed partitions. It is on by default in recent versions and quietly fixes a large fraction of the manual tuning older teams used to do by hand.
The fastest way to make a distributed job faster is to avoid moving data you did not need to move. Partitioning decides how the data is divided across the cluster; locality decides whether the compute runs near its data or far from it. Together they govern how much of a job is honest work and how much is shuffling bytes.
Storage-level partitioning — Hive-style directory partitioning on low-cardinality columns, Iceberg's hidden partitioning, Parquet's row groups and column chunks — determines which bytes the engine has to read for a given predicate. A query filtering to one day should read one day's worth of files, not the whole table. This is the cheapest possible form of parallelism: before the compute starts, most of the data is excluded. Getting the partitioning right on the storage side is usually a bigger win than any amount of executor tuning.
In-job partitioning decides how the data is divided across the executors while the computation runs. Spark's default (usually 200 shuffle partitions) is almost never the right number; too few and individual tasks are huge, too many and scheduling overhead dominates. Explicitly repartitioning by a key before a join can be worth the one-time shuffle cost if many downstream joins share that key. Bucketing pins a partitioning scheme into the table itself, so every future read inherits it. These are the places where understanding your data's keys — which ones are common, which ones are skewed, which ones the queries actually filter on — pays off.
Data locality is the principle of running compute on the machine that already holds the data. In the HDFS era this was literal: MapReduce tried to schedule each task onto a DataNode that had a local replica of its input block. In the cloud era it is less literal — storage and compute are separated — but the same principle reappears higher up the stack: cache hot tables on cluster-local disk, co-locate executors in the same availability zone as the storage bucket, keep a worker's working set in memory between stages. The specific mechanism changed; the rule "compute where the data is" did not.
The partitioning scheme of a large table is a durable design decision: once data is written, repartitioning costs a full rewrite. Choosing on the basis of how the data will be queried — the common predicate columns, the common join keys, the expected data skew — and not on the basis of how it was generated is the difference between a table that ages well and one that has to be rebuilt every six months.
Dask is what you reach for when the workload is Python, the APIs you want are NumPy/Pandas/scikit-learn, and a JVM engine like Spark is more friction than the problem is worth. It is a task-graph scheduler and a set of collection APIs that parallelise familiar Python libraries across cores, threads, and clusters, without asking the user to leave the Python stack.
At its core, Dask is two things. The task graph: a directed acyclic graph of Python function calls with their data dependencies, built lazily as you write code, executed only when you ask for results. And the scheduler: single-threaded, multi-threaded, multi-process, or distributed (over a cluster of workers managed by dask.distributed), any one of which can execute the same graph. The separation of "describe the work" from "choose how to run it" is what lets the same Dask code run on a laptop for dev and a hundred-node cluster for production.
On top of the graph layer sit Dask's collection APIs. Dask Array partitions a NumPy array into blocks and runs NumPy operations on each block, stitching the results. Dask DataFrame does the same for Pandas, partitioning by index across workers. Dask Bag parallelises over arbitrary Python objects for log-processing-style workloads. Dask-ML wraps scikit-learn. The surface is close enough to the originals that a working Pandas script often needs only its import line changed to scale up; close enough, not identical, and the mismatches are where most Dask bugs come from.
Dask's sweet spot is medium-to-large data where the team is pure-Python, the workload is numeric/scientific (image stacks, geospatial arrays, genomics), or the pipeline already leans on Pandas idioms that a JVM engine would not express cleanly. Spark's sweet spot is heavy relational work (SQL, big joins, Catalyst's optimiser is stronger), multi-language teams (Scala/Java as well as Python), and workloads where the storage stack is already Spark-shaped (Delta, Iceberg, the Databricks platform). Both run on the same clusters; the choice is more about idiom than about raw capability.
Before reaching for a cluster, it is worth asking whether Dask in single-machine mode — just multi-threaded or multi-process scheduling on a laptop — would already be enough. A 64-core workstation with a fast NVMe SSD can comfortably handle hundreds of gigabytes with Dask, and the operational complexity is roughly zero. Escalating to a cluster is a last resort, not a first step.
Ray (originally from UC Berkeley's RISELab, commercialised as Anyscale) is a more general distributed-computing framework than Spark or Dask: a runtime for stateless tasks, stateful actors, and a shared object store, on top of which a set of ML-focused libraries (RLlib, Ray Tune, Ray Train, Ray Serve) have become part of how many teams do distributed ML.
Three primitives do most of the work. Tasks: stateless functions decorated with @ray.remote, scheduled onto any worker, each returning a future. Actors: stateful classes, each pinned to a worker, holding local state across calls — useful for things like parameter servers, simulation environments, or GPU-resident models. The object store: a shared-memory zero-copy store (Plasma) where task outputs and large objects live, referenced by ID across the cluster. The mental model is "a distributed Python runtime with explicit dataflow", which is closer to a general distributed computing substrate than a data-processing engine.
The reason Ray shows up in this chapter and not only in more esoteric systems discussions is the ML library stack built on it. Ray Tune runs distributed hyperparameter search across many trials. Ray Train wraps PyTorch/TensorFlow distributed training with a uniform API. RLlib is one of the most-used reinforcement-learning training libraries, with Ray's actor model mapping naturally to RL rollouts. Ray Serve handles model serving with autoscaling and composition of multiple models. Each of these could in principle be written on another substrate; having all of them on the same runtime is what made the bundle sticky.
Ray is not a drop-in replacement for either. Its strengths are workloads that do not fit the dataframe/array shape — RL, hyperparameter search, model serving, agentic pipelines, simulations — where tasks and actors are the natural primitives. Its weaknesses are the same: if the workload is a big join or a columnar scan, Spark's query planner will usually beat a Ray implementation that reproduces the same logic by hand. A common pattern is Spark (or Dask, or a warehouse) for the analytical pipeline, Ray for the training and serving layer that sits on top of its outputs.
Much of modern ML is less "scan a table" and more "run a thousand variations of an experiment, coordinate a dozen stateful simulators, serve a composed graph of models". Those are actor workloads, and Ray's rise inside ML infrastructure teams tracks exactly the rise of those kinds of problems.
Modern deep learning stopped fitting on a single GPU years ago. Distributed training is the set of techniques that take a model and its optimiser across multiple devices — first across the GPUs of a single host, then across hosts in a cluster, then across clusters and data centres. Each step of that ladder adds new vocabulary (DP, DDP, ZeRO, FSDP, TP, PP) and new failure modes.
The simplest and by far the most common pattern. The model is replicated on every device; each device sees a different slice of the minibatch; gradients are averaged across devices at each step (an all-reduce) so the replicas stay synchronised. PyTorch's DistributedDataParallel (DDP) is the standard implementation — faster and more correct than the older DataParallel, which ran on a single host and had significant overhead. For models that fit on one GPU, DDP is close to free scaling: add devices, add minibatch throughput, communication cost is modest.
When the model itself does not fit on a single GPU, data parallelism alone is not enough. Tensor parallelism splits individual layers across devices (each device holds a slice of the weight matrices; matmuls become collective operations); this is what Megatron-LM popularised for very large transformers. Pipeline parallelism splits the layers into groups assigned to different devices, passing activations along a pipeline; it requires micro-batching to keep every stage busy. ZeRO (DeepSpeed) and FSDP (PyTorch) shard the optimiser state, gradients, and (in the largest-stage variants) the parameters themselves across the data-parallel group, so the memory pressure per device drops by a factor equal to the world size. Training frontier-scale models uses all of these in combination — 3D parallelism.
Two techniques that are not "distributed" in the strict sense but come up constantly alongside it. Mixed precision (fp16 / bf16 for most compute, fp32 for the master weights) halves memory and often more than doubles throughput on GPUs with tensor cores; it is close to free on modern hardware. Gradient accumulation runs several small minibatches forward/backward, accumulates gradients, and steps the optimiser once — letting you simulate a large effective minibatch on hardware that cannot hold it. Together they are how medium-sized teams get close to the effective scale of much larger clusters.
Start with DDP; add FSDP/ZeRO when memory pressure forces it; add tensor parallelism when single-device memory is not enough even with sharding; add pipeline parallelism only at cluster scales where per-layer splits are not sufficient. Each added dimension multiplies debugging difficulty; resist adding one until the previous has genuinely run out.
Underneath DDP, FSDP, Horovod, and every other distributed-training framework sits a small set of collective communication primitives — all-reduce, all-gather, reduce-scatter, broadcast, barrier — implemented by libraries like NCCL (for NVIDIA GPUs), MPI (for HPC), and Gloo (as a fallback). The performance of distributed training is, to a first approximation, the performance of these collectives.
A broadcast sends one rank's value to all other ranks. A reduce aggregates values from all ranks onto one (sum, max, min). All-reduce aggregates across all ranks and leaves the result on all ranks — it is the operation that averages gradients in DDP. All-gather collects each rank's shard and gives every rank the full concatenated result — used by FSDP to reassemble sharded weights before a forward. Reduce-scatter is the inverse: every rank contributes, and each rank ends up with a sum over one slice. Barrier synchronises; send/recv give explicit point-to-point. That handful of primitives is the whole vocabulary.
A naive all-reduce centralises on one node — bandwidth-limited by that node's link. The ring all-reduce (popularised by Baidu and Horovod) arranges the ranks in a logical ring; each rank sends to the next and receives from the previous, in two passes (reduce-scatter then all-gather), so the total traffic per link is independent of the number of ranks. It is bandwidth-optimal and is the default pattern in NCCL and most modern implementations. Every time a DDP run scales cleanly to hundreds of GPUs, this is part of the reason.
Collectives are fast because the hardware cooperates. NVLink and NVSwitch connect GPUs within a server at hundreds of GB/s, far faster than PCIe. InfiniBand (and RoCE on Ethernet) carries traffic between servers with microsecond latency and tens of GB/s of bandwidth; GPUDirect RDMA lets the network card read GPU memory directly without bouncing through host RAM. These are the reasons a well-configured cluster can run all-reduces that stay comfortably below the per-step compute time; on cheaper networking, the same code is communication-bound.
If you train on NVIDIA GPUs and use PyTorch distributed, NCCL is doing most of your networking. It is usually invisible when it works and extremely visible when it does not: flaky runs, timeouts, and hangs on collectives are some of the most common distributed-training failure modes, and learning to read NCCL logs pays for itself quickly once you are on a cluster.
A cluster is a pool of machines; a scheduler is the layer that decides which of those machines runs which job at which time. YARN, Mesos, Kubernetes, Slurm, Borg, Omega — each scheduler encodes a different set of assumptions about workloads, isolation, and resource granularity, and every framework in this chapter is running on top of one of them.
Google's Borg (described publicly in 2015) has run Google's internal workloads for two decades and is the direct ancestor of Kubernetes. Apache YARN (2013) split the Hadoop JobTracker into a ResourceManager and per-application ApplicationMasters, letting non-MapReduce workloads (Spark especially) share the cluster. Apache Mesos took a different approach with two-level scheduling; it powered large deployments at Twitter and Apple before Kubernetes largely displaced it. The shared idea across all of them is the separation of resource allocation (here is a machine-slice) from application scheduling (here is how the app uses it).
Kubernetes — open-sourced from Borg's lineage in 2014 — has become the default cluster manager for most of the industry, including increasingly for data workloads. Spark on Kubernetes, Ray on Kubernetes, Flink on Kubernetes, Airflow on Kubernetes, and MLOps stacks (Kubeflow, KServe) all run their work as pods on a K8s cluster. The appeal is operational uniformity: one scheduler, one networking model, one way to roll out a new version. The cost is complexity: Kubernetes itself is a non-trivial distributed system, and operating it well is its own specialisation.
High-performance-computing clusters — the kind that run weather forecasts and physics simulations — use schedulers from a different tradition: Slurm, PBS, LSF. These are optimised for tightly coupled MPI-style jobs that need all N nodes at once for H hours, and they have batch queues, backfill, and fair-share accounting. Large ML training runs look much more like HPC jobs than like web services, and it is increasingly common to see Slurm and Kubernetes running side by side on the same AI cluster — Slurm for the big training jobs, Kubernetes for everything else.
For analytical batch on the cloud, you rarely touch the scheduler directly; the managed service (EMR, Dataproc, Databricks) does it. For ML training at scale, the choice of Slurm vs Kubernetes is a real engineering decision with consequences for how you submit jobs, how you handle preemption, and how your stack interoperates with storage and networking. For production services, Kubernetes is the default for reasons that have more to do with the rest of the company's infrastructure than with anything ML-specific.
A distributed system needs to agree on things: who is the leader, what order did these writes happen in, which version of a config is current. The theoretical result (FLP) is that perfect consensus is impossible in an asynchronous system with even one faulty node. The practical result is a handful of algorithms — Paxos, Raft, Zab, view-stamped replication — that achieve consensus under realistic assumptions, and whose implementations are the beating heart of every serious distributed platform.
Ordinary coordination fails in clever ways. A node that appears dead may just be slow; a network that appears partitioned may be intermittent; a leader that has lost quorum may not know it yet. Any algorithm has to tolerate nodes lying silent, nodes coming back after long delays, and messages arriving out of order or duplicated. The FLP impossibility result (Fischer, Lynch, Paterson, 1985) proves that no deterministic protocol can guarantee consensus in a purely asynchronous system with a crashed process — so every working algorithm either adds timing assumptions, gives up liveness under adversarial conditions, or both.
Paxos (Lamport, 1998) is the algorithm everyone cites and few people implement by hand; it is correct and famously hard to follow. Raft (Ongaro and Ousterhout, 2014) was explicitly designed for understandability: the same guarantees as multi-Paxos, decomposed into leader election, log replication, and safety as three smaller problems. Most modern distributed systems that need strong consistency use Raft — etcd, Consul, CockroachDB, TiKV — and the Raft paper has become the standard reading for anyone who wants to understand how these systems stay correct under failures.
Consensus is the quiet foundation under a lot of infrastructure. Kubernetes's control plane stores its state in etcd, whose job is to replicate that state using Raft. ZooKeeper (using Zab) provides leader election and coordination for everything from Kafka (until recent versions) to HBase to Storm. Spanner, CockroachDB, and TiDB use Paxos or Raft to give globally consistent transactions. Even within a single service, leader election for "who is the master for this shard" is usually a consensus problem in disguise. Wherever there is a single source of truth in a distributed system, there is almost certainly a consensus algorithm underneath it.
If a system design seems to call for a custom leader-election or distributed-locking protocol, the right next step is almost never to implement one. The right next step is to put etcd, ZooKeeper, or Consul behind the problem and use the primitive they already provide. Every year someone writes a "simple" ad-hoc coordination scheme and every year it fails in a way Paxos and Raft already know how to avoid.
Serverless is the name for compute models where the cluster is run by the provider, billing is by invocation or by second of running function, and the user never sees a machine. Lambda, Cloud Functions, and Cloud Run on the function-execution side; Athena, BigQuery, and Snowflake on the query-execution side. It is not the right shape for every workload, but for a lot of them it is the simplest distributed computing has ever been.
AWS Lambda (2014) pioneered the idea: deploy a function, get a URL and an event source, pay for each invocation by millisecond and memory. Google Cloud Functions, Azure Functions, and Cloud Run (and its open-source cousin Knative) follow the same pattern, with Cloud Run relaxing the constraints enough to host long-running containers. The appeal is that the user writes a function and the provider handles scaling, routing, patching, and keeping machines alive. The costs are cold starts, opaque limits, and a programming model that punishes stateful workloads.
On the analytical side, the same shift happened. BigQuery (from its launch in 2011) handed a serverless query engine to the user; provision nothing, write SQL, pay per terabyte scanned. Snowflake followed with virtual warehouses that suspend and resume automatically. Athena ran Presto/Trino on top of S3 with no cluster at all. These engines are deeply distributed underneath — hundreds of workers may touch a single query — but the user sees nothing of it. For many teams, "distributed analytical compute" has gone from "we run a Spark cluster" to "we write SQL" without passing through the middle step.
Fits: bursty workloads, event-driven glue, pipelines where the control plane is more code than the data plane, analytical SQL on cloud-lake data, small APIs that spend most of their time idle. Does not fit: steady workloads with a known throughput (cheaper on a VM), latency-critical hot paths (cold starts matter), GPU training (mostly not available), long-running stateful services (container services or VMs are the right shape). A good heuristic is: if the workload is predictable enough to rent a machine for a month, serverless is probably more expensive; if the workload is spiky or idle, serverless usually wins.
Behind the abstraction, a serverless function at scale hits all the same problems this chapter has already discussed: cold starts are scheduler decisions, throttling is quota, regional failover is a consensus decision somewhere. The abstraction is good enough that most users never see those mechanics, but when it fails, the failures are recognisable as the classic ones.
Running a distributed system well is not an engineering problem and not an ops problem; it is both, and the teams that do it well treat observability, capacity, and failure as first-class parts of the system, not things to be bolted on after the first outage.
Single-machine intuitions — top, ps, stepping through a debugger — do not scale to a cluster. Observability for distributed systems rests on three pillars: metrics (time series of counts, latencies, utilisations per node, aggregated), logs (structured, searchable, centralised so you can correlate across nodes), and traces (per-request spans across service boundaries, showing which hop was slow). Open standards — Prometheus for metrics, OpenTelemetry for traces and logs — have largely won; the framework-specific UIs (Spark UI, Flink UI, Ray Dashboard) are on top of this layer, not a replacement for it.
Cluster capacity has two axes: total size (how many machines can the cluster grow to) and elasticity (how fast can it add or shed them). Getting either wrong is expensive. Underprovisioned clusters queue jobs forever; overprovisioned clusters burn money idle. Autoscaling — pod autoscalers on Kubernetes, Spark dynamic allocation, Snowflake's auto-suspend, Lambda's provider-managed scaling — closes that loop imperfectly but meaningfully. The corresponding discipline is cost observability: per-team, per-job, per-query cost attribution, because the cluster bill is usually the first place engineering decisions show up in the P&L.
In a cluster of a thousand machines, something is always broken. A disk is slow, a NIC is flapping, a node is stuck, a rack's top-of-rack switch has been wobbling for an hour. Good distributed systems are designed so that these events are non-events — retries, timeouts, health checks, quorum-based failover, graceful draining. Bad ones are the ones where a single degraded node takes down a whole pipeline. The cultural expression of this is the game day / chaos engineering practice (Netflix's Chaos Monkey is the canonical example): deliberately injecting failures into production to discover which parts of the system do not actually survive them.
The honest test of a distributed system is not whether it runs when everything is healthy; it is whether it behaves sensibly when something is broken, and whether the people responsible will know about it in time. Every team that has built this discipline has done so by watching the first few failures go badly and refusing to let the second wave go the same way.
Machine learning touches every layer of the distributed-computing stack this chapter has walked through. The dataset pipelines run on Spark or Dask; the training runs on clusters of GPUs with DDP or FSDP under Slurm or Kubernetes; the serving runs on Ray or Kubernetes with autoscaled replicas; the coordination of it all leans on consensus systems (etcd) and observability stacks. The competitiveness of a modern ML team is not any one of those layers — it is how cleanly the whole ladder composes.
The empirical scaling laws for deep learning (Kaplan et al., 2020; Hoffmann et al., 2022) say that loss improves predictably with model size and dataset size, given enough compute. "Enough compute" in practice means tens of thousands of GPU-hours for medium models and millions for frontier ones, and there is no single-machine version of those numbers. The whole distributed-training stack — DDP, FSDP, 3D parallelism, high-bandwidth interconnects, fault-tolerant checkpointing — exists because the scaling laws make the cluster unavoidable. A research lab that cannot run a clean FSDP job on a thousand GPUs is a research lab that cannot reproduce the frontier; that is a consequence of the laws, not a choice.
Training is episodic; serving is continuous, and serving large models has become its own distributed-systems discipline. Tensor-parallel inference on a multi-GPU host, KV-cache management for attention, continuous batching (vLLM, TensorRT-LLM, SGLang) to pack many concurrent requests into a single forward pass, speculative decoding, sharding across hosts, routing between replicas for latency — every one of these is a distributed-systems problem wearing ML clothes. The teams that run inference well usually look more like high-performance-systems teams than like classical ML teams.
The practical observation, across most mature ML teams: the competitive moat is rarely the model architecture and rarely the individual component. It is the end-to-end pipeline — data collection, storage, batch and streaming pipelines, training infrastructure, evaluation harnesses, serving infrastructure, observability — working as one system, running many experiments in parallel, and shipping changes into production weekly instead of quarterly. That pipeline is the subject of this entire Part III, and distributed computing is the physical substrate it all runs on.
Collection, storage, pipelines, streaming, and now distributed computing are the five layers we have walked through. The next two chapters turn from the systems that move data to the systems that guard it (data quality and governance) and the systems that let people answer questions with it (analytics and BI). Together they are the full surface of the data-engineering stack a modern ML team relies on.
Distributed computing has an unusually dense canon: a handful of systems papers that defined the field, a short shelf of books that work through its hard parts carefully, and the official documentation of the projects that working engineers run in production. The list below picks the references the field actually returns to, moving from foundational books and papers, through the analytical-compute and ML-training stacks, into the schedulers and consensus layer underneath.
torch.distributed (collectives and process groups), DistributedDataParallel, Fully Sharded Data Parallel (FSDP), RPC, and the pipeline/tensor-parallel primitives that stitch together to train very large models. The tutorials at pytorch.org/tutorials are the most useful companion.This page is the fifth chapter of Part III: Data Engineering & Systems. The next — Cloud Platforms & Infrastructure — moves from the abstract substrate of distributed computing to the concrete cloud platforms (AWS, GCP, Azure) on which most teams actually assemble these pieces: managed services for storage, compute, streaming, and ML, the identity and networking layers under them, and the cost discipline that keeps the bill honest. After that comes the governance layer — data contracts, lineage, catalogs, observability — that keeps the whole apparatus auditable.