boxmoe_header_banner_img

 

文章导读

Java JUC 库(更新)


avatar
wars 25 9 月, 2025 144

JCU → java.util.concurrent, Java 的并发库主包名, 用于实现高性能并发和多线程控制.

其实相关接口类不算很多, 90 个左右, 主要分布在以下 package:

  • java.util.concurrent: 核心库
  • java.util.concurrent.atomic: 原子库
  • java.util.concurrent.locks: 锁库

1) JCU 核心库

核心包主要包含: 线程池、并发集合、阻塞队列、同步工具、响应式流 这些大类工具, 以及几个辅助类.

**4. 同步工具类**

**6. 响应式流 (Flow)**
* Flow, SubmissionPublisher

1.1) 线程池与任务控制

Executor 接口和抽象类:

  • Executor: 最顶层接口, 只含一个 execute 方法, 抽象任务的提交执行.
    • ExecutorService: 二级扩展接口, 抽象扩展==状态返回==版任务提交执行, 以及执行器的生命周期.
    • ScheduledExecutorService: 三级扩展接口, 抽象扩展 ==定时和周期性任务 ==提交执行.
    • AbstractExecutorService: 三级抽象类, 并未完整实现线程池, 提供线程池基础工具方法实现, 给实际线程池类继承使用.

Executor 实现

JDK 标准库中只提供了三个 Executor 的实现, 两个线程池的实现, 以及一个新建线程执行器.

  • ThreadPoolExecutor: 继承 AbstractExecutorService 的实现, 最常用的线程池实现.
    • ScheduledThreadPoolExecutor: 继承 ThreadPoolExecutor 的扩展线程池, 额外支持定时和周期性任务.
  • ThreadPerTaskExecutor: 每个任务创建一个线程执行, 通常将 factory 属性设置为 VirtualThreadFactory 来与虚拟线程结合使用.

异常/策略

  • RejectedExecutionHandler: Reject 策略接口, 抽象执行器无法接收任务时的处理.
  • RejectedExecutionException: 内置默认 Reject 策略 AbortPolicy 下, 直接抛出的异常.

工具/工厂

  • ThreadFactory: 线程工厂接口, 抽象了根据任务创建线程的方法.
  • Executors: 提供了 Executor 工厂方法、包装器、适配器, 以及 ThreadFactory 的实现.
    • 工厂方法:
    • newFixedThreadPool: 固定线程线程池.
    • newSingleThreadExecutor: 单线程执行器.
    • newCachedThreadPool: 缓存线程池, 线程空闲存活 60 秒, 上限 Integer.MAX_VALUE, 基于 SynchronousQueue 实现.
    • newScheduledThreadPool: 定时、周期任务线程池.
    • newSingleThreadScheduledExecutor: 定时、周期任务单线程执行器.
    • newWorkStealingPool: 工作窃取线程池, 本质是 ForkJoinPool.
    • newThreadPerTaskExecutor: 任务新建线程执行器.
    • newVirtualThreadPerTaskExecutor: 任务新建虚拟线程执行器(虚拟线程).
    • 包装器: 部分工厂方法并不直接使用 ThreadPoolExecutor 的实现, 而是提供了基于 ExecutorService 的特殊包装类, 如单线程执行器的创建.
    • DelegatedExecutorService: 代理并隐藏 ThreadPoolExecutor 特有方法.
    • DelegatedScheduledExecutorService: 代理并隐藏 ScheduledThreadPoolExecutor 特有方法.
    • AutoShutdownDelegatedExecutorService: newSingleThreadExecutor 会使用此包装, GC 时自动回收.
    • 适配器: Callable 适配器, 将 Runnable 包装为 Callable.
    • ThreadFactory 实现:
    • defaultThreadFactory: 默认线程工厂, 创建名字类似 pool-{p_num}-thread-{t_num} 的线程, 无 Daemon, 正常优先级 NORM_PRIORITY.
    • privilegedThreadFactory: 已过时, 权限和类加载器增强

1.2) 任务和异步计算

接口

  • Callable: 带返回值且可抛出异常的任务接口.
  • Future: 异步任务提交结果接口, 提供了任务状态、结果访问等操作.
    • RunnableFuture: Runnable/Callable 包装接口, ExecutorService 执行时使用该包装.
    • RunnableScheduledFuture: 支持定时、周期任务的 RunnableFuture 子接口.
    • FutureTask: 可取消的异步任务提交结果, 即 Future/RunnableFuture 的实现之一.
  • ScheduledFuture: 定时、周期任务的异步执行结果接口, 继承 Delayed、Future, 可获取剩余延迟时间.

异步工具

  • CompletionStage: 链式的异步任务执行工具接口.
    • CompletableFuture: 带完成扩展的 Future/CompletionStage 实现, 可手动完成/异常完成, 且支持再各种状态下的回调.
  • CompletionService: 抽象了 异步任务生成 与 已完成任务结果 解耦的接口, 往下只有一个实现.
    • ExecutorCompletionService: CompletionService 实现, 代理 Executor 并维护了已完成任务队列, 默认采用 ForkJoinPool, 但其并没有采用 CompletableFuture 而是基于 FutureTask 实现.

异常

  • CancellationException: 任务已取消但调用 get 时抛出.
  • ExecutionException: 任务执行中抛出异常时包装抛出.
  • CompletionException: 链式异步工具 CompletableFuture 中抛出的封装异常.
  • TimeoutException: FutureTask.get(long timeout, TimeUnit unit) 超时抛出.

