从spark-shell(pyspark)查询Spark Streaming应用程序

纳米un

控制台中遵循此示例pyspark并且一切正常。

之后,我将其编写为PySpark应用程序,如下所示:

# -*- coding: utf-8 -*-

import sys

import click

import logging

from pyspark.sql import SparkSession

from pyspark.sql.types import *


@click.command()
@click.option('--master')
def most_idiotic_bi_query(master):
    spark = SparkSession \
            .builder \
            .master(master)\
            .appName("stream-test")\
            .getOrCreate()

    spark.sparkContext.setLogLevel('ERROR')

    some_schema = ....  # Schema removed 

    some_stream    = spark\
                     .readStream\
                     .option("sep", ",")\
                     .schema(some_schema)\
                     .option("maxFilesPerTrigger", 1)\
                     .csv("/data/some_stream", header=True)

    streaming_counts = (
        linkage_stream.groupBy(some_stream.field_1).count()
    )

    query = streaming_counts.writeStream\
                            .format("memory")\
                            .queryName("counts")\
                            .outputMode("complete")\
                            .start()



    query.awaitTermination()

if __name__ == "__main__":
    logging.getLogger("py4j").setLevel(logging.ERROR)
    most_idiotic_bi_query()

该应用程序执行为:

spark-submit test_stream.py --master spark://master:7077

现在,如果我在另一个终端中打开一个新的spark驱动程序:

pyspark --master spark://master:7077

并尝试运行:

spark.sql("select * from counts")

它失败并显示:

During handling of the above exception, another exception occurred:

AnalysisExceptionTraceback (most recent call last)
<ipython-input-3-732b22f02ef6> in <module>()
----> 1 spark.sql("select * from id_counts").show()

/usr/spark-2.0.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
    541         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    542         """
--> 543         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    544 
    545     @since(2.0)

/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/spark-2.0.2/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: 'Table or view not found: counts; line 1 pos 14'

我不明白发生了什么。

零323

这是预期的行为。如果您查看有关内存接收器的文档

输出作为内存表存储在内存中。支持追加和完整输出模式。由于整个输出已收集并存储在驱动程序的内存中,因此应在数据量较小时用于调试目的因此,请谨慎使用。

如您所见,内存接收器不会创建持久表或全局临时视图,而是会创建一个仅限于驱动程序的本地结构。因此,无法从另一个Spark应用程序查询它。

因此必须从驱动程序中查询内存输出,并将其写入其中。例如,您可以模拟console如下所示的模式。

一个虚拟作家:

import pandas as pd
import numpy as np
import tempfile
import shutil

def producer(path):
    temp_path = tempfile.mkdtemp()

    def producer(i):
        df = pd.DataFrame({
          "group": np.random.randint(10, size=1000)
        }) 
        df["val"] = (
            np.random.randn(1000) + 
            np.random.random(1000) * df["group"] + 
            np.random.random(1000) * i % 7
        )
        f = tempfile.mktemp(dir=temp_path)
        df.to_csv(f, index=False)
        shutil.move(f, path)
    return producer

Spark应用程序:

from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField

schema = StructType([
   StructField("group", IntegerType()),
   StructField("val", DoubleType())
])

path = tempfile.mkdtemp()
query_name = "foo"

stream = (spark.readStream
    .schema(schema)
    .format("csv")
    .option("header", "true")
    .load(path))

query = (stream
    .groupBy("group")
    .avg("val")
    .writeStream
    .format("memory")
    .queryName(query_name)
    .outputMode("complete")
    .start())

还有一些事件:

from rx import Observable

timer = Observable.timer(5000, 5000)
timer.subscribe(producer(path))
timer.skip(1).subscribe(lambda *_: spark.table(query_name).show())

query.awaitTermination()

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何部署Spark Streaming应用程序?

Spark Streaming独立应用程序和依赖项

我的Spark Streaming应用程序中的OOM异常

Spark Streaming 应用程序在同时向 Cassandra 写入和读取时卡住

在Spark Streaming应用程序中联接数据的最佳方法是什么?

Spark Streaming 应用程序启动时如何得到通知?

如何配置检查点以重新部署Spark Streaming应用程序?

如果 Spark-streaming 应用程序遇到一个巨大的文件会发生什么?

如何在Spark Streaming应用程序中从Kafka接收Java对象

如何删除 spark -streaming 应用程序从 eventhub 接收消息生成的进度目录

重新启动Spark Streaming应用程序的最佳方法是什么?

如何访问Spark Streaming应用程序的统计信息终结点?

如何在IntelliJ IDEA中使用Kafka Direct Stream运行Spark Streaming应用程序?

如何为 Spark Structured Streaming 应用程序构建 uber jar 到 MongoDB 接收器

如何将Spark Streaming应用程序的输出写入单个文件

使用Flume + Spark Streaming的示例字数统计应用程序

Spark Streaming 不会在应用程序 UI 上显示任何记录

如何在NetworkWordCount Spark Streaming应用程序中修复“ org.apache.spark.shuffle.FetchFailedException:无法连接”?

从Flask应用程序访问Spark

如何打包spark scala应用程序

Spark应用程序记录器

Spark应用程序中的作业总数

从Java Spark应用程序返回List <>

如何在Spark Streaming应用程序中异步写入行以加快批处理执行速度?

远程在不同版本的 spark 上运行 spark 应用程序

Datastax Spark SQL Thriftserver与Spark应用程序

Spark应用程序杀死执行程序

Spark Streaming + Spark SQL

我的Spark Streaming程序流程