Skip to content

Java 并发编程最佳实践

多线程和并发编程是 Java 平台的核心优势,也是构建高性能、响应式应用的基础。然而,并发编程也是最容易出错的领域之一,稍有不慎就会导致死锁、活锁、竞态条件等难以调试的问题。本文将详细介绍 Java 并发编程的核心概念、常见挑战和最佳实践。

1. 并发编程基础

1.1 线程基础

Java 中创建线程的两种基本方式:

方式一:继承 Thread 类

java
class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Thread is running: " + Thread.currentThread().getName());
    }
}

// 使用
MyThread thread = new MyThread();
thread.start(); // 不要直接调用 run() 方法

方式二:实现 Runnable 接口(推荐)

java
class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Runnable is running: " + Thread.currentThread().getName());
    }
}

// 使用
Thread thread = new Thread(new MyRunnable());
thread.start();

// Java 8 Lambda 表达式简化
Thread thread = new Thread(() -> {
    System.out.println("Lambda Runnable is running");
});
thread.start();

1.2 线程状态与生命周期

Java 线程的生命周期包含六个状态:

  1. NEW:新创建但尚未启动的线程
  2. RUNNABLE:可运行状态,等待 CPU 分配时间片
  3. BLOCKED:线程被阻塞,等待监视器锁
  4. WAITING:无限期等待另一个线程执行特定操作
  5. TIMED_WAITING:有限期等待另一个线程执行操作
  6. TERMINATED:线程已执行完毕
java
// 获取线程状态
Thread thread = new Thread(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

System.out.println("Before starting: " + thread.getState()); // NEW
thread.start();
System.out.println("After starting: " + thread.getState()); // RUNNABLE
Thread.sleep(1000);
System.out.println("While sleeping: " + thread.getState()); // TIMED_WAITING
thread.join();
System.out.println("After completion: " + thread.getState()); // TERMINATED

1.3 线程优先级与调度

Java 线程调度是基于优先级的抢占式调度:

java
// 设置线程优先级
thread.setPriority(Thread.MIN_PRIORITY); // 1
thread.setPriority(Thread.NORM_PRIORITY); // 5 (默认)
thread.setPriority(Thread.MAX_PRIORITY); // 10

需要注意,线程优先级高并不保证一定先执行,只是获得 CPU 时间片的概率更高。优先级的效果受到操作系统和 JVM 实现的影响。

2. 线程安全与同步机制

2.1 竞态条件与临界区

竞态条件是指多个线程以不可预期的顺序访问共享资源,导致程序出现错误:

java
// 线程不安全的计数器示例
class UnsafeCounter {
    private int count = 0;
    
    public void increment() {
        count++; // 非原子操作
    }
    
    public int getCount() {
        return count;
    }
}

2.2 同步机制

解决竞态条件的主要方法是使用同步机制:

1. synchronized 关键字

java
// 同步方法
class SafeCounter {
    private int count = 0;
    
    public synchronized void increment() {
        count++;
    }
    
    public synchronized int getCount() {
        return count;
    }
}

// 同步块
class SafeCounter {
    private int count = 0;
    private final Object lock = new Object();
    
    public void increment() {
        synchronized(lock) {
            count++;
        }
    }
    
    public int getCount() {
        synchronized(lock) {
            return count;
        }
    }
}

2. volatile 关键字

volatile 保证变量的可见性,但不保证原子性:

java
class SharedData {
    private volatile boolean flag = false;
    
    public void setFlag(boolean flag) {
        this.flag = flag;
    }
    
    public boolean isFlag() {
        return flag;
    }
}

3. 原子类

java.util.concurrent.atomic 包提供了原子类,用于无锁编程:

java
import java.util.concurrent.atomic.AtomicInteger;

class AtomicCounter {
    private AtomicInteger count = new AtomicInteger(0);
    
    public void increment() {
        count.incrementAndGet();
    }
    
    public int getCount() {
        return count.get();
    }
}

2.3 死锁与避免策略

死锁是指两个或多个线程互相等待对方持有的锁,导致永久阻塞:

java
// 死锁示例
public void deadlockExample() {
    final Object resource1 = new Object();
    final Object resource2 = new Object();
    
    Thread thread1 = new Thread(() -> {
        synchronized(resource1) {
            System.out.println("Thread 1: Holding resource 1");
            
            try { Thread.sleep(100); } catch (InterruptedException e) {}
            
            System.out.println("Thread 1: Waiting for resource 2");
            synchronized(resource2) {
                System.out.println("Thread 1: Holding resource 1 and 2");
            }
        }
    });
    
    Thread thread2 = new Thread(() -> {
        synchronized(resource2) {
            System.out.println("Thread 2: Holding resource 2");
            
            try { Thread.sleep(100); } catch (InterruptedException e) {}
            
            System.out.println("Thread 2: Waiting for resource 1");
            synchronized(resource1) {
                System.out.println("Thread 2: Holding resource 2 and 1");
            }
        }
    });
    
    thread1.start();
    thread2.start();
}

避免死锁的策略:

  1. 锁顺序:始终按照相同的顺序获取锁
  2. 锁超时:使用带超时的锁获取方法
  3. 死锁检测:使用线程转储和监控工具检测死锁
  4. 使用 tryLock():尝试获取锁,如果不可用则执行其他操作

3. Java并发工具类

3.1 Lock 接口

java.util.concurrent.locks 包提供了比 synchronized 更灵活的锁机制:

java
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ReentrantLockCounter {
    private int count = 0;
    private final Lock lock = new ReentrantLock();
    
    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock(); // 确保在异常情况下也能释放锁
        }
    }
    
    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

