温馨提示×

Linux下Rust并发编程实战

小樊
45
2025-11-14 17:26:21
栏目: 编程语言

Linux 下 Rust 并发编程实战指南

一 环境准备与项目初始化

  • Linux 上安装 Rust:执行命令:curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh,随后执行 source $HOME/.cargo/env 重载环境。
  • 创建项目:cargo new concurrent_demo && cd concurrent_demo
  • 运行与调试:cargo run;需要调试时使用 gdblldb,也可借助 IDE 内置调试功能。

二 核心并发模型与最小示例

  • 线程与通道

    • 使用 std::thread 创建线程,借助 std::sync::mpsc 进行线程间消息传递(MPSC)。
    • 示例要点:将发送端 tx 通过 move 转移到子线程;主线程通过 recv() 接收数据。
  • 共享内存与同步原语

    • 使用 Arc(原子引用计数)共享所有权,配合 Mutex/RwLock 保护共享可变状态;读多写少场景优先考虑 RwLock
    • 示例要点:对共享数据加锁后操作,完成后及时释放锁;多线程 join() 等待回收。
  • 原子操作

    • 对简单计数器等场景,使用 std::sync::atomic 提供的原子类型(如 AtomicUsize)与合适的 Ordering(如 SeqCst)可避免锁开销。
  • 异步并发 I/O

    • 使用 async/awaittokio 运行时编写高并发网络服务;典型模式是 TcpListener 接收连接,并为每个连接 tokio::spawn 一个任务处理读写循环。
  • 并行数据并行

    • 使用 rayon 的并行迭代器对计算密集型数据进行分治并行,简化线程池与任务调度。
  • 第三方通道与 Actor 生态

    • 需要更灵活的线程通信时,可使用 crossbeam::channel(支持多生产者多消费者等);若偏好 Actor 模型,可选 actix

三 实战示例 多线程并行计算与原子计数

  • 目标:启动 10 个线程并发累加,分别用 MutexAtomicUsize 两种实现,最后对比结果与性能感受。
  • 代码:
// Cargo.toml
// [dependencies]

use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

fn main() {
    const N: usize = 10;
    const INCR: usize = 100_000;

    // 1) 使用 Mutex + Arc
    let mtx = Arc::new(Mutex::new(0usize));
    let mut handles = Vec::with_capacity(N);
    for _ in 0..N {
        let m = Arc::clone(&mtx);
        handles.push(thread::spawn(move || {
            for _ in 0..INCR {
                *m.lock().unwrap() += 1;
            }
        }));
    }
    for h in handles { h.join().unwrap(); }
    println!("Mutex 结果: {}", *mtx.lock().unwrap());

    // 2) 使用 AtomicUsize
    let atomic = Arc::new(AtomicUsize::new(0));
    let mut handles = Vec::with_capacity(N);
    for _ in 0..N {
        let a = Arc::clone(&atomic);
        handles.push(thread::spawn(move || {
            for _ in 0..INCR {
                a.fetch_add(1, Ordering::SeqCst);
            }
        }));
    }
    for h in handles { h.join().unwrap(); }
    println!("Atomic 结果: {}", atomic.load(Ordering::SeqCst));
}
  • 说明:两种实现均应输出 1_000_000;计算密集型且竞争较低时,原子操作通常开销更小,Mutex 更通用但存在锁竞争与阻塞。

四 实战示例 异步高并发回显服务器

  • 目标:基于 tokio 实现一个 TCP 回显服务器,对每条连接并发处理,读到 0 字节 表示对端关闭。
  • 代码:
// Cargo.toml
// [dependencies]
// tokio = { version = "1", features = ["full"] }

use tokio::net::TcpListener;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Listening on 127.0.0.1:8080");

    loop {
        let (mut socket, _) = listener.accept().await?;
        tokio::spawn(async move {
            let mut buf = vec![0u8; 1024];
            loop {
                let n = match socket.read(&mut buf).await {
                    Ok(0) => return, // 对端关闭
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("read error: {}", e);
                        return;
                    }
                };
                if let Err(e) = socket.write_all(&buf[..n]).await {
                    eprintln!("write error: {}", e);
                    return;
                }
            }
        });
    }
}
  • 说明:每个连接由独立任务并发处理,适合 I/O 密集型 高并发场景;运行时负责 异步任务调度I/O 多路复用

五 性能与工程实践建议

  • 选择模型
    • CPU 密集:优先考虑 rayonstd::thread + 工作窃取;共享计数用 AtomicUsize,复杂共享状态用 Arc<Mutex/RwLock>
    • I/O 密集:使用 async/await + tokio(或 async-std),以事件驱动减少线程阻塞与上下文切换。
  • 减少锁争用
    • 细化锁粒度、缩短临界区;优先使用 原子操作 或无锁数据结构;读多写少场景用 RwLock
  • 通道与背压
    • 线程/任务间通信优先 消息传递;当生产者远快于消费者时,引入 有限容量通道批量处理 形成背压,避免内存暴涨。
  • 资源与错误处理
    • 为所有线程/任务 join()/await;为 I/O 操作设置超时与重试;统一错误类型与日志。
  • 可观测性
    • 在关键路径打点与追踪(如连接数、队列长度、处理时延),配合 perf/flamegraphtokio-console 定位瓶颈。
  • 生态扩展
    • 需要更强的并发通道与无锁结构时引入 crossbeam;需要 Actor 模式时考虑 actix

0