分段故障多线程C ++ 11

哈尼·格克(Hani Goc)

介绍

我有一个entities包含4400万个名字的向量我想将其分为4个部分,并并行处理每个部分。Freebase包含loadData()用于拆分向量和调用函数multiThread以进行处理的函数

  • loadEntities()读取包含名称的文本文件。我没有将实现放在类中,因为它并不重要
  • loadData()entities在构造函数中初始化的向量分为4部分,并将各部分相加vector<thread> threads,如下所示:

threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
  • multiThread是我处理文件的功能
  • i并且i+right是在多线程的for循环中用于遍历实体的索引
  • returnValues是的子函数,multiThread用于调用外部函数。

问题

cout <<"Entity " << entities[i] << endl; 显示以下结果:

  • 实体m.0rzf6wv(确定)
  • 实体m.0rzf70(确定)
  • 实体m.068s4h9 m.0n_k8bz(WRONG)
  • 实体实体m.068s5_1(错误)

最后2个输出错误。输出应为:

  • Entity name entity entity name也不entity name name

在将输入发送给function时,这会导致分段错误returnValues我该如何解决?


源代码

#ifndef FREEBASE_H
#define FREEBASE_H

class Freebase
{
 public:
    Freebase(const std::string &, const std::string &, const std::string &, const std::string &);
    void loadData();
 private:
   std::string _serverURL;
   std::string _entities;
   std::string _xmlFile;
   void multiThread(int,int, std::vector<std::pair<std::string, std::string>> &);
   //private data members
   std::vector<std::string> entities;
};

#endif

#include "Freebase.h"
#include "queries/SparqlQuery.h"

Freebase::Freebase(const string & url, const string & e, const string & xmlFile, const string & tfidfDatabase):_serverURL(url), _entities(e), _xmlFile(xmlFile), _tfidfDatabase(tfidfDatabase)
{
  entities = loadEntities();
}

void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
  string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
  for(int i = start; i < end; i++)
  {
     cout <<"Entity " << entities[i] << endl;
     vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
     string desc = "";
     for(auto &d: description)
     {
       desc += d.first + " ";
     }
     data.push_back(make_pair(entities[i], desc));
  }
}


void Freebase::loadData()
{
  vector<pair<string, string>> data;
  vector<thread> threads;
  int Size = entities.size();
  //split database into 4 parts
  int p = 4;
  int right = round((double)Size / (double)p);
  int left = Size % p;
  float totalduration = 0;
  
  vector<pair<int, int>> coordinates;
  int counter = 0;
  for(int i = 0; i < Size; i += right)
  {

      if(i < Size - right)
      {
      threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
      }
      else
      {
      threads.push_back(thread(&Freebase::multiThread, this, i, Size, ref(data)));
      }
      
  }//end outer for
  
   for(auto &t : threads)
   {
      t.join();
   }
   
}


vector<pair<string, string>>  Freebase::returnValues(const string & query)
{
  vector<pair<string, string>> data;
  SparqlQuery sparql(query, _serverURL);
  string result = sparql.retrieveInformations();
  istringstream str(result);
  string line;
  //skip first line
  getline(str,line);
  while(getline(str, line))
  {
    vector<string> values;
    line.erase(remove( line.begin(), line.end(), '\"' ), line.end());
    
    boost::split(values, line, boost::is_any_of("\t"));
    if(values.size() == 2)
    {
      pair<string,string> fact = make_pair(values[0], values[1]);
      data.push_back(fact);
    }
    else
    {
      data.push_back(make_pair(line, ""));
    }
  }
  
  return data;
}//end function
蒂莫

编辑:Arnon Zilca在他的评论中是正确的。您正在从多个线程(在中Freebase::multiThread()写入单个向量,这是灾难的根源。您可以如下所述使用互斥锁来保护push_back操作。

有关容器上线程安全的更多信息,请参见std :: vector或boost :: vector线程安全吗?

所以:

mtx.lock();
data.push_back(make_pair(entities[i], desc));
mtx.unlock();

另一个选择是使用与returnValues相同的策略,在multiThread中创建局部向量,并且仅在线程完成处理后才将内容推入数据向量。

所以:

void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
  vector<pair<string,string>> threadResults;
  string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
  for(int i = start; i < end; i++)
  {
     cout <<"Entity " << entities[i] << endl;
     vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
     string desc = "";
     for(auto &d: description)
     {
       desc += d.first + " ";
     }
     threadResults.push_back(make_pair(entities[i], desc));
  }
  mtx.lock()
  data.insert(data.end(), threadResults.begin(), threadResults.end());
  mtx.unlock()
}

注意:我建议使用与用于cout的互斥锁不同的互斥锁。总体结果向量data与资源是不同的资源cout因此,想要使用的线程cout不必等待其他线程结束data

/编辑

您可以在周围使用互斥锁

cout <<"Entity " << entities[i] << endl;

这样可以防止多个线程“同时”使用cout。这样,您可以确保在另一个线程可以打印一条消息之前,一个线程可以打印一条完整的消息。请注意,这将影响性能,因为线程必须等待互斥体可用后才允许打印。

注意:保护cout仅会清除流中的输出,不会影响其余代码的行为,请参见上文。

有关示例,请参见http://www.cplusplus.com/reference/mutex/mutex/lock/

// mutex::lock/unlock
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex

std::mutex mtx;           // mutex for critical section

void print_thread_id (int id) {
  // critical section (exclusive access to std::cout signaled by locking mtx):
  mtx.lock();
  std::cout << "thread #" << id << '\n';
  mtx.unlock();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_thread_id,i+1);

  for (auto& th : threads) th.join();

  return 0;
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章