JCU → java.util.concurrent
, Java 的并发库主包名, 用于实现高性能并发和多线程控制.
其实相关接口类不算很多, 90 个左右, 主要分布在以下 package:
java.util.concurrent
: 核心库java.util.concurrent.atomic
: 原子库java.util.concurrent.locks
: 锁库
1) JCU 核心库
核心包主要包含: 线程池、并发集合、阻塞队列、同步工具、响应式流 这些大类工具, 以及几个辅助类.
**4. 同步工具类**
**6. 响应式流 (Flow)**
* Flow, SubmissionPublisher
1.1) 线程池与任务控制
Executor 接口和抽象类:
- Executor: 最顶层接口, 只含一个 execute 方法, 抽象任务的提交执行.
- ExecutorService: 二级扩展接口, 抽象扩展==状态返回==版任务提交执行, 以及执行器的生命周期.
- ScheduledExecutorService: 三级扩展接口, 抽象扩展 ==定时和周期性任务 ==提交执行.
- AbstractExecutorService: 三级抽象类, 并未完整实现线程池, 提供线程池基础工具方法实现, 给实际线程池类继承使用.
Executor 实现
JDK 标准库中只提供了三个 Executor 的实现, 两个线程池的实现, 以及一个新建线程执行器.
- ThreadPoolExecutor: 继承 AbstractExecutorService 的实现, 最常用的线程池实现.
- ScheduledThreadPoolExecutor: 继承 ThreadPoolExecutor 的扩展线程池, 额外支持定时和周期性任务.
- ThreadPerTaskExecutor: 每个任务创建一个线程执行, 通常将 factory 属性设置为 VirtualThreadFactory 来与虚拟线程结合使用.
异常/策略
- RejectedExecutionHandler: Reject 策略接口, 抽象执行器无法接收任务时的处理.
- RejectedExecutionException: 内置默认 Reject 策略 AbortPolicy 下, 直接抛出的异常.
工具/工厂
- ThreadFactory: 线程工厂接口, 抽象了根据任务创建线程的方法.
- Executors: 提供了 Executor 工厂方法、包装器、适配器, 以及 ThreadFactory 的实现.
- 工厂方法:
- newFixedThreadPool: 固定线程线程池.
- newSingleThreadExecutor: 单线程执行器.
- newCachedThreadPool: 缓存线程池, 线程空闲存活 60 秒, 上限 Integer.MAX_VALUE, 基于 SynchronousQueue 实现.
- newScheduledThreadPool: 定时、周期任务线程池.
- newSingleThreadScheduledExecutor: 定时、周期任务单线程执行器.
- newWorkStealingPool: 工作窃取线程池, 本质是 ForkJoinPool.
- newThreadPerTaskExecutor: 任务新建线程执行器.
- newVirtualThreadPerTaskExecutor: 任务新建虚拟线程执行器(虚拟线程).
- 包装器: 部分工厂方法并不直接使用 ThreadPoolExecutor 的实现, 而是提供了基于 ExecutorService 的特殊包装类, 如单线程执行器的创建.
- DelegatedExecutorService: 代理并隐藏 ThreadPoolExecutor 特有方法.
- DelegatedScheduledExecutorService: 代理并隐藏 ScheduledThreadPoolExecutor 特有方法.
- AutoShutdownDelegatedExecutorService: newSingleThreadExecutor 会使用此包装, GC 时自动回收.
- 适配器: Callable 适配器, 将 Runnable 包装为 Callable.
- ThreadFactory 实现:
- defaultThreadFactory: 默认线程工厂, 创建名字类似 pool-{p_num}-thread-{t_num} 的线程, 无 Daemon, 正常优先级 NORM_PRIORITY.
privilegedThreadFactory: 已过时, 权限和类加载器增强
1.2) 任务和异步计算
接口
- Callable: 带返回值且可抛出异常的任务接口.
- Future: 异步任务提交结果接口, 提供了任务状态、结果访问等操作.
- RunnableFuture: Runnable/Callable 包装接口, ExecutorService 执行时使用该包装.
- RunnableScheduledFuture: 支持定时、周期任务的 RunnableFuture 子接口.
- FutureTask: 可取消的异步任务提交结果, 即 Future/RunnableFuture 的实现之一.
- ScheduledFuture: 定时、周期任务的异步执行结果接口, 继承 Delayed、Future, 可获取剩余延迟时间.
异步工具
- CompletionStage: 链式的异步任务执行工具接口.
- CompletableFuture: 带完成扩展的 Future/CompletionStage 实现, 可手动完成/异常完成, 且支持再各种状态下的回调.
- CompletionService: 抽象了 异步任务生成 与 已完成任务结果 解耦的接口, 往下只有一个实现.
- ExecutorCompletionService: CompletionService 实现, 代理 Executor 并维护了已完成任务队列, 默认采用
ForkJoinPool
, 但其并没有采用 CompletableFuture 而是基于 FutureTask 实现.
- ExecutorCompletionService: CompletionService 实现, 代理 Executor 并维护了已完成任务队列, 默认采用
异常
- CancellationException: 任务已取消但调用 get 时抛出.
- ExecutionException: 任务执行中抛出异常时包装抛出.
- CompletionException: 链式异步工具 CompletableFuture 中抛出的封装异常.
- TimeoutException: FutureTask.get(long timeout, TimeUnit unit) 超时抛出.
Fork/Join 库
- ForkJoinPool: Fork/Join 库线程池, 管理执行 ForkJoinTask.
- ForkJoinTask: 实现了 Future 的任务抽象类.
- RecursiveAction: 无返回值任务, ForkJoinTask 实现.
- RecursiveTask: 有返回值任务, ForkJoinTask 实现, 可以将子任务结果合并.
- CountedCompleter: 复杂任务, ForkJoinTask 实现.
- ForkJoinWorkerThread: ForkJoinPool 工作线程, 维护任务队列可窃取其他线程任务. 一般通过 ForkJoinPool 使用, 不直接使用.
- StructuredTaskScope: 结构化并发任务管理.
- StructureViolationException: StructuredTaskScope 相关异常.
ForkJoin 基本用法:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws Exception {
// Tasks build
List<Integer> tasks = new ArrayList<>();
long expectedResult = 0L;
for (int i = 0; i < 103; i++) {
expectedResult += i;
tasks.add(i);
}
// Pool build.
try (ForkJoinPool forkJoinPool = ForkJoinPool.commonPool()) {
// Tasks submit.
Long result = forkJoinPool.invoke(new MyTask(tasks, 0, tasks.size() - 1));
System.out.printf("Expected result: %d, result: %d.%n", expectedResult, result);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static class MyTask extends RecursiveTask<Long> {
private final List<Integer> items;
private final Integer startIndex;
private final Integer endIndex;
public MyTask(List<Integer> items, Integer startIndex, Integer endIndex) {
this.items = items;
this.startIndex = startIndex;
this.endIndex = endIndex;
}
@Override
protected Long compute() {
long sum = 0;
if (1 + endIndex - startIndex < 100) {
for (int i = startIndex; i <= endIndex; i++) {
sum += items.get(i);
}
System.out.printf("Small task scale, task result: %d.%n", sum);
return sum;
}
int middleIndex = (startIndex + endIndex) / 2;
MyTask leftTask = new MyTask(items, startIndex, middleIndex);
MyTask rightTask = new MyTask(items, middleIndex + 1, endIndex);
invokeAll(leftTask, rightTask);
Long leftResult = leftTask.join();
Long rightResult = rightTask.join();
long result = leftResult + rightResult;
System.out.printf("Large task scale, task merge result: %d.%n", result);
return result;
}
}
}
1.3) 并发集合
Map:
- ConcurrentMap: Map 并发安全语义接口.
- ConcurrentHashMap: 最经典的并发 Map, CAS+分段锁+红黑树实现.
- ConcurrentNavigableMap: 有序导航 Map 语义接口.
- ConcurrentSkipListMap: 基于跳表并发有序 Map, CAS+链表节点标记实现.
List:
- CopyOnWriteArrayList: 写时复制 ArrayList.
Set:
- CopyOnWriteArraySet: 线程安全去重 Set, 代理了 CopyOnWriteArrayList 并扩展 add 做扫描去重.
- ConcurrentSkipListSet: 代理了 ConcurrentSkipListMap 的并发有序 Set.
Queue/Deque:
接口:
- BlockingQueue: 阻塞队列接口.
- BlockingDeque: 二级阻塞双端队列接口.
- TransferQueue: 二级接口, 扩展了 transfer 方法, 可确保生产者插入的元素被消费者接收.
实现:
- ArrayBlockingQueue: 数组有界阻塞队列.
- LinkedBlockingQueue: 链表阻塞队列.
- SynchronousQueue: 同步移交队列, 生产阻塞等待消费, 0 容量不能且多 add.
- PriorityBlockingQueue: 优先级排序阻塞队列.
- DelayQueue: 代理了 PriorityQueue 的延时优先级排序阻塞队列.
- LinkedTransferQueue: 链表队列 Transfer 版.
- ConcurrentLinkedQueue: 链表队列 CAS 非阻塞版.
- LinkedBlockingDeque: 双端链表队列阻塞版.
- ConcurrentLinkedDeque: 双端链表队列 CAS 非阻塞版.
1.4) 同步工具
- CountDownLatch: 计数器闭锁, 阻塞线程, 等待其他线程将计数器归零, 不可复用.
- CyclicBarrier: 栅栏, 阻塞线程, 直到目标线程数满放行.
- Semaphore: 信号量, 限制同时执行的线程数量, 溢出阻塞.
- Exchanger: 交换器, 两线程交换数据.
- Phaser
- BrokenBarrierException
1.5) 响应式流
1.6) 其余零散辅助类
- TimeUnit
- Delayed
- ThreadLocalRandom
- Helpers
2) 原子库
java.util.concurrent.atomic 提供原子操作类,可支持无锁并发编程。
- AtomicBoolean
- AtomicInteger
- AtomicIntegerArray
- AtomicIntegerFieldUpdater
- AtomicLong
- AtomicLongArray
- AtomicLongFieldUpdater
- AtomicMarkableReference
- AtomicReference
- AtomicReferenceArray
- AtomicReferenceFieldUpdater
- AtomicStampedReference
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
- Striped64
3) 锁库
java.util.concurrent.locks 提供显式锁、读写锁和条件对象,功能比 synchronized 更灵活。 代表类:ReentrantLock、ReentrantReadWriteLock、StampedLock、Condition。
- AbstractOwnableSynchronizer
- AbstractQueuedLongSynchronizer
- AbstractQueuedSynchronizer
- Condition
- Lock
- LockSupport
- ReadWriteLock
- ReentrantLock
- ReentrantReadWriteLock
- StampedLock
评论(0)
暂无评论