線程池(英語:thread pool):一種線程使用模式。線程過多會帶來排程開銷,進而影響快取局部性和整體效能。而線程池維護着多個線程,等待着監督管理者分配可並行執行的任務。這避免了在處理短時間任務時建立與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分排程。可用線程數量應該取決於可用的並行處理器、處理器內核、記憶體、網絡sockets等的數量。 例如,對於計算密集型任務,線程數上限一般取CPU邏輯核心數+2,線程數過多會導致額外的線程切換開銷。

任務排程以執行線程的常見方法是使用同步佇列,稱作任務佇列。池中的線程等待佇列中的任務,並把執行完的任務放入完成佇列中。

線程池模式一般分為兩種:HS/HA半同步/半非同步模式、L/F領導者與跟隨者模式。

  • 半同步/半非同步模式又稱為生產者消費者模式,是比較常見的實現方式,比較簡單。分為同步層、佇列層、非同步層三層。同步層的主線程處理工作任務並存入工作佇列,工作線程從工作佇列取出任務進行處理,如果工作佇列為空,則取不到任務的工作線程進入掛起狀態。由於線程間有數據通訊,因此不適於大數據量交換的場合。
  • 領導者跟隨者模式,線上程池中的線程可處在3種狀態之一:領導者leader、追隨者follower或工作者processor。任何時刻線程池只有一個領導者線程。事件到達時,領導者線程負責訊息分離,並從處於追隨者線程中選出一個來當繼任領導者,然後將自身設置為工作者狀態去處置該事件。處理完畢後工作者線程將自身的狀態置為追隨者。這一模式實現複雜,但避免了線程間交換任務數據,提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了領導者跟隨者模式實現。

線程池的伸縮性對效能有較大的影響。

  • 建立太多線程,將會浪費一定的資源,有些線程未被充分使用。
  • 銷毀太多線程,將導致之後浪費時間再次創建它們。
  • 建立線程太慢,將會導致長時間的等待,效能變差。
  • 銷毀線程太慢,導致其它線程資源飢餓。

Windows API的線程池函數 編輯

Windows作業系統API提供了一套線程池的實現介面。[1]可以方便地建立、使用線程池。Windows線程池API被設計為一組協同對象, 其中有些對象表示工作單位、計時器、非同步I/O 等等。使用下述用戶模式的對象來管理線程池及相關的數據:

  • 線程池對象(pool object)包含了一組工作線程(worker threads)。每個行程可以建立多個線程池對象,也可以直接使用行程預設的線程池。
  • 回呼環境(Threadpool Callback Environment)線程池可以關聯多個回呼環境,回呼環境可單獨指定線上程池內的排程優先級。
  • 清理群(clean-up group)對象關聯於一個回呼環境。用於追溯(track)線程緩衝池回呼。每次建立線程池的IO、work、timer、wait等對象(呼叫CreateThreadpool*等函數),會在cleanup group中增加一個成員。關閉線程池的IO、work、timer、wait等對象(呼叫CloseThreadpool*等函數),會在cleanup group中刪除相應的成員。
  • 線程池工作對象(work object) 由一個函數指標和一個被稱為上下文的 void 指標組成,線程池每次在執行時將 void 指標送入函數。可非同步多次投寄給線程池去執行它的回呼函數。實際上就是以非同步方式呼叫回呼函數。通過SubmitThreadpoolWork投寄工作對象到任務佇列。如果不新增的工作對象,則不能更改函數和上下文。
  • 線程池定時器對象(timer object)在定時器到期時投寄給線程池去執行它的回呼函數。 建立(CreateThreadpoolTimer)並設定(SetThreadpoolTimer)定時器對象。當定時器對象到期時,工作線程會執行定時器對象的回呼函數。
  • 線程池等待對象(wait object):在可等待控制代碼(waitable handle,可使用任何內核同步對象的控制代碼)被觸發(signaled)時或逾時到期時,它的回呼函數被投寄給線程池去執行。建立(CreateThreadpoolWait)並設定(SetThreadpoolWait)等待對象。線程池內部的一個線程呼叫WaitForMultipleObjects並傳入由SetThreadpoolWait函數註冊的控制代碼,不斷地組成一個控制代碼組,同時將Wait*函數的bWaitAll設為FALSE,這樣當任何一個控制代碼被觸發,線程池就會被喚醒。因WaitForMultipleObjects不允許將同一個控制代碼傳入多次,因此必須確保不會用SetThreadpoolWait來多次註冊同一個控制代碼,但可以呼叫DuplicationHandle複製控制代碼並傳給Set*函數。因WaitForMultipleObjects一次最多只能等待64個內核對象,因此線程池實際上為每64個內核對象分配一個線程來等待,所以效率比較高。如果要等待超過64個以上的內核對象,系統會每64個內核對象,就開闢一個線程來等待這些內核對象。當線程池中一個線程呼叫了傳入的回呼函數,對應的等待項將進入「不活躍」狀態;這意味着如果在同一個內核對象被觸發後如果想再次呼叫這個回呼函數,需要呼叫SetThreadpoolWait再次註冊;如果傳入的hObject為NULL,將把pWaitItem這個等待項從線程中移除。
  • 線程池I/O對象把檔案控制代碼關聯到線程池的I/O完成埠(completion port)。當非同步I/O操作完成,一個工作線程取得操作的狀態並呼叫I/O對象的回呼函數。