Lock 接口相比 synchronized 的优势:

  1. 非阻塞获取锁 (tryLock())
  2. 可中断获取锁 (lockInterruptibly())
  3. 超时获取锁 (tryLock(long timeout, TimeUnit unit))
  4. 多条件变量 (newCondition())

3.2 读写锁

读写锁允许多个线程同时读取,但只允许一个线程写入:

java
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class ReadWriteMap<K, V> {
    private final Map<K, V> map = new HashMap<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();
    
    public V get(K key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }
    
    public V put(K key, V value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }
}

3.3 并发集合

java.util.concurrent 包提供了线程安全的集合类:

  1. ConcurrentHashMap:线程安全的哈希表,比 Hashtable 性能更好
  2. CopyOnWriteArrayList:适用于读多写少的场景
  3. ConcurrentLinkedQueue:无界线程安全队列
  4. BlockingQueue:支持阻塞操作的队列,常用于生产者-消费者模式
java
// ConcurrentHashMap 示例
Map<String, String> concurrentMap = new ConcurrentHashMap<>();
concurrentMap.put("key", "value");

// CopyOnWriteArrayList 示例
List<String> list = new CopyOnWriteArrayList<>();
list.add("item");

// BlockingQueue 示例
BlockingQueue<Task> queue = new LinkedBlockingQueue<>(100);
queue.put(new Task()); // 如果队列满,则阻塞
Task task = queue.take(); // 如果队列空,则阻塞

3.4 线程池

使用线程池可以减少线程创建和销毁的开销,提高资源利用率:

java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(10);

// 缓存线程池(根据需要创建新线程,空闲线程会被复用)
ExecutorService cachedPool = Executors.newCachedThreadPool();

// 单线程执行器
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

// 提交任务
fixedPool.submit(() -> {
    System.out.println("Task executed by " + Thread.currentThread().getName());
});

// 关闭线程池
fixedPool.shutdown();

自定义线程池

java
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10,                         // 核心线程数
    20,                         // 最大线程数
    60, TimeUnit.SECONDS,       // 空闲线程存活时间
    new LinkedBlockingQueue<>(100), // 工作队列
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

4. 并发编程设计模式

4.1 生产者-消费者模式

