您现在的位置是:网站首页> 编程资料编程资料
.NET 6线程池ThreadPool实现概述_自学过程_
2023-05-24
417人已围观
简介 .NET 6线程池ThreadPool实现概述_自学过程_
前言
在即将发布的 .NET 6 runtime 中,默认的线程池实现从 C++ 代码改为了 C#,更方便我们学习线程池的设计了。
https://github.com/dotnet/runtime/tree/release/6.0/src/libraries/System.Threading.ThreadPool
新的线程池实现位于 PortableThreadPool 中,原 ThreadPool 中的对外公开的接口会直接调用 PortableThreadPool 中的实现。
通过设置环境变量 ThreadPool_UsePortableThreadPool 为 0 可以设置成使用老的线程池实现。
https://github.com/dotnet/runtime/pull/43841/commits/b0d47b84a6845a70f011d1b0d3ce5adde9a4d7b7
本文以 .NET 6 runtime 源码作为学习材料,对线程池的设计进行介绍。从目前的理解上来看,其整体的设计与原来 C++ 的实现并没有特别大的出入。
注意:
- 本文不涉及细节的代码实现,主要为大家介绍其整体设计。所展示的代码并非原封不动的源码,而是为了方便理解的简化版。
ThreadPool.SetMaxThreads(int workerThreads, int completionPortThreads)中的completionPortThreads所相关的IOCP线程池是 .NET Framework 时代的遗留产物,用于管理 Windows 平台专有的 IOCP 的回调线程池。目前没看到有什么地方在用它了,completionPortThreads 这个参数也已经没有意义,底层IO库是自己维护的IO等待线程池。本文只涉及 worker thread 池的介绍。- 本文理解并不完整也不一定完全正确,有异议的地方欢迎留言讨论。
- 为了解释问题,一部分代码会运行在 .NET 6 之前的环境中。
任务的调度
线程池的待执行任务被存放在一个队列系统中。这个系统包括一个 全局队列,以及绑定在每一个 Worker Thread 上 的 本地队列 。而线程池中的每一个线程都在执行 while(true) 的循环,从这个队列系统中领取并执行任务。


在 ThreadPool.QueueUserWorkItem 的重载方法 ThreadPool.QueueUserWorkItem 里有一个 preferLocal 参数。
调用不带 preferLocal 参数的 ThreadPool.QueueUserWorkItem 方法重载,任务会被放到全局队列。
当 preferLocal 为 true 的时候,如果调用 ThreadPool.QueueUserWorkItem 代码的线程正好是个线程池里的某个线程,则该任务就会进入该线程的本地队列中。除此之外的情况则会被放到全局队列中等待未来被某个 Worker Thread 捡走。
在线程池外的线程中调用,不管 preferLocal 传的是什么,任务都会被放到全局队列。

