将SQL文件导入ElasticSearch

Android风暴

我想将数据从sql文件导入到ElasticSearch。我知道通过JBDC和Logstash的方式,但首先需要将数据加载到mysql中。由于sql文件很大,因此我想跳过这一部分并直接导入。有可能这样做吗?

编辑:我偶然发现了此解决方案,但也许有一个更简单的方法:链接

dadoonet

我在此处发布我在2015年撰写的博客文章内容,因此可能有点过时了(特别是elasticsearch部分-映射和logstash elasticsearch输出:是为Elasticsearch 1.7设计的),但在该方法中仍然有效。


最近,我得到了一个数据库MySQL转储,并且正在考虑将其导入elasticsearch。

出现的第一个想法是:

  • 安装MySQL
  • 导入数据库
  • 使用Logstash读取数据库并导入elasticsearch
  • 删除数据库
  • 卸载MySQL

好。我发现确实不需要某些步骤。

我实际上可以使用Elastic stack并创建一个简单的配方,该配方可用于导入SQL转储脚本,而无需实际将数据加载到数据库中,然后从数据库中再次读取。

用logstash解析SQL脚本

我从拥有的MySQL示例数据库中导出了一些数据。您可以下载相同的数据

SQL插入脚本

我们的对象被拆分到3个表上,但是我们不在这里进行联接。我们将仅从Person表中导入数据

让我们看一下脚本的重要内容:

--
-- Table structure for table `Person`
--

DROP TABLE IF EXISTS `Person`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `Person` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `children` int(11) DEFAULT NULL,
  `dateOfBirth` datetime DEFAULT NULL,
  `gender` varchar(255) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  `reference` varchar(255) DEFAULT NULL,
  `address_id` int(11) DEFAULT NULL,
  `marketing_id` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `FK_tagx64iglr1dxpalbgothv83r` (`address_id`),
  KEY `FK_j4ifv49erkwul9jruu15o40r4` (`marketing_id`),
  CONSTRAINT `FK_j4ifv49erkwul9jruu15o40r4` FOREIGN KEY (`marketing_id`) REFERENCES `Marketing` (`id`),
  CONSTRAINT `FK_tagx64iglr1dxpalbgothv83r` FOREIGN KEY (`address_id`) REFERENCES `Address` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10001 DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;

--
-- Dumping data for table `Person`
--

LOCK TABLES `Person` WRITE;
/*!40000 ALTER TABLE `Person` DISABLE KEYS */;
INSERT INTO `Person` VALUES (1,4,'1944-07-21 00:00:00','male','Joe Smith','0',1,1),...,(10000,0,'2009-09-10 00:00:00','female','Stephanie Rebecca','9999',10000,10000);
/*!40000 ALTER TABLE `Person` ENABLE KEYS */;

此文件中的两个重要部分:

  • CREATE TABLE 给我们所有列标题
  • INSERT INTO 是我们的数据

如果实际备份中的数据要多得多,那么您可能会有多于INSERT INTO一行。

首先,我们基本上需要在这里忽略备份的前108行。

然后让我们看第一行:

cat person.sql | head -109 | tail -1

给出:

INSERT INTO `Person` VALUES (1,4,'1944-07-21 00:00:00','male','Joe Smith','0',1,1),...,(10000,0,'2009-09-10 00:00:00','female','Stephanie Rebecca','9999',10000,10000);

用logstash阅读

我们需要做的是使用以下模式拆分每一行:

INSERT INTO `Person` VALUES (DATA),(DATA),***,(DATA);

我们可以mysql.conf为logstash创建一个以对此进行解析。与往常一样,从“空”开始:

input { stdin {} }

filter {
}

output { stdout { codec => rubydebug } }

然后,让我们忽略该INSERT INTO ...零件,并在名为的新字段中提取数据extracted_sql让我们为此使用grok过滤器

grok {
  match => {
    "message" => "INSERT INTO \`Person\` VALUES (%{GREEDYDATA:extracted_sql})"
  }
  remove_field => "message"
}

执行它:

cat person.sql | head -109 | tail -1 | bin/logstash -f mysql.conf

它给出了这样的内容:

{
         "@version" => "1",
       "@timestamp" => "2015-09-14T07:32:43.495Z",
             "host" => "MacBook-Air-de-David.local",
    "extracted_sql" => "(..),..(..);
}

现在,我们需要拆分extracted_sql多个事件。让我们添加一个拆分过滤器

split {
  terminator => "),("
  field => "extracted_sql"
}

再次启动,现在每表行给出一个事件:

