Cloud Dataflow で Cloud SQLからBigQueryにサーバーレスにデータ連携する その2

この記事は suusan2go Advent Calendar 2019 - Adventar の8日目の記事です。

その1はこちら

suzan2go.hatenablog.com

上記の記事では以下のようなJSONを用意してCloud SQLからBigQueryにデータ連携する仕組みを作ったのだけど、テーブル追加の度にこのJSONを書くのが面倒になって、指定したテーブルだけをガッと全部とってこれないかなと思って作り直した。

- query: "select * from users"
  output_table_name: users
  fields:
    - name: id
      type: INT64
      mode: REQUIRED
    - name: avatar_url
      type: STRING
      mode: REQUIRED
    - name: nick_name
      type: STRING
      mode: REQUIRED

普段全くJava書いてないので、結構変なコードがあるかもしれない・・・というエクスキューズをしておきます :pray:

BQのスキーマ指定なしでもDBからスキーマを自動生成してデータを同期したい

もともとの記事でYAMLを用意していた理由は以下の2つ。

  • 同期したい・したくないカラムを柔軟にカスタマイズできるように
  • 参考にしたSQL to BQのテンプレートがBQのスキーマ情報を要求していた

前者についての要望はもちろんあるものの(emailをコピーしたくないとか)、どちらかというと後者の実装面での制約が大きかったので、ガッと作ることにした。 調べてみると、BigQueryIOは withSchemaFromView というAPIを持っていて、DataflowのPipelineの中で計算したschema情報を指定できるようなので、それを使うことにした。

コードでいうと以下のような変更になる

BigQueryIO.writeTableRows()
        .withoutValidation()
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) // テーブルがなければ作成する
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) // 既存のテーブルを消して書き込む
-       .withSchema(schema.getTableSchema())
+       .withSchemaFromView(schema)
        .ignoreUnknownValues()
        .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
        .to(new CustomTableValueProvider(schema.getOutputTableName(), options.getOutputDataset())));

PostgreSQLのInformation Schemaの利用

方針としてはPostgreSQLを見に行って、テーブルのカラム情報と型情報を取得し、それをもとにBQのスキーマを組み立てていけばよい。

基本的にテーブルのカラムの情報は information.schema.columns を見に行けばよい。 table_name で絞り込み data_typecolumn_nameis_nullable といったカラムの情報をもとにテーブル情報を取得できる。注意しなければいけないのはArray型で、以下のドキュメントにも書いてあるがこのテーブル Array までしか分からず Array<int> なのかどうなのかを知るには elemenmt_types まで見に行く必要がある。

35.16. columns

実際に発行するクエリとしては以下のような形になる。

 String schemaQueryTemplate = "SELECT c.column_name as COLUMN_NAME, c.is_nullable AS IS_NULLABLE, c.data_type as DATA_TYPE_NAME, e.data_type AS ELEMENT_TYPE_NAME,\n" +
         "    pg_catalog.col_description(format('%%s.%%s',c.table_schema,c.table_name)::regclass::oid,c.ordinal_position) AS COLUMN_DESCRIPTION\n" +
         "FROM information_schema.columns c LEFT JOIN information_schema.element_types e\n" +
         "     ON ((c.table_catalog, c.table_schema, c.table_name, 'TABLE', c.dtd_identifier)\n" +
         "       = (e.object_catalog, e.object_schema, e.object_name, e.object_type, e.collection_type_identifier))" +
         "where table_name = '%s'";

このクエリ結果をもとに、地道にBQのスキーマを組み立てればよい。

Pipelineで上記のクエリ結果からBQのスキーマ情報を組み立てる

JdbcIOというクラスを使って、上記のクエリ結果を一行一行処理していき、最後にまとめていけば良い。

結果をBigQueryのスキーマ情報に変更する

これはかなり力技なのだけど、DBのTypeをもとにBQのTypeに変換していく。上述したようにArrayだけはelement_typesの型を見に行かないと実際の型が分からないことに注意が必要。

    // 変換用のMap ※一部抜粋
    private static final Map<String, String> postgresqlToBqTypeMap = ImmutableMap.<String, String>builder()
            .put("integer", "INT64")
            .put("character varying", "STRING")
            .put("date", "DATE")
            .build();


