C#线程---ThreadPool

apple-hu / 2024-10-13 / 原文

线程池的简介

      为每个短暂的异步操作创建线程会产生显著的开销,线程池可以成功地适应于任何 需要大量短暂的开销大的资源的情形。我们事先分配一定的资源,将这些资源放入到资源 池。每次需要新的资源.只需从池中获取一个,而不用创建一个新的。当该资源不再被使用 时,就将其返回到池中。

     ThreadPool类型拥有一个QueueUserWorkltem静 态方法。该静态方法接受一个委托,代表用户自定义的一个异步操作。在该方法被调用后, 委托会进入到内部队列,如果池中没有任何线程,将创建一个新的工作者线程(worker thread)并将队列中第一个委托放入到该工作者线程中。

    如果想线程池中放入新的操作,当之前的所有操作完成后,很可能只需重用一个线程来 执行这些新的操作然而,如果放置新的操作过快,线程池将创建更多的线程来执行这些操 作创建太多的线程是有限制的,在这种情况下新的操作将在队列中等待直到线程池中的工 作者线程有能力来执行它们。

    保持线程中的操作都是短暂的是非常重要的.不要在线程池中放入长时间运行的操作,或者阻塞工作者线程。这将导致所有工作者线程变得繁忙,从而无法服务用户操 作。这会导致性能问题。

    当停止向线程池中放置新操作时,线程池最终会删除一定时间后过期的不再使用的线 程。这将释放所有那些不再需要的系统资源。

    再次强调线程池的用途是执行运行时间短的操作。使用线程池可以减少并行度耗费 及节省操作系统资源。我们只使用较少的线程,但是以比平常更慢的速度来执行异步操作, 使用一定数量的可用的工作者线程批量处理这些操作。如果操作能快速地完成则比较适用线 程池.但是执行长时间运行的计算密集型操作则会降低性能。

    在ASP.NET应用程序中使用线程池时要相当小心。ASP.NET基础设施使用自己的线程池,如果在线程池中浪费所有的工作者线程,Web服务器将不能够服务新 的请求。在ASP.NET中只推荐使用输入/输出密集型的异步操作,因为其使用了一个不同的 方式,叫做I/O线程。

    线程池中的工作者线程都是后台线程。这意味着当所有的前台线程(包括主程 序线程)完成后,所有的后台线程将停止工作。

 

1. 线程池中调用委托(异步编程模型 (Asynchronous Programming Model,简称APM,.NET历史中第一个异步编程模式)

    a. 使用旧的方式创建了一个线程,然后启动它并等待完成(通过Thread. CurrentThread.IsThreadPoolThread属性可以知道不是线程池的线程)
    由于线程的 构造函数只接受一个无任何返回结果的方法,我们使用了 lambda表达式来将对Test方法 的调用包起来(这样可以把线程ID传回来)。

    b. 定义了一个委托并调用Beginlnvoke方法来运行该委托。Beginlnvoke方法接受一 个回调函数。该回调函数在异步操作完成后会被调用,并且一个用户自定义的状态会传给该 回调函数。

    Beginlnvoke立即返回了结果,当线程池中的工作者线程在执行异步操作时,仍 允许我们继续其他工作。当需要异步操作的结果时,可以使用Beginlnvoke方法调用返回的 result对象°

    我们可以使用result对象的IsCompleted属性轮询结果。可以通 过委托调用Endlnvoke方法,传递委托参数和lAsyncResult对象。

    当操作完成后,传递给Beginlnvoke方法的回调函数将被放置到线程池中,确切地说是一个工作者线程中。

    如果在Main方法定义的结尾注释掉Thread.Sleep方法调用,回调函 数将不会被执行。这是因为当主线程完成后,所有的后台线程会被停止,包括该回调函数。 对委托和回调函数的异步调用很可能会被同一个工作者线程执行。通过工作者线程ID可以 容易地看出。

      BeginOperationName/EndOperationName 方法和.NET 中的 lAsyncResult 对象等方 式被称为异步编程模型(或APM模式),这样的方法对称为异步方法"该模式也被应用于多 个.NET类库的API中,但在现代编程中,更推荐使用任务并行库(Task Parallel Library,简 称TPL)  

using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
using static System.Threading.Thread;
namespace Recipe1
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建一个线程,看这个线程是否是线程池中的线程:不是
            int threadId = 0;
            var t = new Thread(() => Test(out threadId));
            t.Start();
            t.Join();
            WriteLine($"Thread id: {threadId}");

            //委托实例化(把test赋值给委托)
            RunOnThreadPool poolDelegate = Test;
            //委托还可以有回调,并传递一个对象给回调
            IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "a delegate asynchronous call");
            r.AsyncWaitHandle.WaitOne();

            string result = poolDelegate.EndInvoke(out threadId, r);

            WriteLine($"8. Thread pool worker thread id: {threadId}");
            WriteLine("9."+result);

            Sleep(TimeSpan.FromSeconds(2));
           
            Read();
        }
        private delegate string RunOnThreadPool(out int threadId);
        private static void Callback(IAsyncResult ar)
        {
            WriteLine("4. Starting a callback...");
            WriteLine($"5. State passed to a callbak: {ar.AsyncState}");
            WriteLine($"6. Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
            WriteLine($"7. Thread pool worker thread id: {CurrentThread.ManagedThreadId}");
        }
        private static string Test(out int threadId)
        {
            WriteLine("1. Test Starting...");
            WriteLine($"2. Test Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
            Sleep(TimeSpan.FromSeconds(2));
            threadId = CurrentThread.ManagedThreadId;
            return $"3. Thread pool worker thread id was: {threadId}";
        }
    }
}
线程中调用委托和回调

 

