虚拟线程在JDK19中以preview状态到来,经过JDK19和JDK20的测试,接收开发者的反馈和打磨,最终在JDK21(2023-09-19)中正式发布。JDK19中带来了两个在整个最酷的特征就是JEP444 虚拟线程和JEP428 结构化并发。虚拟线程的到来将使Java编程一门真正低成本进行并发程序开发的语言。
本文将以最完整的方式来介绍虚拟线程,使本文章成为虚拟线程领域最佳文章。
准备工作
我们使用slf4j
记录日志以便观察虚拟线程的行为情况.
static final Logger logger = LoggerFactory.getLogger(App.class);
为了方便记录信息,我们在这里封装一个log
方法,方法中主要的改造点的就是输出了当前线程。
static void log(String message) {
logger.info("{} | " + message, Thread.currentThread());
}
为什么选择虚拟线程?
JVM 是一种多线程语言,众所周知,JVM通过java.util.Thread
提供了对操作系统线程的抽象。在Project Loom(虚拟线程的项目名)发布之前,每一个java.util.Thread
都是对操作系统线程的包装,它们是1:1的。将这种原始的线程称之为平台线程(Platform Thread).
使用操作系统线程的问题是会带来一系列昂贵的代价。首先, 创建线程是有成本的。每当创建一个平台线程,操作系统必须申请Mb(默认为1Mb)级别的内存来作为线程栈的存储空间,用来保存线程上下文,Java调用栈信息,native信息。
可以很轻松想象到,多线程在时间和空间上都会带来成本。事实上,栈内存的大小会严重限制可创建内存的数量。在Java中,我们只要不断创建线程,就能耗尽操作系统内存,从而轻松触发OutOfMemoryError
.
private static void stackOverFlowErrorExample() {
for (int i = 0; i < 1_000_000; i++) {
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
}
结果取决于操作系统和硬件配置,在笔者的8Gb MBP m1 上,很快就能触发问题。
[0.949s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
[0.949s][warning][os,thread] Failed to start the native thread for java.lang.Thread "Thread-4073"
Exception in thread "main" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
Java 是一种追求简单的编程语言,在并发编程中,最简单的办法就是把他们写成类似同步的方式。目前并发任务最佳的方式就是为每一个任务开启一个线程(Thread-pre-request)。
在这种编程方式中,每个线程都可以使用线程的局部变量来保存信息,这使线程之间需要共享的状态减少,从而使并发编程变得简单。但是Thread-pre-request的模型,会使我们更容易到达thread数量的限制。Java中会搭配线程池来管理多线程编程中的线程,但是使用线程池会带来更多需要考虑的问题:排队延迟;线程数量设置;拒绝策略支持等,稍有不注意,一个参数配置错误,就会导致系统产生OOM,慢响应等问题。
另外一种避免多线程的方式是使用非阻塞IO,这种方式会带来大量的callback,从而形成回调地狱(callback hell),非常影响代码的可读性、可维护性,甚至是编写难度。
static void callbackHell() {
a(aInput, resultFromA ->
b(resultFromA, resultFromB ->
c(resultFromB, resultFromC ->
d(resultFromC, resultFromD ->
System.out.printf("A, B, C, D: $resultFromA, $resultFromB, $resultFromC, $resultFromD")))));
}
为了解决回调地狱问题,响应式编程,async/await 关键字等解决方案被提出。
响应式编程试图使用 DSL 语言对数据进行流式处理并且实现并发。但是DSL十分难以理解,带来非常大的学习成本和心智负担。
public Observable<String> getSmsCode(String mobile) {
return getService().getSmsCode(mobile, "GRAVIDA")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Response<String>, Observable<String>>() {
@Override
public Observable<String> call(Response<String> stringResponse) {
return flatResponse(stringResponse);
}
});
}
另外一门基于JVM的语言Kotlin,就使用了类似async/await的方案,kotlin的协程依赖 suspending
功能,suspending
底层依赖非阻塞IO ,但并不是所有功能/库都提供了非阻塞IO。这还会导致整个程序被分割为两部分,一部分是同步的,另外一部分是suspending
的.这是一件对代码维护性非常有挑战性的事。同样会使我们丢失程序的简单性。
基于上述这些原因,Java需要一个更好的方式来编写并发程序。虚拟线程就是目前最合适的方案,Java之父曾说过,虚拟线程将终结响应式编程。接下来我们详细学习虚拟线程。
创建虚拟线程
虚拟线程是一种新的线程,前面我们介绍过平台线程的概念,它会尝试解决线程的资源使用问题。作为java.util.Thread
的替代品,虚拟线程使用堆(heap)保存栈帧数据而不是栈(stack)。
虚拟线程的内存占用非常小,只有几百bytes.并且此时的栈空间可以随时调整,不需要每个线程都申请大量的内存。
创建虚拟线程非常简单。java.util.Thread
提供了新的工厂方法ofVirtual()
.下面的代码就使用该方法创建一个带有名字的虚拟线程.
private static Thread virtualThread(String name, Runnable runnable){
return Thread.ofVirtual().name(name).start(runnable);
}
我们会使用一个例子来演示虚拟线程,比如我们去洗澡.
static Thread bathTime() {
return virtualThread(
"Bath time",
() -> {
log("I'm going to take a bath");
sleep(Duration.ofMillis(500L));
log("I'm done with the bath");
});
}
然后喝一杯茶。
static Thread boilingWater() {
return virtualThread(
"Drink some tea",
() -> {
log("I'm going to drink some tea");
sleep(Duration.ofSeconds(1L));
log("I'm done with the tea");
});
}
把任务组合到一起
static void concurrentMorningRoutine() {
var bathTime = bathTime();
var boilingWater = boilingWater();
bathTime.join();
boilingWater.join();
}
通过线程的join,可以避免在main方法在异步任务完成之前结束。我们查看输出
08:34:46.217 [boilWater] INFO in.rcard.virtual.threads.App - VirtualThread[#21,boilWater]/runnable@ForkJoinPool-1-worker-1 | I'm going to take a bath
08:34:46.218 [boilWater] INFO in.rcard.virtual.threads.App - VirtualThread[#23,boilWater]/runnable@ForkJoinPool-1-worker-2 | I'm going to boil some water
08:34:46.732 [bath-time] INFO in.rcard.virtual.threads.App - VirtualThread[#21,boilWater]/runnable@ForkJoinPool-1-worker-2 | I'm done with the bath
08:34:47.231 [boilWater] INFO in.rcard.virtual.threads.App - VirtualThread[#23,boilWater]/runnable@ForkJoinPool-1-worker-2 | I'm done with the water
输出结果符合我们的预期,两个线程并发交替执行。
这里使用了Thread来创建线程,工作中我们一般会更有限使用Executors的线程池来维护和管理线程。
static void concurrentMorningRoutineUsingExecutors() {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var bathTime =
executor.submit(
() -> {
log("I'm going to take a bath");
sleep(Duration.ofMillis(500L));
log("I'm done with the bath");
});
var boilingWater =
executor.submit(
() -> {
log("I'm going to boil some water");
sleep(Duration.ofSeconds(1L));
log("I'm done with the water");
});
bathTime.get();
boilingWater.get();
}
}
提供线程名命名工厂
final ThreadFactory factory = Thread.ofVirtual().name("routine-", 0).factory();
try (var executor = Executors.newThreadPerTaskExecutor(factory)) {
var bathTime =
executor.submit(
() -> {
log("I'm going to take a bath");
sleep(Duration.ofMillis(500L));
log("I'm done with the bath");
});
var boilingWater =
executor.submit(
() -> {
log("I'm going to boil some water");
sleep(Duration.ofSeconds(1L));
log("I'm done with the water");
});
bathTime.get();
boilingWater.get();
}
ThreadFactory
使用统一配置来创建线程,我们这里的用法中,线程拥有一个前缀routine-
来命名线程,数字从0开始。
08:44:35.390 [routine-1] INFO in.rcard.virtual.threads.App - VirtualThread[#23,routine-1]/runnable@ForkJoinPool-1-worker-2 | I'm going to boil some water
08:44:35.390 [routine-0] INFO in.rcard.virtual.threads.App - VirtualThread[#21,routine-0]/runnable@ForkJoinPool-1-worker-1 | I'm going to take a bath
08:44:35.900 [routine-0] INFO in.rcard.virtual.threads.App - VirtualThread[#21,routine-0]/runnable@ForkJoinPool-1-worker-1 | I'm done with the bath
08:44:36.399 [routine-1] INFO in.rcard.virtual.threads.App - VirtualThread[#23,routine-1]/runnable@ForkJoinPool-1-worker-1 | I'm done with the water
现在我们都会创建虚拟线程了,接下来我们来看看虚拟线程是如何工作的。
虚拟线程如何工作
虚拟线程如何工作?下面的图表达了平台线程和虚拟线程之前的关系。
JVM会维护一个平台线程池,通过ForkJoinPool
(此ForkJoinPool和之前系统中的ForkJoinPool不是一个)来创建和管理,这些用来支持虚拟线程的平台线程被称为载体线程(Carry Thread).默认情况下,载体线程数等于cpu核心数,并且不会超过256.
对于创建的每个虚拟线程,JVM负责调度它的执行,从空闲的载体线程中选择一个,从堆中临时复制虚拟线程的栈信息到载体线程的栈中。
我们可以从上面的日志中分析:
08:44:35.390 [routine-1] INFO in.rcard.virtual.threads.App - VirtualThread[#23,routine-1]/runnable@ForkJoinPool-1-worker-2 | I'm going to boil some water
我们先看|
左边的部分,这其中保护了虚拟线程的id标识符VirtualThread[#23,routine-1]
,#23
和routine-1
,其次是ForkJoinPool-1-worker-2
说明该线程worker-2
来自线程池ForkJoinPool-1
中。
当虚拟线程第一次遇到阻塞操作时,载体线程被释放,线程上的栈信息被拷贝回堆中。这表明载体线程可以运行任务虚拟线程,当被阻塞的虚拟线程结束阻塞操作时,调度器会再度运行它。可能在同一个载体线程或者其他线程上。
使用api获取当前系统的cpu数量。在现代cpu中,往往使用超线程(Hyper-Thread)技术,会使cpu的逻辑核心=物理核心*2。Java api中获取的是逻辑核心数量。比如笔者使用的笔记本CPU为Intel G7400,cpu规格为2核心4线程(也叫做2c4t).
static int numberOfCores() {
return Runtime.getRuntime().availableProcessors();
}
启动数量为cpu+1的虚拟线程
static void viewCarrierThreadPoolSize() {
final ThreadFactory factory = Thread.ofVirtual().name("routine-", 0).factory();
try (var executor = Executors.newThreadPerTaskExecutor(factory)) {
IntStream.range(0, numberOfCores() + 1)
.forEach(i -> executor.submit(() -> {
log("Hello, I'm a virtual thread number " + i);
sleep(Duration.ofSeconds(1L));
}));
}
}
我们期望启动5个虚拟线程,同时只有4个平台线程作为载体。
08:44:54.849 [routine-0] INFO in.rcard.virtual.threads.App - VirtualThread[#21,routine-0]/runnable@ForkJoinPool-1-worker-1 | Hello, I'm a virtual thread number 0
08:44:54.849 [routine-1] INFO in.rcard.virtual.threads.App - VirtualThread[#23,routine-1]/runnable@ForkJoinPool-1-worker-2 | Hello, I'm a virtual thread number 1
08:44:54.849 [routine-2] INFO in.rcard.virtual.threads.App - VirtualThread[#24,routine-2]/runnable@ForkJoinPool-1-worker-3 | Hello, I'm a virtual thread number 2
08:44:54.855 [routine-4] INFO in.rcard.virtual.threads.App - VirtualThread[#26,routine-4]/runnable@ForkJoinPool-1-worker-4 | Hello, I'm a virtual thread number 4
08:44:54.849 [routine-3] INFO in.rcard.virtual.threads.App - VirtualThread[#25,routine-3]/runnable@ForkJoinPool-1-worker-4 | Hello, I'm a virtual thread number 3
从结果上看,我们成功启动了5个虚拟线程,编号从routine-0到4,对应的平台线程只有4个,从ForkJoinPool-1-worker-1到-4,其中-4出现了2次,这也和我们上面的描述相匹配。
虚拟线程是如何进行调度的
虚拟线程内部使用 FIFO (先进先出) 队列和ForkJoinPool
进行消费。java.util.VirtualThread
中定义了默认的调度器。
final class VirtualThread extends BaseVirtualThread {
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
// 忽略了其他代码
private static ForkJoinPool createDefaultScheduler() {
// 忽略了其他代码
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
// 忽略了其他代码
return new ForkJoinPool(parallelism, factory, handler, asyncMode,
0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
}
}
使用上述系统属性指定载体线程池大小。默认池大小等于cpu数量,值最大为256.
虚拟线程实现了协作调度,虚拟线程自身决定合适让出(yield)线程.更具体一点,该操作传递到调度器,然后虚拟线程就被从载体线程上卸载。
我们可以使用sleep
方法和上述属性来验证这些观点。我们先定义一个方法,该方法会进入一个while
的死循环中,无法执行到后续的sleep
方法.
static Thread workingHard() {
return virtualThread(
"Working hard",
() -> {
log("I'm working hard");
while (alwaysTrue()) {
// Do nothing
}
sleep(Duration.ofMillis(100L));
log("I'm done with working hard");
});
}
在这个代码中,定义了一个alwaysTure
方法来绕过编译器过死代码(dead code)的检查,该方法永远返回true
.
然后我们再定义一个方法来让员工进行休息
static Thread takeABreak() {
return virtualThread(
"Take a break",
() -> {
log("I'm going to take a break");
sleep(Duration.ofSeconds(1L));
log("I'm done with the break");
});
}
将两部分功能组合起来。
static void workingHardRoutine() {
var workingHard = workingHard();
var takeABreak = takeABreak();
workingHard.join();
takeABreak.join();
}
在运行代码之前,我们先设置一下参数.
-Djdk.virtualThreadScheduler.parallelism=1
-Djdk.virtualThreadScheduler.maxPoolSize=1
-Djdk.virtualThreadScheduler.minRunnable=1
以上参数的设置效果是,载体线程池只有1个线程。并且workHard
任务会死循环占用线程,永远不会让出线程。
21:28:35.702 [Working hard] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Working hard]/runnable@ForkJoinPool-1-worker-1 | I'm working hard
--- hang住,无其他输出 ---
结果显示workHard
无限占用线程,takeABreak
虚拟线程永远无法执行。
现在改动为每次循环休眠100ms,从而使协作调度能够发生。workHard
现在会让出线程。
static Thread workingConsciousness() {
return virtualThread(
"Working consciousness,
() -> {
log("I'm working hard");
while (alwaysTrue()) {
sleep(Duration.ofMillis(100L));
}
log("I'm done with working hard");
});
}
现在workingConsciousness
方法中有阻塞操作sleep
,能够让出线程。将两部分代码合并到一起,从而方便验证。
static void workingConsciousnessRoutine() {
var workingConsciousness = workingConsciousness();
var takeABreak = takeABreak();
workingConsciousness.join();
takeABreak.join();
}
我们预测takeABreak
虚拟线程将在workingConsciousness
执行到阻塞操作后运行。结果上也表明了这一点。
21:30:51.677 [Working consciousness] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Working consciousness]/runnable@ForkJoinPool-1-worker-1 | I'm working hard
21:30:51.682 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm going to take a break
21:30:52.688 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm done with the break
--- hang住,永远运行 ---
和预料中一样,两个虚拟线程(#21和#23)共享了载体线程worker-1
现在我们回到workHard
方法中,同时将载体线程池设置为2,此时我们可以看到两个虚拟线程同时运行,并且使用了不同的载体线程。
-Djdk.virtualThreadScheduler.parallelism=2
-Djdk.virtualThreadScheduler.maxPoolSize=2
-Djdk.virtualThreadScheduler.minRunnable=2
我们预料中的是,两个虚拟线程会同时执行并且使用worker-1
和worker-2
载体线程.
21:33:43.641 [Working hard] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Working hard]/runnable@ForkJoinPool-1-worker-1 | I'm working hard
21:33:43.641 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#24,Take a break]/runnable@ForkJoinPool-1-worker-2 | I'm going to take a break
21:33:44.655 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#24,Take a break]/runnable@ForkJoinPool-1-worker-2 | I'm done with the break
--- hang住,永远执行 ---
值得注意的是,只有在能够高度协作的环境下,线程协作才有意义。由于虚拟线程只有才阻塞操作时才会让出线程,所以协作调度和虚拟线程不会提高cpu密集型应用的性能。
虚拟线程挂起现象
前面提到过,虚拟线程运行在载体线程上,当产生阻塞(block)操作时,会自动让出载体线程。当虚拟线程阻塞结束时,再由调度器选择新的载体线程恢复执行。
可惜的是,到目前为止,存在一部分阻塞操作无法从载体线程上让出,导致载体线程也阻塞。这种情况被称为挂起(Pinned).这不会导致程序错误,但是会影响程序的扩展性和并发性。如果载体线程被挂起,并且载体线程池允许的话,JVM将会自动添加一个载体线程。
有两种明确的原因将导致挂起:
- 代码执行到
synchronized
代码块或者方法。或者调用了Object.wait()
- 调用了native方法或者foreign 方法。比如使用JNI调用了native库
接下来我们构造一个相应的例子,模拟只有一个厕所,所以上厕所方法添加了synchronized
关键字。
static class Bathroom {
synchronized void useTheToilet() {
log("I'm going to use the toilet");
sleep(Duration.ofSeconds(1L));
log("I'm done with the toilet");
}
}
模拟一个员工使用卫生间
static Bathroom bathroom = new Bathroom();
static Thread goToTheToilet() {
return virtualThread(
"Go to the toilet",
() -> bathroom.useTheToilet());
}
再模拟出两个员工kwn和smy,kwn使用厕所,smy等待休息。
static void twoEmployeesInTheOffice() {
var kwn = goToTheToilet();
var smy = takeABreak();
kwn.join();
smy.join();
}
为了看到占用载体线程的效果,这里将载体线程池大小设置为1。然后运行该程序。输出如下
16:29:05.548 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm going to use the toilet
16:29:06.558 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm done with the toilet
16:29:06.559 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm going to take a break
16:29:07.563 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm done with the break
从结果上可以看到,被synchronized
修饰的Go to the toilet
占用了线程,直到执行完成后,第二个任务才开始运行。时间刚好相差1s。
通过-Djdk.tracePinnedThreads=full/short
参数可以设置JVM跟踪挂起的线程。
full
参数会展示展示挂起虚拟线程的完全stack trace,而short
只会展示少量的信息。下面就是用short
所展示的挂起虚拟线程的信息。
16:29:05.548 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm going to use the toilet
Thread[#22,ForkJoinPool-1-worker-1,5,CarrierThreads]
virtual.threads.playground/in.rcard.virtual.threads.App$Bathroom.useTheToilet(App.java:188) <== monitors:1
16:29:06.558 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm done with the toilet
16:29:06.559 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm going to take a break
16:29:07.563 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm done with the break
可以配置载体线程池大小从而使JVM能够在载体线程不足时创建新的线程.
-Djdk.virtualThreadScheduler.parallelism=1
-Djdk.virtualThreadScheduler.maxPoolSize=2
-Djdk.virtualThreadScheduler.minRunnable=1
使用新的配置执行程序。
16:32:05.235 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm going to use the toilet
16:32:05.235 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-2 | I'm going to take a break
16:32:06.243 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm done with the toilet
16:32:06.243 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-2 | I'm done with the break
JVM 在无法找空闲载体线程时自动添加了新的载体线程worker-2
。
使用java.util.concurrent.locks.ReentrantLock
中的Lock API替换synchronized
可以解决该问题。
static class Bathroom {
private final Lock lock = new ReentrantLock();
@SneakyThrows
void useTheToiletWithLock() {
if (lock.tryLock(10, TimeUnit.SECONDS)) {
try {
log("I'm going to use the toilet");
sleep(Duration.ofSeconds(1L));
log("I'm done with the toilet");
} finally {
lock.unlock();
}
}
}
}
使用新版代码
static Thread goToTheToiletWithLock() {
return virtualThread("Go to the toilet", () -> bathroom.useTheToiletWithLock());
}
@SneakyThrows
static void twoEmployeesInTheOfficeWithLock() {
var riccardo = goToTheToiletWithLock();
var daniel = takeABreak();
riccardo.join();
daniel.join();
}
最终结果展示两个虚拟线程并发运行。
16:35:58.921 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-2 | I'm going to take a break
16:35:58.921 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-1 | I'm going to use the toilet
16:35:59.932 [Take a break] INFO in.rcard.virtual.threads.App - VirtualThread[#23,Take a break]/runnable@ForkJoinPool-1-worker-1 | I'm done with the break
16:35:59.933 [Go to the toilet] INFO in.rcard.virtual.threads.App - VirtualThread[#21,Go to the toilet]/runnable@ForkJoinPool-1-worker-2 | I'm done with the toilet
可以同样添加-Djdk.tracePinnedThreads=full
来查看是否存在PinnedThread情况。
ThreadLocal 和 线程池
虚拟线程曾经不支持ThreadLocal,但是在开发者的反馈下,最后JDK21发布时,支持了ThreadLocal。我们一起来看看在虚拟线程中如何使用ThreadLocal.
static ThreadLocal<String> context = new ThreadLocal<>();
static void virtualThreadLocal() {
var virtualThread1 = Thread.ofVirtual().name("thread-1").start(() -> {
context.set("thread-1");
sleep(Duration.ofSeconds(1L));
log("Hey, my name is " + context.get());
});
var virtualThread2 = Thread.ofVirtual().name("thread-2").start(() -> {
context.set("thread-2");
sleep(Duration.ofSeconds(1L));
log("Hey, my name is " + context.get());
});
virtualThread1.join();
virtualThread2.join();
}
输出为
15:08:37.142 [thread-1] INFO in.rcard.virtual.threads.App - VirtualThread[#21,thread-1]/runnable@ForkJoinPool-1-worker-1 | Hey, my name is thread-1
15:08:37.142 [thread-2] INFO in.rcard.virtual.threads.App - VirtualThread[#23,thread-2]/runnable@ForkJoinPool-1-worker-2 | Hey, my name is thread-2
和平台线程类似,使用方式一样。
但是这不代表在虚拟线程场景中使用ThreadLocal是一个很好的想法,相反,反而需要注意这种情况。原因在于现在可以轻易创建大量的虚拟线程,同时每个虚拟线程都拥有自己的ThreadLocal,这就意味着,它们占用的内存可能会意想不到的大。
未来可能会有其他方案能在虚拟线程场景代替ThreadLocal,JDK20中的特征scoped value将允许在线程中和跨线程共享不可变变量。
虚拟线程内部实现
本章节中,我们将介绍虚拟线程的原理,为什么能做到线程切换等等。
本章节不会特别深入,但是会讲解到基本的概念。
Project Loom 提供了新的API Continuation来进行线程的让出和恢复。虚拟线程存储了自身运行的必要信息。
虚拟线程被JVM天然地支持,Continuation的执行就是一堆对于JVM的native调用。
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);
// choose scheduler if not specified
if (scheduler == null) {
Thread parent = Thread.currentThread();
if (parent instanceof VirtualThread vparent) {
scheduler = vparent.scheduler;
} else {
scheduler = DEFAULT_SCHEDULER;
}
}
this.scheduler = scheduler;
this.cont = new VThreadContinuation(this, task);
this.runContinuation = this::runContinuation;
}
之后一个Continuation
对象被创建,VThreadContinuation
.
private static class VThreadContinuation extends Continuation {
VThreadContinuation(VirtualThread vthread, Runnable task) {
super(VTHREAD_SCOPE, () -> vthread.run(task));
}
@Override
protected void onPinned(Continuation.Pinned reason) {
if (TRACE_PINNING_MODE > 0) {
boolean printAll = (TRACE_PINNING_MODE == 1);
PinnedThreadPrinter.printStackTrace(System.out, printAll);
}
}
}
上面一段代码同样展示了jdk.tracePinnedThreads
参数是如何工作的。
一旦调用start
方法,状态就变更为started
.
@Override
void start(ThreadContainer container) {
if (!compareAndSetState(NEW, STARTED)) {
throw new IllegalThreadStateException("Already started");
}
// Omissis
try {
// Omissis
// submit task to run thread
submitRunContinuation();
started = true;
} finally {
// Omissis
}
}
submitRunContinuation 方法将runContinuation
提交给调度器进行调度。
private void submitRunContinuation(boolean lazySubmit) {
try {
if (lazySubmit && scheduler instanceof ForkJoinPool pool) {
pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
} else {
scheduler.execute(runContinuation);
}
} catch (RejectedExecutionException ree) {
// Omissis
}
}
runContinuation 方法的执行会使状态变为running
,不管之前是STARTED
或者RUNNABLE
.
private void runContinuation() {
// Omissis
if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {
// first run
firstRun = true;
} else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {
// consume parking permit
setParkPermit(false);
firstRun = false;
} else {
// not runnable
return;
}
// Omissis
try {
cont.run();
} finally {
// Omissis
}
}
每一次线程碰到阻塞点,线程的状态都变为PRAKING
。到达阻塞点是通过调用VirtualThread.park()
方法。
void park() {
assert Thread.currentThread() == this;
// complete immediately if parking permit available or interrupted
if (getAndSetParkPermit(false) || interrupted)
return;
// park the thread
setState(PARKING);
try {
if (!yieldContinuation()) {
// park on the carrier thread when pinned
parkOnCarrierThread(false, 0);
}
} finally {
assert (Thread.currentThread() == this) && (state() == RUNNING);
}
}
一旦进入 PARKING
状态,yieldContinuation
方法被调用,该方法会执行虚拟线程实际的Parking操作以及从载体线程上卸载(unmount).
private boolean yieldContinuation() {
boolean notifyJvmti = notifyJvmtiEvents;
// unmount
if (notifyJvmti) notifyJvmtiUnmountBegin(false);
unmount();
try {
return Continuation.yield(VTHREAD_SCOPE);
} finally {
// re-mount
mount();
if (notifyJvmti) notifyJvmtiMountEnd(false);
}
}
Continuation.yield(VTHREAD_SCOPE)
方法调用很多JVM的native方法。如果该方法返回true
, parkOnCarrierThread
方法将被调用.
private void parkOnCarrierThread(boolean timed, long nanos) {
assert state() == PARKING;
var pinnedEvent = new VirtualThreadPinnedEvent();
pinnedEvent.begin();
setState(PINNED);
try {
if (!parkPermit) {
if (!timed) {
U.park(false, 0);
} else if (nanos > 0) {
U.park(false, nanos);
}
}
} finally {
setState(RUNNING);
}
// consume parking permit
setParkPermit(false);
pinnedEvent.commit();
}
最后,VirtualThread.afterYield()
方法被调用。该方法设置虚拟线程状态为PARKED
,通过执行lazySubmitRunContinuation
方法再次继续执行,并设置状态为RUNNABLE
.
private void afterYield() {
int s = state();
assert (s == PARKING || s == YIELDING) && (carrierThread == null);
if (s == PARKING) {
setState(PARKED);
// notify JVMTI that unmount has completed, thread is parked
if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);
// may have been unparked while parking
if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
// lazy submit to continue on the current thread as carrier if possible
lazySubmitRunContinuation();
}
} else if (s == YIELDING) { // Thread.yield
setState(RUNNABLE);
// notify JVMTI that unmount has completed, thread is runnable
if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);
// lazy submit to continue on the current thread as carrier if possible
lazySubmitRunContinuation();
}
}
到这里最终形成了闭环,从整个过程中会发现,虚拟线程的状态机变更机制还是挺负责的。
结论
终于写完了这个2w字的文章,本文从开始介绍虚拟线程诞生的原因,然后介绍如何创建虚拟线程,通过案例展示了虚拟线程的切换原理,挂起行为的产生,规避的方法。
随着虚拟线程生态的完善,java将会称为并发编程的搅局者。
希望本文能使读者完善对虚拟线程的认知,帮助大家在以低成本方式进行并发代码编程。