Parallel Computing with Ray: A Coffee Shop Story
Imagine you run a coffee shop. Orders come in throughout the day. You have multiple baristas behind the counter, each capable of making any drink. As drinks get made, your cash register keeps a running total of the day's revenue.
This is exactly how Ray thinks about distributed computing. You have work that can be done in parallel (making drinks), workers that can do it (baristas), and shared state that needs careful tracking (the cash register). Ray gives you a clean way to express this pattern in Python, whether you're running on a laptop or a thousand-machine cluster.
Let's build this system and see how Ray's pieces fit together.
You can find the complete code on GitHub.
Opening the Shop: Ray clusters and workers
Before we can serve customers, we need to open the shop and staff it with baristas. In Ray terms, this means starting a cluster. When you call ray.init(), Ray spins up a head node (the manager coordinating everything) and a pool of worker processes (the baristas ready to work).
The num_cpus parameter tells Ray how many parallel tasks it can run at once. Think of it as hiring four baristas for the morning shift. Ray will schedule up to four pieces of work simultaneously across these workers.
import ray
import ray.data as rd
import time
import logging
logging.getLogger("ray").setLevel(logging.WARNING)
ray.init(
num_cpus=4,
include_dashboard=False,
logging_level=logging.WARNING,
_system_config={"metrics_report_interval_ms": 0}
)
Once initialized, you can verify the cluster state:
print(f"Shop open with {ray.available_resources().get('CPU', 0)} baristas ready")
print(f"Running on {len(ray.nodes())} node(s)")
Output: Shop open with 4.0 baristas ready
Making Drinks: Ray Tasks
Each drink order is independent work. A barista takes an order, makes the drink, and hands it back. No state to maintain, no coordination needed. This is what Ray calls a task: a stateless function that can run anywhere in the cluster.
You define a task by decorating a normal Python function with @ray.remote. When you call make_drink.remote(order), Ray doesn't run it immediately. Instead, it schedules the work on an available worker and gives you back a reference to the future result (think of it as the receipt for the drink). You only wait when you call ray.get(), which blocks until the result is ready.
This non-blocking design is key. You can fire off dozens of drink orders instantly, and Ray will parallelize them across your baristas without you managing threads or processes.
@ray.remote
def make_drink(order: dict) -> dict:
"""A barista makes a drink. Takes time, but no shared state."""
drink_name = order["drink"]
price = order["price"]
# Simulate the time it takes to make a drink
time.sleep(1)
return {
"drink": drink_name,
"price": price,
"status": "ready"
}
Tracking Revenue: Ray Actors
Unlike making drinks, tracking daily revenue requires state. You can't just have four separate cash registers. You need one centralized place that accumulates every sale. This is where Ray actors come in.
An actor is a Python class that Ray turns into a long-lived, stateful process living somewhere in the cluster. Every method call is routed to the same instance, executed serially, so there are no race conditions. You get stateful computation without worrying about locks or consistency.
Like tasks, actor method calls return immediately with a reference. The actual update happens asynchronously, but because all calls to the same actor are serialized, your state stays consistent.
@ray.remote
class CashRegister:
"""Tracks total revenue for the day. Stateful and centralized."""
def __init__(self):
self.total_revenue = 0.0
self.drinks_sold = 0
def record_sale(self, price: float) -> None:
"""Add a sale to today's total."""
self.total_revenue += price
self.drinks_sold += 1
def get_summary(self) -> dict:
"""Return the current revenue summary."""
return {
"total_revenue": self.total_revenue,
"drinks_sold": self.drinks_sold
}
Handling the Morning Rush: Ray Data
During the morning rush, orders flood in. You could manage them with Python lists, manually slicing and distributing work. But Ray Data gives you a cleaner abstraction: a distributed dataset that Ray can partition, stream, and iterate over efficiently.
You create a dataset from your orders, and Ray handles the sharding and distribution. When you iterate over it or map functions across it, Ray manages the parallel execution. It's the difference between manually coordinating work and letting the runtime do it for you.
# The morning rush: a batch of orders comes in
orders = [
{"drink": "Cappuccino", "price": 4.50},
{"drink": "Latte", "price": 5.00},
{"drink": "Espresso", "price": 3.00},
{"drink": "Americano", "price": 3.50},
{"drink": "Mocha", "price": 5.50},
{"drink": "Flat White", "price": 4.75},
]
# Create a Ray dataset from the orders
orders_dataset = rd.from_items(orders)
Running the Shop
Now we put it all together. Orders come in through the dataset. We send each one to a barista (a Ray task). As drinks get made, we record each sale in the cash register (a Ray actor). Finally, we check the day's totals.
Notice the pattern: we submit work without waiting, collect references to the results, then block only when we actually need the values. This lets Ray parallelize while keeping our code simple and sequential-looking.
# Open the cash register for the day
register = CashRegister.remote()
# Process each order: send to baristas (tasks)
print("Processing orders...")
start_time = time.time()
drink_refs = []
for order in orders_dataset.iter_rows():
# Non-blocking: schedule the drink to be made
drink_ref = make_drink.remote(order)
drink_refs.append(drink_ref)
print(f"Sent {len(drink_refs)} orders to baristas")
Now wait for the drinks to be ready:
# Wait for all drinks to be ready (blocking)
completed_drinks = ray.get(drink_refs)
parallel_time = time.time() - start_time
print("\nDrinks ready:")
for drink in completed_drinks:
print(f" {drink['drink']} - ${drink['price']:.2f} [{drink['status']}]")
print(f"\nTime with Ray (4 baristas working in parallel): {parallel_time:.2f} seconds")
Output:
Drinks ready:
Cappuccino - $4.50 [ready]
Latte - $5.00 [ready]
Espresso - $3.00 [ready]
Americano - $3.50 [ready]
Mocha - $5.50 [ready]
Flat White - $4.75 [ready]
Time with Ray (4 baristas working in parallel): 2.11 seconds
The Power of Parallelism
That execution happened fast because four baristas worked simultaneously. But what if you only had one barista handling orders sequentially? Let's compare by simulating the same work without Ray's parallelization.
# Sequential processing: one barista handling all orders
def make_drink_sequential(order: dict) -> dict:
"""Same work, but running locally without Ray."""
drink_name = order["drink"]
price = order["price"]
time.sleep(1) # Same 1 second per drink
return {"drink": drink_name, "price": price, "status": "ready"}
print("\nNow let's see what happens with just ONE barista (sequential)...")
sequential_start = time.time()
sequential_drinks = []
for order in orders:
drink = make_drink_sequential(order)
sequential_drinks.append(drink)
sequential_time = time.time() - sequential_start
print(f"Time without Ray (1 barista, sequential): {sequential_time:.2f} seconds")
print(f"\n{'═'*50}")
print(f"SPEEDUP: {sequential_time / parallel_time:.1f}x faster with Ray!")
print(f"{'═'*50}")
print(f"Sequential: {sequential_time:.2f}s | Parallel: {parallel_time:.2f}s")
Output:
Time without Ray (1 barista, sequential): 6.01 seconds
══════════════════════════════════════════════════
SPEEDUP: 2.8x faster with Ray!
══════════════════════════════════════════════════
Sequential: 6.01s | Parallel: 2.11s
With 6 drinks taking 1 second each, sequential processing takes about 6 seconds. With Ray distributing work across 4 workers, the same work completes in roughly 2 seconds (since 6 drinks / 4 baristas ≈ 2 batches). The more work you have and the more workers you add, the more dramatic the speedup.
This is the fundamental value of Ray: you get near-linear scaling without writing complex threading or multiprocessing code. Just decorate your function with @ray.remote and Ray handles the rest.
Recording Sales
With drinks made, we record each sale in our centralized cash register:
# Record each sale in the cash register (actor)
print("\nRecording sales...")
sale_refs = []
for drink in completed_drinks:
# Non-blocking: record the sale
sale_ref = register.record_sale.remote(drink["price"])
sale_refs.append(sale_ref)
# Wait for all sales to be recorded (blocking)
ray.get(sale_refs)
print("All sales recorded")
Check the daily summary:
# Check today's totals
summary = ray.get(register.get_summary.remote())
print("\n" + "═"*40)
print("DAILY SUMMARY")
print("═"*40)
print(f"Drinks sold: {summary['drinks_sold']}")
print(f"Total revenue: ${summary['total_revenue']:.2f}")
print("═"*40)
Output:
════════════════════════════════════════
DAILY SUMMARY
════════════════════════════════════════
Drinks sold: 6
Total revenue: $26.25
════════════════════════════════════════
What Just Happened
You built a distributed system without thinking about distributed systems. Ray handled the complexity:
- Workers (baristas) ran your tasks in parallel across available CPUs
- Tasks (
make_drink) executed independently, scaling naturally with your worker pool - Actors (
CashRegister) maintained consistent state across concurrent operations - Ray Data managed the input dataset, letting you iterate cleanly without manual partitioning
- The cluster (head node + workers) coordinated everything behind the scenes
The code looks almost sequential, but Ray parallelized wherever it could. Four baristas worked simultaneously. The cash register serialized updates to avoid conflicts. And you wrote maybe 50 lines of Python.
This pattern (stateless tasks for parallel work, stateful actors for shared data, datasets for structured input) scales from your laptop to hundreds of machines (in the cloud or on-prem). The abstraction stays the same. Ray handles the distribution.
Closing Time
When you're done, shut down the Ray cluster to free resources:
ray.shutdown()
print("Shop closed. See you tomorrow!")
Where to Go from Here
This coffee shop is tiny, but the pattern extends to real workloads:
Bigger datasets: Replace rd.from_items() with rd.read_parquet(), rd.read_csv(), or rd.read_images(). Ray Data can handle terabytes of data, streaming it through transformations without loading everything into memory.
More complex tasks: Your @ray.remote functions can do anything. Train models, process images, call APIs. If it's Python code that can run independently, it can be a Ray task.
Distributed actors: Actors can live on different machines, manage GPU resources, or coordinate complex workflows. You can even have multiple actors of the same class handling different shards of work.
Multi-node clusters: Instead of ray.init(num_cpus=4), connect to a Ray cluster running across dozens of machines. Your code stays the same. Ray handles scheduling across nodes, moving data, and recovering from failures.
The coffee shop abstraction breaks down eventually, but the primitives don't. Tasks for stateless parallel work. Actors for stateful coordination. Datasets for distributed data. That's the core of Ray, and it's enough to build surprisingly complex systems.