相關的API函數:[2]

  • CreateThreadpool 建立一個線程池數據結構
  • CloseThreadpool 關閉一個線程池
  • SetThreadpoolThreadMaximum 設置一個線程池的線程數量上限。預設線程池的最小數量為1,最大數量為500
  • SetThreadpoolThreadMinimum 設置一個線程池的線程數量下限
  • InitializeThreadpoolEnvironment 初始化一個回呼環境。作用是回呼環境結構體中的將Version設為1,其餘為0。
  • DestroyThreadpoolEnvironment 刪除回呼環境
  • SetThreadpoolCallbackPool 把指定的線程緩衝池關聯到指定的線程池回呼環境。建立線程池的IO、work、timer、wait等對象會關聯到一個回呼環境上。如果使用系統預設的回呼環境,需要自行負責回呼函數的DLL保持裝入狀態,自行管理線程池內的各種對象的生存期。線程池可以關聯多個回呼環境,回呼環境可單獨指定線上程池內的排程優先級。
  • SetThreadpoolCallbackLibrary 指示在回呼函數未執行完的時候其所在的DLL庫不能解除安裝
  • SetThreadpoolCallbackPriority 設定回呼環境在所屬線程池內的優先級
  • SetThreadpoolCallbackRunsLong 指示當前回呼函數將執行較長時間。這有助於線程池判斷是否新增線程
  • CreateThreadpoolCleanupGroup 建立一個cleanup group數據結構,關聯到一個回呼環境上
  • SetThreadpoolCallbackCleanupGroup 把指定的cleanup group關聯到指定的回呼環境。第三個參數設置CleanupGroupCancelCallback回呼函數。當呼叫清理組的CloseThreadpoolCleanupGroupmembers函數,並為bCancelPendingCallbacks傳入TRUE來清除清理組時,如果此時尚有未被處理的工作項時,則這個回呼函數被呼叫,這個回掉函數的第一個參數pvObjectContext是通過CreateThreadpool*函數傳入的pvContext,第二個參數pvCleanupContext是由CloseThreadpoolCleanupGroupMembers的pvCleanupContext參數傳入的。
  • CloseThreadpoolCleanupGroup 關閉一個cleanup group數據結構。cleanup group不能包含任何成員。
  • CloseThreadpoolCleanupGroupMembers 把指定cleanup group的所有work、timer、wait等對象釋放掉。對於正在執行的回呼函數,會等待其執行完畢。參數fCancelPendingCallbacks確定是否尚未開始執行的任務執行。
  • CreateThreadpoolWork 建立一個線程池工作對象的數據結構
  • CloseThreadpoolWork 函數通知線程池該對象可釋放。
  • SubmitThreadpoolWork 投寄(post)一個工作對象到線程池中。一個工作線程將會呼叫執行這個工作對象的回呼函數。
  • TrySubmitThreadpoolCallback 請求線程池一個工作線程呼叫指定的回呼函數
  • CreateThreadpoolTimer 建立一個線程池定時器對象的數據結構
  • SetThreadpoolTimer 設定一個線程定時器對象的參數。當定時器到期時,這個定時器對象的回呼函數被放入任務佇列等待被一個工作線程執行。第三個參數設定周期性到時,不論之前放入任務佇列的回呼函數是否執行完畢,總是把回呼函數再一次放入任務佇列等待被執行。如果不希望一個定時器對象的回呼函數的多個實例並行重疊執行,可以不設定周期性到時參數,而在回呼函數末尾呼叫SetThreadpoolTimer重設這個定時器對象,起碼可以簡化除錯。如果呼叫SetThreadpoolTimer時當前定時器對象還沒有到期,則會修改該定時器對象的到期時間等屬性。第二個參數pftDueTime表示首次到時時間,如果為NULL,則取消還未到時的定時器的下次到期,不再向任務佇列放入定時器對象的回呼函數,但已經在任務佇列中(正在被執行)的回呼函數仍會被執行。第四個參數是到時的容限上限,涉及定時器匯聚,如計時器A會在5-7微秒內被觸發,計時器B會在6-8微秒內。因時間有重疊,所以線程池只會喚醒一個線程來處理這兩個計時器,以減少用兩個線程呼叫時產生的額外的線程上下文交換的開銷。
  • IsThreadpoolTimerSet 判斷給定的定時器對象當前是否被設置
  • CreateThreadpoolWait 建立一個線程池等待(wait)對象的數據結構
  • SetThreadpoolWait 設定一個線程等待對象與一個可等待對象(waitable object)的控制代碼繫結並設定逾時時間。當可等待對象的控制代碼被通知(signaled)或指定的逾時到期,這個等待對象的回呼函數會被入列(queue)進入任務佇列中並等待被一個工作線程執行。如果可等待對象(即函數的第二個參數)的控制代碼被設置為NULL,(由於可等待對象還沒有signaled所以)線程池等待對象的回呼函數還未放入任務佇列的都被取消,但已在任務佇列的回呼函數仍會被工作線程執行。注意不要多次使用SetThreadpoolWait來等待同一個hObject。第三個參數pftTimeout指出線程池願意花的最長時間來等待內核對象觸發,0是立即返回,負值為相對時間,正值為絕對時間,NULL表示無限等待(線程池內部呼叫了WaitForMultipleObjects)。
  • CloseThreadpoolWait 關閉一個線程等待對象,然後才能關閉可等待對象(waitable object)的控制代碼。
  • WaitForThreadpoolWaitCallbacks 等待指定的線程池等待對象執行完成或者取消還沒開始執行的線程池等待對象。
  • WaitForThreadpoolIoCallbacks 等待指定的線程池IO對象執行完成或者取消還沒開始執行的線程池IO對象。該函數須在另一個線程使用,而不能在回呼函數內部使用,因為這會造成死結。如果bCancelPendingCallbacks為TRUE,那麼當請求完成的時候,回呼函數不會被呼叫(如果尚未被呼叫)。
  • WaitForThreadpoolTimerCallbacks 等待指定的線程池定時器對象執行完成或者取消還沒開始執行的線程池定時器對象。
  • WaitForThreadpoolWorkCallbacks 等待指定的線程池工作對象執行完成或者取消還沒開始執行的線程池工作對象。
  • CreateThreadpoolIo 建立一個IO完成對象,並繫結一個非同步IO的控制代碼。
  • CloseThreadpoolIo 釋放IO完成對象。應等待所有IO操作完成,關閉IO檔案控制代碼後再呼叫本函數。解除IO對象(工作項)與線程池的關聯。
  • StartThreadpoolIo 初始化非同步IO操作前應先呼叫本函數,否則線程池會忽略IO操作的完成並記憶體泄露。
  • CancelThreadpoolIo 在IO失敗或者IO直接同步完成時,取消對線程池的通知(notification)
  • CallbackMayRunLong 回呼函數通知系統需要執行較長時間。只能在呼叫線程的回呼函數裏使用。返回TRUE時,說明線程池還有其他線程可供使用。FALSE則相反。
  • DisassociateCurrentThreadFromCallback 表示與回呼函數關聯的工作項在「邏輯上」完成了,執行當前回呼函數的線程與初始化當前回呼函數的對象去關聯。這使得所有等待當前回呼函數完成的線程被釋放。任何由於呼叫WaitForThreadpool*Callbacks(如WaitForThreadpoolIoCallbacks)而被阻塞的線程能早一些返回,而不必等到線程從回呼函數中結束時才返回。
  • 下述函數與回呼函數實體指標PTP_CALLBACK_INSTANCE有關,用於在回呼函數內設置當該函數結束時的動作。注意只能有一個動作在回呼函數結束後被執行,不能多次呼叫下述函數,否則最後一次呼叫的函數會覆蓋之前呼叫的下述函數。當線程呼叫回呼函數時,Windows會自動傳一個pInstance參數(類型PTP_CALLBACK_INSTANCE)給回呼函數,然後回呼函數將這個參數又傳給如下的函數,執行一些相應的終止操作:
    • FreeLibraryWhenCallbackReturns 當回呼函數返回的時候,線程池會自動呼叫FreeLibrary,並在參數中傳入指定的HMOUDLE。如果回呼函數是從DLL中載入的,這個函數尤為重要,因為當線程執行完畢後,回呼函數不能自己呼叫FreeLibrary,否則回呼函數代碼將從行程中清除,這樣當FreeLibrary試圖返回到回呼函數時,會引發訪問違規
    • LeaveCriticalSectionWhenCallbackReturns 當回呼函數返回時,線程池會自動呼叫LeavCriticalSection,並在參數中傳入指定的CRITCAL_SECTION結構體。
    • ReleaseMutexWhenCallbackReturns 當回呼函數返回時,線程池會自動呼叫ReleaseMutex,並在參數中傳入指定的HANDLE
    • ReleaseSemaphoreWhenCallbackReturns 當回呼函數返回的時候,線程池會自動呼叫ReleaseSemaphore,並在參數中傳入指定的HANDLE
    • SetEventWhenCallbackReturns 當回呼函數返回的時候,線程池會自動呼叫SetEvent,並在參數中傳入指定的HANDLE

