Cloud Dataflow で Cloud SQLからBigQueryにサーバーレスにデータ連携する

取引先で、Cloud SQL(Postgres)からBigQueryにアプリケーションのデータを連携させたいという話があって、いろいろ調べたのでまとめる。 最初はEmbulkとか使えばいいんかなと思ったけど、基本サーバーサイドはGAEしか使ってない状況でEmbulkをどこで動かそうか…って感じだったので、GCPのマネージドサービスであるCloud Dataflowを活用することにした。*1

アプリケーションの環境は以下のような感じ。

  • スマホアプリを提供している
  • Firebase Auth / Analyticsを使用している
    • Analyticsの情報はBigQueryに連携させている
  • APIサーバはGAE上にたっている
    • DBはCloud SQL(Postgres)を使用している

GCPのデータ処理関連のマネージドサービスについて

GCP上でデータをBigQueryに投入しようと思ってグーグルで調べるとCloud Composer、Cloud Dataflowという2つのサービスが出てくると思う。最初はこの2つの使い分けがよく理解できなくて混乱したのだけど、以下のようにユースケースが異なっているという理解をした。

  • Cloud Composer
    • マネージドなApache Airflow
    • Apache Airflowの公式ドキュメントAirflow is a platform to programmatically author, schedule and monitor workflows. とあるように、あくまでワークフローを管理するもの。
    • もちろんCloudSQL => BigQuery連携のジョブを書くこともできるけど、単発のジョブのために使うのはちょっとオーバーキルっぽいし、GKE上で常に動かすことになるのでややコストがかかる
  • Cloud Dataflow
    • マネージドなApache Beamの実行環境
    • Apache Beamは公式ドキュメントimplement batch and streaming data processing jobs that run on any execution engine. とあるようにデータのバッチ・ストリーミング処理を行うためのライブラリ・実装

というわけで複数ステップからなる複雑なデータ処理を行うならCloud Composerを使い、そうでなければCloud Dataflowを使えば良さそうという結論に自分はなった。もし今後複数のジョブの依存関係が出てきたら、そのときにCloud Composerを使って、Cloud Dataflowのジョブをいい感じにマネジメントすればよさそう。

Cloud Dataflowについて

自分は最初勘違いしてたんだけどCloud Dataflow自体はあくまで実行環境なので、それ自体にジョブを登録しておいて定期実行するとかそういった機能はない。ただ処理のテンプレートをGCSにアップロードしてそれを使ってジョブを起動させることはできるので、Cloud Scheduler / Cloud Functionsと組み合わせると定期バッチ処理の仕組みがサーバーレスで実現できる。これについては後述する。

Cloud Dataflow のテンプレート

上述したようにCloud Dataflowはテンプレートという機能があり、事前に作成したテンプレートを使用してジョブを実行することができる。 例えばGoogleが提供する jdbc to Bigquery のテンプレートを使いたい場合には以下のように必要なパラメータを渡してあげることで条件を変えて実行させることができる。ちなみにCloud DataflowからCloud SQLにつなぐ場合には、Cloud SQL側でprivate IPを有効にして、同じネットワークから実行してあげればよい。

gcloud dataflow jobs run <your job name> \
    --project=<your project id> \
    --gcs-location gs://dataflow-templates/latest/Jdbc_to_BigQuery \
    --region=asia-northeast1 \
    --parameters \
driverJars=gs://<your-bucket>/postgresql-42.2.6.jar,\
driverClassName=org.postgresql.Driver,\
connectionURL=jdbc:postgresql://10.7.xxx.xxx:5432/xxxx,\
query='select * from users',\
outputTable=<your_project_id>:<your_data_set>.users,\
bigQueryLoadingTemporaryDirectory=gs://<your_bucket_nanme>/bq/tmp_dir/,\
username=user,\
password=password

Cloud SQL => BigQuery連携がこれで事足りる場合にはこれでいいんだけど、このテンプレートには以下のような制限がある。

  • BigQuery側に予めテーブルとスキーマの設定を済ませておく必要がある
  • APPENDしかできないので、アプリケーションのデータのように定期的に更新のかかるデータを連携できない
  • 一つのジョブで1クエリ => 1テーブルしか連携できない

自分は以下のような形でアプリケーションDBからBigQueryへデータを連携させたかったので、自分でカスタムのテンプレートを作って対応することにした。

  • 予めジョブ側に定義したYAMLにクエリ定義とBigQueryのスキーマ定義を書いてデータを連携させたい
  • 更新のあるデータがあるので、毎回データは洗い替えたい
  • 複数のクエリ結果を同時にBigQueryに連携させたい

