数据流GCS到BQ问题

andre622

这是一种情况:我正在GCS中压缩了一组文件,并具有一个.gz文件扩展名(即000000_ [0-5] .gz),我正尝试将其导入单个BQ表中。到目前为止,我一直在从命令行执行命令,但想通过Dataflow来完成此操作,将来可能会添加一些转换。

压缩的GCS文件中的数据是一个复杂的JSON结构,该结构经常更改架构,因此最简单的做法是将整个文件作为TSV仅包含一列称为TSV的形式带入BigQuery record,然后在BQ中使用JSON_EXTRACT函数解析值在需要它们时需要。

问题:我已经编写了一个Dataflow管道,它将在这种情况下进行最少的工作;从GCS读取并写入BigQuery表。但是,当我执行此管道时,出现了JSON解析错误,如下所示:

Error while reading data, error message: JSON table encountered too 
many errors, giving up. Rows: 1; errors: 1., error: Error while reading 
data, error message: JSON table encountered too many errors, giving up. 
Rows: 1; errors: 1., error: Error while reading data, error message: 
JSON parsing error in row starting at position 2630029539: Value 
encountered without start of object.

以下是我的Dataflow脚本,其中一些变量已匿名。

from __future__ import absolute_import

import argparse
import logging
import re
import json

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import Read
from apache_beam.io import WriteToText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None):

  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      dest='input',
                      default='gs://BUCKET_NAME/input-data/000000_0.gz',
                      help='Input file to process.')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      '--runner=DataflowRunner',
      '--project=GCP_PROJECT_NAME',
      '--staging_location=gs://BUCKET_NAME/dataflow-staging',
      '--temp_location=gs://BUCKET_NAME/dataflow-temp',
      '--job_name=gcs-gzcomp-to-bq1',
  ])

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  with beam.Pipeline(options=pipeline_options) as p:

    (p | "ReadFromGCS" >> ReadFromText(known_args.input)
       | WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME',
           project='GCP_PROJECT_NAME', schema='record:string'))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

如您所见,我尝试通过指定仅包含一个字符串类型的列的模式来完成与传统加载作业中相同的操作,但是仍然失败。

有没有一种方法可以明确地告诉Dataflow有关我要如何导入GCS文件的更多详细信息?即指定TSV,即使它是每一行上的有效JSON对象?

另外,如果此错误与我可能搞砸的其他任何事情有关,也请予以指出;我是Dataflow的超级新手,但是对BQ和其他一些GCP工具非常有经验,因此希望将其添加到我的工具栏中。

k

我相信输入集合WriteToBigQuery应该是字典的集合(每个键都映射到BigQuery列),而不是字符串的集合。尝试通过类似的内容| beam.Map(lambda line: dict(record=line))

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章