我有两个大型Hive表,我想将它们与spark.sql连接。假设我们有表1和表2,表1中有500万行,表2中有7000万行。表是活泼的格式,并作为拼花文件存储在Hive中。
我想加入它们并在某些列上进行一些聚合,可以说计算所有行和列的平均值(例如doubleColumn),同时使用两个条件进行过滤(在col1,col2上说)。
注意:我在单台机器上进行测试安装(虽然功能很强大)。我希望集群中的性能可能会有所不同。
我的第一次尝试是使用spark sql像这样:
val stat = sqlContext.sql("select count(id), avg(doubleColumn) " +
" FROM db.table1 as t1 JOIN db.table2 " +
" ON t1.id = t2.id " +
" WHERE col1 = val1 AND col2 = val2").collect
不幸的是,即使我为每个执行程序和驱动程序提供至少8 GB的内存,运行时间也只有大约5分钟,非常差。我还尝试使用数据帧语法,并尝试首先过滤行,然后仅选择特定的列以具有更好的选择性,例如:
//Filter first and select only needed column
val df = spark.sql("SELECT * FROM db.tab1")
val tab1= df.filter($"col1" === "val1" && $"col2" === "val2").select("id")
val tab2= spark.sql("SELECT id, doubleColumn FROM db.tab2")
val joined = tab1.as("d1").join(tab2.as("d2"), $"d1.id" === $"d2.id")
//Take the aggregations on the joined df
import org.apache.spark.sql.functions;
joined.agg(
functions.count("id").as("count"),
functions.avg("doubleColumn").as("average")
).show();
但这并没有明显的性能提升。如何提高联接的性能?
哪种方法是执行spark.sql或dataframe语法的最佳方法?
给更多的执行者或记忆会有所帮助吗?
我应该使用缓存吗?
我同时缓存了两个数据帧tab1,tab2,并且联接聚合显着提高了速度,但是我认为缓存我的数据帧不切实际,因为我们对并发性感兴趣,许多用户同时询问一些分析查询。
因为我在单节点上工作,而当我进入集群中的生产环境时,我的问题会消失,这无可奈何吗?
奖励问题:我用Impala尝试了此查询,它执行了大约40秒,但比spark.sql更好。Impala如何比Spark更好?
哪种方法是执行spark.sql或dataframe语法的最佳方法?
没有任何区别。
给更多的执行者或记忆会有所帮助吗?
仅当问题不是由数据偏斜引起的并且您正确调整配置时。
我应该使用缓存吗?
如果输入数据被多次重用,那么从性能角度考虑(在您已经确定的情况下)可能是明智的。
因为我在单节点上工作,而当我进入集群中的生产环境时,我的问题会消失,这无可奈何吗?
通常,在单个节点上进行性能测试是完全没有用的。它错过了瓶颈(网络IO /通信)和优势(摊销的磁盘I / O和资源使用)。
然而,你可以显著降低parallelsm( spark.sql.shuffle.partitions
,sql.default.parallelism
并增加了输入分配的大小)。专为分配负载而设计的Counterintuitiv Spark风格的并行性在单台计算机上的责任多于资产。与共享内存相比,它依赖于随机播放(磁盘写入!)进行通信,从而使事情变得极其缓慢,并且调度开销非常大。
Impala如何比Spark更好?
因为它是专为低延迟并发查询而设计的。这不是Spark的目标(数据库与ETL框架)。
当你
因为我们对并发感兴趣,所以许多用户同时询问一些分析性查询。
Spark听起来似乎不是正确的选择。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句