基本调度单元
本地队列和全局队列的元素类型被定义为 object,实际的任务类型分为两类,在从队列系统取到任务之后会判断类型并执行对应的方法。
IThreadPoolWorkItem 实现类的实例。
///Represents a work item that can be executed by the ThreadPool. public interface IThreadPoolWorkItem { void Execute(); }
执行 Execute 方法也就代表着任务的执行。
IThreadPoolWorkItem 的具体实现有很多,例如通过 ThreadPool.QueueUserWorkItem(WaitCallback callBack) 传入的 callBack 委托实例会被包装到一个 QueueUserWorkItemCallback 实例里。QueueUserWorkItemCallback 是 IThreadPoolWorkItem 的实现类。
Task
class Task { internal void InnerInvoke(); }执行 InnerInvoke 会执行 Task 所包含的委托。
全局队列
全局队列 是由 ThreadPoolWorkQueue 维护的,同时它也是整个队列系统的入口,直接被 ThreadPool 所引用。
public static class ThreadPool { internal static readonly ThreadPoolWorkQueue s_workQueue = new ThreadPoolWorkQueue(); public static bool QueueUserWorkItem(WaitCallback callBack, object state) { object tpcallBack = new QueueUserWorkItemCallback(callBack!, state); s_workQueue.Enqueue(tpcallBack, forceGlobal: true); return true; } } internal sealed class ThreadPoolWorkQueue { // 全局队列 internal readonly ConcurrentQueue本地队列
线程池中的每一个线程都会绑定一个 ThreadPoolWorkQueueThreadLocals 实例,在 workStealingQueue 这个字段上保存着本地队列。
internal sealed class ThreadPoolWorkQueueThreadLocals { // 绑定在线程池线程上 [ThreadStatic] public static ThreadPoolWorkQueueThreadLocals threadLocals; // 持有全局队列的引用,以便能在需要的时候将任务转移到全局队列上 public readonly ThreadPoolWorkQueue workQueue; // 本地队列的直接维护者 public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue; public readonly Thread currentThread; public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq) { workQueue = tpq; workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue(); // WorkStealingQueueList 会集中管理 workStealingQueue ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue); currentThread = Thread.CurrentThread; } // 提供将本地队列中的任务转移到全局队列中去的功能, // 当 ThreadPool 通过后文将会介绍的 HillClimbing 算法判断得出当前线程是多余的线程后, // 会调用此方法对任务进行转移 public void TransferLocalWork() { while (workStealingQueue.LocalPop() is object cb) { workQueue.Enqueue(cb, forceGlobal: true); } } ~ThreadPoolWorkQueueThreadLocals() { if (null != workStealingQueue) { // TransferLocalWork 真正的目的并非是为了在这里被调用,这边只是确保任务不会丢的 fallback 逻辑 TransferLocalWork(); ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue); } } }偷窃机制
这里思考一个问题,为什么本地队列的名字会被叫做 WorkStealingQueue 呢?
所有 Worker Thread 的 WorkStealingQueue 都被集中在 WorkStealingQueueList 中。对线程池中其他所有线程可见。
Worker Thread 的 while(true) 中优先会从自身的 WorkStealingQueue 中取任务。如果本地队列已经被清空,就会从全局队列中取任务。例如下图的 Thread1 取全局队列中领取了一个任务。
同时 Thread3 也没活干了,但是全局队列中的任务被 Thread1 抢走了。这时候就会去 从 Thread2 的本地队列中抢 Thread2 的活。
Worker Thread 的生命周期管理
接下来我们把格局放大,关注点从 Worker Thread 的打工日常转移到对它们的生命周期管理上来。
为了更方便的解释线程管理的机制,这边使用下面使用一些代码做演示。
代码参考自 https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-6/。
线程注入实验
Task.Run 会将 Task 调度到线程池中执行,下面的示例代码中等效于 ThreadPool.QueueUserWorkItem(WaitCallback callBack),会把 Task 放到队列系统的全局队列中(顺便一提,如果在一个线程池线程中执行 Task.Run 会将 Task 调度到此线程池线程的本地队列中)。
.NET 5 实验一 默认线程池配置
static void Main(string[] args) { var sw = Stopwatch.StartNew(); var tcs = new TaskCompletionSource(); var tasks = new List(); for (int i = 1; i <= Environment.ProcessorCount * 2; i++) { int id = i; Console.WriteLine($"Loop Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tasks.Add(Task.Run(() => { Console.WriteLine($"Task Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tcs.Task.Wait(); })); } tasks.Add(Task.Run(() => { Console.WriteLine($"Task SetResult | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tcs.SetResult(); })); Task.WaitAll(tasks.ToArray()); Console.WriteLine($"Done: | {sw.Elapsed.TotalSeconds:0.000}"); } static int GetBusyThreads() { ThreadPool.GetAvailableThreads(out var available, out _); ThreadPool.GetMaxThreads(out var max, out _); return max - available; } 首先在代码在 .NET 5 环境中运行以下代码,CPU 逻辑核心数 12。
Loop Id: 01 | 0.000 | Busy Threads: 0 Loop Id: 02 | 0.112 | Busy Threads: 1 Loop Id: 03 | 0.112 | Busy Threads: 2 Loop Id: 04 | 0.113 | Busy Threads: 4 Loop Id: 05 | 0.113 | Busy Threads: 7 Loop Id: 06 | 0.113 | Busy Threads: 10 Loop Id: 07 | 0.113 | Busy Threads: 10 Task Id: 01 | 0.113 | Busy Threads: 11 Task Id: 02 | 0.113 | Busy Threads: 12 Task Id
相关内容
- System.Diagnostics.Metrics .NET 6 全新指标API讲解_基础应用_
- 理解ASP.NET Core 错误处理机制(Handle Errors)_实用技巧_
- .NET 6全新配置对象ConfigurationManager介绍_基础应用_
- 解析.netcore项目中IStartupFilter使用教程_实用技巧_
- 三种方法解决ASP.NET Core 6中的依赖项_基础应用_
- python安装pillow的三种方法_实用技巧_
- 在.NET 6中使用日志组件log4net的方法_实用技巧_
- .NET6新特性之 隐式命名空间引用_ASP.NET_
- .NET多种数据库大数据批量插入、更新(支持SqlServer、MySql、PgSql和Oracle)_自学过程_
- .NET Core 中对象池 Object Pool的使用_ASP.NET_
点击排行
本栏推荐
