はじめましてインターンの学生エンジニアな五段です。
弊社は最近収集されるデータをgoogleのBigQueryに入れるようになりました。 その時に各情報からBigQueryに入れると気に使ったミドルウェアであるEmbulk周りをお手伝いさせていただいた時に得た知見をまとめようと思います。 今回はRubyほぼ未経験な自分がRubyで開発したEmbulkparserの話です。
Embulkとは
バルク処理に特化したデータ転送ミドルウェアです。プラグイン方式でデータ入力先からデータ先を豊富にあり万が一なかったとしても簡単に作ることができます。
BigQueryとは
2012年にGoogleからリリースされたカラム型データストアです。数テラ数ペタでも数秒でデータ抽出を行ってくれるサービスで、かなりの低価格で提供されています。
→ BigQueryで150万円溶かした人の顔
今回作るもの
データは普通のタブ区切りのログテキストなのですが今回は流し込まれたデータの中にJSONデータが一部含まれていたので、そのJSONをパースする機能と不必要なカラムを削る機能を持つパーサープラグインです。
パースしたのは以下のようなデータ タブ区切りですが、中にJSONのデータとBigQueryに送らなくても良いデータが幾つかあります。
YYYY-MM-DD HH:mm:ss {"hoge":"0",foo":"1","is_bar":true} - 0000 000.000.000.000 japanese 1.4.6 /api/hogehoge/foobar/propro/post
EmbulkはYamlで設定ファイルを作成します。今回のプラグインでは下記を追加しました:
- 不必要なカラムをアウトプットしないよう除外できるように
- JSON文字列が入っているカラムをパースできるように
parser: type: parser_col_json delimiter: "\t" input_columns: - {name: date, type: string} - {name: json , type: json} - {name: option, type: string} - {name: ID, type: string} - {name: ip_address, type: string} - {name: country, type: string} - {name: version, type: string} - {name: pass, type: string} get_JSON: - {name: hoge , type: long} - {name: foo , type: long} - {name: bar , type: none}
CSVのパーサーと同じ感じになるようにしてます。type
にJSONカラム指定用のjson
とアウトプットしないカラム用のnone
で指定するようにしました。またJSONの中身はもう一つ get_JSON
配列オプションに設定を書いてもらう感じです。
テンプレートファイル作成
Embulkにはプラグイン作成用のテンプレートを生成してくれるコマンドがあるのでそれを用います。
$ embulk new ruby-parser parser_col_json Creating embulk-parser-parser_col_json/ Creating embulk-parser-parser_col_json/README.md Creating embulk-parser-parser_col_json/LICENSE.txt Creating embulk-parser-parser_col_json/.gitignore ︙ ︙
いろいろと生成されますが今回のパーサープラグインの場合メインでコードを書き込むのは./embulk-parser-parser_col_json/lib/embulk/parser/parser_col_json.rb
の中です。
実装周り
実際に実装したのが下のソースです。
require 'json' module Embulk module Parser class ParserColJson < ParserPlugin Plugin.register_parser("parser_col_json", self) def self.transaction(config, &control) # Yamlの設定受け取り task = { "delimiter" => config.param("delimiter", :string, default: ","), # 区切り文字 "columns" => config.param("input_columns", :array, default: []), # パースした時のカラム "get_json" => config.param("get_JSON", :array, default: []) #Jsonをデコードした時のカラム } #出力カラム配列を作成 columns = [] count = 0 task["columns"].each do |col| #TypeがJSONだったJSONの設定を参照する if col["type"] == "json" && !task["get_json"].empty? then task["get_json"].each do |col| if (col["type"] == "none") then next end columns.push(Column.new(count, col["name"], col['type'].to_sym)) count += 1 end end if (col["type"] == "json" || col["type"] == "none") then next end columns.push(Column.new(count, col["name"], col['type'].to_sym)) count += 1 end yield(task, columns) end def init # initialization code: @delimiter = task["delimiter"] @col = task["columns"] @get_json = task["get_json"] end def run(file_input) while file = file_input.next_file #処理したカラムをカウントしながら Yamlに書かれた情報に則って処理 #もっとスマートな処理を考える text = file.read text.each_line do |row| count = 0 row = row.split(@delimiter) record=[] row.each do |col| if @col[count]["type"] == "json" then value = JSON.parse(col) @get_json.each do |data| if data["type"] == "none" then next end record.push(value[data['name']]) end elsif @col[count]["type"] != "none" then record.push(col) end count += 1 end page_builder.add(record) end end page_builder.finish end end end end
ruby素人感が拭えないコードですが、現状ちゃんと動いてるようです。
def self.transaction
以下が読み込まれたYamlファイルを処理する部分です。今回は、アウトプットする形がJsonによって変わるので読み込まれたものを一行ずつforeachして最終的な物に何をアウトプットするカラムをColumn.new(index,カラム名,型)
で追加していきます。
#出力カラム配列を作成 columns = [] count = 0 task["columns"].each do |col| #TypeがJSONだったJSONの設定を参照する if col["type"] == "json" && !task["get_json"].empty? then task["get_json"].each do |col| if (col["type"] == "none") then next end columns.push(Column.new(count, col["name"], col['type'].to_sym)) count += 1 end end if (col["type"] == "json" || col["type"] == "none") then next end columns.push(Column.new(count, col["name"], col['type'].to_sym)) count += 1 end yield(task, columns) end
def run
以下が実際のファイルを処理する箇所です。
Yamlで指定されたデリミタで分けて、一行ずつrow.each do
でひとつひとつ処理していきます。
def run(file_input) while file = file_input.next_file #処理したカラムをカウントしながら Yamlに書かれた情報に則って処理 text = file.read text.each_line do |row| count = 0 row = row.split(@delimiter) record=[] row.each do |col| if @col[count]["type"] == "json" then value = JSON.parse(col) @get_json.each do |data| if data["type"] == "none" then next end record.push(value[data['name']]) end elsif @col[count]["type"] != "none" then record.push(col) end count += 1 end page_builder.add(record) end end page_builder.finish end
実行してみる
試しに以下のYamlファイルを作ってembulk preview
で動作確認してみます。
in: type: file path_prefix: ./dummy_log parser: type: parser_col_json delimiter: "\t" input_columns: - {name: date, type: string} - {name: json , type: json} - {name: option, type: string} - {name: ID, type: string} - {name: ip_address, type: string} - {name: country, type: string} - {name: version, type: string} - {name: api, type: none } get_JSON: - {name: hoge , type: long} - {name: foo , type: long} - {name: is_bar , type: none}
実際に実行したのが以下の物です。ちゃんと分けれて、削られてますね。
$ embulk preview damy.yml -L ./embulk-parser-parser_col_json/ 2016-04-18 02:59:17.679 +0000 [INFO] (0001:preview): Loading files [damy_log] +---------------------+---------------+-----------+-------------------+----------------+----------------+ | date:string | option:string | ID:string | ip_address:string | country:string | version:string | +---------------------+---------------+-----------+-------------------+----------------+----------------+ | YYYY-MM-DD HH:mm:ss | - | 0000 | 000.000.000.000 | japanese | 1.4.6 | | YYYY-MM-DD HH:mm:ss | - | 0000 | 000.000.000.000 | japanese | 1.4.6 | +---------------------+---------------+-----------+-------------------+----------------+----------------+
まとめ
いかがでしょうか?
まだまだ修正するべき所は多いですが、必要としていた機能の実装はできました。
EmbulkのPlugin開発はまだ日本語ドキュメントが豊富というわけではないですが、なかなか簡単にできます。
EmbulkのプラグインはRuby初心者でも簡単に動くものが作れますので、もし既存のもので不便があったら開発してみるのも手かもしれません。
子育て家族アプリFamm、カップル専用アプリPairyを運営する Timers inc. では、現在エンジニアを積極採用中! 急成長中のサービスの技術の話を少しでも聞いてみたい方、スタートアップで働きたい方など、是非お気軽にご連絡ください! 採用HP : http://timers-inc.com/engineerings