範例 編輯

範例程式1如下:

#include <windows.h>
#include <tchar.h>
#include <stdio.h>

//
// Thread pool wait callback function template
//
VOID CALLBACK MyWaitCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult)
{
	// Instance, Parameter, Wait, and WaitResult not used in this example.
	UNREFERENCED_PARAMETER(Instance); 	UNREFERENCED_PARAMETER(Parameter);	UNREFERENCED_PARAMETER(Wait); 	UNREFERENCED_PARAMETER(WaitResult);

	// Do something when the wait is over.
	_tprintf(_T("MyWaitCallback: wait is over.\n"));
}


//
// Thread pool timer callback function template
//
VOID CALLBACK MyTimerCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_TIMER Timer)
{
	// Instance, Parameter, and Timer not used in this example.
	UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(Parameter); UNREFERENCED_PARAMETER(Timer);

	// Do something when the timer fires.
	_tprintf(_T("MyTimerCallback: timer has fired.\n"));
}


//
// This is the thread pool work callback function.
//
VOID CALLBACK MyWorkCallback( PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WORK Work)
{
	// Instance, Parameter, and Work not used in this example.
	UNREFERENCED_PARAMETER(Instance);UNREFERENCED_PARAMETER(Parameter);UNREFERENCED_PARAMETER(Work);

	// Do something when the work callback is invoked.
	_tprintf(_T("MyWorkCallback: Task performed.\n"));
}

 