Cloud Dataflowのテンプレートを作る

カスタムのテンプレートを作成するにあたっては以下のドキュメントが参考になる。

テンプレートの作成  |  Cloud Dataflow  |  Google Cloud

しかしApache Beamを理解して1から処理を実装するのは結構ハードルが高いので、Googleが提供するテンプレートをカスタマイズしていくのがよさそう。Cloud Dataflowのコンソールから選択できるテンプレートは以下のGitHubのRepositoryで管理されている。

github.com

例えば、今回やりたいことの参考になりそうな JdbcToBigQueryのコードは以下。 DataflowTemplates/JdbcToBigQuery.java at master · GoogleCloudPlatform/DataflowTemplates · GitHub

JdbcToBigQuery.java を参考に1から処理を書いてもよかったのだけど、このクラス自体がリポジトリ内の他のクラスにかなり依存していたので、ちょと横着だけどリポジトリ自体をフォークして以下のようなYamlを用意しておけばクエリ結果をBigQueryに書きだせるようなクラスを新たに作成した。

- query: "select * from users"
  output_table_name: users
  fields:
    - name: id
      type: INT64
      mode: REQUIRED
    - name: avatar_url
      type: STRING
      mode: REQUIRED
    - name: nick_name
      type: STRING
      mode: REQUIRED
    // パイプラインを作るところの抜粋 
    ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
    mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);

    List<BigQuerySchema> bigQuerySchemas = mapper.readValue(
            new File(Resources.getResource("schema/bigquery/app_schema.yaml").getPath()),
            new TypeReference<List<BigQuerySchema>>(){}
    );

    bigQuerySchemas.stream().forEach(schema -> {
      pipeline
              .apply(
                      "Read '" +  schema.getQuery() + "' from JdbcIO",
                      DynamicJdbcIO.<TableRow>read()
                              .withDataSourceConfiguration(
                                      DynamicJdbcIO.DynamicDataSourceConfiguration.create(
                                              ValueProvider.StaticValueProvider.of("org.postgresql.Driver"),
                                              options.getConnectionURL())
                                              .withUsername(options.getUsername())
                                              .withPassword(options.getPassword())
                                              .withConnectionProperties(options.getConnectionProperties()))
                              .withQuery(ValueProvider.StaticValueProvider.of(schema.getQuery()))
                              .withCoder(TableRowJsonCoder.of())
                              .withRowMapper(JdbcConverters.getResultSetToTableRow()))
              .apply(
                      "Write to BigQuery: " + schema.getOutputTableName(),
                      BigQueryIO.writeTableRows()
                              .withoutValidation()
                              .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) // テーブルがなければ作成する
                              .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) // 既存のテーブルを消して書き込む
                              .withSchema(schema.getTableSchema())
                              .ignoreUnknownValues()
                              .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                              .to(new CustomTableValueProvider(schema.getOutputTableName(), options.getOutputDataset())));
    });

    // Execute the pipeline and return the result.
    return pipeline.run();

ValueProviderというものの概念がなかなか難しかったのだけど、Apache Beamでは RuntimeValueProviderStaticValueProviderNestedValueProvider の3つのValueProviderがあり、パイプライン作成時(テンプレート作成時)とパイプライン実行時に参照できる値が区別されていて、パイプライン作成時にエラーにできるものはエラーにするような設計思想になっているようだった。

ちなみに、テンプレートはPythonで記述することもできるのだけれど、RDBの読み込みに現時点では対応していない(公式でないライブラリで提供しているものはある)、SDKPythonの2.7しか対応していないということで、Javaを選択した。

beam.apache.org

テンプレートは以下のようなコマンドでGCSにアップロードできる。自分はCloud Buildを使って、masterにマージされたら自動でアップロードされるようにCIを設定した。

mvn compile exec:java \
     -Dexec.mainClass=com.github.suusan2go.dataflow.templates.AppDbToBigQuery \
     -Dexec.args="--runner=DataflowRunner --serviceAccount=dataflow-jobs@<project_id>.iam.gserviceaccount.com \
                  --project=<project_id> \
                  --stagingLocation=gs://<your_bucket>/staging \
                  --templateLocation=gs://<your_bucket>/templates/AppDbToBigQuery"

Cloud Scheduler / Cloud Functions を用いたDataflowジョブ定期実行の仕組み

前置きがだいぶ長くなったけど、上記で準備したテンプレートとGCPのサービスを使えば、連携ジョブを定期実行させることができる。

