`
adapterofcoms
  • 浏览: 72270 次
  • 性别: Icon_minigender_1
  • 来自: 苏州
社区版块
存档分类
最新评论

c[not c++]版 ThreadPool base on winnt

阅读更多

QueueUserWorkItem,CreateTimerQueueTimer,BindIoCompletionCallback,RegisterWaitForSingleObject你用过吗,性能如何?CreateThreadpool你想用吗,那你就去安装Windows Vista,or 2008吧.

下面的车轮模型来自java util concurrent's thread pool模型.

作者:一个java程序员.

下面的程序没有安装logger,只是简单的printf.

 

threadpool.h

#ifndef _WHEELSOFT_THREADPOOL_H_
#define _WHEELSOFT_THREADPOOL_H_

 

#define MAX_THREADS 100
#define MAX_THREADS_MIN  10
#define MIN_WORKER_WAIT_TIMEOUT  60*1000
#define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_THREADPOOLS  32


typedef void (* runnable_t)(void);

int initializeThreadPool(void **tpp,int minPoolSize,int maxPoolSize,long  keepAliveTime);
int addTaskToThreadPool(void *tp,runnable_t toRun);
int shutdownThreadPool(void *tp);

 

#endif /* !_WHEELSOFT_THREADPOOL_H_ */

 

queuedthreadpool.c
#include "threadpool.h"
#include "stdio.h"
#include "windows.h"
 
/*QTP:QueuedThreadPool Model*/
 
typedef struct QTPTaskNode{
 runnable_t task;
 struct QTPTaskNode * next;
}QTPTaskNode;

typedef struct {
 volatile int counter;
 QTPTaskNode * head;
 QTPTaskNode * tail;

}QTPTaskQueue;

typedef struct {
 int minPoolSize;
 int maxPoolSize;
 long  keepAliveTime;//wait timeout
 
 volatile int   currentPoolSize;
 volatile int   currentBusyWorkers;
 volatile int shouldTerminate;

 QTPTaskQueue * p_tasks;
 
 CRITICAL_SECTION  putLock;
 CRITICAL_SECTION  takeLock;
 CRITICAL_SECTION  poolLock;
 
 HANDLE takeEvent;
 HANDLE terminateEvent; 

}QueuedThreadPool;

typedef struct {
 int counter;
 CRITICAL_SECTION  mainLock;
 QueuedThreadPool * QTPHandlers[MAX_THREADPOOLS];
 
}QTPMarshall;

static volatile long QTPMarshallerInitToken=0;
static volatile int QTPMarshallerInitedToken=0;

static QTPMarshall QTPMarshaller={0};

static DWORD workerForQueuedThreadPool(LPVOID);

static SYSTEM_INFO QTP_SystemInfoGetter;

 

/*return index+1;*/
static int isAvailableQTP(void * tp){ 
 if(tp!=NULL){
  QTPMarshall * _p=&QTPMarshaller;
  int index=0;
  for(;index<_p->counter;)
   if(_p->QTPHandlers[index++]==tp)
    return index+1;  
 }
 return 0;
}

 

static void * allocQTPFromQTPMarshall(){ 
  QTPMarshall * _p=&QTPMarshaller;
  void *tp=NULL;
  EnterCriticalSection(&_p->mainLock);
  if(_p->counter<MAX_THREADPOOLS){
   tp=malloc(sizeof(QueuedThreadPool));
   _p->QTPHandlers[_p->counter++]=tp;
  }   
  LeaveCriticalSection(&_p->mainLock);  
  return tp;
}


static int deleteQTPFromQTPMarshall(void * tp){
 if(tp!=NULL){
  QTPMarshall * _p=&QTPMarshaller;
  int index=0;
  EnterCriticalSection(&_p->mainLock);
  index=isAvailableQTP(tp);
  if(index--){
   for(;index<_p->counter-1;index++)
   _p->QTPHandlers[index]=_p->QTPHandlers[index+1];
   _p->counter--;
   LeaveCriticalSection(&_p->mainLock);//attention: don't forget LeaveCriticalSection when return.
   return 1;
  }
  LeaveCriticalSection(&_p->mainLock);  
 }
 return 0;
}


static void freeQTPFromQTPMarshall(void * tp){
  deleteQTPFromQTPMarshall(tp);
  free(tp);
}

 

static void adjustQTPLimits(QueuedThreadPool *tp){
  /*adjust size and timeout.*/
  if(tp->maxPoolSize <= 0) {
   tp->maxPoolSize = MAX_THREADS;
        } else if (tp->maxPoolSize < MAX_THREADS_MIN) {           
         tp->maxPoolSize = MAX_THREADS_MIN;
        }
        if(tp->minPoolSize >  tp->maxPoolSize) {
         tp->minPoolSize =  tp->maxPoolSize;
        }
        if(tp->minPoolSize <= 0) {
            if(1 == tp->maxPoolSize) {
             tp->minPoolSize = 1;
            } else {
             tp->minPoolSize = tp->maxPoolSize/2;
            }
        }       
        if(tp->keepAliveTime<MIN_WORKER_WAIT_TIMEOUT)
         tp->keepAliveTime=MIN_WORKER_WAIT_TIMEOUT;
        if(tp->keepAliveTime>MAX_WORKER_WAIT_TIMEOUT)
         tp->keepAliveTime=MAX_WORKER_WAIT_TIMEOUT;       
}

 

typedef struct{
 QueuedThreadPool * tp;
 HANDLE th;
}QTPWORKER_PARAM;

 

static int addQTPWorker(QueuedThreadPool *tp){
 //don't check the tp's state.
 if(tp->currentPoolSize<tp->minPoolSize||
  tp->currentPoolSize<tp->maxPoolSize&&
  (tp->currentPoolSize==tp->currentBusyWorkers||tp->p_tasks->counter>=((int)(QTP_SystemInfoGetter.dwNumberOfProcessors)+1)*tp->currentPoolSize)){
  int _tid; 
  EnterCriticalSection(&tp->poolLock);
  if(tp->currentPoolSize<tp->maxPoolSize){
   QTPWORKER_PARAM * _param=malloc(sizeof(QTPWORKER_PARAM));
   if(_param){
    _param->th=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)workerForQueuedThreadPool,_param,CREATE_SUSPENDED,&_tid);
    _param->tp=tp;
    ResumeThread(_param->th);
    tp->currentPoolSize++;
   }else{
    LeaveCriticalSection(&tp->poolLock);
    return 0;
   }    

  }  
  LeaveCriticalSection(&tp->poolLock);  
 }  
 return 1;
}

 

int initializeThreadPool(void **tpp,int minPoolSize,int maxPoolSize,long  keepAliveTime){ 
 //init QTPMarshall once.
 if(!InterlockedCompareExchange(&QTPMarshallerInitToken,1,0)){
  GetSystemInfo(&QTP_SystemInfoGetter);
  //QTPMarshaller.counter=0;
  InitializeCriticalSection(&QTPMarshaller.mainLock); 
  QTPMarshallerInitedToken=1;//don't Interlocked ?
 }

 if(tpp==NULL||!QTPMarshallerInitedToken||QTPMarshaller.counter>=MAX_THREADPOOLS)
  return 0;

 {//initialize QTP
  QueuedThreadPool *tp=allocQTPFromQTPMarshall();
  if(tp==NULL)
   return 0;

  tp->shouldTerminate=0;
  tp->minPoolSize=minPoolSize,tp->maxPoolSize=maxPoolSize,tp->keepAliveTime=keepAliveTime,tp->currentPoolSize=tp->currentBusyWorkers=0;
  adjustQTPLimits(tp); 
 
  {//init taskQueue
   QTPTaskNode * nullNode=malloc(sizeof(QTPTaskNode));
   QTPTaskQueue * queue=malloc(sizeof(QTPTaskQueue));  
   
   nullNode->task=NULL,nullNode->next=NULL;
   queue->counter=0,queue->head=queue->tail=nullNode;
   
   tp->p_tasks=queue;   
  }
 
  InitializeCriticalSection(&tp->putLock); 
  InitializeCriticalSection(&tp->takeLock);
  InitializeCriticalSection(&tp->poolLock);
  tp->takeEvent=CreateEvent(NULL,FALSE,FALSE,NULL);
  tp->terminateEvent=CreateEvent(NULL,TRUE,FALSE,NULL); 
      
  *tpp=tp;//last to do this.
  printf("initializeTP error:%d,\n",GetLastError());

 } 
 return 1;
}


static void insertQTPTaskNode(QTPTaskQueue * queue,QTPTaskNode * node){ 
 queue->tail->next=node,queue->tail->task=node->task,queue->tail=node,node->task=NULL;
}

 

static QTPTaskNode * extractQTPTaskNode(QTPTaskQueue * queue){
 QTPTaskNode * res=queue->head;  
 queue->head=res->next;
 res->next=NULL;
 return res;
}


int addTaskToThreadPool(void *tp,runnable_t toRun){
 if(isAvailableQTP(tp)){
  QueuedThreadPool * _tp=(QueuedThreadPool *)tp;  
  int oldCounter=0;  
  QTPTaskNode * task=NULL;
  
  if(_tp->shouldTerminate)
   return 0;
  //addQTPWorker
  addQTPWorker(tp);
  task=malloc(sizeof(QTPTaskNode));//note: free it when it end.
  if(task==NULL) //may be out of memory
   return 0;
  task->task=toRun,task->next=NULL;  
  //put task to taskQueue. must check the QTP's status !
  EnterCriticalSection(&_tp->putLock);
  if(!_tp->shouldTerminate){
   insertQTPTaskNode(_tp->p_tasks,task);
   oldCounter=InterlockedExchangeAdd(&_tp->p_tasks->counter,1);
   LeaveCriticalSection(&_tp->putLock);
  }else{
   LeaveCriticalSection(&_tp->putLock);
   free(task);
   return 0;
  }
  
  if(oldCounter<=0)
   SetEvent(_tp->takeEvent);   
  
  return 1;
 }else
  return 0; 
}


static void clearupQTPTaskQueue(QTPTaskQueue * queue){
 if(queue->counter<=0){
  if(queue->head!=queue->tail||queue->head->next!=NULL||queue->tail->next!=NULL)
  ;//log error:for test

  free(queue->tail);
  queue->head=queue->tail=NULL;
 
 }else{  
  //log error:for test
 }
}


/*it is asynchronous.*/
int shutdownThreadPool(void *tp){
 if(deleteQTPFromQTPMarshall(tp)){
  QueuedThreadPool * _tp=(QueuedThreadPool *)tp;
  
  EnterCriticalSection(&_tp->putLock/*just for prevent putting task .*/);
  _tp->shouldTerminate=1;
  LeaveCriticalSection(&_tp->putLock);

  //notify the last worker to clearup QTP.
  SetEvent(_tp->terminateEvent);
  return 1;
 }else
  return 0;
}

 

//Call it just once for per QTP.
static void clearupQueuedThreadPool(QueuedThreadPool * tp){
 if(tp->shouldTerminate){
 
  clearupQTPTaskQueue(tp->p_tasks);
  free(tp->p_tasks);
  
  DeleteCriticalSection(&tp->putLock);
  DeleteCriticalSection(&tp->takeLock);
  DeleteCriticalSection(&tp->poolLock);
  CloseHandle(tp->takeEvent);
  CloseHandle(tp->terminateEvent);

  freeQTPFromQTPMarshall(tp); 
 }
}

 

DWORD workerForQueuedThreadPool(LPVOID pparam){ 

 QTPWORKER_PARAM * param=(QTPWORKER_PARAM *)pparam;
 QueuedThreadPool * tp=param->tp; 

 QTPTaskNode * _task=NULL;
 HANDLE EVENTS[2];
 int curTaskNum=0,shouldTerminate=0,shouldClearupQTP=0;
 DWORD waitRtn; 
 if(!isAvailableQTP(tp))
  return 0;
 
 EVENTS[0]=tp->terminateEvent,EVENTS[1]=tp->takeEvent;//note the sequence.
 shouldTerminate=tp->shouldTerminate;
 
 while(!shouldTerminate){  
   
  if(GetLastError()!=0){
   printf("curThreadId:%d,err:%d\n",GetCurrentThreadId(),GetLastError());
   //shouldTerminate=1;
   //break;
   exit(0);//for test.
  }   
  
  while(1){  
   EnterCriticalSection(&tp->takeLock);
   if(tp->p_tasks->counter<=0)
   {
    LeaveCriticalSection(&tp->takeLock);    
    waitRtn=WaitForMultipleObjects(2,EVENTS,FALSE,tp->keepAliveTime);

    if(waitRtn==WAIT_OBJECT_0){     
     if(tp->p_tasks->counter>0)
      continue;
     else{
      shouldTerminate=1;
      break;
     }      
      
    }else if(waitRtn==WAIT_OBJECT_0+1){
      continue;
    }else if(waitRtn==WAIT_TIMEOUT){

     if(tp->currentBusyWorkers<=0&&tp->currentPoolSize>tp->minPoolSize){      
      EnterCriticalSection(&tp->poolLock);
      if(tp->currentPoolSize>tp->minPoolSize){
       shouldTerminate=2;
       /*Decrement currentPoolSize and shouldClearupQTP ? ,when timeout check.*/
       if(!--tp->currentPoolSize&&tp->shouldTerminate){
        shouldClearupQTP=1;
       }
       LeaveCriticalSection(&tp->poolLock);
       break;
      }else{
       LeaveCriticalSection(&tp->poolLock);
       continue;
      }      
     }else{
      continue;
     }
     
    }else{
     shouldTerminate=3;
     if(GetLastError()!=0){
      printf("curThreadId:%d,WaitForMultipleObjects_err:%d,waitRtn=%d\n",GetCurrentThreadId(),GetLastError(),waitRtn);
      exit(0);//for test.
     }
     break;
    }//end wait handle.       

   }else{
    //take task to _task
    _task=extractQTPTaskNode(tp->p_tasks); 
    curTaskNum=InterlockedExchangeAdd(&tp->p_tasks->counter,-1);
    LeaveCriticalSection(&tp->takeLock);
    
    if(curTaskNum>=2)
     SetEvent(tp->takeEvent);
    break;
   }
   
  }//end get _task loop;  
  
  //toRun the _task.
  if(_task!=NULL){
   InterlockedIncrement(&tp->currentBusyWorkers);
   addQTPWorker(tp);//adjust
   _task->task();
   free(_task);
   _task=NULL;
   InterlockedDecrement(&tp->currentBusyWorkers);
  }
 
 }//end thread main loop. 
  
 if(shouldTerminate!=2){
  EnterCriticalSection(&tp->poolLock);
  if(!--tp->currentPoolSize&&tp->shouldTerminate)
   shouldClearupQTP=1;
  LeaveCriticalSection(&tp->poolLock); 
 } 

 if(shouldClearupQTP)
  clearupQueuedThreadPool(tp); 

 CloseHandle(param->th);
 free(param);
 //CloseHandle(GetCurrentThread()); //<---it is last error:6
 return 1;
}

 

 

 

1
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics