
在深入探讨线程同步的实现方式之前,我们首先要理解为什么需要线程同步。让我们从一个简单的例子开始。
假设我们有一个计数器,多个线程同时对其进行递增操作:
import lombok.extern.slf4j.Slf4j;
/**
* 线程不安全的计数器示例
*
* @author ken
*/
@Slf4j
public class UnsafeCounter {
private int count = 0;
/**
* 递增计数器
*/
public void increment() {
count++;
}
/**
* 获取当前计数
*
* @return 当前计数值
*/
public int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
UnsafeCounter counter = new UnsafeCounter();
int threadCount = 10;
int incrementPerThread = 10000;
// 创建并启动多个线程
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < incrementPerThread; j++) {
counter.increment();
}
});
threads[i].start();
}
// 等待所有线程执行完毕
for (Thread thread : threads) {
thread.join();
}
log.info("预期结果: {}", threadCount * incrementPerThread);
log.info("实际结果: {}", counter.getCount());
}
}
运行这段代码,你会发现实际结果往往小于预期的 100000。这是因为count++操作并不是原子操作,它实际上包含三个步骤:
当多个线程同时执行这三个步骤时,就可能出现交错执行的情况,导致最终结果不正确。例如:
这种多个线程访问共享资源时可能出现的数据不一致问题,称为线程不安全。而线程同步就是为了解决这类问题。
线程同步的核心目标是保证多线程环境下共享资源的正确访问,具体来说包括:
这三个特性通常被称为并发编程的 "三要素",也是我们实现线程同步时需要保证的基本要求。
Java 提供了多种线程同步机制,每种机制都有其适用场景和优缺点。下面我们将逐一介绍这些机制。
synchronized是 Java 中最基本的线程同步机制,它通过对象锁的方式来实现同步。
synchronized可以用于修饰方法或代码块:
import lombok.extern.slf4j.Slf4j;
/**
* synchronized关键字使用示例
*
* @author ken
*/
@Slf4j
public class SynchronizedDemo {
private int count = 0;
/**
* 同步实例方法
*/
public synchronized void synchronizedMethod() {
count++;
}
/**
* 同步代码块(使用this作为锁对象)
*/
public void synchronizedBlockThis() {
synchronized (this) {
count++;
}
}
private final Object lock = new Object();
/**
* 同步代码块(使用指定对象作为锁)
*/
public void synchronizedBlockObject() {
synchronized (lock) {
count++;
}
}
/**
* 同步静态方法
*/
public static synchronized void synchronizedStaticMethod() {
// 静态方法的同步锁是类对象本身
}
}
synchronized的实现基于对象头和监视器锁(Monitor)。在 JVM 中,每个对象都有一个对象头,其中包含了锁的状态信息。当线程进入同步代码块时,会尝试获取对象的监视器锁;当线程退出同步代码块时,会释放监视器锁。
流程图如下:

