ENGLISH 意见建议 网站地图 网站帮助
广泛智力汇聚   高效成果传播   先进机制培育
联盟首页  |  协同开发  |  开放源码库  |  安全告警  |  开源导航  |  文档中心  |  服务支持  |  共创论坛  |  关于联盟


注册会员 网站帮助
    您的位置 »
    今天是: 2010年11月22日    
项目搜索

完全匹配   
开源软件
软件分类表
新发布软件
其它网站镜像
代码片断
协同开发
文档
论坛
寻求协助
热点项目
站点状态
编译工厂

联系我们
关于联盟

代码片段库:
查看代码片段

浏览 | 提交新的代码片段 | 创建代码包

线程池

类型:
Class
类别:
Other
许可证:
GNU General Public License
语言:
C++
 
描述:
WIN32线程池基本实现
可向线程池传递C函数和继承于ThreadJob的类对象;

该代码片段的版本系列:

片段ID 下载版本 提交时间 提交人 删除
48520.12004-11-22 16:25forzxp

点击"下载版本"来下载该代码片段.


最新版本的代码片段: 0.1


/******************************************************************
*  Thread Pool For Win32 
*  VC++ 6, BC++ 5.5(Free), GCC(Free)
*  Update : 2004.6.9 llBird

Use:
1):
void threadfunc(void *p)
{
	//...
}
	ThreadPool tp;
	for(i=0; i<100; i++)
		tp.Call(threadfunc);

	ThreadPool tp(20);//20为初始线程池规模
	tp.Call(threadfunc, lpPara);
	tp.AdjustSize(50);//增加50
	tp.AdjustSize(-30);//减少30


2):
class MyThreadJob : public ThreadJob //线程对象从ThreadJob扩展
{
public:
	virtual void DoJob(void *p)//自定义的虚函数
	{
		//....
	}
};
	MyThreadJob mt[10];
	ThreadPool tp;
	for(i=0; i<100 i++)
		tp.Call(mt + i);//tp.Call(mt + i, para);

*******************************************************************/
#ifndef _ThreadPool_H_
#define _ThreadPool_H_

#pragma warning(disable: 4530)
#pragma warning(disable: 4786)

#include <cassert>
#include <vector>
#include <queue>
#include <windows.h>


class ThreadJob	 //工作基类
{
public:
	//供线程池调用的虚函数
	virtual void DoJob(void *pPara) = 0;
};

class ThreadPool
{
private:

	ThreadPool(const ThreadPool&);
	operator = (const ThreadPool&);

public:
	//dwNum 线程池规模
	ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0) 
	{
		InitializeCriticalSection(&_csThreadVector);
		InitializeCriticalSection(&_csWorkQueue);

		_EventComplete = CreateEvent(0, false, false, NULL);
		_EventEnd = CreateEvent(0, true, false, NULL);
		_SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
		_SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);

		assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
		assert(_EventComplete != INVALID_HANDLE_VALUE);
		assert(_EventEnd != INVALID_HANDLE_VALUE);
		assert(_SemaphoreDel != INVALID_HANDLE_VALUE);

		AdjustSize(dwNum <= 0 ? 4 : dwNum);
	}

	virtual ~ThreadPool()
	{
		EndAndWait(100);

		DeleteCriticalSection(&_csWorkQueue);

		CloseHandle(_EventEnd);
		CloseHandle(_EventComplete);
		CloseHandle(_SemaphoreCall);
		CloseHandle(_SemaphoreDel);
		
		vector<ThreadItem*>::iterator iter;
		for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
		{
			if(*iter)
				delete *iter;
		}

		DeleteCriticalSection(&_csThreadVector);
	}
	//调整线程池规模
	int AdjustSize(int iNum)
	{
		if(iNum > 0)
		{
			ThreadItem *pNew;
			EnterCriticalSection(&_csThreadVector);
			for(int _i=0; _i<iNum; _i++)
			{
				_ThreadVector.push_back(pNew = new ThreadItem(this)); 
				assert(pNew);
				pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
				assert(pNew->_Handle);
			}
			LeaveCriticalSection(&_csThreadVector);
		}
		else
		{
			iNum *= -1;
			ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
		}
		return (int)_lThreadNum;
	}
	//调用线程池
	void Call(void (*pFunc)(void  *), void *pPara = NULL)
	{
		assert(pFunc);

		EnterCriticalSection(&_csWorkQueue);
		_JobQueue.push(new JobItem(pFunc, pPara));
		LeaveCriticalSection(&_csWorkQueue);

		ReleaseSemaphore(_SemaphoreCall, 1, NULL);
	}
	//调用线程池
	inline void Call(ThreadJob * p, void *pPara = NULL)
	{
		Call(CallProc, new CallProcPara(p, pPara));
	}
	//结束线程池, 并同步等待
	bool EndAndWait(DWORD dwWaitTime = INFINITE)
	{
		SetEvent(_EventEnd);
		return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
	}
	//结束线程池
	inline void End()
	{
		SetEvent(_EventEnd);
	}
	inline DWORD Size()
	{
		return (DWORD)_lThreadNum;
	}
	inline DWORD GetRunningSize()
	{
		return (DWORD)_lRunningNum;
	}
	bool IsRunning()
	{
		return _lRunningNum > 0;
	}

