Lab 6: Closures and Concurrency - Solutions
This lab explores closures and concurrency in Rust, focusing on how closures work and how they enable safe concurrent programming.
Closures in Rust
Closures are anonymous functions that can capture their environment. They provide a concise way to define inline functions and are especially useful for operations like mapping, filtering, and callbacks.
Basic Syntax and Behavior
A simple closure in Rust looks like this:
let add = |a, b| a + b;
let result = add(1, 4); // 5Closures can capture variables from their environment in three different ways:
- By reference (
&T): The default method when possible - By mutable reference (
&mut T): Used when the closure modifies captured values - By value (
T): Takes ownership of the captured values
Environment Capture Examples
Capturing by Reference
fn main() {
let colour = "green";
let closure = |s, e| {
for i in s..e {
println!("{} {} bottles", i, colour); // Uses colour by reference (&)
}
};
closure(3, 6);
}This closure borrows colour immutably. If we were to change colour between defining the closure and calling it, the compiler would reject the code:
let mut colour = "green";
let closure = |s, e| { /* uses colour */ };
colour = "red"; // Error: cannot assign to `colour` because it is borrowed
closure(3, 6);Capturing by Value with move
fn main() {
let colour = "green";
let closure = move |s, e| { // Takes ownership of captured variables
for i in s..e {
println!("{} {} bottles", i, colour);
}
};
closure(3, 6);
// colour is no longer accessible here if it's not Copy
}The move keyword forces the closure to take ownership of all captured variables, which is necessary when the closure outlives the scope in which it was created (e.g., when returning closures or using them with threads).
Closure Traits
Rust’s closures implement one or more of these traits depending on how they capture their environment:
Fn: Captures by reference (&T)FnMut: Captures by mutable reference (&mut T)FnOnce: Captures by value (T)
These traits allow closures to be passed as arguments to functions:
fn use_closure<F: Fn(u32, u32)>(closure: F) {
closure(1, 4);
}Returning Closures
Since closure types are anonymous, we need to use impl Trait syntax to return them:
fn produce_closure() -> impl FnMut(u32) -> u32 {
let mut count = 0;
move |x| {
count += 1;
x * count
}
}Concurrency with Threads and Message Passing
Rust’s approach to concurrency is centered around the ownership system, which prevents data races at compile time.
Creating Threads
The std::thread::spawn function takes a closure and executes it in a new thread:
use std::thread;
fn main() {
let handle = thread::spawn(|| {
println!("Hello from a thread!");
});
handle.join().unwrap(); // Wait for the thread to finish
}Using move with Threads
To use data from the parent thread, we need to use move to transfer ownership to the thread:
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Vector in thread: {:?}", v);
});
// v is no longer accessible here
handle.join().unwrap();
}This prevents data races by ensuring the parent thread can’t access v while the spawned thread is using it.
Message Passing with Channels
Rust provides channels for safe communication between threads:
use std::thread;
use std::sync::mpsc; // multi-producer, single-consumer
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send("Hello from a thread!").unwrap();
});
let received = rx.recv().unwrap();
println!("Received: {}", received);
}Multiple Producers
Multiple threads can send messages to the same receiver by cloning the transmitter:
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
for i in 1..5 {
let tx_clone = tx.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(i * 100));
tx_clone.send(format!("Message from thread {}", i)).unwrap();
});
}
// Drop the original sender to avoid keeping the channel open forever
drop(tx);
// Receive all messages
while let Ok(message) = rx.recv() {
println!("Received: {}", message);
}
}Ownership Transfer through Channels
When a value is sent through a channel, ownership is transferred:
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let data = vec![1, 2, 3];
tx.send(data).unwrap();
// data is moved and no longer accessible here
});
let received = rx.recv().unwrap();
println!("Received: {:?}", received);
}Safety Guarantees in Concurrent Rust
Rust provides strong safety guarantees for concurrent code:
-
No data races: The ownership system ensures that mutable data is never shared between threads.
-
Controlled sharing: Data can be shared between threads using thread-safe wrappers like
Arc(Atomic Reference Counting). -
Explicit synchronization: Rust requires synchronization mechanisms like
MutexorRwLockto be used when sharing mutable data between threads.
However, Rust does not prevent:
-
Logical race conditions: Bugs that depend on the order of execution can still occur.
-
Deadlocks: Rust doesn’t prevent acquiring locks in an order that could cause deadlock.
Example: Thread-Safe Counter
Here’s an example of a thread-safe counter using Arc and Mutex:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter_clone.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}Practical Concurrency Patterns
Worker Pool
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
enum Message {
NewJob(Job),
Terminate,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for _ in &self.workers {
self.sender.send(Message::Terminate).unwrap();
}
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Processing task {}", i);
thread::sleep(std::time::Duration::from_secs(1));
});
}
// Wait a moment for execution
thread::sleep(std::time::Duration::from_secs(5));
}This example demonstrates:
- How to build a thread pool that manages a fixed number of worker threads
- How to send jobs (closures) to the workers
- How to properly shut down the pool when it’s dropped
- The use of
moveclosures to transfer ownership to threads
Rust’s closure and ownership system makes building thread-safe concurrent applications much more manageable and less error-prone than in languages without these safety guarantees.