int main(void)
{
	PTP_WAIT Wait = NULL;
	PTP_WAIT_CALLBACK waitcallback = MyWaitCallback;
	HANDLE hEvent = NULL;
	UINT i = 0;
	UINT rollback = 0;

	// Create an auto-reset event and initialized as nonsignaled.
	hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);

	if (NULL == hEvent) {
		// Error Handling
		return 0;
	}

	rollback = 1; // CreateEvent succeeded

	Wait = CreateThreadpoolWait(waitcallback,
		NULL,     // 回调函数的输入参数
		NULL);    // 使用缺省的回调环境

	if (NULL == Wait) {
		_tprintf(_T("CreateThreadpoolWait failed. LastError: %u\n"),GetLastError());
		goto new_wait_cleanup;
	}

	rollback = 2; // CreateThreadpoolWait succeeded

				  // must re-register the event with the wait object before signaling it each time to trigger the wait callback
				  // each time before signaling the event to trigger the wait callback.
	for (i = 0; i < 5; i++) {
		SetThreadpoolWait(Wait, //线程池等待对象
			hEvent,             //内核等待对象
			NULL);              //超时设定

		SetEvent(hEvent);       //触发内核等待对象

		// Delay for the waiter thread to act if necessary.
		Sleep(500);

		// Block here until the callback function is done executing.
		WaitForThreadpoolWaitCallbacks(Wait, FALSE);
	}