2. 向线程池中放入异步操作(描述如何向线程池中放入异步操作,闭包操作可以避免上面回调那么麻烦)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
using static System.Threading.Thread;
namespace Recipe1
{
    class Program
    {
        static void Main(string[] args)
        {
            const int x = 1;
            const int y = 2;
            const string lambdaState = "lambda state 2";
            //使用QueueUser- Workltem方法将该方法放到线程池中运行
            ThreadPool.QueueUserWorkItem(AsyncOperation);
            //在操作完成后让线程睡眠一秒钟,从而让线程池拥有为后面的新操作重用线程的可能性
            Sleep(TimeSpan.FromSeconds(1));

            //给方法调用传入了一 个state对象 该对象将作为state参数传递。
            ThreadPool.QueueUserWorkItem(AsyncOperation, "async state");
            Sleep(TimeSpan.FromSeconds(1));

            //使用了 lambda表 达式语法,从而无须定义一个单独的方法
            ThreadPool.QueueUserWorkItem(state =>
            {
                WriteLine($"Operation state: {state}");
                WriteLine($"Worker thread id: {CurrentThread.ManagedThreadId}");
                Sleep(TimeSpan.FromSeconds(2));
            }, "lambda state");

            //使用闭包机制,从而无须传递lambda表达式的状态。闭包更灵活,允许我 们向异步操作传递一个以上的对象而且这些对象具有静态类型。
            //所以之前介绍的传递对象给方法回调的机制既冗余又过时。在C#中有了闭包后就不再需要使用它了。
            ThreadPool.QueueUserWorkItem(_ =>
            {
                WriteLine($"Operation state: {x + y}, {lambdaState}");
                WriteLine($"Worker thread id: {CurrentThread.ManagedThreadId}");
                Sleep(TimeSpan.FromSeconds(2));
            }, "lambda state");

            Sleep(TimeSpan.FromSeconds(2));
            Read();
        }
        //定义了 AsyncOperation方法,它接受单个object类型的参数
        private static void AsyncOperation(object state)
        {
            WriteLine($"Operation state: {state ?? "(null)"}");
            WriteLine($"Worker thread id: {CurrentThread.ManagedThreadId}");
            Sleep(TimeSpan.FromSeconds(2));
        }
    }
}
线程池中放入异步操作