在 JDK 6 及以后,synchronized 进行了重大优化,引入了偏向锁、轻量级锁和重量级锁三种锁状态,根据竞争情况自动升级,大大提高了性能。
我们使用 synchronized 来修复前面的计数器问题:
import lombok.extern.slf4j.Slf4j;
/**
* 使用synchronized解决线程安全问题
*
* @author ken
*/
@Slf4j
public class SynchronizedCounter {
private int count = 0;
/**
* 同步递增计数器
*/
public synchronized void increment() {
count++;
}
/**
* 获取当前计数
*
* @return 当前计数值
*/
public synchronized int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
SynchronizedCounter counter = new SynchronizedCounter();
int threadCount = 10;
int incrementPerThread = 10000;
// 创建并启动多个线程
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < incrementPerThread; j++) {
counter.increment();
}
});
threads[i].start();
}
// 等待所有线程执行完毕
for (Thread thread : threads) {
thread.join();
}
log.info("预期结果: {}", threadCount * incrementPerThread);
log.info("实际结果: {}", counter.getCount());
}
}
运行这段代码,你会发现实际结果总是等于预期结果,说明线程安全问题得到了解决。
优点:
缺点:
java.util.concurrent.locks.Lock接口是 Java 5 引入的更灵活的线程同步机制,它提供了比synchronized更丰富的功能。
public interface Lock {
/**
* 获取锁,如果锁被占用则等待
*/
void lock();
/**
* 获取锁,如果线程被中断则停止等待
*/
void lockInterruptibly() throws InterruptedException;
/**
* 尝试获取锁,立即返回结果,不等待
*
* @return 如果获取到锁则返回true,否则返回false
*/
boolean tryLock();
/**
* 在指定时间内尝试获取锁,如果超时则返回
*
* @param time 等待时间
* @param unit 时间单位
* @return 如果获取到锁则返回true,否则返回false
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 释放锁
*/
void unlock();
/**
* 创建一个与该锁关联的条件对象
*
* @return 条件对象
*/
Condition newCondition();
}
ReentrantLock是Lock接口的最常用实现类,它提供了与synchronized类似的功能,但更灵活。
使用ReentrantLock解决计数器问题:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 使用ReentrantLock解决线程安全问题
*
* @author ken
*/
@Slf4j
public class ReentrantLockCounter {
private int count = 0;
private final Lock lock = new ReentrantLock();
/**
* 递增计数器
*/
public void increment() {
// 获取锁
lock.lock();
try {
// 执行临界区代码
count++;
} finally {
// 确保锁被释放
lock.unlock();
}
}
/**
* 获取当前计数
*
* @return 当前计数值
*/
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ReentrantLockCounter counter = new ReentrantLockCounter();
int threadCount = 10;
int incrementPerThread = 10000;
// 创建并启动多个线程
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < incrementPerThread; j++) {
counter.increment();
}
});
threads[i].start();
}
// 等待所有线程执行完毕
for (Thread thread : threads) {
thread.join();
}
log.info("预期结果: {}", threadCount * incrementPerThread);
log.info("实际结果: {}", counter.getCount());
}
}
/**
* ReentrantLock可中断特性示例
*
* @author ken
*/
public class InterruptibleLockDemo {
private final Lock lock = new ReentrantLock();
public void longRunningTask() throws InterruptedException {
// 使用可中断的方式获取锁
lock.lockInterruptibly();
try {
// 模拟长时间运行的任务
Thread.sleep(10000);
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
InterruptibleLockDemo demo = new InterruptibleLockDemo();
// 第一个线程获取锁并执行长时间任务
Thread thread1 = new Thread(() -> {
try {
demo.longRunningTask();
} catch (InterruptedException e) {
log.info("线程1被中断");
}
});
thread1.start();
// 让线程1有时间获取锁
Thread.sleep(1000);
// 第二个线程尝试获取锁
Thread thread2 = new Thread(() -> {
try {
log.info("线程2尝试获取锁");
demo.longRunningTask();
} catch (InterruptedException e) {
log.info("线程2被中断");
}
});
thread2.start();
// 让线程2尝试获取锁一段时间后中断它
Thread.sleep(2000);
thread2.interrupt();
}
}
/**
* ReentrantLock超时获取锁示例
*
* @author ken
*/
public class TimeoutLockDemo {
private final Lock lock = new ReentrantLock();
public boolean tryTaskWithTimeout(long timeout, TimeUnit unit) throws InterruptedException {
// 尝试在指定时间内获取锁
if (lock.tryLock(timeout, unit)) {
try {
// 执行任务
log.info("获取锁成功,执行任务");
return true;
} finally {
lock.unlock();
}
} else {
// 获取锁超时
log.info("获取锁超时");
return false;
}
}
public static void main(String[] args) throws InterruptedException {
TimeoutLockDemo demo = new TimeoutLockDemo();
// 第一个线程获取锁并保持一段时间
Thread thread1 = new Thread(() -> {
demo.lock.lock();
try {
log.info("线程1获取锁");
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
demo.lock.unlock();
log.info("线程1释放锁");
}
});
thread1.start();
// 让线程1有时间获取锁
Thread.sleep(1000);
// 第二个线程尝试在3秒内获取锁
Thread thread2 = new Thread(() -> {
try {
demo.tryTaskWithTimeout(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread2.start();
}
}
ReentrantLock默认是非公平锁,即线程获取锁的顺序不一定按照申请锁的顺序。我们可以通过构造函数创建公平锁:
// 创建公平锁
private final Lock fairLock = new ReentrantLock(true);
公平锁可以保证线程按照申请锁的顺序获取锁,但会带来一定的性能开销。
特性 | synchronized | ReentrantLock |
|---|---|---|
锁获取方式 | 隐式获取 | 显式调用 lock () 方法 |
锁释放方式 | 自动释放 | 显式调用 unlock () 方法 |
可中断性 | 不可中断 | 可中断(lockInterruptibly ()) |
超时获取 | 不支持 | 支持(tryLock (time, unit)) |
公平性 | 非公平 | 可选择公平或非公平 |
条件变量 | 不支持 | 支持(通过 Condition) |
性能 | 低并发下较好,高并发下较差 | 高并发下性能更稳定 |
Java 的java.util.concurrent.atomic包提供了一系列原子类,这些类利用CAS(Compare-And-Swap) 操作来实现线程安全,无需使用锁。
CAS 操作包含三个操作数:内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相等,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。
流程图如下:

CAS 是一种乐观锁策略,它假设多个线程访问共享资源时不会发生冲突,因此不需要加锁,只是在更新时检查是否有冲突。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 使用AtomicInteger解决线程安全问题
*
* @author ken
*/
@Slf4j
public class AtomicCounter {
private final AtomicInteger count = new AtomicInteger(0);
/**
* 递增计数器
*/
public void increment() {
count.incrementAndGet();
}
/**
* 获取当前计数
*
* @return 当前计数值
*/
public int getCount() {
return count.get();
}
public static void main(String[] args) throws InterruptedException {
AtomicCounter counter = new AtomicCounter();
int threadCount = 10;
int incrementPerThread = 10000;
// 创建并启动多个线程
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < incrementPerThread; j++) {
counter.increment();
}
});
threads[i].start();
}
// 等待所有线程执行完毕
for (Thread thread : threads) {
thread.join();
}
log.info("预期结果: {}", threadCount * incrementPerThread);
log.info("实际结果: {}", counter.getCount());
}
}
优点:
缺点:
在很多场景下,读操作的频率远高于写操作。ReentrantReadWriteLock提供了一种读写分离的锁机制,允许多个线程同时读取共享资源,但只有一个线程可以写入共享资源。
流程图如下:

import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 读写锁使用示例
*
* @author ken
*/
@Slf4j
public class ReadWriteLockDemo {
private final Map<String, Object> cache = new HashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 读锁
private final java.util.concurrent.locks.Lock readLock = rwLock.readLock();
// 写锁
private final java.util.concurrent.locks.Lock writeLock = rwLock.writeLock();
/**
* 从缓存中获取数据
*
* @param key 键
* @return 对应的值,如果不存在则返回null
*/
public Object get(String key) {
readLock.lock();
try {
log.info("读取缓存,key: {}", key);
return cache.get(key);
} finally {
readLock.unlock();
}
}
/**
* 向缓存中写入数据
*
* @param key 键
* @param value 值
*/
public void put(String key, Object value) {
writeLock.lock();
try {
log.info("写入缓存,key: {}, value: {}", key, value);
cache.put(key, value);
} finally {
writeLock.unlock();
}
}
/**
* 从缓存中移除数据
*
* @param key 键
*/
public void remove(String key) {
writeLock.lock();
try {
log.info("移除缓存,key: {}", key);
cache.remove(key);
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ReadWriteLockDemo cache = new ReadWriteLockDemo();
// 启动多个读线程
for (int i = 0; i < 5; i++) {
int readerId = i;
new Thread(() -> {
for (int j = 0; j < 10; j++) {
cache.get("key" + (j % 3));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Reader-" + readerId).start();
}
// 启动写线程
new Thread(() -> {
for (int j = 0; j < 3; j++) {
cache.put("key" + j, "value" + j);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Writer").start();
// 等待所有线程执行完毕
Thread.sleep(5000);
}
}
信号量是一种计数器,用于控制同时访问某个资源的线程数量。它可以看作是一种广义的锁,允许指定数量的线程同时访问共享资源。
信号量维护了一个许可集,线程可以通过acquire()方法获取许可,通过release()方法释放许可。如果没有可用许可,acquire()方法会阻塞直到有许可可用。
流程图如下:

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Semaphore;
/**
* 信号量使用示例
*
* @author ken
*/
@Slf4j
public class SemaphoreDemo {
// 限制同时只有3个线程可以访问资源
private static final Semaphore SEMAPHORE = new Semaphore(3);
private static final int THREAD_COUNT = 10;
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
int threadId = i;
new Thread(() -> {
try {
// 获取许可
SEMAPHORE.acquire();
log.info("线程{}获取到许可,开始访问资源", threadId);
// 模拟访问资源的操作
Thread.sleep(1000);
log.info("线程{}访问资源完毕,释放许可", threadId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放许可
SEMAPHORE.release();
}
}).start();
}
}
}
运行这段代码,你会发现同一时间最多只有 3 个线程在访问资源,其他线程需要等待已有线程释放许可后才能获取许可。
CountDownLatch允许一个或多个线程等待其他线程完成一组操作后再继续执行。
CountDownLatch初始化时指定一个计数,等待的线程调用await()方法进入等待状态,其他线程完成操作后调用countDown()方法 decrement 计数。当计数减到 0 时,所有等待的线程被唤醒。
流程图如下:

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
/**
* CountDownLatch使用示例
*
* @author ken
*/
@Slf4j
public class CountDownLatchDemo {
private static final int WORKER_COUNT = 5;
public static void main(String[] args) throws InterruptedException {
// 初始化CountDownLatch,计数为工作线程数量
CountDownLatch latch = new CountDownLatch(WORKER_COUNT);
log.info("主线程开始,等待所有工作线程完成任务");
// 启动工作线程
for (int i = 0; i < WORKER_COUNT; i++) {
int workerId = i;
new Thread(() -> {
try {
log.info("工作线程{}开始执行任务", workerId);
// 模拟任务执行
Thread.sleep((long) (Math.random() * 1000));
log.info("工作线程{}完成任务", workerId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 任务完成,计数减1
latch.countDown();
}
}).start();
}
// 等待所有工作线程完成任务
latch.await();
log.info("所有工作线程都已完成任务,主线程继续执行");
}
}
CyclicBarrier允许一组线程相互等待,直到所有线程都到达某个屏障点后再同时继续执行。与CountDownLatch不同的是,CyclicBarrier可以重复使用。
CyclicBarrier初始化时指定参与的线程数量,每个线程到达屏障点时调用await()方法等待其他线程。当所有线程都到达屏障点后,所有线程被唤醒并继续执行。如果构造CyclicBarrier时指定了一个 Runnable,该 Runnable 会在所有线程到达屏障点后、线程被唤醒前执行。
流程图如下:

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrier使用示例
*
* @author ken
*/
@Slf4j
public class CyclicBarrierDemo {
private static final int PARTY_COUNT = 3;
private static final int ROUND_COUNT = 2;
public static void main(String[] args) {
// 初始化CyclicBarrier,指定参与线程数量和屏障动作
CyclicBarrier barrier = new CyclicBarrier(PARTY_COUNT, () -> {
log.info("所有线程都已到达屏障点,执行屏障动作");
});
// 启动参与线程
for (int i = 0; i < PARTY_COUNT; i++) {
int threadId = i;
new Thread(() -> {
try {
for (int round = 1; round <= ROUND_COUNT; round++) {
log.info("线程{}开始第{}轮任务", threadId, round);
// 模拟任务执行
Thread.sleep((long) (Math.random() * 1000));
log.info("线程{}完成第{}轮任务,等待其他线程", threadId, round);
// 到达屏障点,等待其他线程
barrier.await();
}
log.info("线程{}所有轮次任务完成", threadId);
} catch (Exception e) {
log.error("线程{}发生异常", threadId, e);
}
}).start();
}
}
}
运行这段代码,你会看到所有线程完成第一轮任务后,会执行屏障动作,然后继续执行第二轮任务,这展示了CyclicBarrier可以重复使用的特性。
ThreadLocal提供了一种线程隔离的机制,它为每个线程创建一个独立的变量副本,从而避免了线程间的共享和同步问题。
ThreadLocal内部维护了一个ThreadLocalMap,每个线程都有一个关联的ThreadLocalMap,其中存储了该线程的变量副本。当线程访问ThreadLocal的get()方法时,会获取当前线程的ThreadLocalMap,并从中取出该ThreadLocal对应的变量副本;当调用set()方法时,会将变量存储到当前线程的ThreadLocalMap中。
架构图如下:

import lombok.extern.slf4j.Slf4j;
/**
* ThreadLocal使用示例
*
* @author ken
*/
@Slf4j
public class ThreadLocalDemo {
// 创建ThreadLocal变量
private static final ThreadLocal<Integer> THREAD_LOCAL = ThreadLocal.withInitial(() -> 0);
/**
* 递增当前线程的计数器
*/
public static void increment() {
THREAD_LOCAL.set(THREAD_LOCAL.get() + 1);
}
/**
* 获取当前线程的计数器值
*
* @return 计数器值
*/
public static int get() {
return THREAD_LOCAL.get();
}
/**
* 清除当前线程的计数器
*/
public static void clear() {
THREAD_LOCAL.remove();
}
public static void main(String[] args) {
int threadCount = 3;
int incrementPerThread = 5;
// 启动多个线程
for (int i = 0; i < threadCount; i++) {
int threadId = i;
new Thread(() -> {
try {
log.info("线程{}开始,初始值: {}", threadId, get());
// 执行递增操作
for (int j = 0; j < incrementPerThread; j++) {
increment();
log.info("线程{}第{}次递增后的值: {}", threadId, j + 1, get());
}
} finally {
// 清除ThreadLocal,避免内存泄漏
clear();
}
}).start();
}
}
}
运行这段代码,你会发现每个线程的计数器都是独立的,不会相互影响。
ThreadLocalMap中的 Entry 是弱引用(WeakReference),但 Entry 中的 value 是强引用。如果线程长时间存活(如线程池中的核心线程),且ThreadLocal对象被回收,value 会一直存在,导致内存泄漏。因此,使用完ThreadLocal后,应调用remove()方法清除 value。ThreadLocal中的值,后一个任务可能会读取到错误的值。因此,在线程池环境中使用ThreadLocal时,必须在任务执行完毕后清除 value。掌握了 Java 提供的各种线程同步机制后,我们还需要了解一些高级技巧和最佳实践,以写出更高效、更安全的并发代码。
将一个大锁拆分为多个小锁,减少锁竞争。例如,ConcurrentHashMap 使用分段锁(Segment)来提高并发性能。
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
/**
* 减小锁粒度示例
*
* @author ken
*/
@Slf4j
public class ReduceLockGranularity {
// 锁的数量
private static final int LOCK_COUNT = 16;
// 存储数据的Map数组
private final Map<String, Object>[] segments;
// 锁数组
private final ReentrantLock[] locks;
@SuppressWarnings("unchecked")
public ReduceLockGranularity() {
segments = new HashMap[LOCK_COUNT];
locks = new ReentrantLock[LOCK_COUNT];
for (int i = 0; i < LOCK_COUNT; i++) {
segments[i] = new HashMap<>();
locks[i] = new ReentrantLock();
}
}
/**
* 根据key计算对应的段索引
*
* @param key 键
* @return 段索引
*/
private int getSegmentIndex(String key) {
return Math.abs(key.hashCode() % LOCK_COUNT);
}
/**
* 存储数据
*
* @param key 键
* @param value 值
*/
public void put(String key, Object value) {
int index = getSegmentIndex(key);
ReentrantLock lock = locks[index];
lock.lock();
try {
segments[index].put(key, value);
} finally {
lock.unlock();
}
}
/**
* 获取数据
*
* @param key 键
* @return 值
*/
public Object get(String key) {
int index = getSegmentIndex(key);
ReentrantLock lock = locks[index];
lock.lock();
try {
return segments[index].get(key);
} finally {
lock.unlock();
}
}
}
根据操作类型分离读写锁,如前面介绍的ReentrantReadWriteLock。
如果一系列连续的操作都需要获取同一把锁,应该将锁的范围扩大,减少锁的获取和释放次数。
/**
* 锁粗化示例
*
* @author ken
*/
public class LockCoarsening {
private final Object lock = new Object();
private int count = 0;
/**
* 不推荐:频繁获取和释放锁
*/
public void badPractice() {
synchronized (lock) {
count++;
}
// 其他操作...
synchronized (lock) {
count++;
}
// 其他操作...
synchronized (lock) {
count++;
}
}
/**
* 推荐:扩大锁范围,减少锁操作
*/
public void goodPractice() {
synchronized (lock) {
count++;
// 其他操作...
count++;
// 其他操作...
count++;
}
}
}
死锁是指两个或多个线程相互等待对方释放锁而陷入无限等待的状态。避免死锁的方法包括:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* 避免死锁示例
*
* @author ken
*/
@Slf4j
public class AvoidDeadlock {
private final ReentrantLock lock1 = new ReentrantLock();
private final ReentrantLock lock2 = new ReentrantLock();
/**
* 按顺序获取锁,避免死锁
*/
public void safeOperation() {
// 总是先获取编号小的锁
ReentrantLock firstLock = lock1;
ReentrantLock secondLock = lock2;
firstLock.lock();
try {
secondLock.lock();
try {
// 执行需要两个锁的操作
log.info("执行安全操作");
} finally {
secondLock.unlock();
}
} finally {
firstLock.unlock();
}
}
/**
* 使用超时机制避免死锁
*/
public boolean tryOperationWithTimeout() throws InterruptedException {
// 尝试获取第一个锁
if (!lock1.tryLock(1, TimeUnit.SECONDS)) {
log.warn("获取锁1超时");
return false;
}
try {
// 尝试获取第二个锁
if (!lock2.tryLock(1, TimeUnit.SECONDS)) {
log.warn("获取锁2超时");
return false;
}
try {
// 执行需要两个锁的操作
log.info("执行超时安全操作");
return true;
} finally {
lock2.unlock();
}
} finally {
lock1.unlock();
}
}
}
不同的同步机制有不同的适用场景,选择合适的同步机制可以提高程序的性能和可读性:
synchronized,代码更简洁,不易出错。ReentrantLock,如需要中断、超时、公平锁等特性。ReentrantReadWriteLock,提高并发读性能。Semaphore。CountDownLatch。CyclicBarrier。ThreadLocal,避免共享变量。Java 的java.util.concurrent包提供了多种线程安全的容器类,可以大大简化并发编程:
示例:使用 BlockingQueue 实现生产者 - 消费者模型
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 使用BlockingQueue实现生产者-消费者模型
*
* @author ken
*/
@Slf4j
public class ProducerConsumerDemo {
// 缓冲区大小
private static final int BUFFER_SIZE = 10;
// 产品数量
private static final int PRODUCT_COUNT = 20;
// 阻塞队列作为缓冲区
private final BlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
/**
* 生产者线程
*/
class Producer implements Runnable {
private final int producerId;
Producer(int producerId) {
this.producerId = producerId;
}
@Override
public void run() {
try {
for (int i = 0; i < PRODUCT_COUNT; i++) {
int product = producerId * 100 + i;
// 将产品放入缓冲区,如果缓冲区满则等待
buffer.put(product);
log.info("生产者{}生产了产品{},当前缓冲区大小: {}",
producerId, product, buffer.size());
// 模拟生产时间
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 100));
}
log.info("生产者{}完成生产任务", producerId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* 消费者线程
*/
class Consumer implements Runnable {
private final int consumerId;
Consumer(int consumerId) {
this.consumerId = consumerId;
}
@Override
public void run() {
try {
int count = 0;
while (count < PRODUCT_COUNT) {
// 从缓冲区取出产品,如果缓冲区空则等待
Integer product = buffer.take();
count++;
log.info("消费者{}消费了产品{},当前缓冲区大小: {}",
consumerId, product, buffer.size());
// 模拟消费时间
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 150));
}
log.info("消费者{}完成消费任务", consumerId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
ProducerConsumerDemo demo = new ProducerConsumerDemo();
// 启动生产者
new Thread(demo.new Producer(1)).start();
new Thread(demo.new Producer(2)).start();
// 启动消费者
new Thread(demo.new Consumer(1)).start();
new Thread(demo.new Consumer(2)).start();
}
}
可见性问题是指一个线程对共享变量的修改,其他线程不能立即看到。这通常是由于 CPU 缓存、编译器优化或指令重排序导致的。
解决方案:
volatile关键字:volatile可以保证变量的可见性和有序性(禁止指令重排序),但不能保证原子性。import lombok.extern.slf4j.Slf4j;
/**
* volatile可见性示例
*
* @author ken
*/
@Slf4j
public class VolatileVisibilityDemo {
// 使用volatile保证可见性
private volatile boolean flag = false;
public void setFlag(boolean flag) {
this.flag = flag;
log.info("设置flag为: {}", flag);
}
public static void main(String[] args) throws InterruptedException {
VolatileVisibilityDemo demo = new VolatileVisibilityDemo();
// 监控线程
Thread monitorThread = new Thread(() -> {
log.info("监控线程开始运行");
// 循环检查flag的值
while (!demo.flag) {
// 如果flag为false,则空循环
}
log.info("监控线程检测到flag变为true,退出循环");
});
monitorThread.start();
// 主线程休眠一段时间后修改flag的值
Thread.sleep(1000);
demo.setFlag(true);
// 等待监控线程结束
monitorThread.join();
}
}
synchronized或Lock:同步机制不仅可以保证原子性,还可以保证可见性,因为释放锁时会将线程的本地缓存刷新到主内存,获取锁时会从主内存加载最新值。原子性问题是指一个操作不能被中断,要么全部执行,要么全部不执行。
解决方案:
synchronized或Lock保证代码块的原子性。AtomicReference结合 CAS 操作实现原子性。import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
/**
* 使用AtomicReference实现复合操作的原子性
*
* @author ken
*/
@Slf4j
public class AtomicReferenceDemo {
// 存储账户余额的原子引用
private final AtomicReference<Integer> balance = new AtomicReference<>(100);
/**
* 转账操作
*
* @param amount 转账金额,正数表示转入,负数表示转出
* @return 操作是否成功
*/
public boolean transfer(int amount) {
while (true) {
int current = balance.get();
int next = current + amount;
// 如果余额不足,转账失败
if (next < 0) {
log.info("余额不足,转账失败");
return false;
}
// 使用CAS操作更新余额
if (balance.compareAndSet(current, next)) {
log.info("转账成功,金额: {}, 当前余额: {}", amount, next);
return true;
}
// CAS操作失败,重试
}
}
public static void main(String[] args) {
AtomicReferenceDemo account = new AtomicReferenceDemo();
// 启动多个线程进行转账操作
for (int i = 0; i < 5; i++) {
int threadId = i;
new Thread(() -> {
// 随机转账金额,-50到50之间
int amount = (int) (Math.random() * 101) - 50;
account.transfer(amount);
}, "TransferThread-" + threadId).start();
}
}
}
有序性问题是指程序执行的顺序与代码的先后顺序不一致,这通常是由于编译器或 CPU 的指令重排序优化导致的。
解决方案:
volatile关键字:volatile可以禁止指令重排序。synchronized或Lock:同步机制可以保证代码块内的有序性。java.util.concurrent包中的工具类:这些工具类内部实现了有序性保证。典型的有序性问题示例是单例模式的双重检查锁定(DCL):
/**
* 单例模式的双重检查锁定
*
* @author ken
*/
public class Singleton {
// 必须使用volatile关键字,防止指令重排序
private static volatile Singleton instance;
private Singleton() {
// 私有构造方法
}
/**
* 获取单例实例
*
* @return 单例实例
*/
public static Singleton getInstance() {
// 第一次检查,避免每次都加锁
if (instance == null) {
// 同步块,保证只有一个线程进入
synchronized (Singleton.class) {
// 第二次检查,防止多个线程等待锁后重复创建实例
if (instance == null) {
// 如果没有volatile,这里可能发生指令重排序
instance = new Singleton();
}
}
}
return instance;
}
}
instance = new Singleton()实际上包含三个步骤:
如果没有volatile关键字,步骤 2 和步骤 3 可能会被重排序,导致其他线程获取到未初始化的对象。
线程同步是 Java 并发编程的核心内容,也是难点所在。本文详细介绍了 Java 中各种线程同步机制,包括synchronized、Lock、原子类、读写锁、信号量、倒计时门闩、循环屏障和线程本地存储等,并通过实例代码展示了它们的使用方法和适用场景。