我有一个entities
包含4400万个名字的向量。我想将其分为4个部分,并并行处理每个部分。类Freebase
包含loadData()
用于拆分向量和调用函数multiThread
以进行处理的函数。
loadEntities()
读取包含名称的文本文件。我没有将实现放在类中,因为它并不重要loadData()
将entities
在构造函数中初始化的向量分为4部分,并将各部分相加vector<thread> threads
,如下所示:threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
i
并且i+right
是在多线程的for循环中用于遍历实体的索引returnValues
是的子函数,multiThread
用于调用外部函数。cout <<"Entity " << entities[i] << endl;
显示以下结果:
最后2个输出错误。输出应为:
Entity name
不 entity entity name
也不entity name name
在将输入发送给function时,这会导致分段错误returnValues
。我该如何解决?
#ifndef FREEBASE_H
#define FREEBASE_H
class Freebase
{
public:
Freebase(const std::string &, const std::string &, const std::string &, const std::string &);
void loadData();
private:
std::string _serverURL;
std::string _entities;
std::string _xmlFile;
void multiThread(int,int, std::vector<std::pair<std::string, std::string>> &);
//private data members
std::vector<std::string> entities;
};
#endif
#include "Freebase.h"
#include "queries/SparqlQuery.h"
Freebase::Freebase(const string & url, const string & e, const string & xmlFile, const string & tfidfDatabase):_serverURL(url), _entities(e), _xmlFile(xmlFile), _tfidfDatabase(tfidfDatabase)
{
entities = loadEntities();
}
void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
for(int i = start; i < end; i++)
{
cout <<"Entity " << entities[i] << endl;
vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
string desc = "";
for(auto &d: description)
{
desc += d.first + " ";
}
data.push_back(make_pair(entities[i], desc));
}
}
void Freebase::loadData()
{
vector<pair<string, string>> data;
vector<thread> threads;
int Size = entities.size();
//split database into 4 parts
int p = 4;
int right = round((double)Size / (double)p);
int left = Size % p;
float totalduration = 0;
vector<pair<int, int>> coordinates;
int counter = 0;
for(int i = 0; i < Size; i += right)
{
if(i < Size - right)
{
threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
}
else
{
threads.push_back(thread(&Freebase::multiThread, this, i, Size, ref(data)));
}
}//end outer for
for(auto &t : threads)
{
t.join();
}
}
vector<pair<string, string>> Freebase::returnValues(const string & query)
{
vector<pair<string, string>> data;
SparqlQuery sparql(query, _serverURL);
string result = sparql.retrieveInformations();
istringstream str(result);
string line;
//skip first line
getline(str,line);
while(getline(str, line))
{
vector<string> values;
line.erase(remove( line.begin(), line.end(), '\"' ), line.end());
boost::split(values, line, boost::is_any_of("\t"));
if(values.size() == 2)
{
pair<string,string> fact = make_pair(values[0], values[1]);
data.push_back(fact);
}
else
{
data.push_back(make_pair(line, ""));
}
}
return data;
}//end function
编辑:Arnon Zilca在他的评论中是正确的。您正在从多个线程(在中Freebase::multiThread()
)写入单个向量,这是灾难的根源。您可以如下所述使用互斥锁来保护push_back操作。
有关容器上线程安全的更多信息,请参见std :: vector或boost :: vector线程安全吗?。
所以:
mtx.lock();
data.push_back(make_pair(entities[i], desc));
mtx.unlock();
另一个选择是使用与returnValues相同的策略,在multiThread中创建局部向量,并且仅在线程完成处理后才将内容推入数据向量。
所以:
void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
vector<pair<string,string>> threadResults;
string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
for(int i = start; i < end; i++)
{
cout <<"Entity " << entities[i] << endl;
vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
string desc = "";
for(auto &d: description)
{
desc += d.first + " ";
}
threadResults.push_back(make_pair(entities[i], desc));
}
mtx.lock()
data.insert(data.end(), threadResults.begin(), threadResults.end());
mtx.unlock()
}
注意:我建议使用与用于cout的互斥锁不同的互斥锁。总体结果向量data
与资源是不同的资源cout
。因此,想要使用的线程cout
不必等待其他线程结束data
。
/编辑
您可以在周围使用互斥锁
cout <<"Entity " << entities[i] << endl;
这样可以防止多个线程“同时”使用cout。这样,您可以确保在另一个线程可以打印一条消息之前,一个线程可以打印一条完整的消息。请注意,这将影响性能,因为线程必须等待互斥体可用后才允许打印。
注意:保护cout仅会清除流中的输出,不会影响其余代码的行为,请参见上文。
有关示例,请参见http://www.cplusplus.com/reference/mutex/mutex/lock/。
// mutex::lock/unlock
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
std::mutex mtx; // mutex for critical section
void print_thread_id (int id) {
// critical section (exclusive access to std::cout signaled by locking mtx):
mtx.lock();
std::cout << "thread #" << id << '\n';
mtx.unlock();
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);
for (auto& th : threads) th.join();
return 0;
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句