Embulkの使い方について

はじめに

目的

現在の会社ではEmbulk関連の問い合わせを受けることが多い。この記事は自身の備忘録を兼ねて書いている。

Embulkって何?

EmbulkはFluentdやMessage Packで有名な古橋さんが開発した、バルク処理用のOSS。Fluetdだと細かくデータを転送するのには向いているが(いわゆるストリーミング処理)、データベースのテーブルまるごとや巨大なログ・ファイルの転送には向いておらず、Embulkはその課題を解決するためのもの。(バルク処理)

Embulkを使うメリット

  • 自動的にInput fileの形式を推測してくれる機能(あくまで推測する機能)
  • ビックデータを並行処理・分散処理してくれる
  • トランザクション処理があり、中途半端にデータが投入されている危険性をなくしてくれる。(All-or-Nothing)
  • 途中から再開できる。

Embulkの使い方について

  • 前提条件 : java8の環境が必要。

細かい動作検証はしていないが、java8であればはopenjdkでもoracle jdkでも基本的には

問題ない。

1. embulkをインスールする。

このページをみて各自の環境に合わせてバイナリーをインスールしてください。

github.com

 

2. seed.ymlを作る。

まず基礎となるseed.ymlを作る。まずは気軽に以下のようなイメージで作ってもらえば問題ない。

 

in:
  type: file
  path_prefix: ./mydata/csv/
out:
  type: elasticsearch
  index: embulk
  index_type: embulk
  nodes:
    - host: localhost

 

Embulkのファイルには項目が以下の4つ存在する。

  • in : input plugin optionsについてのconfを記載する箇所。
  • out : output plugin optionについて記載する箇所
  • filters : filter plugin optionについて記載する箇所。

上記のseed.ymlに関していえば、inはlocalのファイルを指定しており、outではoutput先のelasticsearchのpluginを指定している。

type以降の書き方に関しては各pluginに依存しているため、それぞれのpluginのドキュメントを参照しながら必要な情報について記載をする必要がある。

 

基本はinとoutがわかっていれば問題なのだが、実際の運用ではfilterは非常に重要となる。例えば、Treasure Dataの仕様としてHeaderが大文字のcsvやテーブルの取り込みは推奨されていない。これを行うと、同じカラム名のはずなのに別カラムとして判断されて、小文字のheader名_xなどのカラムができてしまう。そういった場合にはぜひfilter pluginを使用してほしい。

 

in:
  type: file
  path_prefix: ./mydata/csv/
out:
  type: elasticsearch
  index: embulk
  index_type: embulk
  nodes:
    - host: localhost
filters:
- type: rename rules: - rule: upper_to_lower - rule: character_types pass_types: [ "a-z", "0-9" ] pass_characters: "_" replace: "_"
3. load.ymlを作る。
seedができたらload.ymlを作ろう。

embulk guess ./mydata/seed.yml -o config.yml

このコマンドを実行するとconfig.ymlが自動生成される。こんな感じである。
in:
  type: file
  path_prefix: ./mydata/csv/
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    null_string: 'NULL'
    skip_header_lines: 1
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out:
  type: elasticsearch
  index: embulk
  index_type: embulk
  nodes:
  - {host: localhost}

4. Previewする。(option)
dry runとして以下のコマンドを実行すると、どんなデータが取り込まれるのかがわかる。

embulk preview config.yml

5. embulkを実行する。
embulk run config.yml -c diff.yml

-c diff.ymlって何かというと、これは増分バルクインポート用のファイルであり、一回だけのインポートであればこのオプションは不要である。
ではdiff.ymlになにが入るかというと、

in: {last_path: mydata/csv/sample_01.csv.gz} out: {}
という内容が記録される。

重要なのはlast_pathに入っている値である。このlast_pathは前回取り込み時のファイル名を記録している。次回実行時にはこのlast_pathより新しいファイルが取り込まれるようになる。ぜひ役立ててほしい。



最後に

これらの内容はembulkのドキュメントからも確認できるので、困ったことがあったらそっちを確認してほしい。

 

www.embulk.org