注目キーワード

Apache Beam チュートリアル::公式文書を柔らかく煮込んでみた

Apache Beam チュートリアル

前回のエントリでは、Google Cloud Dataflowのテンプレートを使ったパイプライン処理について、チュートリアル的なエントリをアップしました。

Google Cloud Dataflowは、Apache Beamというオープンソースのデータ処理プロダクトと同じルーツであり(Googleの研究から誕生した)、Google Cloud Dataflowを使いこなすには、Apache Beamの習熟が欠かせません。

  • カスタムなパイプライン処理を実現するには、Apache Beam SDK を使ってコードを実装する必要がある!

ということで今回は、Apache Beam のチュートリアル的な内容です。
(おそらく、近い将来レクチャーする機会が来そうなので、今のうちから素材を準備している感MAXですが・・・)

Apache Beam 概要

Apache Beam はどういうものか、公式サイトトップページでは次のように定義されている。

Apache Beam: An advanced unified programming model
Implement batch and streaming data processing jobs that run on any execution engine.

(翻訳) Apache Beam: 高度な統一プログラミングモデル
任意の実行エンジンで実行するバッチおよびストリーミングデータ処理ジョブを実装します。

分散環境上でバッチ処理およびストリーミングデータ処理を実装するためのフレームワークである。
現在、次の言語でパイプライン処理の開発ができる。(SDKが提供されている)

  • Python
  • Java
  • Go

    今回は、Python SDK を使ったチュートリアルコードの実装までたどり着きたい。

(補足) Apache Beam のリリース状況

2017-05-17 にバージョン2.0.0がリリースされ、現時点(2020-10-10)の最新バージョンは 2.24.0(2020-09-18リリース)
比較的新しいプロダクトである。

概念

Apache Beam SDKを用いたコードの実装に先立ち、まず基本的な概念を理解したい。

Runner(実行環境)

Apache Beam SDK で実装されたパイプライン処理を実行する環境のことをRunnerと呼ぶ。
基本的に Runnerは分散環境であるが、開発用にローカルで動く環境 DirectRunnerが用意されている。
Apache Beamが対応しているRunnerをまとめると次の通りである。

ランナーの種類 説明
DirectRunner ローカルマシンで動くものであり、開発・テスト・デバッグに最適である。
DataflowRunner Google Cloud Dataflow上で動くRunnerである。
FlinkRunner Apache Flink上で動くRunnerである。
SparkRunner Apache Spark上で動くRunnerである。
SamzaRunner Apache Samza上で動くRunnerである。
NemoRunner Apache Nemo上で動くRunnerである。
JetRunner Hazelcast Jet上で動くRunnerである。
Twister2Runner Twister2上で動くRunnerである。

それにしても分散処理システムの種類が結構増えたものだ。
HadoopやSparkがメジャーだった時代が懐かしい。

パイプライン処理を構成する概念

Apache Beam では、大規模分散データ処理を実現するための抽象化された概念(abstractions)を用意している。
(それらの概念は、バッチ処理/ストリーミング処理の両方で共通である。)

抽象化された概念 説明
driver program データ処理を実装するプログラムのこと。
Pipeline 実装されたデータ処理を最初から終わりまでカプセル化したもの。
PCollection Apach Beam のパイプラインで取り扱う分散データセットのこと。
PTransform データ処理における操作・工程のこと。
I/O Transforms 外部ストレージに対する読み書き機能を提供するライブラリ。

