如何在Python中合并两个IO流?

Codious-JR

我已经围绕Spark-Submit命令创建了一个包装器,以便能够通过解析日志来生成实时事件。目的是创建一个实时界面,显示Spark Job的详细进度。

因此包装器将如下所示:

  submitter = SparkSubmitter()
  submitter.submit('/path/to/spark-code.py')
  for log_event in submitter:
    if log_event:
      print('Event:', log_event)

输出将如下所示:

  Event: StartSparkContextEvent()
  Event: StartWorkEvent()
  Event: FinishWorkEvent()
  Event: StopSparkContextEvent()

在内部,SparkSubmitter类将spark-submit命令作为子进程.Popen进程启动,然后在stdout流上进行迭代,并通过解析由该进程生成的日志来返回事件,如下所示:

  class SparkSubmitter():
    def submit(self, path):
        command = self.build_spark_submit_command(path)
      self.process = Popen(command, stdout=PIPE, stderr=PIPE)

    def __iter__(self):
        return self

    def __next__(self):
        # note: this is a IO-Blocking command
        log = self.process.stdout.readline().decode('utf-8') 
      return self.parse_log_and_return_event(log)

此实现与Spark Standalone集群很好地配合。但是在纱线群集上运行时出现问题。

在“纱线群”中,“火花相关的日志”stderr而不是stdout因此,我的课程无法解析Spark生成的日志,因为它仅尝试读取stdout

问题1:是否可以将Popen的stdout和stderr作为单个流读取?

问题2:由于stdout和stderr都是Streams,是否可以合并两个Streams并将它们作为一个读取?

问题3:是否可以将所有日志重定向到仅stdout?

吹牛

的回答您的问题所有3个是肯定的,你可以使用stderr=subprocess.STDOUT作为参数Popen,以输出重定向stderrstdout

self.process = Popen(command, stdout=PIPE, stderr=subprocess.STDOUT)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章