在下面的代码中,我正在创建一个producer thread
和n
consumer threads
,queue
并从其专用内容和打印内容读取到stdout
。该代码有时会在语句时崩溃consumerQueues[id]->empty()
。通过调试去我看到consumerQueues[id]
是0x0
当它崩溃。现在在init()
函数中,我在创建工作程序之前创建了ith
使用者。我不确定为什么会保留下去。请帮我弄清楚发生了什么。queue
ith
thread
consumerQueues[id]
0x0
#include <thread>
#include <queue>
#include <memory>
#include <iostream>
#include <mutex>
#include <condition_variable>
class Test
{
private:
void producer()
{
while(true)
{
std::string s = "abc";
for(const auto& q : consumerQueues)
{
std::unique_lock<std::mutex> lock(mutex);
q->push(s);
condition_variable.notify_all();
}
}
}
void consumer(int id)
{
while (true)
{
std::string job;
{
std::unique_lock<std::mutex> lock(mutex);
while(consumerQueues[id]->empty())
{
condition_variable.wait(lock);
}
job = consumerQueues[id]->front();
consumerQueues[id]->pop();
}
std::cout << "ID "<< id << " job " << job << std::endl;
}
}
std::mutex mutex;
std::condition_variable condition_variable;
std::vector<std::thread> workers;
std::vector<std::shared_ptr<std::queue<std::string>>> consumerQueues;
std::thread producerThread;
public:
Test(const unsigned n_threads):
workers(std::vector<std::thread>(n_threads))
{}
Test(const Test &) = delete;
Test(Test &&) = delete;
Test & operator=(const Test &) = delete;
Test & operator=(Test &&) = delete;
void init()
{
for (unsigned i = 0; i < workers.size(); ++i)
{
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}
~Test()
{
producerThread.join();
for (unsigned i = 0; i < workers.size(); ++i)
{
if(workers[i].joinable())
{
workers[i].join();
}
}
}
};
int main()
{
Test t(1000);
t.init();
return 0;
}
您的init函数正在修改没有互斥量的std :: vector。这将在线程逐个启动的同时修改向量。
为了使这项工作有效,您的init函数必须是这样的:
void init() {
for (unsigned i = 0; i < workers.size(); ++i) {
std::unique_lock<std::mutex> lock(mutex);
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}
来自:http : //www.cplusplus.com/reference/vector/vector/push_back/
数据竞赛
容器已修改。如果发生重新分配,则会修改所有包含的元素。否则,将不访问任何现有元素,并且安全地并发访问或修改它们。
重新分配通常从0个元素开始到1000个时发生。因此,您还可以保留向量的大小,以确保没有发生重新分配:
void init() {
consumerQueues.reserve(workers.size());
for (unsigned i = 0; i < workers.size(); ++i) {
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句