我需要从两个不同的Elastic Search集群读取数据。一个用于日志,一个用于产品数据,我尝试sparkConf()
在创建时进行其他设置,SparkSession
但它似乎仅适用于我创建的第一个SparkSession
val config1 = new SparkConf().setAppName("test")
.set("spark.driver.allowMultipleContexts", "true")
.set("es.index.auto.create", "true")
.set("es.nodes.discovery", "false")
.set("es.nodes.wan.only", "true")
.set("es.nodes.client.only", "false")
.set("es.nodes", s"$esNode1:$esPort1")
val config2 = new SparkConf().setAppName("test")
.set("spark.driver.allowMultipleContexts", "true")
.set("es.index.auto.create", "true")
.set("es.nodes.discovery", "false")
.set("es.nodes.wan.only", "true")
.set("es.nodes.client.only", "false")
.set("es.nodes", s"$esNode2:$esPort2")
val session1 = SparkSession.builder.master('local').config(config1).getOrCreate()
val session2 = SparkSession.builder.master('local').config(config2).getOrCreate()
session1.read.format("org.elasticsearch.spark.sql").load(path)
session2.read.format("org.elasticsearch.spark.sql").load(path)
似乎spark不支持使用相同格式的多个会话,因为我也在Mysql(jdbc)中使用了相同的SparkSession,并且效果很好。有没有其他方法可以从多个ElasticSearch集群获取数据?
每个Spark应用程序仅创建一个会话。然后以这种方式读取2个DataFrame:
val config = new SparkConf().setAppName("test")
.set("spark.driver.allowMultipleContexts", "true")
.set("es.index.auto.create", "true")
.set("es.nodes.discovery", "false")
.set("es.nodes.wan.only", "true")
.set("es.nodes.client.only", "false")
val session = SparkSession.builder.master("local").config(config).getOrCreate
val df1 = session.read.format("org.elasticsearch.spark.sql")
.option("es.nodes", s"$esNode1:$esPort1").load(path)
val df2 = session.read.format("org.elasticsearch.spark.sql")
.option("es.nodes", s"$esNode2:$esPort2").load(path)
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句