Lab 6: Closures and Concurrency - Solutions

This lab explores closures and concurrency in Rust, focusing on how closures work and how they enable safe concurrent programming.

Closures in Rust

Closures are anonymous functions that can capture their environment. They provide a concise way to define inline functions and are especially useful for operations like mapping, filtering, and callbacks.

Basic Syntax and Behavior

A simple closure in Rust looks like this:

let add = |a, b| a + b;
let result = add(1, 4);  // 5

Closures can capture variables from their environment in three different ways:

  1. By reference (&T): The default method when possible
  2. By mutable reference (&mut T): Used when the closure modifies captured values
  3. By value (T): Takes ownership of the captured values

Environment Capture Examples

Capturing by Reference

fn main() {
    let colour = "green";
    
    let closure = |s, e| {
        for i in s..e {
            println!("{} {} bottles", i, colour);  // Uses colour by reference (&)
        }
    };
    
    closure(3, 6);
}

This closure borrows colour immutably. If we were to change colour between defining the closure and calling it, the compiler would reject the code:

let mut colour = "green";
let closure = |s, e| { /* uses colour */ };
colour = "red";  // Error: cannot assign to `colour` because it is borrowed
closure(3, 6);

Capturing by Value with move

fn main() {
    let colour = "green";
    
    let closure = move |s, e| {  // Takes ownership of captured variables
        for i in s..e {
            println!("{} {} bottles", i, colour);
        }
    };
    
    closure(3, 6);
    // colour is no longer accessible here if it's not Copy
}

The move keyword forces the closure to take ownership of all captured variables, which is necessary when the closure outlives the scope in which it was created (e.g., when returning closures or using them with threads).

Closure Traits

Rust’s closures implement one or more of these traits depending on how they capture their environment:

  1. Fn: Captures by reference (&T)
  2. FnMut: Captures by mutable reference (&mut T)
  3. FnOnce: Captures by value (T)

These traits allow closures to be passed as arguments to functions:

fn use_closure<F: Fn(u32, u32)>(closure: F) {
    closure(1, 4);
}

Returning Closures

Since closure types are anonymous, we need to use impl Trait syntax to return them:

fn produce_closure() -> impl FnMut(u32) -> u32 {
    let mut count = 0;
    move |x| {
        count += 1;
        x * count
    }
}

Concurrency with Threads and Message Passing

Rust’s approach to concurrency is centered around the ownership system, which prevents data races at compile time.

Creating Threads

The std::thread::spawn function takes a closure and executes it in a new thread:

use std::thread;
 
fn main() {
    let handle = thread::spawn(|| {
        println!("Hello from a thread!");
    });
    
    handle.join().unwrap();  // Wait for the thread to finish
}

Using move with Threads

To use data from the parent thread, we need to use move to transfer ownership to the thread:

use std::thread;
 
fn main() {
    let v = vec![1, 2, 3];
    
    let handle = thread::spawn(move || {
        println!("Vector in thread: {:?}", v);
    });
    
    // v is no longer accessible here
    
    handle.join().unwrap();
}

This prevents data races by ensuring the parent thread can’t access v while the spawned thread is using it.

Message Passing with Channels

Rust provides channels for safe communication between threads:

use std::thread;
use std::sync::mpsc;  // multi-producer, single-consumer
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        tx.send("Hello from a thread!").unwrap();
    });
    
    let received = rx.recv().unwrap();
    println!("Received: {}", received);
}

Multiple Producers

Multiple threads can send messages to the same receiver by cloning the transmitter:

use std::thread;
use std::sync::mpsc;
use std::time::Duration;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    for i in 1..5 {
        let tx_clone = tx.clone();
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(i * 100));
            tx_clone.send(format!("Message from thread {}", i)).unwrap();
        });
    }
    
    // Drop the original sender to avoid keeping the channel open forever
    drop(tx);
    
    // Receive all messages
    while let Ok(message) = rx.recv() {
        println!("Received: {}", message);
    }
}

Ownership Transfer through Channels

When a value is sent through a channel, ownership is transferred:

use std::thread;
use std::sync::mpsc;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        let data = vec![1, 2, 3];
        tx.send(data).unwrap();
        // data is moved and no longer accessible here
    });
    
    let received = rx.recv().unwrap();
    println!("Received: {:?}", received);
}

Safety Guarantees in Concurrent Rust

Rust provides strong safety guarantees for concurrent code:

  1. No data races: The ownership system ensures that mutable data is never shared between threads.

  2. Controlled sharing: Data can be shared between threads using thread-safe wrappers like Arc (Atomic Reference Counting).

  3. Explicit synchronization: Rust requires synchronization mechanisms like Mutex or RwLock to be used when sharing mutable data between threads.

However, Rust does not prevent:

  1. Logical race conditions: Bugs that depend on the order of execution can still occur.

  2. Deadlocks: Rust doesn’t prevent acquiring locks in an order that could cause deadlock.

Example: Thread-Safe Counter

Here’s an example of a thread-safe counter using Arc and Mutex:

use std::sync::{Arc, Mutex};
use std::thread;
 
fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    
    for _ in 0..10 {
        let counter_clone = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter_clone.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("Result: {}", *counter.lock().unwrap());
}

Practical Concurrency Patterns

Worker Pool

use std::sync::{mpsc, Arc, Mutex};
use std::thread;
 
enum Message {
    NewJob(Job),
    Terminate,
}
 
type Job = Box<dyn FnOnce() + Send + 'static>;
 
struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}
 
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();
 
            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);
                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);
                    break;
                }
            }
        });
 
        Worker {
            id,
            thread: Some(thread),
        }
    }
}
 
struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}
 
impl ThreadPool {
    fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
 
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);
 
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
 
        ThreadPool { workers, sender }
    }
 
    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(Message::NewJob(job)).unwrap();
    }
}
 
impl Drop for ThreadPool {
    fn drop(&mut self) {
        for _ in &self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }
 
        for worker in &mut self.workers {
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}
 
fn main() {
    let pool = ThreadPool::new(4);
    
    for i in 0..8 {
        pool.execute(move || {
            println!("Processing task {}", i);
            thread::sleep(std::time::Duration::from_secs(1));
        });
    }
    
    // Wait a moment for execution
    thread::sleep(std::time::Duration::from_secs(5));
}

This example demonstrates:

  • How to build a thread pool that manages a fixed number of worker threads
  • How to send jobs (closures) to the workers
  • How to properly shut down the pool when it’s dropped
  • The use of move closures to transfer ownership to threads

Rust’s closure and ownership system makes building thread-safe concurrent applications much more manageable and less error-prone than in languages without these safety guarantees.