
JUC (java.util.concurrent)JUC 是 Java 标准库中用于并发编程的核心工具包,大概包含以下组件:
ExecutorService、ThreadPoolExecutor 等类实现高效的线程复用和任务调度Lock 接口(如ReentrantLock),支持更灵活的锁控制Condition 类实现线程间精细化的等待/唤醒机制ConcurrentHashMap、CopyOnWriteArrayList,兼顾性能与安全性BlockingQueue 接口支持生产者-消费者模式的高效实现AtomicInteger、AtomicReference 等提供无锁化的线程安全数值操作Callable 接口Callable 接口是 Java 并发编程中用于表示可以由线程执行的任务的一种接口,它与 Runnable 接口类似,但功能更强大。
Callable是任务的生产者,负责定义需要执行的任务并生成结果;Future是任务的消费者,用于获取任务执行的结果或状态。两者通过 异步执行 和 结果传递 实现协作。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}特性 | Runnable | Callable |
|---|---|---|
返回结果 | 无 | 有(泛型支持) |
抛出异常 | 不能抛出受检异常 | 可以抛出受检异常 |
核心方法 | void run() | V call() throws Exception |
引入版本 | Java 1.0 | Java 5 |
相当于在 Runnable 的基础上,Callable 多了泛型返回值、异常处理的机制,更加强大!
Thread 启动任务,需要适配器类 FutureTask在使用 Callable 的时候,由于 Thread 类在构造的时候只能传入 Runnable,而不能直接传入 Callable,但可以通过使用 适配器类 FutureTask 来作为桥梁搭建关系。
原因如下所示:
FutureTask 实现了 Runnable,可以塞进 ThreadFutureTask 的构造函数接受 Callable,可以让你写返回值逻辑FutureTask 实现了 Future,你可以 get() 拿到结果public static void main(String[] args) throws Exception {
Callable<String> cable = () -> {
Thread.sleep(1000);
return "使用Callable执行完成";
};
// 使用FutureTask包装Callable
FutureTask<String> ft = new FutureTask<>(cable);
// 传给Thread进行构造
Thread thread = new Thread(ft);
thread.start();
// 使用get()方法获取返回值
String ret = ft.get();
System.out.println("结果:" + ret);
}Future(推荐⭐⭐⭐)Future 是一个接口,表示 "未来某个时间的计算结果";而 FutureTask 是它的一个实现类,同时也实现了 Runnable。
实际开发中更推荐这种方式,如下所示:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(5);
// submit一个Callable任务,然后返回Future<String>类型,需要强转
Future<String> ft = pool.submit(() -> {
try {
Thread.sleep(1000);
return "使用Callable执行完成";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
String ret = ft.get();
System.out.println("结果:" + ret);
}ReentrantLockReentrantLock 是一把可重入独占锁,相比 synchronized 来说使用方式要传统、复杂一点,但是更灵活强大,一般业务简单用 synchronized 足够,但并发复杂或需要更强控制力时,选 ReentrantLock。
方法名 | 作用 | 说明 |
|---|---|---|
lock() | 获取锁 | 不可中断,阻塞直到拿到锁 |
unlock() | 释放锁 | 必须手动调用,一般写在 finally 块中 |
tryLock() | 尝试获取锁 | 不阻塞,立即返回 true/false,适合失败降级处理 |
tryLock(long time, TimeUnit unit) | 限时尝试获取锁 | 在指定时间内等待,超时返回 false |
lockInterruptibly() | 可中断地获取锁 | 允许在等待锁时响应中断(用于避免死锁) |
isLocked() | 查询是否被锁定 | 返回当前锁是否被任意线程持有 |
isHeldByCurrentThread() | 是否被当前线程持有 | 可用于调试/逻辑判断 |
getHoldCount() | 当前线程持锁次数 | 多用于递归加锁场景 |
newCondition() | 获取 Condition 条件变量 | 替代 Object.wait()/notify() 的高级用法 |
💥注意事项:
lockInterruptibly() 和 lock() 两者都会在获取不到锁的时候进行阻塞挂起,区别是如果发生了死锁,前者可以通过监测线程调用 Thread.interrupt() 来唤醒,破除循环等待条件;而后者没办法唤醒,如果发生了死锁的话则只能关闭线程等来挽救,损失比较大。lock() 和 unlock():lock.lock(); // 尽量不要写在try里面,因为如果lock抛异常了,然后去finally中unlock的话,会直接报错的
try {
// 临界区
} finally {
lock.unlock();
}2. 对于 trylock():
if (lock.tryLock()) {
try {
// 获取锁成功
} finally {
lock.unlock();
}
} else {
// 获取失败,降级处理;也可以搞个计数重新trylock,直到次数达到某个上限就退出
}3. 对于 lockInterruptibly():
try {
lock.lockInterruptibly(); // 被唤醒的话,会抛出InterruptedException,然后进行特殊处理即可
try {
// 临界区
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
// 响应中断逻辑(释放资源等)
}4. 配合条件变量:
Condition condition = lock.newCondition();
lock.lock();
try {
while (!条件成立) {
condition.await(); // 相当于 wait()
}
// 继续执行
condition.signal(); // 相当于 notify()
} finally {
lock.unlock();
}ReentrantLock 和 synchronized 的区别synchronized | ReentrantLock | |
|---|---|---|
基本功能 | JVM 内置关键字 | JDK 提供的类 |
可重入性 | ✅ | ✅ |
可中断性 | ❌ | ✅ lockInterruptibly() |
尝试获取锁 | ❌ | ✅ tryLock() |
超时获取锁 | ❌ | ✅ tryLock(timeout) |
公平锁支持 | ❌ | ✅ 构造时指定 new ReentrantLock(true) |
必须手动释放锁 | ❌(自动) | ✅ 必须手动 unlock() |
条件变量 | 仅一个(wait/notify) | 支持多个独立的 Condition 对象(newCondition()) |
性能差异 | JVM 优化后性能很好 | 功能强大但稍慢 |
如何选择使用哪个锁❓❓❓
synchronized,效率更高,自动释放更方便。ReentrantLock,搭配 trylock() 更灵活控制加锁的行为,而不是死等。ReentrantLock。Atomic Java 的原子类是 java.util.concurrent.atomic 包中的核心工具之一,用于在并发环境下进行线程安全的变量操作,利用了 CAS 来实现,而不依赖 synchronized 或锁机制,所以性能也比较高!
下面是常见的原子类及其方法:
原子类 | 方法名 | 说明 |
|---|---|---|
AtomicInteger / AtomicLong | get() | 获取当前值 |
set(newValue) | 设置新值 | |
getAndSet(newValue) | 获取旧值并设置新值 | |
getAndIncrement() | 相当于 i++ | |
incrementAndGet() | 相当于 ++i | |
getAndAdd(delta) | 获取当前值后加 delta | |
addAndGet(delta) | 加 delta 后返回值 | |
compareAndSet(expect, update) | 原子条件更新 | |
AtomicBoolean | get() | 获取布尔值 |
set(newValue) | 设置布尔值 | |
compareAndSet(expect, update) | CAS 操作 | |
getAndSet(newValue) | 获取并设置布尔值 | |
AtomicReference<T> | get() | 获取引用 |
set(newRef) | 设置引用 | |
getAndSet(newRef) | 获取并替换引用 | |
compareAndSet(expect, update) | CAS 引用替换 | |
AtomicIntegerArray / AtomicLongArray | get(index) | 获取指定位置的值 |
set(index, newValue) | 设置指定位置的值 | |
getAndIncrement(index) | 获取并自增指定位置的值 | |
incrementAndGet(index) | 自增后返回值 | |
compareAndSet(index, expect, update) | CAS 更新某个位置的值 | |
AtomicStampedReference<T> | getReference() | 获取引用值 |
getStamp() | 获取当前 stamp(版本号) | |
set(newRef, newStamp) | 设置引用和版本号 | |
compareAndSet(expectRef, newRef, expectStamp, newStamp) | CAS 替换引用+版本号 | |
LongAdder | add(x) | 增加指定值 |
increment() | 增加 1 | |
sum() | 返回当前总和 | |
reset() | 重置为 0(非线程安全) |
SemaphoreSemaphore 是 信号量机制 的实现,用于控制对某些资源的访问数量。它的核心作用是:允许多个线程同时访问某个资源,但限制最大线程数。
方法签名 | 返回值类型 | 说明 |
|---|---|---|
Semaphore(int permits) | - | 创建一个非公平信号量,最多允许 permits 个许可 |
Semaphore(int permits, boolean fair) | - | 创建一个公平或非公平信号量 |
void acquire() | void | 获取一个许可,如果无可用许可则阻塞 |
void acquire(int permits) | void | 获取多个许可,若不足则阻塞 |
boolean tryAcquire() | boolean | 尝试获取一个许可,立即返回是否成功 |
boolean tryAcquire(long timeout, TimeUnit unit) | boolean | 在给定时间内尝试获取许可,超时失败 |
void release() | void | 释放一个许可,供其他线程使用 |
void release(int permits) | void | 释放多个许可 |
int availablePermits() | int | 返回当前可用许可数 |
int drainPermits() | int | 一次性获取并清空所有许可,返回清空前数量 |
boolean hasQueuedThreads() | boolean | 是否有线程在等待许可 |
int getQueueLength() | int | 当前等待许可的线程数(估计值) |
典型场景一:限制时访问某个服务的线程数(如 API 接口限流),避免系统因突发流量崩溃
Semaphore semaphore = new Semaphore(100); // 允许 100 个并发请求
public void handleRequest() {
semaphore.acquire();
try {
// 处理请求
} finally {
semaphore.release();
}
}经典场景二:控制同时执行的任务数量(如批量处理),平衡资源占用与处理效率。
// 限制同时处理的任务数为 5
Semaphore taskSemaphore = new Semaphore(5);
for (int i = 0; i < 100; i++) {
new Thread(() -> {
taskSemaphore.acquire();
try {
processTask();
} finally {
taskSemaphore.release();
}
}).start();
}经典场景三:协调生产者和消费者的执行节奏,避免生产者过快或消费者过慢导致的阻塞。
Semaphore emptySlots = new Semaphore(10); // 缓冲区容量 10
Semaphore filledSlots = new Semaphore(0);
// 生产者
void produce() throws InterruptedException {
emptySlots.acquire();
addToBuffer();
filledSlots.release();
}
// 消费者
void consume() throws InterruptedException {
filledSlots.acquire();
removeFromBuffer();
emptySlots.release();
}CountDownLatchCountDownLatch 是一种 "倒计时锁存器",它可以让一个或多个线程等待,直到其它线程完成一组操作。
形象地说:它就像一个倒计时器,初始值设为 N,每次 countDown() 值减 1,直到变成 0,所有等待的线程才会继续执行。
方法签名 | 返回类型 | 说明 |
|---|---|---|
CountDownLatch(int count) | - | 构造函数,设置初始计数值 |
void await() | void | 当前线程阻塞,直到计数器归零 |
boolean await(long timeout, TimeUnit unit) | boolean | 等待指定时间,如果超时返回 false |
void countDown() | void | 计数器减 1 |
long getCount() | long | 获取当前计数器值 |
典型场景一:主线程需要汇总多个子线程的计算结果
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
// 子线程执行任务
latch.countDown();
}).start();
}
latch.await(); // 主线程等待所有子线程完成
System.out.println("所有子线程已完成,主线程继续执行");典型场景二:控制依赖程序启动顺序
ountDownLatch serviceLatch = new CountDownLatch(3);
// 启动数据库服务
new Thread(() -> {
startDatabase();
serviceLatch.countDown();
}).start();
// 启动缓存服务
new Thread(() -> {
startCache();
serviceLatch.countDown();
}).start();
serviceLatch.await(); // 等待所有服务启动
startMainService();Semaphore、CountDownLatch、ReentrantLock 的区别工具类 | 控制并发数量 | 等待阻塞 | 支持释放 | 可复用 | 公平性控制 |
|---|---|---|---|---|---|
Semaphore | ✅ | ✅ | ✅ | ✅ | ✅ |
CountDownLatch | ❌(用于等待) | ✅ | ❌ | ❌(一次性) | ❌ |
ReentrantLock | ❌(互斥) | ✅ | ✅ | ✅ | ✅ |
类别 | 常用类名 | 特点和说明 |
|---|---|---|
老版同步类(Java 1.0+) | Vector、 Hashtable | 方法基本都被 synchronized 修饰,性能较差,不推荐新项目使用 |
并发集合(java.util.concurrent) | ConcurrentHashMap、ConcurrentLinkedQueue、CopyOnWriteArrayList | 为并发设计,性能更优,是现代推荐的线程安全集合 |
同步包装类(Collections.synchronizedXXX) | Collections.synchronizedList(List<T> list)、 synchronizedMap() 等 | 使用装饰器模式包装普通集合,实现线程安全,但使用时仍需手动同步遍历等操作 |
阻塞队列(BlockingQueue 系列) | ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、 DelayQueue等 | 内建锁机制,可用于线程间的安全通信,支持阻塞读写操作 |
💥注意事项:
ConcurrentLinkedQueue 是一个高性能、线程安全的非阻塞队列,它采用 CAS 保证并发安全,是非阻塞队列的经典实现。CopyOnWriteArrayList 是一种具有 "写时拷贝" 功能的容器,即每次修改操作(比如 add/remove/set)都会创建一个新的数组副本,然后在这个副本上面进行修改,最后等到没有线程在读取数组的时候,才将引用指向这个新数组。synchronizedList 是一种将普通 List 包装成线程安全版本的方式,原理是通过在所有操作的外部添加上 synchronized 关键字来保证只有一个线程可以访问集合的方法~Hashtable 与 ConcurrentHashMap 的区别Hashtable | ConcurrentHashMap(推荐⭐⭐⭐) | |
|---|---|---|
线程安全性 | ✅ | ✅ |
加锁策略 | 全表锁 | 读操作:无锁,配合 volatile 读取保证内存可见 写操作:分段锁(JDK7)或 桶锁+CAS(JDK8) |
扩容策略 | 由触发扩容的线程独自完成整个扩容过程,涉及到大量的元素拷贝,效率非常低 | 协作式扩容,即多个线程共同搬桶 (下面注意事项有过程) |
并发性能 | ❌ 性能差,每次读写都锁整个表 | ✅ 性能高,支持高并发读写 |
null 键/值支持 | ❌ 键和值都不能为 null | ❌ 键不能为 null,值可以为 null |
迭代器是否 fail-fast | ❌ | ✅ 不会严格 fail-fast,本质属于 fail-safe 即不抛异常,但也不保证看到实时最新数据 |
💥注意事项:
Hashtable 来说,它只是简单地在关键方法上添加了 synchronized 锁,粒度很粗,相当于给整个哈希表加锁,即所有读写操作都是互斥的,所以效率很低!ConcurrentHashMap 来说就不一样了,它是针对每个桶进行独立的加锁操作,在读操作时候甚至都不加锁,在写操作的时候会用上桶锁,此外在实现 size() 时候用 CAS 来保证一致性并且提高效率!ConcurrentHashMap 的线程,都会参与搬家的过程,每个操作负责搬运一小部分元素JDK 1.7 中引入的,JDK 1.8 中已经不再使用,简单的说就是把若干个哈希桶,分成一个段,然后针对每个段分别加锁,相比于桶锁来说效率要低,而且不灵活!public class DeadlockExample {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static void main(String[] args) {
Thread threadA = new Thread(() -> {
synchronized (lock1) {
System.out.println("Thread-A 持有 lock1");
try {
Thread.sleep(100); // 模拟操作耗时,增加死锁概率
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread-A 等待获取 lock2...");
synchronized (lock2) {
System.out.println("Thread-A 成功获取 lock2");
}
}
});
Thread threadB = new Thread(() -> {
synchronized (lock2) {
System.out.println("Thread-B 持有 lock2");
try {
Thread.sleep(100); // 模拟操作耗时,增加死锁概率
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread-B 等待获取 lock1...");
synchronized (lock1) {
System.out.println("Thread-B 成功获取 lock1");
}
}
});
threadA.start();
threadB.start();
}
}
// 程序会输出以下内容后卡住(死锁):
Thread-A 持有 lock1
Thread-B 持有 lock2
Thread-A 等待获取 lock2...
Thread-B 等待获取 lock1...synchronized (lock1) {
if (tryLock(lock2)) { // 尝试获取lock2
try {
// 执行任务
} finally {
unlock(lock2);
}
}
}3. 破坏不可剥夺条件:允许线程主动释放已持有的锁,如 ReentrantLock.unlock()
Lock lock = new ReentrantLock();
if (lock.tryLock()) {
try {
// 临界区
} finally {
lock.unlock(); // 主动释放
}
}4. 破坏循环等待条件:按固定顺序获取锁(全局锁排序)
// 定义锁的顺序
private static final Object[] locks = {lockA, lockB, lockC};
public void doWork() {
Arrays.sort(locks); // 按对象哈希值排序
synchronized (locks[0]) {
synchronized (locks[1]) {
synchronized (locks[2]) {
// 临界区
}
}
}
}5. 超时机制:使用 Lock.tryLock(timeout, unit) 设置超时时间
if (lock1.tryLock(1, TimeUnit.SECONDS)) {
try {
if (lock2.tryLock(1, TimeUnit.SECONDS)) {
try {
// 执行任务
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}ConcurrentHashMap、AtomicIntegerSemaphore:控制并发访问数量CountDownLatch/CyclicBarrier:协调线程执行顺序,避免嵌套锁原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。