Cloud Dataflow で Cloud SQLからBigQueryにサーバーレスにデータ連携する
取引先で、Cloud SQL(Postgres)からBigQueryにアプリケーションのデータを連携させたいという話があって、いろいろ調べたのでまとめる。 最初はEmbulkとか使えばいいんかなと思ったけど、基本サーバーサイドはGAEしか使ってない状況でEmbulkをどこで動かそうか…って感じだったので、GCPのマネージドサービスであるCloud Dataflowを活用することにした。*1
アプリケーションの環境は以下のような感じ。
- スマホアプリを提供している
- Firebase Auth / Analyticsを使用している
- Analyticsの情報はBigQueryに連携させている
- APIサーバはGAE上にたっている
- DBはCloud SQL(Postgres)を使用している
GCPのデータ処理関連のマネージドサービスについて
GCP上でデータをBigQueryに投入しようと思ってグーグルで調べるとCloud Composer、Cloud Dataflowという2つのサービスが出てくると思う。最初はこの2つの使い分けがよく理解できなくて混乱したのだけど、以下のようにユースケースが異なっているという理解をした。
- Cloud Composer
- Cloud Dataflow
というわけで複数ステップからなる複雑なデータ処理を行うならCloud Composerを使い、そうでなければCloud Dataflowを使えば良さそうという結論に自分はなった。もし今後複数のジョブの依存関係が出てきたら、そのときにCloud Composerを使って、Cloud Dataflowのジョブをいい感じにマネジメントすればよさそう。
Cloud Dataflowについて
自分は最初勘違いしてたんだけどCloud Dataflow自体はあくまで実行環境なので、それ自体にジョブを登録しておいて定期実行するとかそういった機能はない。ただ処理のテンプレートをGCSにアップロードしてそれを使ってジョブを起動させることはできるので、Cloud Scheduler / Cloud Functionsと組み合わせると定期バッチ処理の仕組みがサーバーレスで実現できる。これについては後述する。
Cloud Dataflow のテンプレート
上述したようにCloud Dataflowはテンプレートという機能があり、事前に作成したテンプレートを使用してジョブを実行することができる。 例えばGoogleが提供する jdbc to Bigquery
のテンプレートを使いたい場合には以下のように必要なパラメータを渡してあげることで条件を変えて実行させることができる。ちなみにCloud DataflowからCloud SQLにつなぐ場合には、Cloud SQL側でprivate IPを有効にして、同じネットワークから実行してあげればよい。
gcloud dataflow jobs run <your job name> \ --project=<your project id> \ --gcs-location gs://dataflow-templates/latest/Jdbc_to_BigQuery \ --region=asia-northeast1 \ --parameters \ driverJars=gs://<your-bucket>/postgresql-42.2.6.jar,\ driverClassName=org.postgresql.Driver,\ connectionURL=jdbc:postgresql://10.7.xxx.xxx:5432/xxxx,\ query='select * from users',\ outputTable=<your_project_id>:<your_data_set>.users,\ bigQueryLoadingTemporaryDirectory=gs://<your_bucket_nanme>/bq/tmp_dir/,\ username=user,\ password=password
Cloud SQL => BigQuery連携がこれで事足りる場合にはこれでいいんだけど、このテンプレートには以下のような制限がある。
- BigQuery側に予めテーブルとスキーマの設定を済ませておく必要がある
- APPENDしかできないので、アプリケーションのデータのように定期的に更新のかかるデータを連携できない
- 一つのジョブで1クエリ => 1テーブルしか連携できない
自分は以下のような形でアプリケーションDBからBigQueryへデータを連携させたかったので、自分でカスタムのテンプレートを作って対応することにした。
- 予めジョブ側に定義したYAMLにクエリ定義とBigQueryのスキーマ定義を書いてデータを連携させたい
- 更新のあるデータがあるので、毎回データは洗い替えたい
- 複数のクエリ結果を同時にBigQueryに連携させたい
Cloud Dataflowのテンプレートを作る
カスタムのテンプレートを作成するにあたっては以下のドキュメントが参考になる。
テンプレートの作成 | Cloud Dataflow | Google Cloud
しかしApache Beamを理解して1から処理を実装するのは結構ハードルが高いので、Googleが提供するテンプレートをカスタマイズしていくのがよさそう。Cloud Dataflowのコンソールから選択できるテンプレートは以下のGitHubのRepositoryで管理されている。
例えば、今回やりたいことの参考になりそうな JdbcToBigQuery
のコードは以下。
DataflowTemplates/JdbcToBigQuery.java at master · GoogleCloudPlatform/DataflowTemplates · GitHub
JdbcToBigQuery.java
を参考に1から処理を書いてもよかったのだけど、このクラス自体がリポジトリ内の他のクラスにかなり依存していたので、ちょと横着だけどリポジトリ自体をフォークして以下のようなYamlを用意しておけばクエリ結果をBigQueryに書きだせるようなクラスを新たに作成した。
- query: "select * from users" output_table_name: users fields: - name: id type: INT64 mode: REQUIRED - name: avatar_url type: STRING mode: REQUIRED - name: nick_name type: STRING mode: REQUIRED
// パイプラインを作るところの抜粋 ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); List<BigQuerySchema> bigQuerySchemas = mapper.readValue( new File(Resources.getResource("schema/bigquery/app_schema.yaml").getPath()), new TypeReference<List<BigQuerySchema>>(){} ); bigQuerySchemas.stream().forEach(schema -> { pipeline .apply( "Read '" + schema.getQuery() + "' from JdbcIO", DynamicJdbcIO.<TableRow>read() .withDataSourceConfiguration( DynamicJdbcIO.DynamicDataSourceConfiguration.create( ValueProvider.StaticValueProvider.of("org.postgresql.Driver"), options.getConnectionURL()) .withUsername(options.getUsername()) .withPassword(options.getPassword()) .withConnectionProperties(options.getConnectionProperties())) .withQuery(ValueProvider.StaticValueProvider.of(schema.getQuery())) .withCoder(TableRowJsonCoder.of()) .withRowMapper(JdbcConverters.getResultSetToTableRow())) .apply( "Write to BigQuery: " + schema.getOutputTableName(), BigQueryIO.writeTableRows() .withoutValidation() .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) // テーブルがなければ作成する .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) // 既存のテーブルを消して書き込む .withSchema(schema.getTableSchema()) .ignoreUnknownValues() .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory()) .to(new CustomTableValueProvider(schema.getOutputTableName(), options.getOutputDataset()))); }); // Execute the pipeline and return the result. return pipeline.run();
ValueProviderというものの概念がなかなか難しかったのだけど、Apache Beamでは RuntimeValueProvider
、StaticValueProvider
、 NestedValueProvider
の3つのValueProviderがあり、パイプライン作成時(テンプレート作成時)とパイプライン実行時に参照できる値が区別されていて、パイプライン作成時にエラーにできるものはエラーにするような設計思想になっているようだった。
ちなみに、テンプレートはPythonで記述することもできるのだけれど、RDBの読み込みに現時点では対応していない(公式でないライブラリで提供しているものはある)、SDKがPythonの2.7しか対応していないということで、Javaを選択した。
テンプレートは以下のようなコマンドでGCSにアップロードできる。自分はCloud Buildを使って、masterにマージされたら自動でアップロードされるようにCIを設定した。
mvn compile exec:java \ -Dexec.mainClass=com.github.suusan2go.dataflow.templates.AppDbToBigQuery \ -Dexec.args="--runner=DataflowRunner --serviceAccount=dataflow-jobs@<project_id>.iam.gserviceaccount.com \ --project=<project_id> \ --stagingLocation=gs://<your_bucket>/staging \ --templateLocation=gs://<your_bucket>/templates/AppDbToBigQuery"
Cloud Scheduler / Cloud Functions を用いたDataflowジョブ定期実行の仕組み
前置きがだいぶ長くなったけど、上記で準備したテンプレートとGCPのサービスを使えば、連携ジョブを定期実行させることができる。
- 事前準備
- Cloud Dataflowのテンプレートを作成する
- 処理
- Cloud SchedulerがCRONの時間に従いトピックにメッセージを発行
- pubsubをトリガーに設定したCloud FunctionsよりCloud Dataflowのジョブを API経由で実行
- ここで事前準備で作成したCloud DataflowテンプレートのGCSパスを指定する
- Cloud Dataflowでテンプレートを元にジョブを実行
- ジョブが失敗した場合にはStackdriver Alertingで設定したポリシーにより、Slackにアラートを通知
Cloud Shceduler からCloud Functionsを実行する
Cloud Scheduler からCRONを設定すればよい。HTTPでCloud FunctionsのURLを叩くようにしてもいいし、pubsubを使っても良い。自分はpubsubを使った。
Cloud FunctionsからCloud Dataflowのジョブを実行する
なかなかCloud Dataflow をキックするAPIが見つからなかったのだけど、以下のブログが参考になった。 medium.com
const { google } = require("googleapis"); const dataflow = google.dataflow("v1b3"); const BUCKET_NAME = process.env.BUCKET_NAME; const DB_URL = process.env.DB_URL; const DB_USERNAME = process.env.DB_USERNAME; const DB_PASSWORD = process.env.DB_PASSWORD; const PROJECT_ID = process.env.GCLOUD_PROJECT; module.exports.appDBtoBQ = async (event, context) => { const jobName = `appdb-to-bigquery` ; // Dataflowでは同名のジョブは同時実行できないので、常に同じジョブ名にしておけば意図しない重複実行を防げる const tmpLocation = `gs://${BUCKET_NAME}/tmp`; const templatePath = `gs://${BUCKET_NAME}/templates/AppDbToBigQuery`; const request = { projectId: PROJECT_ID, requestBody: { jobName, parameters: { connectionURL: `jdbc:postgresql://${DB_URL}`, outputDataset: `${PROJECT_ID}:app_db`, bigQueryLoadingTemporaryDirectory: `gs://${BUCKET_NAME}/bq/tmp_dir/`, username: DB_USERNAME, password: DB_PASSWORD }, environment: { tempLocation: tmpLocation } }, location: "asia-northeast1", gcsPath: templatePath }; return google.auth .getClient({ scopes: ["https://www.googleapis.com/auth/cloud-platform"] }) .then(auth => { request.auth = auth; // 元のブログではdataflow.projects.templates.launchになっていたが、regionを指定するときには // dataflow.projects.locations.templates.launchにする必要があった return dataflow.projects.locations.templates.launch(request); }) .catch(error => { console.error(error); throw error; }); };
Cloud Dataflow のジョブ失敗をStackdriver Monitoringで監視する
ジョブの失敗をどう検知するかというところで自分は結構困っていたのだけど、 Stackdriver Monitoring を使えばかなり簡単に実現できた。
具体的には以下のように dataflow.googleapis.com/job/is_failed
というズバリなメトリクスがあるのでそれをStackdriver Monitoringで監視してあげればよい。
EmailやSlack等への通知も設定できて、Slack通知は以下のようになる。
まとめ
GCPのマネージドサービスを組み合わせることで、CloudSQL から BigQueryへのデータ連携をサーバーレスで実現することができました。自分はこれまでこういった分析基盤の利用側だったのですが、いざ自分で作ってみると知らないことや分かってなかったことが多くて(特にBigQueryの型とか更新制限とか…)、これまで基盤を作ってくれた方は頑張ってたんだなぁという気持ちになりました(こなみかん)。
参考にしたブログ
Cloud Dataflow がテンプレートにより気軽に使えるサーバーレスのサービスに進化した話 - google-cloud-jp - Medium
サーバレスにバッチ処理をしよう!@GCP | キャスレーコンサルティング株式会社
nouhau/dataflow/example/multi-source-multi-sink at master · gcpug/nouhau · GitHub
How to kick off a Dataflow pipeline via Cloud Functions
Apache Airflow で作る GCP のデータパイプライン @ 酔いどれGCPUG 2017/11/28
リレーショナル データベースから BigQuery に ETL を実行する | ソリューション | Google Cloud
*1:GKEとか立ってるんだったらKubernetesのCronJobとかでEmbulkを動かすのが楽そうな気がしている。