使用架构自动检测功能将数据流作业写入BigQuery

Costello:

当前,我们正在寻找将原始数据转换为通用结构以进行进一步分析的最佳方法。我们的数据是JSON文件,有些文件具有更多字段,有些文件较少,有些文件可能具有数组,但总的来说,它的结构大致相同。

为此,我正在尝试用Java构建Apache Beam管道。我所有的管道都基于以下模板:TextIOToBigQuery.java

第一种方法是将整个JSON作为字符串加载到一列中,然后在标准SQL中使用JSON函数转换为通用结构。此处对此进行了详细说明:如何在将JSON文件加载到BigQuery表中时管理/处理架构更改

第二种方法是将数据加载到适当的列中。因此,现在可以通过标准SQL查询数据。它还需要了解架构。可以通过控制台,UI和其他方法进行检测使用模式自动检测,但是我没有找到关于如何通过Java和Apache Beam管道实现此目的的任何信息。

我分析了BigQueryIO,看起来没有模式它就无法工作(只有一个例外,如果已经创建了表)

如前所述,新文件可能会带来新的字段,因此应该相应地更新架构。

假设我有三个JSON文件:

1. { "field1": "value1" }
2. { "field2": "value2" }
3. { "field1": "value3", "field10": "value10" }

第一个创建一个具有字符串类型的字段“ field1”的新表。所以我的桌子应该像这样:

|field1  |
----------
|"value1"|

第二个功能相同,但添加新字段“ field2”。现在我的桌子应该像这样:

|field1  |field2  |
-------------------
|"value1"|null    |
-------------------
|null    |"value2"|

第三个JSON应该在架构中添加另一个字段“ field10”,依此类推。实际的JSON文件可能包含200个字段或更多。处理这种情况有多难?

哪种方法更好地进行此转换?

Guillem Xercavins:

我进行了一些测试,以模拟典型的自动检测模式:首先,我遍历所有数据以构建Map所有可能的字段和类型(这里只是考虑StringInteger为简单起见)。我使用有状态管道来跟踪已经看到的字段并将其另存为PCollectionView.withSchemaFromView()在管道构建过程中,这种方式可以用作模式是未知的。请注意,此方法仅对批处理作业有效。

首先,我创建了一些没有严格模式的伪数据,其中每行可能包含也可能不包含任何字段:

PCollection<KV<Integer, String>> input = p
  .apply("Create data", Create.of(
        KV.of(1, "{\"user\":\"Alice\",\"age\":\"22\",\"country\":\"Denmark\"}"),
        KV.of(1, "{\"income\":\"1500\",\"blood\":\"A+\"}"),
        KV.of(1, "{\"food\":\"pineapple pizza\",\"age\":\"44\"}"),
        KV.of(1, "{\"user\":\"Bob\",\"movie\":\"Inception\",\"income\":\"1350\"}"))
  );

我们将读取输入数据并构建一个Map在数据中看到的不同字段名称,并进行基本类型检查以确定其是否包含INTEGERSTRING当然,如果需要,可以扩展此范围。请注意,之前创建的所有数据都分配给了相同的键,以便将它们分组在一起,我们可以构建完整的字段列表,但这可能是性能瓶颈。我们实现输出,以便可以将其用作侧面输入:

PCollectionView<Map<String, String>> schemaSideInput = input  
  .apply("Build schema", ParDo.of(new DoFn<KV<Integer, String>, KV<String, String>>() {

    // A map containing field-type pairs
    @StateId("schema")
    private final StateSpec<ValueState<Map<String, String>>> schemaSpec =
        StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));

    @ProcessElement
    public void processElement(ProcessContext c,
                               @StateId("schema") ValueState<Map<String, String>> schemaSpec) {
      JSONObject message = new JSONObject(c.element().getValue());
      Map<String, String> current = firstNonNull(schemaSpec.read(), new HashMap<String, String>());

      // iterate through fields
      message.keySet().forEach(key ->
      {
          Object value = message.get(key);

          if (!current.containsKey(key)) {
              String type = "STRING";

              try {
                  Integer.parseInt(value.toString());
                  type = "INTEGER";
              }
              catch(Exception e) {}

              // uncomment if debugging is needed
              // LOG.info("key: "+ key + " value: " + value + " type: " + type);

              c.output(KV.of(key, type));
              current.put(key, type); 
              schemaSpec.write(current);
          }
      });
    }
  })).apply("Save as Map", View.asMap());

现在,我们可以使用前面的代码Map来构建PCollectionView包含BigQuery表架构的数据库:

PCollectionView<Map<String, String>> schemaView = p
  .apply("Start", Create.of("Start"))
  .apply("Create Schema", ParDo.of(new DoFn<String, Map<String, String>>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Map<String, String> schemaFields = c.sideInput(schemaSideInput);  
        List<TableFieldSchema> fields = new ArrayList<>();  

        for (Map.Entry<String, String> field : schemaFields.entrySet()) 
        { 
            fields.add(new TableFieldSchema().setName(field.getKey()).setType(field.getValue()));
            // LOG.info("key: "+ field.getKey() + " type: " + field.getValue());
        }

        TableSchema schema = new TableSchema().setFields(fields);

        String jsonSchema;
        try {
            jsonSchema = Transport.getJsonFactory().toString(schema);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        c.output(ImmutableMap.of("PROJECT_ID:DATASET_NAME.dynamic_bq_schema", jsonSchema));

      }}).withSideInputs(schemaSideInput))
  .apply("Save as Singleton", View.asSingleton());

相应地更改标准表名称PROJECT_ID:DATASET_NAME.dynamic_bq_schema

最后,在管道中,我们使用以下命令读取数据,将其转换TableRow为BigQuery 并将其写入BigQuery .withSchemaFromView(schemaView)

input
  .apply("Convert to TableRow", ParDo.of(new DoFn<KV<Integer, String>, TableRow>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
          JSONObject message = new JSONObject(c.element().getValue());
          TableRow row = new TableRow();

          message.keySet().forEach(key ->
          {
              Object value = message.get(key);
              row.set(key, value);
          });

        c.output(row);
      }}))
  .apply( BigQueryIO.writeTableRows()
      .to("PROJECT_ID:DATASET_NAME.dynamic_bq_schema")
      .withSchemaFromView(schemaView)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

完整代码在这里

管道创建的BigQuery表架构:

在此处输入图片说明

以及产生的稀疏数据:

在此处输入图片说明

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章