我用C++编写了一个简单的线程池实现,希望您的评论能提高我的理解和改进我的实现。我知道我在重复线程池的实现,但它是为了将我的概念放到真正的视角和学习中。
#pragma once
#include <vector>
#include <thread>
#include <memory>
#include <algorithm>
#include <queue>
#include <mutex>
#include <condition_variable>
class TPool;
class Task;
//reference of this interface is passed to the tasks being executed
//so that tasks can add new tasks to thread-pool
interface ITPoolTaskQueue
{
void enQueueTask(Task& task);
uint32_t countPendingTasks();
};
//task base class which can be inherited to create actual
//task which will be submitted to thread-pool by client code
class Task
{
public:
virtual Task* clone() = 0;
virtual void destroy(Task* t) = 0;
virtual void execute(ITPoolTaskQueue& tpTasks) = 0;
};
//thread-pool class, which allows to init with number of threads
//as an arguent and allows to submit tasks to be executed by threads
class TPool : public ITPoolTaskQueue
{
typedef std::unique_ptr<std::thread> ThreadPtr;
typedef std::unique_ptr<Task> TaskPtr;
typedef std::queue<TaskPtr> TaskQueue;
protected:
TaskQueue _taskQ; //queue for storing tasks
bool _stopThreads; //this flag is set to inform threads to exit
std::mutex _mutexQ; //mutex used by task queue
std::condition_variable _cvThread; //condition variable to signal run and wait to threads
std::vector<ThreadPtr> _threads; //threads of thread-pool
void destroyThread(ThreadPtr& thread);
void createThread(ThreadPtr& thread);
bool deQueueTask(TaskPtr& task);
void clearQueue();
public:
explicit TPool(unsigned int numberOfThreads = 0);
virtual ~TPool();
void init();
void deInit();
void threadFunction(); //this will be called by each thread of thread-pool
virtual void enQueueTask(Task& task);
virtual uint32_t countPendingTasks();
};#include "stdafx.h"
#include "TPool.h"
TPool::TPool(unsigned int numberOfThreads)
{
auto poolSize = 0 == numberOfThreads ? std::thread::hardware_concurrency() : numberOfThreads;
while (poolSize--)
{
_threads.push_back(nullptr);
}
}
TPool::~TPool()
{
try
{
deInit();
}
catch (...)
{}
}
void TPool::destroyThread(ThreadPtr& thread)
{
if (thread.get() != nullptr)
{
if (thread->joinable())
{
thread->join();
}
thread.reset();
}
}
void TPool::createThread(ThreadPtr& thread)
{
thread = std::make_unique<ThreadPtr::element_type>(&TPool::threadFunction, this);
}
void TPool::init()
{
_stopThreads = false;
std::for_each(_threads.begin(), _threads.end(), [this](ThreadPtr& thread){createThread(thread);});
}
void TPool::deInit()
{
_stopThreads = true;
clearQueue();
_cvThread.notify_all();
std::for_each(_threads.begin(), _threads.end(), [this](ThreadPtr& thread){destroyThread(thread);});
_threads.clear();
}
uint32_t TPool::countPendingTasks()
{
return _taskQ.size();
}
void TPool::enQueueTask(Task& task)
{
std::unique_lock<std::mutex> lock(_mutexQ);
TaskPtr tptr(task.clone());
_taskQ.push(std::move(tptr));
_cvThread.notify_one();
}
bool TPool::deQueueTask(TaskPtr& task)
{
std::unique_lock<std::mutex> lock(_mutexQ);
if (_taskQ.empty())
{
task = nullptr;
return false;
}
else
{
task = std::move(_taskQ.front());
_taskQ.pop();
return true;
}
}
void TPool::clearQueue()
{
std::unique_lock<std::mutex> lock(_mutexQ);
while (!_taskQ.empty())
{
TaskPtr t;
t = std::move(_taskQ.front());
t->destroy(t.release());
_taskQ.pop();
}
}
void TPool::threadFunction()
{
TaskPtr task(nullptr);
while (!_stopThreads)
{
if (deQueueTask(task))
{
task->execute(*this);
task->destroy(task.release());
}
else
{
std::unique_lock<std::mutex> lock(_mutexQ);
_cvThread.wait(lock, [this]{return !_taskQ.empty() || _stopThreads; });
}
}
}一些客户端测试代码
class EnmFolder : public Task
{
public:
virtual Task* clone()
{
Task * t = new EnmFolder;
return t;
}
virtual void destroy(Task* t)
{
delete t;
}
virtual void execute(ITPoolTaskQueue& tpTQ)
{
}
std::string _folder;
};
void someTest()
{
EnmFolder enumFolderTask;
enumFolderTask._folder = "C:";
TPool threadPool;
threadPool.init();
threadPool.enQueueTask(enumFolderTask);
threadPool.enQueueTask(enumFolderTask);
threadPool.enQueueTask(enumFolderTask);
threadPool.enQueueTask(enumFolderTask);
threadPool.enQueueTask(enumFolderTask);
//require some way to wait here before deInit()
threadPool.deInit();
}发布于 2017-08-31 16:57:59
interface免费限制了可移植性。ITPoolTaskQueue,因为它只有一个实现。如果你放弃了它,你就可以消除那里的虚拟脱轨。Task:destroy(Task* t)让我目瞪口呆。你为什么要让一个Task去摧毁另一个Task?如果它应该摧毁自己,你为什么不打电话给delete并使用一个虚拟的dtor呢?operator()来执行Task?另外,您确定在这里传递线程池而不是依赖任务的创建者传递所需的一切是一个好主意吗?如果任务应该使用不同的任务池怎么办?std::function是否比定义自己的要好。TPool:TaskQueue是一个简单的、私有的、单用途的typedef。内衬它将有助于可读性。init()与ctor集成,deinit()与dtor集成。threadFunction()似乎是一个实现细节。考虑以这样的方式对待它。countPendingTasks()是一种具有并发排队和脱队列的数据竞争.你得锁门。std::thread并不代表线程?https://codereview.stackexchange.com/questions/174478
复制相似问题