概要
Google Cloud Platform のプロダクトを使ってビッグなデータで機械学習する機会が増えそうです。
ということで、公式チュートリアルの中からテンプレートを使ったものを選び、柔らかく煮込んでみました。(レクチャーする機会もありそうなので、内容の整理も兼ねて)
今回は、公式マニュアルで紹介されていた Pub/Sub Topic to BigQuery に挑戦します。
テンプレートからジョブを作成してみる
(1) Google Cloud Console から、Dataflowを選び、Dataflowのトップページへ移動する。
画面上部にジョブ作成方法が二つ提示されている。
テンプレートからジョブを作成 と SQLからジョブを作成 である。
今回はテンプレートからジョブを作成を選ぶ。
(2) すると テンプレートからジョブを作成 という画面に移動する。
ジョブ名、リージョンエンドポイント、テンプレートを入力するフォームがある。
記入するフォームについての説明
- ジョブ名には、ジョブを識別する一意の名前を記入する。
リージョンエンドポイントだが、Google Compute Engine のリージョンに準拠するリージョンを選ぶ。- Dataflowが提供されているリージョンの中から選ぶ。
- デフォルトリージョンは
us-central1 - 東京リージョン(
asia-northeast1)も利用可能である(2020/10/03時点) - ジョブの入出力ファイルを配備する場所は、同じリージョンであることが望ましい。
- ネットワーク遅延・転送コストを最小化できるからである。
- ちなみに今回使用する
Pub/Subのトピックは、グローバルリソースなのでリージョンを考慮する必要はない模様。公式サイトより
テンプレートの種類が多い。需要や使用頻度の高いジョブをテンプレート化している印象である。
テンプレートの種類には以下のものがある(2020/10/03時点)
- Get Started
- Process Data Continuously(stream) : ストリーミング処理
- Process Data in Bulk(batch) : バッチ処理
- Utilities
- カスタム
今回挑戦する Pub/Sub Topic to BigQuery は、ストリーミング処理であり、Process Data Continuously(stream)に属している。
(3) Pub/Sub Topic to BigQuery テンプレートの設定
まず、以下のようにして、プルダウンメニューの中からPub/Sub Topic to BigQueryテンプレートを選ぶ。
テンプレートを選ぶと、英語でテンプレートの概要が表示される。
Streaming pipeline. Ingests JSON-encoded messages from a Pub/Sub topic, transforms them using a JavaScript user-defined function (UDF), and writes them to a pre-existing BigQuery table as BigQuery elements.
(翻訳) ストリーミング パイプライン。Pub/Sub トピックから JSON エンコードされたメッセージをインジェストし、JavaScript のユーザー定義関数 (UDF) を使用して変換し、BigQuery 要素として既存の BigQuery テーブルに書き込みます。
JavaScriptでユーザー定義関数を実装して使うことができるという。
また、処理の結果は、既存のBigQueryテーブルに書き込まれるという。
※今回は使わない。
画面右側にチュートリアルウィンドウが開き、選択しているテンプレートのチュートリアル情報が表示されている。
- テンプレートを作成するにあたっての条件(=準備)が、
パイプラインの要件として記載されている。 - Pub/SubメッセージはJSON形式であること。
- Google Cloud Storage 上に一時的な出力場所を確保しておく必要がある。
- BigQueryテーブルを用意しておくこと
設定すべき必須パラメータは次の通りであり、パイプラインの要件に準拠している。
Input Pub/Sub topic- Pub/Sub のトピックを作成しておき、そのパスを記述する。
BigQuery output table- 出力結果を保存するBigQueryのテーブル。作成済みのテーブルのパスを記述する。
一時的なロケーション- Google Cloud Storageのパスを指定する。
テンプレート実行に先立って、上記の準備を進めておく必要がある。
テンプレートジョブ実行のための準備
Pub/Sub トピックの作成
(1) Google Cloud Console を別ウィンドウ or タブで開いて、画面左側のサービス一覧から ビッグデータ カテゴリにあるPub/Sub を選択してクリックする。
(2) Pub/Sub のトップページが表示されたら、画面上部にある トピックを作成 をクリックする。
(3) トピックを作成 のダイアログが開くので、トピックIDを記入して トピックを作成 をクリックする。
ここでは test_topic という値をトピックIDに入力してトピックを作成する。
(4) すると、トピック一覧に新しく作成したトピックが追加されていることが確認できる。
(5) トピックをクリックすると詳細画面に移動するので、そこでトピックの名前をコピーしておく。
このトピックの名前をテンプレートのフォームに記入する。
projects/optical-aviary-280311/topics/test_topic
個別トピック詳細画面には、Dataflowジョブへの連携するための案内が記載されている。
BigQueryテーブルの作成
(1) Google Cloud Console を別ウィンドウ or タブで開き、BigQuery の画面へ移動する。
(2) 任意のデータセットを選び、テーブルを新規に作成する。
テーブルを作成する際には、Pub/Sub で入力されるデータ構造に準拠したテーブル定義をする。
- テーブル名は
dataflow_testとする。 - テーブル作成の際に、ソースは
空のテーブルとする。
今回は以下のテーブル構造とする(Pub/Subのメッセージもこの構造に合わせるとする)
| カラム名 | データ型 |
|---|---|
| id | INTEGER |
| datetime | DATETIME |
| message | STRING |
スキーマ定義は、テキスト形式で指定することもできる。今回の構造の場合は以下のようになる。
id:INTEGER,datetime:DATETIME,message:STRING
テーブル定義を入力したら、テーブルを作成 ボタンをクリックする。
(3) プロジェクト-データセット-テーブルのツリー表示にあるテーブルdataflow_testをクリックし、詳細情報を表示する。
そして、表IDをコピーしておく。これがテンプレートに入力するパラメータになる。
今回の場合は、以下が表IDとなる。
optical-aviary-280311:gcp_training.dataflow_test
Google Cloud Storage の準備
(1) Google Cloud Console から Storage を選択肢、Cloud Storage の画面へ移動する。
(2) 画面上部にあるバケットを作成をクリックする。
(3) バケットの作成画面に移動したら、以下の項目を記入してゆく。
バケットに名前を付ける- バケット名は一般公開される。
- 今回は
gcp-training-ytとする。
- バケット名を入力したら
続行をクリックする。 データの保存場所の選択Multi-regionを指定する。
データのデフォルトのストレージクラスStandardを指定する。- 今回のテンプレートはストリーミング処理であり、頻繁な入出力を想定している。
オブジェクトへのアクセスを制御する方法きめ細かい管理(デフォルト) を指定する。
詳細設定- デフォルトの設定を採用するので、省略。
作成をクリックして、バケットを作成する。
(4) バケット詳細画面に移動し、構成タブをクリックし、gsutilのリンクをコピーしておく。
今回、コピーする値は次のようになる。
gs://gcp-training-yt
テンプレートパラメータの入力
準備が整ったところで、Dataflowテンプレートパラメータを入力してゆく。
まずはジョブ名を入力する。
今回は、dataflow-tutrial-pubsub-bigquery とする。
続いて以下を入力する。
- Input Pub/Sub topic
projects/optical-aviary-280311/topics/test_topic
- BigQuery output table
optical-aviary-280311:gcp_training.dataflow_test
- 一時的なロケーション
- Cloud Storage の gsutil リンクに、一時ファイルの接頭辞
dataflow-tempを付与しておく(識別しやすくするため) gs://gcp-training-yt/dataflow-temp
- Cloud Storage の gsutil リンクに、一時ファイルの接頭辞
補足事項
- JavaScriptで実装したユーザー定義関数は、
オプションパラメータのセクションで設定することができる。- 今回は使わないのでスキップする。
ジョブを実行
設定が済んだら ジョブを実行 をクリックする。
回転するアイコンが表示され続ける。少し時間がかかるのだろうか。
しばらくすると、起動したジョブの詳細画面に移動した。
- ジョブグラフ(データ処理のフロー図)が画面中央に表示されている。
- 画面右側にはジョブ情報が表示されている。
- ジョブ名やジョブID、ジョブステータス、ジョブタイプ等が表示されている。
画面下部にはログというタブがある。
これをクリックすると、ジョブグラフが後退し、ログウィンドウが開く。
各ログの行をクリックすると、別ウィンドウが開き、ログビューワが起動する。
ログはJSON形式で記録されている。
動作確認
Pub/Sub のトピックにメッセージを投げて、BigQueryテーブルに書き込まれるのか確認する。
メッセージを送る
(1) Pub/Sub の画面に移動し、先ほど作成したトピック test_topic を選択する。
(2) 画面上部にある メッセージをパブリッシュ を選択し、1つのメッセージを公開をクリックする。
(3) メッセージのパブリッシュ の画面が開くので、メッセージ本文を記入する。
入力するメッセージはJSON形式で記述し、テンプレートに登録したBigQueryテーブル optical-aviary-280311:gcp_training.dataflow_test のカラムと同じ構造にする。
以下のJSONをメッセージとして公開する。
{"id" : 202010031815,
"datetime" : "2020-10-03 18:15:30",
"message" : "このメッセージがBigQueryテーブルに届きますように。"
}
(4) 入力を終えたら 公開 ボタンを押す。
BigQueryテーブルを確認する
(1) BigQuery の画面に移動し、テンプレートに登録したテーブルを選択する。
(2) テーブル内容をクエリを使って確認する。
今回の例では、以下のクエリを使って確認する。
SELECT * FROM `optical-aviary-280311.gcp_training.dataflow_test` LIMIT 1000
すると無事にテーブルに書き込まれていることが確認できた。
ジョブを停止する
Dataflowの動作を確認できたので、ジョブを停止する。(起動している時間に比例して料金が掛かるため止めておく)
(1) Google Cloud Console から Dataflow の画面に移動し、稼働しているジョブの詳細画面を開く。
(2) 画面上部の縦方向3点リーダアイコンをクリックしメニューを開き、停止をクリックする。
(3) ジョブの停止 のダイアログが開くので、キャンセル を選択して、STOP JOB をクリックする。
今回は、動作確認用のジョブなので、即座にジョブを停止して差し支えない。
※商用サービスなどの場合は、ドレインを選択して、順番待ちをしているデータの処理が終わるのを待ってから停止するのが良いだろう。
(4) ジョブ情報のセクションで、ジョブステータス が更新される。
終わりに
さて、一般的に鈍器と称される公式ドキュメント(=公式ドキュメントで殴るとい慣用表現より)、煮込むことでどこまで柔らかくなったでしょうか?
自分が後でド忘れした時の助けにもなると思うので、実際の作業の流れに沿って丁寧に各工程をSSを交えて説明を試みています。
やはりこうやって整理してアウトプットすると、その過程で理解と定着が一段と進むのを感じるので、今後も公式ドキュメントを柔らかく煮込むタスクに取り組んでみようかと思います。
こちらのエントリも併せてどうぞ!
目次 1 概要2 テンプレートからジョブを作成してみる2.1 テンプレートジョブ実行のための準備2.1.1 Pub/Sub トピックの作成2.1.2 BigQueryテーブルの作成2.1.3 Google Cloud Storage の準備2.2 テンプレートパラメータの入力2.2.3.1 補足事項 […]
目次 1 Apache Beam チュートリアル2 Apache Beam 概要2.0.1 (補足) Apache Beam のリリース状況3 概念3.1 Runner(実行環境)3.2 パイプライン処理を構成する概念4 基本的なパイプラインの開発の流れ4.1 パイプライン処理内の各工程の詳細5 動 […]