3. 线程池与并行度(展示线程池如何工作于大量的异步操作,以及它与创建大量单独的线程的方式有 何不同)

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
using static System.Threading.Thread;
namespace Recipe1
{
    class Program
    {
        static void Main(string[] args)
        {
            const int numberOfOperations = 500;
            var sw = new Stopwatch();
            sw.Start();
            UseThreads(numberOfOperations);
            sw.Stop();
            WriteLine($"Execution time using threads: {sw.ElapsedMilliseconds}");

            sw.Reset();
            sw.Start();
            UseThreadPool(numberOfOperations);
            sw.Stop();
            WriteLine($"Execution time using the thread pool: {sw.ElapsedMilliseconds}");
            Read();
        }
        /// <summary>
        /// 创建了很多不同的线程,每个线程都运行一个操作。该操作打印出线 程ID并阻塞线程100毫秒。
        /// 结果我们创建了 500个线程,全部并行运行这些操作。
        /// 虽然在我的机器上的总耗时是300毫秒,但是所有线程消耗了大量的操作系统资源。
        /// </summary>
        /// <param name="numberOfOperations"></param>

        static void UseThreads(int numberOfOperations)
        {
            using (var countdown = new CountdownEvent(numberOfOperations))
            {
                WriteLine("Scheduling work by creating threads");
                for (int i = 0; i < numberOfOperations; i++)
                {
                    var thread = new Thread(() =>
                    {
                        Write($"{i}---{CurrentThread.ManagedThreadId},");
                        Sleep(TimeSpan.FromSeconds(0.1));
                        countdown.Signal();
                    });
                    thread.Start();
                }
                countdown.Wait();
                WriteLine();
            }
        }

        /// <summary>
        /// 将它们放入到线程池中,然后线程池开始执行这些操作。线程池花费了更多的时间,但我们为操作系统节省了内存和线程数。
        /// </summary>
        /// <param name="numberOfOperations"></param>
        static void UseThreadPool(int numberOfOperations)
        {
            using (var countdown = new CountdownEvent(numberOfOperations))
            {
                WriteLine("Starting work on a threadpool");
                for (int i = 0; i < numberOfOperations; i++)
                {
                    ThreadPool.QueueUserWorkItem(_ =>
                    {
                        Write($"{i}---{CurrentThread.ManagedThreadId},");
                        Sleep(TimeSpan.FromSeconds(0.1));
                        countdown.Signal();
                    });
                }
                countdown.Wait();
                WriteLine();
            }
        }
    }
}
线程池并行度

4. 实现一个取消选项(三种不同的CancellationToken使用)

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
using static System.Threading.Thread;
namespace Recipe1
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var cts = new CancellationTokenSource())
            {
                CancellationToken token = cts.Token;
                ThreadPool.QueueUserWorkItem(_ => AsyncOperation1(token));
                Sleep(TimeSpan.FromSeconds(2));
                cts.Cancel();
            }

            using (var cts = new CancellationTokenSource())
            {
                CancellationToken token = cts.Token;
                ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token));
                Sleep(TimeSpan.FromSeconds(2));
                cts.Cancel();
            }

            using (var cts = new CancellationTokenSource())
            {
                CancellationToken token = cts.Token;
                ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token));
                Sleep(TimeSpan.FromSeconds(2));
                cts.Cancel();
            }

            Sleep(TimeSpan.FromSeconds(2));
            Read();
        }
        static void AsyncOperation1(CancellationToken token)
        {
            WriteLine("Starting the first task");
            for (int i = 0; i < 5; i++)
            {
                //轮询来检查CancellationToken. IsCancellationRequested属性。
                //如果该属性为true,则说明操作需要被取消,我们必须放弃该 操作
                if (token.IsCancellationRequested)
                {
                    WriteLine("The first task has been canceled.");
                    return;
                }
                Sleep(TimeSpan.FromSeconds(1));
            }
            WriteLine("The first task has completed succesfully");
        }

        static void AsyncOperation2(CancellationToken token)
        {
            try
            {
                WriteLine("Starting the second task");

                for (int i = 0; i < 5; i++)
                {
                    //抛出一个OperationCancelledException异常。这允许在操作之外控制取消过程,取消时通过操作之外的代码来处理
                    token.ThrowIfCancellationRequested();
                    Sleep(TimeSpan.FromSeconds(1));
                }
                WriteLine("The second task has completed succesfully");
            }
            catch (OperationCanceledException)
            {
                WriteLine("The second task has been canceled.");
            }
        }

        static void AsyncOperation3(CancellationToken token)
        {
            bool cancellationFlag = false;
            //注册一个回调函数。当操作被取消时,在线程池将调用该回调函数。这允许链式传递一个取消逻辑到另一个异步操作中
            token.Register(() => cancellationFlag = true);
            WriteLine("Starting the third task");
            for (int i = 0; i < 5; i++)
            {
                if (cancellationFlag)
                {
                    WriteLine("The third task has been canceled.");
                    return;
                }
                Sleep(TimeSpan.FromSeconds(1));
            }
            WriteLine("The third task has completed succesfully");
        }
    }
}
线程取消CancellationToken