.withRowMapper((JdbcIO.RowMapper<Map<String, String>>) resultSet -> {
    String columnName = resultSet.getString("COLUMN_NAME");
    String typeName = resultSet.getString("DATA_TYPE_NAME");
    String elementTypeName = resultSet.getString("ELEMENT_TYPE_NAME");
    String description = resultSet.getString("COLUMN_DESCRIPTION");
    String isNullableString = resultSet.getString("IS_NULLABLE");

    String mode;
    String type;
    if(typeName.equals("ARRAY")) {
        type = postgresqlToBqTypeMap.get(elementTypeName);
        mode = "REPEATED";
    } else {
        type = postgresqlToBqTypeMap.get(typeName);
        mode = isNullableString.equals("YES") ? "NULLABLE" : "REQUIRED";
    }

    if(type == null) {
        throw new IllegalArgumentException(
                "can't detect bigquery schema type from postgres type: " + typeName + " elementType: " + elementTypeName
        );
    }

    Map<String, String> map = new HashMap<String, String>();

    map.put("mode", mode);
    map.put("type", type);
    map.put("name", columnName);
    map.put("description", description != null ? description : "");
    return map;
})

データを結合して一つのスキーマにする

上記の状態から最終的なアウトプットとして { [table名]: <BQ schema json>} な形式に変換して上げる必要がある。ここで使うのが Combine というもの。

beam.apache.org

このCombineの概念をちゃんと理解せずに、 「reduce みたいなもんでしょ」とか舐めてたらめちゃくちゃハマってしまった。サンプルとして以下のようなコードが書かれていたので余計に勘違いしてしまったのだけれど、もうちょっとやっていることは複雑。

public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
  @Override
  public Integer apply(Iterable<Integer> input) {
    int sum = 0;
    for (int item : input) {
      sum += item;
    }
    return sum;
  }
}

公式ドキュメントにも書いてあるとおり、実際の処理の流れとしては以下のようになっている。

  • Create Accumulator creates
    • 集計用のデータをローカルに作成
  • Add Input
    • それぞれの集計用のデータにinputを追加
  • Merge Accumulators
    • 集計用のデータを一つにまとめていく ※この処理は複数回呼ばれる
  • Extract Output
    • アウトプットのデータ構造を作成

Merge Accumulatorsは複数回呼ばれるのでそれを想定した作りにしておく必要があって、それを理解してなくてかなり時間を食ってしまった。実装でいうと上記の処理に対応した関数をもつクラスを作ってやればよく、素朴に実装すると以下のようになる。

    public static class SchemaInfoToBigQuerySchemaFn extends Combine.CombineFn<Map<String, String>, List<Map<String, String>>, Map<String, String>> {
        public CustomTableValueProvider  tableValueProvider;
        private final Logger LOG = LoggerFactory.getLogger(SchemaInfoToBigQuerySchemaFn.class);

        SchemaInfoToBigQuerySchemaFn(CustomTableValueProvider value) {
            this.tableValueProvider = value;
        }

        @Override
        public List<Map<String, String>> createAccumulator() { return new ArrayList<Map<String, String>>(); }

        @Override
        public List<Map<String, String>> addInput(List<Map<String, String>> accum, Map<String, String> item) {
            accum.add(item);
            return accum;
        }

        @Override
        public List<Map<String, String>> mergeAccumulators(Iterable<List<Map<String, String>>> accums) {
            List<Map<String, String>> merged = createAccumulator();
            for (List<Map<String, String>> accum : accums) {
                merged.addAll(accum);
            }
            return merged;
        }

        @Override
        public Map<String, String> extractOutput(List<Map<String, String>> accum) {
            TableSchema tableSchema = new TableSchema();

            List<TableFieldSchema> fieldSchemaList = new ArrayList();
            for (Map<String, String> item : accum) {
                fieldSchemaList.add(
                        new TableFieldSchema()
                                .setName(item.get("name"))
                                .setType(item.get("type"))
                                .setMode(item.get("mode"))
                                .setDescription(item.get("description"))
                );
            }
            tableSchema.setFields(fieldSchemaList);
            String destinationValue =  tableValueProvider.get();
            TableDestination destination = new TableDestination(destinationValue, "");

            Map<String, String> schemaMapValue = Maps.newHashMap();

            String json = new Gson().toJson(tableSchema);
            schemaMapValue.put(destination.getTableSpec(), json);

            return schemaMapValue;
        }
    }

まとめ

  • PostgreSQLからデータを抜いて、実行時に動的にBQのスキーマを取得することができた
  • 頑張ったしApache Beamの勉強にもなったけど、運用的には最初のJSONを自動生成してカスタマイズできるほうが嬉しかったかもなぁーと今は思っている・・・・・・