以下、より具体的な説明:

  • driver program

    • データ処理を実装するプログラムのこと。
    • Apache Beam が提供するSDKを用いて開発する。
    • すべての driver program は、後述する Pipeline を必ず定義しなければならない。
      (というか、Pipelineを定義しないとデータ処理を記述できない)
  • Pipeline

    • 実装されたデータ処理を最初から終わりまでカプセル化したものである。
    • Pipeline は次の機能を持つ
      • 入力データの読み込み
      • データのトランスフォーム(変形)
      • 出力データの書き出し
  • PCollection

    • Apach Beam のパイプラインで取り扱う分散データセットのことを PCollection と呼ぶ。
    • PCollection の形態には次の種類がある。
      1. bounded dataset : ファイルなどの固定ソース(=ファイルサイズなどデータサイズが決まっている)
      2. unbounded dataset : サブスクリプションなどの仕組みで継続的に更新されるソース(=ストリーミングのように受信するデータ量が事前にわからない)
    • 通常は外部ソースからPCollectionを作成するが、メモリ上のデータから作成することも出来る。
      • テストコードを実装する際にはメモリ上のデータが使えると便利。
  • PTransform

    • データ処理における操作・工程のことをPTransformと呼ぶ。
    • 1つ以上のPCollectionを入力として受け取る。
    • 受け取ったPCollectionに対してデータ処理関数を実行した後、何も出力しない、またはPCollectionを返す。
      • 公式の説明を忠実に訳すと「0個以上のPCollectionオブジェクトを生成する」
  • I/O Transforms

    • 外部ストレージに対する読み書き機能を提供するライブラリ。
      • 公式原文では「Beamには多数のIOsが付属しており(comes with)、それらは多様な外部ストレージシステムに対して
        データの読み書きをするPTransformライブラリである」

基本的なパイプラインの開発の流れ

基本的なパイプライン開発(=driver programの開発)では、以下の処理を記述してゆく。

順番 記述する処理
1 Pipeline オブジェクトの作成
2 初期 PCollection の作成
3 PCollectionPTransform を適用する処理
4 I/O Transforms を使い、最終結果 PCollection をストレージに書き出す処理

パイプラインの処理を記述したら、パイプライン処理を走らせる。

  • Pipeline オブジェクトは、run() メソッドを持っている。これを使う。

パイプライン処理内の各工程の詳細

  • 初期 PCollection の作成について

    • 外部ストレージシステムから作成することもできるし、メモリ上のデータから作成することもできる。
  • PTransformPCollection の関係

    • PCollection は変数であり、PTransform は、変数に適用される関数のようなもの。
  • パイプライン処理の形状について

    • 任意の複雑な処理グラフ構造(an arbitrarily complex processing graph)にすることができる。
    • パイプライン処理は、直線的な構造に縛る必要はない。
  • パイプライン処理の実行について

    • 実装した driver program を実行すると、パイプライン処理のワークフローグラフを作成する。
      • 指定した分散処理バックエンド(=Runnerで表される)の上で、非同期ジョブ(または同等のもの)として実行される。

動作環境構築

今回は、ローカル環境でApache Beamパイプラインを動かすことを念頭に、動作環境を構築する。

  • 使用する言語は Python3.7.x
  • Anaconda3 でPython環境 apache_beam を作成し、そこに Apache Beam開発環境を作る。
    • 他のツールでPython環境を準備しても全然問題ない(念のため)

Python環境を構築する

以下は、Windows環境のAnaconda3で環境を作った場合の手順である。

(1) Python環境の構築

  $ conda create -n apache_beam python=3.7

(2) Apache Beam SDK のインストール