Fork/Join 库

  • ForkJoinPool: Fork/Join 库线程池, 管理执行 ForkJoinTask.
  • ForkJoinTask: 实现了 Future 的任务抽象类.
    • RecursiveAction: 无返回值任务, ForkJoinTask 实现.
    • RecursiveTask: 有返回值任务, ForkJoinTask 实现, 可以将子任务结果合并.
    • CountedCompleter: 复杂任务, ForkJoinTask 实现.
  • ForkJoinWorkerThread: ForkJoinPool 工作线程, 维护任务队列可窃取其他线程任务. 一般通过 ForkJoinPool 使用, 不直接使用.
  • StructuredTaskScope: 结构化并发任务管理.
    • StructureViolationException: StructuredTaskScope 相关异常.

ForkJoin 基本用法:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws Exception {
        // Tasks build
        List<Integer> tasks = new ArrayList<>();
        long expectedResult = 0L;
        for (int i = 0; i < 103; i++) {
            expectedResult += i;
            tasks.add(i);
        }
        // Pool build.
        try (ForkJoinPool forkJoinPool = ForkJoinPool.commonPool()) {
            // Tasks submit.
            Long result = forkJoinPool.invoke(new MyTask(tasks, 0, tasks.size() - 1));
            System.out.printf("Expected result: %d, result: %d.%n", expectedResult, result);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static class MyTask extends RecursiveTask<Long> {

        private final List<Integer> items;
        private final Integer startIndex;
        private final Integer endIndex;

        public MyTask(List<Integer> items, Integer startIndex, Integer endIndex) {
            this.items = items;
            this.startIndex = startIndex;
            this.endIndex = endIndex;
        }

        @Override
        protected Long compute() {
            long sum = 0;
            if (1 + endIndex - startIndex < 100) {
                for (int i = startIndex; i <= endIndex; i++) {
                    sum += items.get(i);
                }
                System.out.printf("Small task scale, task result: %d.%n", sum);
                return sum;
            }
            int middleIndex = (startIndex + endIndex) / 2;
            MyTask leftTask = new MyTask(items, startIndex, middleIndex);
            MyTask rightTask = new MyTask(items, middleIndex + 1, endIndex);
            invokeAll(leftTask, rightTask);
            Long leftResult = leftTask.join();
            Long rightResult = rightTask.join();

            long result = leftResult + rightResult;
            System.out.printf("Large task scale, task merge result: %d.%n", result);
            return result;
        }
    }
}

1.3) 并发集合

Map:

  • ConcurrentMap: Map 并发安全语义接口.
    • ConcurrentHashMap: 最经典的并发 Map, CAS+分段锁+红黑树实现.
    • ConcurrentNavigableMap: 有序导航 Map 语义接口.
    • ConcurrentSkipListMap: 基于跳表并发有序 Map, CAS+链表节点标记实现.

List:

  • CopyOnWriteArrayList: 写时复制 ArrayList.

Set:

  • CopyOnWriteArraySet: 线程安全去重 Set, 代理了 CopyOnWriteArrayList 并扩展 add 做扫描去重.
  • ConcurrentSkipListSet: 代理了 ConcurrentSkipListMap 的并发有序 Set.

Queue/Deque:

接口:

  • BlockingQueue: 阻塞队列接口.
    • BlockingDeque: 二级阻塞双端队列接口.
    • TransferQueue: 二级接口, 扩展了 transfer 方法, 可确保生产者插入的元素被消费者接收.

实现:

  • ArrayBlockingQueue: 数组有界阻塞队列.
  • LinkedBlockingQueue: 链表阻塞队列.
  • SynchronousQueue: 同步移交队列, 生产阻塞等待消费, 0 容量不能且多 add.
  • PriorityBlockingQueue: 优先级排序阻塞队列.
  • DelayQueue: 代理了 PriorityQueue 的延时优先级排序阻塞队列.
  • LinkedTransferQueue: 链表队列 Transfer 版.
  • ConcurrentLinkedQueue: 链表队列 CAS 非阻塞版.
  • LinkedBlockingDeque: 双端链表队列阻塞版.
  • ConcurrentLinkedDeque: 双端链表队列 CAS 非阻塞版.

1.4) 同步工具

  • CountDownLatch: 计数器闭锁, 阻塞线程, 等待其他线程将计数器归零, 不可复用.
  • CyclicBarrier: 栅栏, 阻塞线程, 直到目标线程数满放行.
  • Semaphore: 信号量, 限制同时执行的线程数量, 溢出阻塞.
  • Exchanger: 交换器, 两线程交换数据.
  • Phaser
  • BrokenBarrierException

1.5) 响应式流

1.6) 其余零散辅助类

  • TimeUnit
  • Delayed
  • ThreadLocalRandom
  • Helpers

2) 原子库

java.util.concurrent.atomic 提供原子操作类,可支持无锁并发编程。

  • AtomicBoolean
  • AtomicInteger
  • AtomicIntegerArray
  • AtomicIntegerFieldUpdater
  • AtomicLong
  • AtomicLongArray
  • AtomicLongFieldUpdater
  • AtomicMarkableReference
  • AtomicReference
  • AtomicReferenceArray
  • AtomicReferenceFieldUpdater
  • AtomicStampedReference
  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder
  • Striped64

3) 锁库

java.util.concurrent.locks 提供显式锁、读写锁和条件对象,功能比 synchronized 更灵活。 代表类:ReentrantLock、ReentrantReadWriteLock、StampedLock、Condition。

  • AbstractOwnableSynchronizer
  • AbstractQueuedLongSynchronizer
  • AbstractQueuedSynchronizer
  • Condition
  • Lock
  • LockSupport
  • ReadWriteLock
  • ReentrantLock
  • ReentrantReadWriteLock
  • StampedLock


评论(0)

查看评论列表

暂无评论


发表评论

表情 颜文字

插入代码