new_wait_cleanup:
	switch (rollback) {
	case 2:
		// Unregister the wait by setting the event to NULL.
		SetThreadpoolWait(Wait, NULL, NULL); //the wait object will cease to queue new callbacks (but callbacks already queued will still occur

		// Close the wait.
		CloseThreadpoolWait(Wait);

	case 1:
		// Close the event.
		CloseHandle(hEvent);

	default:
		break;
	}
	

	BOOL bRet = FALSE;
	PTP_WORK work = NULL;
	PTP_TIMER timer = NULL;
	PTP_POOL pool = NULL;
	PTP_WORK_CALLBACK workcallback = MyWorkCallback;
	PTP_TIMER_CALLBACK timercallback = MyTimerCallback;
	TP_CALLBACK_ENVIRON CallBackEnviron;
	PTP_CLEANUP_GROUP cleanupgroup = NULL;
	FILETIME FileDueTime;
	ULARGE_INTEGER ulDueTime;
	rollback = 0;

	InitializeThreadpoolEnvironment(&CallBackEnviron); //不使用缺省的线程池与缺省的回调环境

	// Create a custom, dedicated thread pool.
	pool = CreateThreadpool(NULL);

	if (NULL == pool) {
		_tprintf(_T("CreateThreadpool failed. LastError: %u\n"), GetLastError());
		goto main_cleanup;
	}

	rollback = 1; // pool creation succeeded

				  // The thread pool is made persistent simply by setting both the minimum and maximum threads to 1.
	SetThreadpoolThreadMaximum(pool, 1);
	bRet = SetThreadpoolThreadMinimum(pool, 1);

	if (FALSE == bRet) {
		_tprintf(_T("SetThreadpoolThreadMinimum failed. LastError: %u\n"),GetLastError());
		goto main_cleanup;
	}

	// Create a cleanup group for this thread pool.
	cleanupgroup = CreateThreadpoolCleanupGroup();

	if (NULL == cleanupgroup) {
		_tprintf(_T("CreateThreadpoolCleanupGroup failed. LastError: %u\n"),GetLastError());
		goto main_cleanup;
	}

	rollback = 2;  // Cleanup group creation succeeded

				   // Associate the callback environment with our thread pool.
	SetThreadpoolCallbackPool(&CallBackEnviron, pool);

	// Associate the cleanup group with our thread pool.
	// Objects created with the same callback environment as the cleanup group become members of the cleanup group.
	SetThreadpoolCallbackCleanupGroup(&CallBackEnviron, //回调环境
		cleanupgroup,                                   //Cleanup Group
		NULL);                                          //Cleanup Group的回调函数,当释放其所包含的对象之前先调用该回调函数


	// Create work with the callback environment.
	work = CreateThreadpoolWork(workcallback, //回调函数
		NULL,                                 //回调函数的输入参数
		&CallBackEnviron);                    //回调环境

	if (NULL == work) {
		_tprintf(_T("CreateThreadpoolWork failed. LastError: %u\n"), GetLastError());
		goto main_cleanup;
	}

	rollback = 3;  // Creation of work succeeded


				   // Submit the work to the pool. Because this was a pre-allocated work item (using CreateThreadpoolWork), it is guaranteed to execute.
	SubmitThreadpoolWork(work);	

	// Create a timer with the same callback environment.
	timer = CreateThreadpoolTimer(timercallback, //回调函数
		NULL,                                    //回调函数的输入参数
		&CallBackEnviron);                       //回调环境


	if (NULL == timer) {
		_tprintf(_T("CreateThreadpoolTimer failed. LastError: %u\n"), GetLastError());
		goto main_cleanup;
	}

	rollback = 4;  // Timer creation succeeded

				   // Set the timer to fire in one second.
	ulDueTime.QuadPart = (ULONGLONG)-(1 * 10 * 1000 * 1000);
	FileDueTime.dwHighDateTime = ulDueTime.HighPart;
	FileDueTime.dwLowDateTime = ulDueTime.LowPart;

	SetThreadpoolTimer(timer, //线程池定时器对象
		&FileDueTime,         //到期时间
		0,                    //周期时期,为0则表示一次性定时器
		0);                   //操作系统调用回调函数的最大延迟时间


	// Delay for the timer to be fired
	Sleep(1500);


	// Wait for all callbacks to finish.
	// CloseThreadpoolCleanupGroupMembers also releases objects that are members of the cleanup group, 
	// so it is not necessary to call close functions on individual objects after calling CloseThreadpoolCleanupGroupMembers.
	CloseThreadpoolCleanupGroupMembers(cleanupgroup, //Cleanup Group
		FALSE,                                       //为真则取消还未开始执行的pending的回调函数
		NULL);                                       //CleanupGroup回调函数的输入参数

	// Already cleaned up the work item with the
	// CloseThreadpoolCleanupGroupMembers, so set rollback to 2.
	rollback = 2;
	goto main_cleanup;