後で、Google Cloud Dataflow 上で動くパイプライン処理を実装する予定があるので、GCP対応版SDKもインストールしておく。

  $ pip install apache-beam apache-beam[gcp]

  Collecting apache-beam
    Downloading apache_beam-2.24.0-cp37-cp37m-win_amd64.whl (3.3 MB)
       |████████████████████████████████| 3.3 MB 6.4 MB/s
  Collecting pydot<2,>=1.2.0
    Downloading pydot-1.4.1-py2.py3-none-any.whl (19 kB)
  Collecting future<1.0.0,>=0.18.2
    Downloading future-0.18.2.tar.gz (829 kB)
       |████████████████████████████████| 829 kB ...
  Collecting python-dateutil<3,>=2.8.0
    Using cached python_dateutil-2.8.1-py2.py3-none-any.whl (227 kB)
  Collecting mock<3.0.0,>=1.0.1
    Downloading mock-2.0.0-py2.py3-none-any.whl (56 kB)
       |████████████████████████████████| 56 kB 1.4 MB/s
  Collecting pytz>=2018.3
    Using cached pytz-2020.1-py2.py3-none-any.whl (510 kB)
  Collecting numpy<2,>=1.14.3
    Using cached numpy-1.19.2-cp37-cp37m-win_amd64.whl (12.9 MB)
  Collecting avro-python3!=1.9.2,<1.10.0,>=1.8.1; python_version >= "3.0"
    Downloading avro-python3-1.9.2.1.tar.gz (37 kB)

  (・・・中略・・・)

  Successfully built future avro-python3 crcmod hdfs oauth2client dill docopt
  Installing collected packages: pyparsing, pydot, future, six, python-dateutil, pbr, mock, pytz, numpy, avro-python3, pyarrow, crcmod, typing-extensions, docopt, chardet, idna, urllib3, requests, hdfs, fastavro, grpcio, protobuf, httplib2, pyasn1, pyasn1-modules, rsa, oauth2client, pymongo, dill, apache-beam
  Successfully installed apache-beam-2.24.0 avro-python3-1.9.2.1 chardet-3.0.4 crcmod-1.7 dill-0.3.1.1 docopt-0.6.2 fastavro-0.23.6 future-0.18.2 grpcio-1.32.0 hdfs-2.5.8 httplib2-0.17.4 idna-2.10 mock-2.0.0 numpy-1.19.2 oauth2client-3.0.0 pbr-5.5.0 protobuf-3.13.0 pyarrow-0.17.1 pyasn1-0.4.8 pyasn1-modules-0.2.8 pydot-1.4.1 pymongo-3.11.0 pyparsing-2.4.7 python-dateutil-2.8.1 pytz-2020.1 requests-2.24.0 rsa-4.6 six-1.15.0 typing-extensions-3.7.4.3 urllib3-1.25.10

チュートリアル:文字列長の計算と出力

非常にシンプルな処理で、Apache Beam パイプラインの動作を確認する。
読み込んだテキストファイルの各行の長さを測定して、結果を書き出すだけのシンプルなパイプライン処理を実装する。

実装する処理

  • ローカルマシン上で実行する。
  • 入力ファイルから1行ずつテキストを読み込み、その文字列長を出力ファイルに書き出す。
  • 入力ファイル、出力ファイルは引数で指定可能とする。
    • 引数定義
      • --input : 入力ファイル名
      • --output : 出力ファイル名
  • ワードカウント処理の動作を確認する処理(=簡易テスト)も実装。
    • 自動化テスト実装は、また別の機会に。

実装上のポイント

  • ローカルマシンで実行するので、 DirectRunner を用いてパイプライン処理を実行する。
    • オプション runner に、DirectRunner を指定する。
  • ファイルの読み書きには次の IO Transformers を使う。
    • beam.io.ReadFromText() : 読み込み
    • beam.io.WriteToText() : 書き込み
  • ParDo Transform 処理を実装するにあたっては、 beam.DoFn クラスを継承した処理クラスを実装する。
    • 上記処理クラスの process() メソッドが呼び出される。
      • 読み込まれたテキストが1行ずつ process() に渡される。
  • コマンドラインからカスタム引数を受け取ることができるように設定する。
    • PipelineOptions クラスを継承した引数定義クラスを実装する。

Pythonコード