生产者和消费者通过共享队列进行通信:

java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

class ProducerConsumer {
    private final BlockingQueue<Integer> queue;
    
    public ProducerConsumer(int capacity) {
        this.queue = new LinkedBlockingQueue<>(capacity);
    }
    
    public void produce() {
        try {
            for (int i = 0; i < 100; i++) {
                queue.put(i);
                System.out.println("Produced: " + i);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public void consume() {
        try {
            while (true) {
                Integer value = queue.take();
                System.out.println("Consumed: " + value);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

4.2 异步任务处理模式

通过 FutureCompletableFuture 实现异步任务:

java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// 使用 CompletableFuture 进行异步任务处理
ExecutorService executor = Executors.newFixedThreadPool(4);

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟长时间运行的任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return "Task completed";
}, executor);

// 添加回调
future.thenAccept(result -> System.out.println("Result: " + result));

// 链式调用
CompletableFuture<Integer> processedFuture = future
    .thenApply(String::length)
    .thenApply(len -> len * 2);

// 组合多个 CompletableFuture
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);

executor.shutdown();

4.3 线程局部存储模式

ThreadLocal 用于线程隔离的数据存储:

java
class UserContext {
    private static final ThreadLocal<User> userThreadLocal = new ThreadLocal<>();
    
    public static void setUser(User user) {
        userThreadLocal.set(user);
    }
    
    public static User getUser() {
        return userThreadLocal.get();
    }
    
    public static void clear() {
        userThreadLocal.remove(); // 防止内存泄漏
    }
}

// 使用
try {
    User user = new User("John");
    UserContext.setUser(user);
    
    // 在任何地方访问用户信息,而不需要参数传递
    processRequest();
} finally {
    UserContext.clear(); // 重要:清理 ThreadLocal 避免内存泄漏
}

5. Java并发编程最佳实践

5.1 避免过度同步

  1. 最小化同步范围:只同步临界区,不要同步整个方法
java
// 不好的实践
public synchronized void processAndLog(Data data) {
    // 耗时的数据处理
    process(data);
    // 日志记录
    log(data);
}

// 好的实践
public void processAndLog(Data data) {
    // 非临界区代码不需要同步
    Data processedData = process(data);
    
    // 只同步需要线程安全的操作
    synchronized(this) {
        log(processedData);
    }
}
  1. 使用并发集合代替同步集合
    • 使用 ConcurrentHashMap 代替 HashtableCollections.synchronizedMap()
    • 使用 CopyOnWriteArrayList 代替 VectorCollections.synchronizedList()

5.2 避免不必要的对象共享

  1. 使用不可变对象:不可变对象天生是线程安全的
  2. 线程封闭:确保对象只被一个线程访问
  3. 线程局部存储:使用 ThreadLocal 避免共享

5.3 正确处理线程中断

java
void interruptibleMethod() {
    try {
        while (!Thread.currentThread().isInterrupted()) {
            // 执行任务...
            
            // 阻塞操作
            Thread.sleep(1000);
        }
    } catch (InterruptedException e) {
        // 重新设置中断标志
        Thread.currentThread().interrupt();
        // 清理资源
    } finally {
        // 清理资源
    }
}

5.4 合理使用线程池

  1. 选择合适的线程池类型

    • IO密集型任务:线程数 = CPU核心数 * (1 + 等待时间/计算时间)
    • CPU密集型任务:线程数 = CPU核心数 + 1
  2. 合理设置线程池参数

    • 避免过大的队列和过多的线程
    • 使用有界队列防止内存溢出
    • 设置合理的拒绝策略
java
int cpuCores = Runtime.getRuntime().availableProcessors();

// CPU密集型任务
ThreadPoolExecutor cpuPool = new ThreadPoolExecutor(
    cpuCores + 1,
    cpuCores + 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(1000),
    new ThreadFactoryBuilder().setNameFormat("cpu-pool-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// IO密集型任务
ThreadPoolExecutor ioPool = new ThreadPoolExecutor(
    cpuCores * 2,
    cpuCores * 4,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),
    new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

5.5 使用 CompletableFuture 进行异步编程

java
// 避免嵌套回调
CompletableFuture.supplyAsync(() -> fetchUserData(userId))
    .thenApply(user -> enrichUserData(user))
    .thenApply(user -> formatUserData(user))
    .thenAccept(formattedData -> display(formattedData))
    .exceptionally(ex -> {
        handleError(ex);
        return null;
    });

// 组合多个异步操作
CompletableFuture<UserData> userFuture = CompletableFuture.supplyAsync(() -> fetchUserData(userId));
CompletableFuture<ProductData> productFuture = CompletableFuture.supplyAsync(() -> fetchProductData(productId));

CompletableFuture<Page> pageFuture = userFuture.thenCombine(productFuture, (user, product) -> {
    return createPage(user, product);
});

5.6 使用并发工具而非底层同步

  1. 优先使用高级工具

    • 使用 ConcurrentHashMap 代替手动同步的 HashMap
    • 使用 AtomicInteger 代替 synchronized 的计数器
    • 使用 BlockingQueue 代替手动实现的生产者-消费者队列
  2. 使用 CountDownLatch 协调多线程

java
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(workerCount);

for (int i = 0; i < workerCount; i++) {
    executor.submit(() -> {
        try {
            startSignal.await(); // 等待开始信号
            doWork();
        } finally {
            doneSignal.countDown(); // 通知任务完成
        }
    });
}

// 准备工作完成后,发送开始信号
startSignal.countDown();

// 等待所有工作线程完成
doneSignal.await();
  1. 使用 CyclicBarrier 协调多阶段计算
java
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
    // 每轮结束时执行
    System.out.println("Phase completed");
});

executor.submit(() -> {
    for (int i = 0; i < phases; i++) {
        doPhaseWork();
        barrier.await(); // 等待所有线程完成本阶段
    }
});

6. 常见并发问题诊断与处理

6.1 死锁检测与处理

检测死锁

java
// 获取Java线程转储
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();

if (deadlockedThreads != null) {
    ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(deadlockedThreads, true, true);
    for (ThreadInfo threadInfo : threadInfos) {
        System.out.println(threadInfo);
    }
}

处理策略

  1. 按照固定顺序获取锁
  2. 使用 tryLock() 方法和超时机制
  3. 使用开放调用:不要在持有锁的情况下调用外部方法

6.2 线程安全问题排查

  1. 使用静态分析工具

    • FindBugs/SpotBugs
    • SonarQube
    • IntelliJ IDEA 内置的线程安全分析
  2. 使用并发调试工具

    • Java Flight Recorder
    • VisualVM
  3. 压力测试暴露并发问题

    • JMeter
    • Gatling

6.3 性能问题优化

  1. 减少锁争用

    • 使用分段锁(如 ConcurrentHashMap 的实现)
    • 使用高效的并发数据结构
    • 减小锁粒度
  2. 合理使用 volatile

    • 对于简单的标志变量,使用 volatile 比同步更轻量
    • 但请记住 volatile 不保证原子性
  3. 优化线程池配置

    • 监控线程池使用情况
    • 根据实际负载调整线程池大小

7. Java 并发编程实战案例

7.1 高性能缓存实现

java
public class ConcurrentCache<K, V> {
    private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<K, Long> timestamps = new ConcurrentHashMap<>();
    private final long expirationTimeMs;
    
    public ConcurrentCache(long expirationTimeMs) {
        this.expirationTimeMs = expirationTimeMs;
        
        // 启动清理线程
        startCleanupThread();
    }
    
    public V get(K key) {
        Long timestamp = timestamps.get(key);
        if (timestamp == null) {
            return null;
        }
        
        if (System.currentTimeMillis() - timestamp > expirationTimeMs) {
            cache.remove(key);
            timestamps.remove(key);
            return null;
        }
        
        return cache.get(key);
    }
    
    public void put(K key, V value) {
        cache.put(key, value);
        timestamps.put(key, System.currentTimeMillis());
    }
    
    private void startCleanupThread() {
        Thread cleanupThread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(expirationTimeMs / 2);
                    
                    long currentTime = System.currentTimeMillis();
                    for (Map.Entry<K, Long> entry : timestamps.entrySet()) {
                        if (currentTime - entry.getValue() > expirationTimeMs) {
                            K key = entry.getKey();
                            cache.remove(key);
                            timestamps.remove(key);
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        
        cleanupThread.setDaemon(true);
        cleanupThread.start();
    }
}

7.2 并发请求限流器

java
public class RateLimiter {
    private final AtomicInteger count = new AtomicInteger(0);
    private final int limit;
    private final long timeWindowMs;
    private final AtomicLong lastResetTime;
    
    public RateLimiter(int limit, long timeWindowMs) {
        this.limit = limit;
        this.timeWindowMs = timeWindowMs;
        this.lastResetTime = new AtomicLong(System.currentTimeMillis());
    }
    
    public boolean allowRequest() {
        long currentTime = System.currentTimeMillis();
        long lastReset = lastResetTime.get();
        
        if (currentTime - lastReset > timeWindowMs) {
            // 尝试重置计数器,使用 CAS 避免竞态条件
            if (lastResetTime.compareAndSet(lastReset, currentTime)) {
                count.set(0);
            }
        }
        
        return count.incrementAndGet() <= limit;
    }
}

7.3 并发数据处理流水线

java
public class DataProcessingPipeline<T, R> {
    private final BlockingQueue<T> inputQueue;
    private final BlockingQueue<R> outputQueue;
    private final ExecutorService executor;
    private final Function<T, R> processingFunction;
    private final AtomicBoolean running = new AtomicBoolean(false);
    
    public DataProcessingPipeline(int queueSize, int workerCount, Function<T, R> processingFunction) {
        this.inputQueue = new LinkedBlockingQueue<>(queueSize);
        this.outputQueue = new LinkedBlockingQueue<>(queueSize);
        this.processingFunction = processingFunction;
        this.executor = Executors.newFixedThreadPool(workerCount);
    }
    
    public void start() {
        if (running.compareAndSet(false, true)) {
            IntStream.range(0, executor.getCorePoolSize()).forEach(i -> {
                executor.submit(this::processItems);
            });
        }
    }
    
    public void stop() {
        running.set(false);
        executor.shutdownNow();
    }
    
    public CompletableFuture<Void> submit(T item) {
        return CompletableFuture.runAsync(() -> {
            try {
                inputQueue.put(item);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CompletionException(e);
            }
        });
    }
    
    public CompletableFuture<R> getOutput() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return outputQueue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CompletionException(e);
            }
        });
    }
    
    private void processItems() {
        while (running.get() && !Thread.currentThread().isInterrupted()) {
            try {
                T input = inputQueue.poll(100, TimeUnit.MILLISECONDS);
                if (input != null) {
                    R result = processingFunction.apply(input);
                    outputQueue.put(result);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                // 处理错误,可能记录日志或重试
            }
        }
    }
}

总结

Java 并发编程既是一个强大的工具,也是一个充满挑战的领域。通过掌握核心概念、遵循最佳实践、使用正确的工具和模式,可以开发出高效、可靠的并发应用。

关键要点:

  1. 理解并发基础概念,如线程状态和同步机制
  2. 合理使用并发工具类,如 Lock、原子类和并发集合
  3. 遵循最佳实践,如避免过度同步、正确处理中断
  4. 使用高级 API,如 CompletableFuture 进行异步编程
  5. 实施积极的调试和监控策略

随着 Java 语言的发展,并发 API 也在不断演进,保持学习新特性和最佳实践将帮助开发者构建更优秀的并发应用。