{
         "@version" => "1",
       "@timestamp" => "2015-09-14T07:38:34.489Z",
             "host" => "MacBook-Air-de-David.local",
    "extracted_sql" => "1,4,'1944-07-21 00:00:00','male','Joe Smith','0',1,1"
}
// ...
{
         "@version" => "1",
       "@timestamp" => "2015-09-14T07:37:25.729Z",
             "host" => "MacBook-Air-de-David.local",
    "extracted_sql" => "8906,3,'1958-12-17 00:00:00','male','Gautier Titouan','8905',8906,8906"
}
// ...
{
         "@version" => "1",
       "@timestamp" => "2015-09-14T07:38:34.489Z",
             "host" => "MacBook-Air-de-David.local",
    "extracted_sql" => "10000,0,'2009-09-10 00:00:00','female','Stephanie Rebecca','9999',10000,10000"
}

听起来我们现在有了CSV结构...我们可以使用CSV过滤器GROK过滤器

Grok提供了更大的灵活性,因为它有助于为每个字段定义所需的正确数据类型。该CSV过滤器不能直接做的时刻Grok可以做到,但是它基于正则表达式,并且比优化分析CSV内容的CSV过滤器要慢得多。因此,我在这里进行交易的灵活性和易于使用的性能。

csv {
  source => "extracted_sql"
  quote_char => "'"
  columns => [ "id", 
    "children", "dateOfBirth", "gender", "name", 
    "reference", "address_id", "marketing_id" ]
  remove_field => "extracted_sql"
}

处理NULL

如果必须处理NULL值,只需在CSV过滤器之前添加:

mutate {
    gsub => [
      "extracted_sql", "NULL", ""
    ]
}

选择一个时间戳

我们还提供了各种格式的日期:

   "@timestamp" => "2015-09-14T07:38:34.489Z",
  "dateOfBirth" => "2009-09-10 00:00:00"

dateOfBirth显然是创建日期。@timestamp始终是内部logstash时间戳。我们想dateOfBirth成为我们的活动日期。

date {
    match => [ "dateOfBirth", "YYYY-MM-DD HH:mm:ss" ]
    remove_field => "dateOfBirth"
}

删除标题

到目前为止很好。但是标题部分呢?

好吧,我们有第一个grok模式尝试解析,INSERT ...因此如果失败,它将生成一个_grokparsefailure标记。我们可以删除包含以下内容的每一行:

# Just after the grok filter
if "_grokparsefailure" in [tags] {
  drop { }
} 

现在,我们可以在完整文件上运行logstash配置:

cat person.sql | bin/logstash -f mysql.conf

清理

我们现在输出:

{
        "@version" => "1",
      "@timestamp" => "1967-01-17T23:00:00.000Z",
            "host" => "MacBook-Air-de-David.local",
              "id" => "9999",
        "children" => "1",
          "gender" => "female",
            "name" => "Laetitia Lois",
       "reference" => "9998",
      "address_id" => "9999",
    "marketing_id" => "9999"
}

我们不需要保留@versionhost字段:

mutate {
  remove_field => [ "@version", "host" ]
}

它给:

{
      "@timestamp" => "1967-01-17T23:00:00.000Z",
              "id" => "9999",
        "children" => "1",
          "gender" => "female",
            "name" => "Laetitia Lois",
       "reference" => "9998",
      "address_id" => "9999",
    "marketing_id" => "9999"
}

连接到elasticsearch

不是最难的部分。但这可能是因为我从事弹性搜索已经有将近5年了:)!

对于新来者,您必须:

  • 下载目前最新的版本是1.7.1。
  • 解压缩: tar xzf elasticsearch-1.7.1.tar.gz
  • 安装奇迹bin/plugin install elasticsearch/marvel/latest
  • 发射: bin/elasticsearch

并连接logstash ...

output {
  elasticsearch {
    host => "localhost"
    port => "9200"
    protocol => "http"
    index => "persons-%{+YYYY}"
    document_type => "person"
    document_id => "%{id}"
    template => "person.json"
    template_name => "person"
  }

  stdout {
    codec => "dots"
  }
}

请注意,我们将按年份分组发送到一个索引中persons-YEAR4DIGITS该索引名为type,使用typeperson并将原始id文档作为文档_id

person.json文件包含我们的模板。我们定义使用1个单个分片,不需要_all字段和其他一些设置:

{
  "template": "persons-*",
  "order":    1, 
  "settings": {
    "number_of_shards": 1 
  },
  "mappings": {
    "_default_" : {
       "_all" : {"enabled" : false},
       "dynamic_templates" : [ {
         "string_fields" : {
           "match" : "*",
           "match_mapping_type" : "string",
           "mapping" : {
             "type" : "string", "index" : "analyzed", "omit_norms" : true,
               "fields" : {
                 "raw" : {"type": "string", "index" : "not_analyzed", "ignore_above" : 256}
               }
           }
         }
       } ]
    },
    "person": { 
      "properties": {
        "id": {
          "type": "long",
          "index": "no"
        }
      }
    }
  }
}

发射!

cat person.sql | bin/logstash -f mysql.conf

如果要提高注入速度,只需在logstash中添加更多工人:

cat person.sql | bin/logstash -f mysql.conf -w 2

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章