main_cleanup:
	// Clean up any individual pieces manually
	// Notice the fall-through structure of the switch.
	// Clean up in reverse order.

	switch (rollback) {
	case 4:
	case 3:
		// Clean up the cleanup group members.
		CloseThreadpoolCleanupGroupMembers(cleanupgroup,FALSE, NULL);
	case 2:
		// Clean up the cleanup group.
		CloseThreadpoolCleanupGroup(cleanupgroup);
	case 1:
		// Clean up the pool.
		CloseThreadpool(pool);
	default:
		break;
	}

	return 0;
}

關於IO線程池的一個範例:

#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h> 
#include <iostream>
#include <limits>
 

void PressEnterToContinue()
{
	std::cout << "Press ENTER to continue... " << std::flush;
	std::cin.ignore( (std::numeric_limits<std::streamsize>::max)( ), '\n');
}
//////////////////////////////////////////////////////////////////////////
#define  QMLX_ALLOC(sz)    HeapAlloc(GetProcessHeap(),0,sz)   
#define  QMLX_CALLOC(sz)   HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz)
#define  QMLX_SAFEFREE(p)  if(NULL != p){HeapFree(GetProcessHeap(),0,p);p=NULL;}

#define  QMLX_ASSERT(s)    if(!(s)){DebugBreak();}
#define  QMLX_BEGINTHREAD(Fun,Param)  CreateThread(NULL,0,\
                                     (LPTHREAD_START_ROUTINE)Fun,Param,0,NULL);

//////////////////////////////////////////////////////////////////////////
#define MAXWRITEPERTHREAD 2  //每个线程最大写入次数
#define MAXWRITETHREAD    2   //写入线程的数量

#define OP_READ    0x01  //读操作
#define OP_WRITE   0x02  //写操作
//#pragma pack(show)
//单IO数据
typedef struct __declspec(align(16)) _tagPerIoData {
	OVERLAPPED  m_ol;
	HANDLE      m_hFile;   //操作的文件句柄
	DWORD       m_dwOp;    //操作类型,OP_READ或OP_WRITE
	LPVOID      m_pData;   //操作的数据
	UINT        m_nLen;    //操作的数据长度
	DWORD       m_dwWrite; //写入的字节数 
	DWORD       m_dwTimestamp; //起始操作的时间戳
}PER_IO_DATA, *PPER_IO_DATA;