f:id:suzan2go:20190704152555p:plain

  • 事前準備
    • Cloud Dataflowのテンプレートを作成する
  • 処理
    1. Cloud SchedulerがCRONの時間に従いトピックにメッセージを発行
    2. pubsubをトリガーに設定したCloud FunctionsよりCloud Dataflowのジョブを API経由で実行
    3. ここで事前準備で作成したCloud DataflowテンプレートのGCSパスを指定する
    4. Cloud Dataflowでテンプレートを元にジョブを実行
    5. ジョブが失敗した場合にはStackdriver Alertingで設定したポリシーにより、Slackにアラートを通知

Cloud Shceduler からCloud Functionsを実行する

Cloud Scheduler からCRONを設定すればよい。HTTPでCloud FunctionsのURLを叩くようにしてもいいし、pubsubを使っても良い。自分はpubsubを使った。 f:id:suzan2go:20190704171533p:plain

Cloud FunctionsからCloud Dataflowのジョブを実行する

なかなかCloud Dataflow をキックするAPIが見つからなかったのだけど、以下のブログが参考になった。 medium.com

const { google } = require("googleapis");
const dataflow = google.dataflow("v1b3");

const BUCKET_NAME = process.env.BUCKET_NAME;
const DB_URL = process.env.DB_URL;
const DB_USERNAME = process.env.DB_USERNAME;
const DB_PASSWORD = process.env.DB_PASSWORD;
const PROJECT_ID = process.env.GCLOUD_PROJECT;

module.exports.appDBtoBQ = async (event, context) => {
  const jobName = `appdb-to-bigquery` ; // Dataflowでは同名のジョブは同時実行できないので、常に同じジョブ名にしておけば意図しない重複実行を防げる
  const tmpLocation = `gs://${BUCKET_NAME}/tmp`;
  const templatePath = `gs://${BUCKET_NAME}/templates/AppDbToBigQuery`;
  const request = {
    projectId: PROJECT_ID,
    requestBody: {
      jobName,
      parameters: {
        connectionURL: `jdbc:postgresql://${DB_URL}`,
        outputDataset: `${PROJECT_ID}:app_db`,
        bigQueryLoadingTemporaryDirectory: `gs://${BUCKET_NAME}/bq/tmp_dir/`,
        username: DB_USERNAME,
        password: DB_PASSWORD
      },
      environment: {
        tempLocation: tmpLocation
      }
    },
    location: "asia-northeast1",
    gcsPath: templatePath
  };
  return google.auth
    .getClient({
      scopes: ["https://www.googleapis.com/auth/cloud-platform"]
    })
    .then(auth => {
      request.auth = auth;
      // 元のブログではdataflow.projects.templates.launchになっていたが、regionを指定するときには 
     //  dataflow.projects.locations.templates.launchにする必要があった
      return dataflow.projects.locations.templates.launch(request);
    })
    .catch(error => {
      console.error(error);
      throw error;
    });
};

Cloud Dataflow のジョブ失敗をStackdriver Monitoringで監視する

ジョブの失敗をどう検知するかというところで自分は結構困っていたのだけど、 Stackdriver Monitoring を使えばかなり簡単に実現できた。 具体的には以下のように dataflow.googleapis.com/job/is_failedというズバリなメトリクスがあるのでそれをStackdriver Monitoringで監視してあげればよい。

f:id:suzan2go:20190704172433p:plain

EmailやSlack等への通知も設定できて、Slack通知は以下のようになる。

f:id:suzan2go:20190704172818p:plain

まとめ

GCPのマネージドサービスを組み合わせることで、CloudSQL から BigQueryへのデータ連携をサーバーレスで実現することができました。自分はこれまでこういった分析基盤の利用側だったのですが、いざ自分で作ってみると知らないことや分かってなかったことが多くて(特にBigQueryの型とか更新制限とか…)、これまで基盤を作ってくれた方は頑張ってたんだなぁという気持ちになりました(こなみかん)。

参考にしたブログ

Cloud Dataflow がテンプレートにより気軽に使えるサーバーレスのサービスに進化した話 - google-cloud-jp - Medium

サーバレスにバッチ処理をしよう!@GCP | キャスレーコンサルティング株式会社

nouhau/dataflow/example/multi-source-multi-sink at master · gcpug/nouhau · GitHub

How to kick off a Dataflow pipeline via Cloud Functions

Apache Airflow で作る GCP のデータパイプライン @ 酔いどれGCPUG 2017/11/28

リレーショナル データベースから BigQuery に ETL を実行する  |  ソリューション  |  Google Cloud

*1:GKEとか立ってるんだったらKubernetesのCronJobとかでEmbulkを動かすのが楽そうな気がしている。