Tuesday, April 28, 2009

ThreadPool is useful when I treat a group of threads

Crafting and using threads is almost as sensitive as allocating and maneuvering memory, at least to me. So, I spent more time looking at the result of memory check tools like “Valgrind,” or allocated more time to create memory management mechanism when I program with C++. I guess these are some of the reasons why I feel more comfortable and effortless when I using Java to test the logic or algorithm of a program.

Almost all the time when I needed tasks to be executed concurrently, I needed a mechanism to manage group of threads. “ThreadPool” is simple class that take care of a pool of threads which run tasks fed through “Command” interface. This pool is very simple but can be used in most common cases. If I need prioritize the tasks, then changing data structure of queue from list to priority_queue will be sufficient. If I need canceling of a task, I can add search and remove of the task from queue to “ThreadPool” class.

“ThreadPool” initialize specified number of workers by using pthread_create(). After that I can add task using addCommand(). The safeguarding of running task which implemented through interface of “Command” is completely up to user. So, if there are any critical sections inside running tasks, use pthread_mutex to protect them.

Here is source code of “ThreadPool.” First the header,

#ifndef THREADPOOL_H_
#define THREADPOOL_H_

#include <list>
#include <pthread.h>

struct Command {
virtual void run()=0;
};

void* worker(void* args);

class ThreadPool {
public:
class ThreadCreationError {};

ThreadPool(int poolSize): poolSize(poolSize) {
inErrorStatus = false;
pRunners = new pthread_t[poolSize];
bootUp(poolSize);
}

virtual ~ThreadPool() {
shutDown();
delete[] pRunners;
}

void addCommand(Command& com);
private:
int poolSize;
bool inErrorStatus;
bool workerLoop;
pthread_t* pRunners;
pthread_attr_t attr;
pthread_mutex_t queueLock;
pthread_cond_t signal;
std::list<Command*> queue;

void bootUp(int size);
void shutDown();
friend void* worker(void* args);
};

#endif /* THREADPOOL_H_ */

Next, cpp code,

#include "ThreadPool.h"

void ThreadPool::bootUp(int size) {
workerLoop = true;
pthread_mutex_init(&queueLock, NULL);
pthread_cond_init(&signal, NULL);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
for (int i = 0; i < size; ++i) {
if (pthread_create(&pRunners[i], &attr, worker, this) != 0) {
if (i != 0) shutDown();
inErrorStatus = true;
break;
}
}
}

void ThreadPool::shutDown() {
workerLoop = false;
pthread_cond_broadcast(&signal);
for (int i = 0; i < poolSize; ++i) pthread_join(pRunners[i], NULL);
pthread_attr_destroy(&attr);
pthread_cond_destroy(&signal);
pthread_mutex_destroy(&queueLock);
}

void ThreadPool::addCommand(Command& com) {
pthread_mutex_lock(&queueLock);
queue.push_back(&com);
pthread_cond_signal(&signal);
pthread_mutex_unlock(&queueLock);
}

void* worker(void* args) {
ThreadPool& pool = *static_cast<ThreadPool*>(args);
std::list<Command*>& queue = pool.queue;
pthread_mutex_t& queueLock = pool.queueLock;
pthread_cond_t& signal = pool.signal;
bool& loop = pool.workerLoop;
Command* pWorker;
while (loop || queue.size() > 0) {
pWorker = NULL;
pthread_mutex_lock(&queueLock);
if (queue.size() == 0)
pthread_cond_wait(&signal, &queueLock);
if (queue.size() > 0) {
pWorker = queue.front();
queue.pop_front();
if (queue.size() > 0)
pthread_cond_signal(&signal);
}
pthread_mutex_unlock(&queueLock);
if (pWorker) {
pWorker->run();
}
}
pthread_exit(NULL);
}

As before, usage is simple. First, I need to instantiate “ThreadPool” with pool size.

ThreadPool pool(5);

And prepare my task.

struct TestCommand: public Command {
int val;

TestCommand(int val): val(val) {}

void run() {
cout << "Worker[" << val << "," << pthread_self() << "] is running" << endl;
sleep(1);
}
};

Then, run it putting task using addCommand().

TestCommand com(100);
pool.addCommand(com);

No comments:

Post a Comment