text_length.py

  """
      Apache Beam :: 文字列長の測定

  """

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.pipeline_options import StandardOptions

  class MyOptions(PipelineOptions):
      """
      カスタムオプションを受け取るためのクラス。
      argparse の機能を使って、コマンドライン引数を追加することができる。
      """
      @classmethod
      def _add_argparse_args(cls, parser):
          parser.add_argument(
              '--input',
              default='./input.txt',
              help='Input path for the pipeline')

          parser.add_argument(
              '--output',
              default='./output.txt',
              help='Output path for the pipeline')

  class ComputeWordLength(beam.DoFn):
      """
      受け取った文字列の文字列長を計算する。
      ParDo Transform を用いて、並列処理で受け取った文字列の長さを計算して返す。
      """

      def __init__(self):
          pass

      def process(self, element):
          yield len(element)

  def test():
      # 動作試験
      test_sentences = ['いろはにほへと', 'ちりぬるを']
      for _s in test_sentences:
          _length = ComputeWordLength().process(_s)
          print(f'input="{_s}"   length={list(_length)}')

  def run():
      options = MyOptions()

      # Runner を指定する。
      # ローカルマシンで動かしたいので、DirectRunner を指定する。
      options.view_as(StandardOptions).runner = 'DirectRunner'

      # Beam パイプラインを初期化する
      p = beam.Pipeline(options=options)

      # Beam パイプライン処理の定義
      # この時点ではまだパイプライン処理は実行されない(遅延実行)
      results = (p | 'ReadFromText' >> beam.io.ReadFromText(options.input)  # I/O Transform を使ってテキストファイルをを読み込む
                   | 'ComputeWordLength' >> beam.ParDo(ComputeWordLength())  # ParDo Transform を使い、並列で文字列長を計算する
                   | 'WriteToText' >> beam.io.WriteToText(options.output)# I/O Transformを適用して、オプションで指定したパスにデータを書き込む
                   )  
      # Beamパイプライン処理を実行する
      p.run()

  if __name__ == '__main__':
      test()
      run()

入力に使用するデータはこちら:

input.txt

吾輩は猫である。まだ名前は無い。
トンネルを抜けるとそこは雪国だった。
To be, or not to be, that is the question.

実行結果

実行すると、画面に簡易テスト結果が表示され、出力ファイルoutput.txt-00000-of-00001に処理の結果が書き出される。

  $ python text_length.py

  input="いろはにほへと"   length=[7]
  input="ちりぬるを"   length=[5]

出力ファイルoutput.txt-00000-of-00001 の内容は次の通りであった。

16
18
42

(補足)再実行時に発生するエラー:出力ファイルが既に存在することを警告する例外

オプションを指定せず、text_length.py を再実行すると、次の例外が発生する。

Exception: Encountered exceptions in finalize_write: [OSError(FileExistsError(17, '既に存在するファイルを作成することはできません。'))] [while running 'WriteToText/Write/WriteImpl/FinalizeWrite']

すでに出力ファイルで指定したパスに、同名のファイルが存在すると上書きはせずに例外を発生させる。
ただし、処理が中断するわけではなく、一時フォルダを作成してその中に書き出している。

例えば次のような一時フォルダが作成され、その中にパイプライン処理の結果が出力されている。

  例) 
  /beam-temp-output.txt-9d234e880abf11eba72aa4c3f0d22a66
        |
        +--- d126546d-d48d-4c9c-a592-c2eb3131b25f.output.txt # ← 出力ファイル

もっとも、例外が発生しているので、正常な処理ではない。
デバッグ情報などの用途として使うのが良いだろう。

まとめ

Apache Beamの公式文書を参考にしながら、柔らかく煮込んでエントリ化をしてみました。
ただ、Apache Beamの公式文書ですが、もともとガチガチの堅くて網羅的すぎて利便性に乏しいような文書ではなく、それどころか読みやすさ・理解しやすさを考慮した構成になっていました。(これでは、柔らかく煮込みすぎて流動食になってしまう・・・!)

非常に良質な公式文書だと思うので、引き続き読み込んでエントリをアップしていこうかと思います。

最新情報をチェックしよう!