5. 在线程池中使用等待事件处理器及超时(描述如何在线程池中对操作实现超时,以及如何在线程池中正确地等待)

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
using static System.Threading.Thread;
namespace Recipe1
{
    class Program
    {
        static void Main(string[] args)
        {
            //WorkerOperation要6秒完成,所以5秒的话,就会超时完成。
            RunOperations(TimeSpan.FromSeconds(5));
            //7秒的话就是正常完成。
            RunOperations(TimeSpan.FromSeconds(7));

            Read();
        }
        static void RunOperations(TimeSpan workerOperationTimeout)
        {
            using (var evt = new ManualResetEvent(false))
            using (var cts = new CancellationTokenSource())
            {
                WriteLine("Registering timeout operation...");
                //ThreadPool.RegisterWaitForSingleObjecto该方法允许我们将回调函数放入线程池中的队列中
                //当提供的等待事件处理器收到信号或发生超时时,该回调函数将被调用。这允许我们为线程池中的操作实现超时处理
                //首先注册了处理完成(或超时)后调用的异步操作WorkerOperationWait,
                //有两种情况会触发该异步操作,一是接收到了ManualRestEvent对象的信号,当工作者操作成功完成后会发出该信号。
                //第二种情况是在第一个操作完成之前超时,如果超时了,那么会使用CancellationToken来取消第一个操作
                var worker = ThreadPool.RegisterWaitForSingleObject(evt
                    , (state, isTimedOut) => WorkerOperationWait(cts, isTimedOut)
                    , null
                    , workerOperationTimeout
                    , true);

                WriteLine("Starting long running operation...");
                ThreadPool.QueueUserWorkItem(_ => WorkerOperation(cts.Token, evt));

                Sleep(workerOperationTimeout.Add(TimeSpan.FromSeconds(2)));
                worker.Unregister(evt);
            }
        }
        static void WorkerOperation(CancellationToken token, ManualResetEvent evt)
        {
            for (int i = 0; i < 6; i++)
            {
                if (token.IsCancellationRequested)
                {
                    return;
                }
                Sleep(TimeSpan.FromSeconds(1));
            }
            evt.Set();
        }

        static void WorkerOperationWait(CancellationTokenSource cts, bool isTimedOut)
        {
            if (isTimedOut)
            {
                cts.Cancel();
                WriteLine("Worker operation timed out and was canceled.");
            }
            else
            {
                WriteLine("Worker operation succeded.");
            }
        }
    }
}
实现超时或正常完成后的回调

6. 描述如何使用System.Threading.Timer对象来在线程池中创建周期性调用的异步操作,可以动态更改下次下一次 计时器操作将被执行的时间

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
using static System.Threading.Thread;
namespace Recipe1
{
    class Program
    {
        static void Main(string[] args)
        {
            WriteLine("Press 'Enter' to stop the timer...");
            DateTime start = DateTime.Now;
            //首先创建了一个Timer实例。
            //第一个参数是一个lambda表达式(我们调用TimerOperation方法并给其提供一个起始时间),将会在线程池中 被执行。
            //第二个参数为null,因为无须使用用户状态对象。
            //第三个参数指定了什么时候会第一次运行TimerOperation,说明一秒后会启动第一次操作。
            //第四个参数指定再次调用的间隔时间,每隔两秒再次运行。
            _timer = new Timer(_ => TimerOperation(start), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2));
            
            //等待6秒后修改计时器:在调用timer.Change方法一秒后启动TimerOperation,然 后每隔4秒再次运行。
            try
            {
                Sleep(TimeSpan.FromSeconds(6));

                _timer.Change(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(4));


                //可以用更复杂的方式使用计时器。比如,可以通过Timeout.Infinite值提供给计时器一个间隔参数来只允许计时器操作一次
                //然后在计时器异步操作内,能够设置下一次 计时器操作将被执行的时间。
                ReadLine();
            }
            finally
            {
                _timer.Dispose();
            }


            
        }
        static Timer _timer;

