首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ThreadPool在C++中的实现

ThreadPool在C++中的实现
EN

Code Review用户
提问于 2017-08-31 14:26:50
回答 1查看 525关注 0票数 4

我用C++编写了一个简单的线程池实现,希望您的评论能提高我的理解和改进我的实现。我知道我在重复线程池的实现,但它是为了将我的概念放到真正的视角和学习中。

"TPool.h"

代码语言:javascript
复制
#pragma once
#include <vector>
#include <thread>
#include <memory>
#include <algorithm>
#include <queue>
#include <mutex>
#include <condition_variable>

class TPool;
class Task;

//reference of this interface is passed to the tasks being executed
//so that tasks can add new tasks to thread-pool
interface ITPoolTaskQueue
{
    void enQueueTask(Task& task);
    uint32_t countPendingTasks();
};

//task base class which can be inherited to create actual
//task which will be submitted to thread-pool by client code
class Task
{
public:
    virtual Task* clone() = 0;
    virtual void destroy(Task* t) = 0;
    virtual void execute(ITPoolTaskQueue& tpTasks) = 0;
};

//thread-pool class, which allows to init with number of threads
//as an arguent and allows to submit tasks to be executed by threads
class TPool : public ITPoolTaskQueue
{
    typedef std::unique_ptr<std::thread> ThreadPtr;
    typedef std::unique_ptr<Task> TaskPtr;
    typedef std::queue<TaskPtr> TaskQueue;

protected:
    TaskQueue _taskQ;                       //queue for storing tasks
    bool _stopThreads;                      //this flag is set to inform threads to exit
    std::mutex _mutexQ;                     //mutex used by task queue
    std::condition_variable _cvThread;      //condition variable to signal run and wait to threads
    std::vector<ThreadPtr> _threads;        //threads of thread-pool

    void destroyThread(ThreadPtr& thread);
    void createThread(ThreadPtr& thread);
    bool deQueueTask(TaskPtr& task);
    void clearQueue();

public:
    explicit TPool(unsigned int numberOfThreads = 0);
    virtual ~TPool();

    void init();
    void deInit();
    void threadFunction();                  //this will be called by each thread of thread-pool

    virtual void enQueueTask(Task& task);
    virtual uint32_t countPendingTasks();
};

"TPool.cpp"

代码语言:javascript
复制
#include "stdafx.h"
#include "TPool.h"

TPool::TPool(unsigned int numberOfThreads)
{
    auto poolSize = 0 == numberOfThreads ? std::thread::hardware_concurrency() : numberOfThreads;

    while (poolSize--)
    {
        _threads.push_back(nullptr);
    }
}

TPool::~TPool()
{
    try
    {
        deInit();
    }

    catch (...)
    {}
}

void TPool::destroyThread(ThreadPtr& thread)
{
    if (thread.get() != nullptr)
    {
        if (thread->joinable())
        {
            thread->join();
        }

        thread.reset();
    }
}

void TPool::createThread(ThreadPtr& thread)
{
    thread = std::make_unique<ThreadPtr::element_type>(&TPool::threadFunction, this);
}

void TPool::init()
{
    _stopThreads = false;
    std::for_each(_threads.begin(), _threads.end(), [this](ThreadPtr& thread){createThread(thread);});
}

void TPool::deInit()
{
    _stopThreads = true;
    clearQueue();
    _cvThread.notify_all();
    std::for_each(_threads.begin(), _threads.end(), [this](ThreadPtr& thread){destroyThread(thread);});
    _threads.clear();
}

uint32_t TPool::countPendingTasks()
{
    return _taskQ.size();
}

void TPool::enQueueTask(Task& task)
{
    std::unique_lock<std::mutex> lock(_mutexQ);
    TaskPtr tptr(task.clone());
    _taskQ.push(std::move(tptr));
    _cvThread.notify_one();
}

bool TPool::deQueueTask(TaskPtr& task)
{
    std::unique_lock<std::mutex> lock(_mutexQ);

    if (_taskQ.empty())
    {
        task = nullptr;
        return false;
    }
    else
    {
        task = std::move(_taskQ.front());
        _taskQ.pop();
        return true;
    }
}

void TPool::clearQueue()
{
    std::unique_lock<std::mutex> lock(_mutexQ);
    while (!_taskQ.empty())
    {
        TaskPtr t;
        t = std::move(_taskQ.front());
        t->destroy(t.release());
        _taskQ.pop();
    }
}

void TPool::threadFunction()
{
    TaskPtr task(nullptr);

    while (!_stopThreads)
    {
        if (deQueueTask(task))
        {
            task->execute(*this);
            task->destroy(task.release());
        }
        else
        {
            std::unique_lock<std::mutex> lock(_mutexQ);
            _cvThread.wait(lock, [this]{return !_taskQ.empty() || _stopThreads; });
        }
    }
}

一些客户端测试代码

代码语言:javascript
复制
class EnmFolder : public Task
{
public:
    virtual Task* clone()
    {
        Task * t = new EnmFolder;
        return t;
    }

    virtual void destroy(Task* t)
    {
        delete t;
    }

    virtual void execute(ITPoolTaskQueue& tpTQ)
    {
    }

    std::string _folder;
};

void someTest()
{
    EnmFolder enumFolderTask;
    enumFolderTask._folder = "C:";
    TPool threadPool;

    threadPool.init();
    threadPool.enQueueTask(enumFolderTask);
    threadPool.enQueueTask(enumFolderTask);
    threadPool.enQueueTask(enumFolderTask);
    threadPool.enQueueTask(enumFolderTask);
    threadPool.enQueueTask(enumFolderTask);
    //require some way to wait here before deInit()
    threadPool.deInit();
}
EN

回答 1

Code Review用户

发布于 2017-08-31 16:57:59

  1. 如果你能帮忙的话,不要使用非标准的关键词。使用interface免费限制了可移植性。
  2. 无论如何,我不知道为什么要定义接口ITPoolTaskQueue,因为它只有一个实现。如果你放弃了它,你就可以消除那里的虚拟脱轨。
  3. 接下来是Task
    1. 我不知道你为什么要克隆一个任务。啊,因为你只排过一次队。那么,你希望在那之后原版会发生什么呢?
    2. destroy(Task* t)让我目瞪口呆。你为什么要让一个Task去摧毁另一个Task?如果它应该摧毁自己,你为什么不打电话给delete并使用一个虚拟的dtor呢?
    3. 为什么不使用operator()来执行Task?另外,您确定在这里传递线程池而不是依赖任务的创建者传递所需的一切是一个好主意吗?如果任务应该使用不同的任务池怎么办?
    4. 无论如何,考虑一下简单地使用std::function是否比定义自己的要好。

  4. 现在转到TPool
    1. TaskQueue是一个简单的、私有的、单用途的typedef。内衬它将有助于可读性。
    2. 考虑简化接口:将init()与ctor集成,deinit()与dtor集成。
    3. threadFunction()似乎是一个实现细节。考虑以这样的方式对待它。
    4. 当队列包含一个null指针时,这是一个错误,因此您可以返回一个null指针,以指示一个元素去排队失败。签订了一个更好的合同。
    5. countPendingTasks()是一种具有并发排队和脱队列的数据竞争.你得锁门。
    6. 您知道,您可以通过调用ctor来创建一个包含任意数量默认构造元素的向量吗?您也知道默认构造的std::thread并不代表线程?
票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/174478

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档