//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
	ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio);

//写文件的线程
DWORD WINAPI WriteThread(LPVOID lpParam);

//当前操作的文件对象的指针
LARGE_INTEGER  g_liFilePointer = { 0 };

//IOCP线程池
PTP_IO  g_pThreadpoolIo = NULL;

//////////////////////////////////////////////////////////////////////////
//获取可模块的路径名(路径后含‘\’)
VOID  GetAppPath(LPTSTR pszBuffer) {
	DWORD dwLen = 0;
	if (0 == (dwLen = GetModuleFileName(NULL, pszBuffer, MAX_PATH)))
		return;

	for (DWORD i = dwLen; i > 0; i--) {
		if ('\\' == pszBuffer[i]) {
			pszBuffer[i + 1] = '\0';
			break;
		}
	}
}

int _tmain() {
	 
	_tsetlocale(LC_ALL, _T("chs"));
	TCHAR pFileName[MAX_PATH] = {};
	GetAppPath(pFileName);
	StringCchCat(pFileName, MAX_PATH, _T("NewIOCPFile.txt"));

	HANDLE ahWThread[MAXWRITETHREAD] = {};
	DWORD dwWrited = 0;

	//创建文件
	HANDLE hTxtFile = CreateFile(pFileName, GENERIC_WRITE, 0, NULL,
		CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
	if (INVALID_HANDLE_VALUE == hTxtFile) {
		_tprintf(_T("CreateFile(%s)失败,错误码:%u\n"), GetLastError());
		_tsystem(_T("PAUSE"));
		return 0;
	}

	//初始化线程池回调环境
	TP_CALLBACK_ENVIRON poolEnv = {};
	InitializeThreadpoolEnvironment(&poolEnv);

	//创建IOCP线程池
	g_pThreadpoolIo = CreateThreadpoolIo(hTxtFile, (PTP_WIN32_IO_CALLBACK)IOCPCallback, hTxtFile, &poolEnv);

	//启动IOCP线程池
	StartThreadpoolIo(g_pThreadpoolIo);

	//写入UNICODE文件的前缀码,以便正确打开
	PER_IO_DATA* pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
	QMLX_ASSERT(pIo != NULL);

	pIo->m_dwOp = OP_WRITE;
	pIo->m_hFile = hTxtFile;
	pIo->m_pData = QMLX_CALLOC(sizeof(WORD));
	QMLX_ASSERT(pIo->m_pData != NULL);
	*((WORD*)pIo->m_pData) = MAKEWORD(0xFF, 0xFE);
	pIo->m_nLen = sizeof(WORD);

	//偏移文件指针
	pIo->m_ol.Offset = g_liFilePointer.LowPart;
	pIo->m_ol.OffsetHigh = g_liFilePointer.HighPart;
	g_liFilePointer.QuadPart += pIo->m_nLen;
	pIo->m_dwTimestamp = GetTickCount(); //记录时间戳

	WriteFile(hTxtFile, pIo->m_pData, pIo->m_nLen,
		&pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);

	//等待IOCP线程池完成操作
	WaitForThreadpoolIoCallbacks(g_pThreadpoolIo, FALSE);

	//启动写入线程进行日志写入操作
	for (int i = 0; i < MAXWRITETHREAD; i++) {
		ahWThread[i] = QMLX_BEGINTHREAD(WriteThread, hTxtFile);
	}

	//让主线程等待这些写入线程结束
	WaitForMultipleObjects(MAXWRITETHREAD, ahWThread, TRUE, INFINITE);

	for (int i = 0; i < MAXWRITETHREAD; i++) {
		CloseHandle(ahWThread[i]);
	}

	//关闭IOCP线程池
	CloseThreadpoolIo(g_pThreadpoolIo);

	//关闭日志文件
	if (INVALID_HANDLE_VALUE != hTxtFile) {
		CloseHandle(hTxtFile);
		hTxtFile = INVALID_HANDLE_VALUE;
	}

	_tsystem(_T("PAUSE"));

	return 0;
}

//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
	ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio)
{ 
 
	if (NO_ERROR != IoResult) {
		_tprintf(_T("I/O操作出错,错误码:%u\n"), IoResult);
		return;
	}

	PPER_IO_DATA pIo = CONTAINING_RECORD((LPOVERLAPPED)pOverlapped, PER_IO_DATA, m_ol);
	DWORD dwCurTimestamp = GetTickCount();

	switch (pIo->m_dwOp)
	{
	case OP_WRITE://写操作结束
	{//写入操作结束
		_tprintf(_T("线程[0x%x]得到IO完成通知,完成操作(%s),缓冲(0x%08x)长度(%ubytes),写入时间戳(%u)当前时间戳(%u)时差(%u)\n"),
			GetCurrentThreadId(), OP_WRITE == pIo->m_dwOp ? _T("Write") : _T("Read"),
			pIo->m_pData, pIo->m_nLen, pIo->m_dwTimestamp, dwCurTimestamp, dwCurTimestamp - pIo->m_dwTimestamp);

		QMLX_SAFEFREE(pIo->m_pData);
		QMLX_SAFEFREE(pIo);
	}
	break;

	case OP_READ: //读操作结束
		break;

	default:
		break;
	}
}