        static void TimerOperation(DateTime start)
        {
            TimeSpan elapsed = DateTime.Now - start;
            WriteLine($"{elapsed.Seconds} seconds from {start}. " +
                      $"Timer thread pool thread id: {CurrentThread.ManagedThreadId}");
        }
    }
用计时器重复异步操作

7. 使用Backgroundworker组件可以将异步代码组织为一系列事件及事件处理器。使用该组件进行异步编程(基于事件的异步模式(Event-based Asynchronous Pattern, 简称EAP)

Backgroundworker组件实际上被使用于Windows窗体应用程序(Windows Forms Applications,简称WPF)中。该实现通过后台工作事件处理器的代码可以直接与UI 控制器交互。

与线程池中的线程与UI控制器交互的方式相比较,使用Backgroundworker组 件的方式更加自然和好用

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
using static System.Threading.Thread;
namespace Recipe1
{
    class Program
    {
        /// <summary>
        /// 没有使用线程池和委托,而是使用了另一个C#语法, 称为事件。事件表示了一些通知的源或当通知到达时会有所响应的一系列订阅者。
        /// 在本例中,我们将订阅三个事件,当这些事件发生时,将调用相应的事件处理器一当事件通知其订阅者时,具有特殊的定义签名的方法将被调用。
        /// 因此,除了将异步API组织为Begin/End方法对,还可以只启动一个异步操作然后订 阅给不同的事件。
        /// 这些事件在该操作执行时会被触发。这种方式被称为基于事件的异步模式(Event-based Asynchronous Pattern, 简称EAP)
        /// 这是历史上第二种用来构造异步程序的方 式,现在更推荐使用TPL,第4章中将会描述该方式
        /// </summary>
        /// <param name="args"></param>
        static void Main(string[] args)
        {
            //创建了一个Backgroundworker组件的实例。显式地指出该后台工作者 线程支持取消操作及该操作进度的通知
            var bw = new BackgroundWorker();
            bw.WorkerReportsProgress = true;
            bw.WorkerSupportsCancellation = true;

            bw.DoWork += Worker_DoWork;
            bw.ProgressChanged += Worker_ProgressChanged;
            bw.RunWorkerCompleted += Worker_Completed;

            bw.RunWorkerAsync();

            WriteLine("Press C to cancel work");
            //最后,得到结果后,将结果设置给事件参数,然后RunWorkerComplet
            do
            {
                if (ReadKey(true).KeyChar == 'C')
                {
                    bw.CancelAsync();
                }

            }
            while (bw.IsBusy);



        }
        /// <summary>
        /// 第一个是DoWork事件。当一个后台工作对象通过RunWorker- Async方法启动一个异步操作时,该事件处理器将被调用
        /// 该事件处理器将会运行在线程池中。
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        static void Worker_DoWork(object sender, DoWorkEventArgs e)
        {
            WriteLine($"DoWork thread pool thread id: {CurrentThread.ManagedThreadId}");
            var bw = (BackgroundWorker)sender;
            for (int i = 1; i <= 100; i++)
            {
                if (bw.CancellationPending)
                {
                    e.Cancel = true;
                    return;
                }
                if (i % 10 == 0)
                {
                    bw.ReportProgress(i);
                }

                Sleep(TimeSpan.FromSeconds(0.1));
            }

            e.Result = 42;
        }
        static void Worker_ProgressChanged(object sender, ProgressChangedEventArgs e)
        {
            WriteLine($"{e.ProgressPercentage}% completed. " +
                      $"Progress thread pool thread id: {CurrentThread.ManagedThreadId}");
        }
        //得到结果后,将结果设置给事件参数,然后RunWorkerComplet
        static void Worker_Completed(object sender, RunWorkerCompletedEventArgs e)
        {
            WriteLine($"Completed thread pool thread id: {CurrentThread.ManagedThreadId}");
            if (e.Error != null)
            {
                WriteLine($"Exception {e.Error.Message} has occured.");
            }
            else if (e.Cancelled)
            {
                WriteLine($"Operation has been canceled.");
            }
            else
            {
                WriteLine($"The answer is: {e.Result}");
            }
        }
    }
}
Backgroundworker线程支持取消操作及该操作进度的通知