Spark如何从多个Elastic Search集群读取

丹尼尔

我需要从两个不同的Elastic Search集群读取数据。一个用于日志,一个用于产品数据,我尝试sparkConf()在创建时进行其他设置SparkSession但它似乎仅适用于我创建的第一个SparkSession

val config1 = new SparkConf().setAppName("test")
  .set("spark.driver.allowMultipleContexts", "true")
  .set("es.index.auto.create", "true")
  .set("es.nodes.discovery", "false")
  .set("es.nodes.wan.only", "true")
  .set("es.nodes.client.only", "false")
  .set("es.nodes", s"$esNode1:$esPort1")

val config2 = new SparkConf().setAppName("test")
  .set("spark.driver.allowMultipleContexts", "true")
  .set("es.index.auto.create", "true")
  .set("es.nodes.discovery", "false")
  .set("es.nodes.wan.only", "true")
  .set("es.nodes.client.only", "false")
  .set("es.nodes", s"$esNode2:$esPort2")

val session1 = SparkSession.builder.master('local').config(config1).getOrCreate()
val session2 = SparkSession.builder.master('local').config(config2).getOrCreate()

session1.read.format("org.elasticsearch.spark.sql").load(path)
session2.read.format("org.elasticsearch.spark.sql").load(path)

似乎spark不支持使用相同格式的多个会话,因为我也在Mysql(jdbc)中使用了相同的SparkSession,并且效果很好。有没有其他方法可以从多个ElasticSearch集群获取数据?

增加意义

每个Spark应用程序仅创建一个会话。然后以这种方式读取2个DataFrame:

  val config = new SparkConf().setAppName("test")
    .set("spark.driver.allowMultipleContexts", "true")
    .set("es.index.auto.create", "true")
    .set("es.nodes.discovery", "false")
    .set("es.nodes.wan.only", "true")
    .set("es.nodes.client.only", "false")

  val session = SparkSession.builder.master("local").config(config).getOrCreate

  val df1 = session.read.format("org.elasticsearch.spark.sql")
    .option("es.nodes", s"$esNode1:$esPort1").load(path)

  val df2 = session.read.format("org.elasticsearch.spark.sql")
    .option("es.nodes", s"$esNode2:$esPort2").load(path)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何使用Hive从集群读取?

覆盖Spring Data Elastic Search集群节点配置

在Kerberized集群的Spark应用程序中读取HDFS文件

MongoDB:如何确保从分片集群的特定辅助读取?

如何在Spark中读取多个线元素?

如何使用包含多个名称空间的spark读取XML文件?

如何在节点集群中安装 Search Guard 5?

如何将 Spark 与 Elastic Search 连接起来

Elastic Search NEST-如何在搜索中具有多个级别的过滤器

使用Spark独立集群如何在工作节点上管理多个执行者?

如何将多个Spark作业提交到单个AWS EMR集群

使用不同容量(CPU、RAM)的机器配置 Elastic Search 集群以进行滚动升级

集群上的Spark:读取大量小型avro文件花费的时间太长,无法列出

Elastic Search - 如何丢失记录

如何使用cloudformation模板创建Amazon RDS aurora Master并读取副本集群

如何通过Java高级Rest客户端在Elastic Search中使用多个字段进行搜索

如何在一个Docker撰写文件中包含Elastic Search(具有多个节点)和Kibana?

Spark - 如何从 S3 读取多个带有文件名的 Json 文件

如何在Spark数据框中从AWS S3读取多个文件?

如何在spark sql中从不同路径读取多个csv文件

如何在 Spark 中读取多个文本文件进行文档聚类?

Spark 2.0 - 如何获取与集群中心关联的集群 ID

如何查看Spark独立集群的聚合日志

如何提高 EMR 集群中的 Spark 性能

如何找到Spark的StreamingKMeans的集群中心?

java.io.IOException:尝试从IBM Cloud Object Storage的Spark集群读取镶木地板文件时,无法读取文件FileStatus的页脚

在Spark中发现和读取多个文件

如何通过Terraform管理多个kubernetes集群?

测试Elastic Search时如何关闭缓存