//写文件的线程
#define MAX_LOGLEN 256
DWORD WINAPI WriteThread(LPVOID lpParam)
{
	TCHAR pTxtContext[MAX_LOGLEN] = {};
	PPER_IO_DATA pIo = NULL;
	size_t szLen = 0;
	LPTSTR pWriteText = NULL;

	StringCchPrintf(pTxtContext, MAX_LOGLEN, _T("这是一条模拟的日志记录,由线程[0x%x]写入\r\n"),
		GetCurrentThreadId());
	StringCchLength(pTxtContext, MAX_LOGLEN, &szLen);

	szLen += 1;

	int i = 0;
	for (; i < MAXWRITEPERTHREAD; i++) {
		pWriteText = (LPTSTR)QMLX_CALLOC(szLen * sizeof(TCHAR));
		QMLX_ASSERT(NULL != pWriteText);
			StringCchCopy(pWriteText, szLen, pTxtContext);

		//为每个操作申请一个“单IO数据”结构体
		pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
		QMLX_ASSERT(pIo != NULL);

		pIo->m_dwOp = OP_WRITE;
		pIo->m_hFile = (HANDLE)lpParam;
		pIo->m_pData = pWriteText;
		pIo->m_nLen = (szLen - 1) * sizeof(TCHAR);

		//这里使用原子操作同步文件指针,写入不会相互覆盖
		//这个地方体现了lock-free算法的精髓,使用了基本的CAS操作控制文件指针
		//比传统的使用关键代码段并等待的方法,这里用的方法要轻巧的多,付出的代价也小
		*((LONGLONG*)&pIo->m_ol.Pointer) = InterlockedCompareExchange64(&g_liFilePointer.QuadPart,
			g_liFilePointer.QuadPart + pIo->m_nLen, g_liFilePointer.QuadPart);
		pIo->m_dwTimestamp = GetTickCount(); //记录时间戳

		StartThreadpoolIo(g_pThreadpoolIo);

		//写入
		WriteFile((HANDLE)lpParam, pIo->m_pData, pIo->m_nLen,
			&pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
		if (ERROR_IO_PENDING != GetLastError()) {
			CancelThreadpoolIo(g_pThreadpoolIo);
		}
	} 
	return i;
}

.NET Framework的線程池實現 編輯

命名空間System.Threading中的類ThreadPool提供一個線程池,該線程池可用於執行任務、傳送工作項、處理非同步 I/O、代表其他線程等待以及處理計時器。[3]

參看 編輯

參考文獻 編輯

  1. ^ MSDN:Using the Thread Pool Functions. [2017-02-22]. (原始內容存檔於2017-07-24). 
  2. ^ MSDN:Thread Pool API. [2017-02-28]. (原始內容存檔於2017-10-04). 
  3. ^ MSDN: ThreadPool 类. [2017-02-22]. (原始內容存檔於2017-02-23).