Apache Spark在完全分布式模式下对执行程序采取行动

Aya Ayaz:

我是新手,我对转换和动作如何工作有基本的了解(指南)。我正在文本文件的每行(基本上是段落)上尝试一些NLP操作。处理后,应将结果发送到服务器(REST Api)进行存储。该程序在yarn模式下的10个节点的群集上作为Spark作业(使用spark-submit提交)运行。这是我到目前为止所做的。

...
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<String> processedLines = lines
    .map(line -> {
        // processed here
        return result;
    });
processedLines.foreach(line -> {
    // Send to server
});

这可以工作,但是foreach循环似乎是顺序的,似乎它不在工作节点上以分布式模式运行。我对么?

我尝试了以下代码,但是它不起作用。错误:java: incompatible types: inferred type does not conform to upper bound(s)显然,它的错误是因为它map是一种转变,而不是一种行动。

lines.map(line -> { /* processing */ })
     .map(line -> { /* Send to server */ });

我也尝试过take(),但要求intprocessedLines.count()is类型long

processedLines.take(processedLines.count()).forEach(pl -> { /* Send to server */ });

数据量巨大(大于100gb)。我想要的是处理和将其发送到服务器均应在工作程序节点上完成。map反过来,处理部分发生在工作节点上。但是我如何将处理后的数据从辅助节点发送到服务器,因为foreach似乎顺序循环发生在驱动程序中(如果我是对的)。简而言之,如何action在辅助节点而不是驱动程序中执行。

任何帮助将不胜感激。

Ajay Kr Choudhary:

foreach是火花中的动作。它基本上采用了RDD的每个元素,并对该元素应用了功能。

foreach在执行程序节点或工作程序节点上执行。不会应用在驱动程序节点上。请注意,在运行spark的本地执行模式下,驱动程序和执行程序节点都可以驻留在同一JVM上。

检查此作为参考以获取每个说明

在尝试映射RDD的每个元素然后应用于每个元素的地方,您的方法看起来不错foreach我之所以会花时间,是因为您要处理的数据大小(〜100GB)。

对此进行优化的一种方法是repartition输入数据集。理想情况下,每个分区的大小应为128MB,以获得更好的性能结果。您将找到许多有关进行数据重新分区的最佳实践的文章。我建议您遵循它们,它将带来一些性能上的好处。

您可以想到的第二个优化是分配给每个执行者节点的内存。在进行火花调整时,它起着非常重要的作用。

您可以想到的第三个优化是将网络呼叫批处理到服务器。当前,您正在为RDD的每个元素对服务器进行网络调用。如果您的设计允许您批处理这些网络调用,则可以在一个网络调用中发送多个元素。如果产生的延迟主要是由于这些网络调用,这也可能有所帮助。

我希望这有帮助。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

非分布式场景下的Apache Storm

Apache Spark:设置执行程序实例不会更改执行程序

使用Apache Spark进行分布式Web爬网-是否可能?

结合使用Apache Spark和HDFS与其他分布式存储

Spark 矢量 UDF 和分布式计算的 Apache Ignite 模拟

Apache Drill嵌入式和分布式系统

通过SSL的Apache Solr分布式搜索

Apache Ignite在分布式缓存中存储JSON Map

Apache Spark与Apache Ignite

Apache Spark:驱动程序(而不只是执行程序)尝试连接到Cassandra

Apache Spark-为什么要删除执行程序?“空闲”是什么意思?

Apache Spark:逐步执行

Apache Spark与Apache Spark 2

apache / nginx-执行程序,从http请求获取参数

是否应在YARN模式下将Apache Spark的执行者核心数设置为1?

Apache Spark使用在一个执行程序上运行一个任务

如何确定我的Apache Ignite 2.x分布式缓存放置异步

如何处理Apache Curator分布式锁连接丢失

Near缓存上的Apache Ignite查询不能提供比分布式缓存更好的性能

我在哪里可以找到使用 Apache HAWQ 的从站上的分布式文件的位置?

Apache TomEE外部ActiveMQ资源不会在分布式事务中回滚

Apache Pig-如何维护供我的python UDF访问的分布式查找表?

除了HDFS,Apache Mesos最好的分布式文件系统是什么?

集群模式下的Apache Storm

纱线群集模式下的Apache Spark抛出Hadoop FileAlreadyExistsException

独立集群模式下具有Apache Spark的Docker容器

在独立模式下对分布式文件使用Spark Shell(CLI)

通过Apache HBase 2.0的Apache Spark 2.3

Apache Kudu 与 Apache Spark NoSuchMethodError:exportAuthenticationCredentials