protected:

	//工作线程
	static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
	{
		ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
		assert(pThread);

		ThreadPool *pThreadPoolObj = pThread->_pThis;
		assert(pThreadPoolObj);

		InterlockedIncrement(&pThreadPoolObj->_lThreadNum);

		HANDLE hWaitHandle[3];
		hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
		hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
		hWaitHandle[2] = pThreadPoolObj->_EventEnd;

		JobItem *pJob;
		bool fHasJob;
		
		for(;;)
		{
			DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);

			//响应删除线程信号
			if(wr == WAIT_OBJECT_0 + 1)	 
				break;
			
			//从队列里取得用户作业
			EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
			if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
			{
				pJob = pThreadPoolObj->_JobQueue.front();
				pThreadPoolObj->_JobQueue.pop();
				assert(pJob);
			}
			LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);

			//受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作)
			if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)	 
				break;

			if(fHasJob && pJob)
			{
				InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
				pThread->_dwLastBeginTime = GetTickCount();
				pThread->_dwCount++;
				pThread->_fIsRunning = true;
				pJob->_pFunc(pJob->_pPara); //运行用户作业
				delete pJob; 
				pThread->_fIsRunning = false;
				InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
			}
		}

		//删除自身结构
		EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
		pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
		LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);

		delete pThread;

		InterlockedDecrement(&pThreadPoolObj->_lThreadNum);

		if(!pThreadPoolObj->_lThreadNum)  //所有线程结束
			SetEvent(pThreadPoolObj->_EventComplete);

		return 0;
	}
	//调用用户对象虚函数
	static void CallProc(void *pPara) 
	{
		CallProcPara *cp = static_cast<CallProcPara *>(pPara);
		assert(cp);
		if(cp)
		{
			cp->_pObj->DoJob(cp->_pPara);
			delete cp;
		}
	}
	//用户对象结构
	struct CallProcPara  
	{
		ThreadJob* _pObj;//用户对象 
		void *_pPara;//用户参数
		CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
	};
	//用户函数结构
	struct JobItem 
	{
		void (*_pFunc)(void  *);//函数
		void *_pPara; //参数
		JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
	};
	//线程池中的线程结构
	struct ThreadItem
	{
		HANDLE _Handle;	//线程句柄
		ThreadPool *_pThis;	 //线程池的指针
		DWORD _dwLastBeginTime; //最后一次运行开始时间
		DWORD _dwCount; //运行次数
		bool _fIsRunning;
		ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
		~ThreadItem()
		{
			if(_Handle)
			{
				CloseHandle(_Handle);
				_Handle = NULL;
			}
		}
	};
	
	std::queue<JobItem *> _JobQueue;	 //工作队列
	std::vector<ThreadItem *>  _ThreadVector; //线程数据

	CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界

	HANDLE	_EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号
	long _lThreadNum, _lRunningNum; //线程数, 运行的线程数

};

#endif //_ThreadPool_H_

		

提交新版本

如果您修改了一个代码片段并且觉得很应该让别人共享,您可以把这作为这个代码片段的最新版本提交上来.


联盟团体会员
合作伙伴
© 共创软件联盟 版权所有
联盟服务条款 | 联盟隐私权规则 | 联系我们
电话: (8610)68313388-5949 | 传真: (8610)88377936
京ICP备05056057号