Obey Your MATHEMATICS.

機械学習関連の純粋数学や実験など

BigDL + Apache Spark on EMR 5.8 でお手軽分散学習

こんにちは。例のごとく久しぶりの投稿になってしまいました。

前回投稿からいろいろな事がありました。

db analytics showcase Sapporo 2017 で講演してきたり

www.slideshare.net



雑誌に激エモポエムを寄稿したり

YANSに参加してきたり

gunosiru.gunosy.co.jp


テキストアナリティクス・シンポジウム で発表してきたり

data.gunosy.io


もう少しで新卒で入社して半年が過ぎるわけですが、今振り返ると本当に刺激的な日々でした。近いうちに書籍の執筆もするっぽいです。

この調子で突っ走りたいと思います。



さて、前置きはこれくらいにして今日の題材は

BigDL + Spark on EMR でお手軽分散学習

です。

§1. 背景

最近仕事でSparkで大規模にデータ整形から学習までを行うバッチサーバーをAmazon EMR上に作っています。

最初は右も左もわからず苦しんでいたのですが、慣れると結構楽しくて、S3にとりあえずログを流してHive meta-storeテーブルを用意するだけで簡単にデータの分散処理が出来るわけです。メモリとか気にしなくて良い*1し、なんだこの天国はという感じ。

そんなこんなでデータ整形も一通り慣れてきて、さあ学習させようとなったときに登場するのが Spark MLlib。Scikit-learnのSparkにおけるカウンターパートです。

そんでもって一通りごにょごにょしてみるわけですが、まあそんな良いモデルが出来るわけもなく*2、案の定

「ああ〜〜〜〜〜テンソーフロー(ぐらい柔軟に計算グラフを構築できる自動微分ライブラリでSpark上で分散処理できるかつScalaで書けるライブラリ)欲しい〜〜〜〜〜」

となりました。


そこで出会ったのが BigDL でした. *3

github.com

§2. BigDLとは

BigDLとはSpark上で動く分散深層学習ライブラリです。TensorFlowやCaffeのモデルwrapperも内蔵しています。余計な設定は一切なしで既存のSparkクラスタ上ですぐに分散学習が実行できるのと、kerasそっくりなインターフェースが特徴でしょうか。

BigDL以外にも "それっぽい" ライブラリはいくつかあるのですが、

どれも開発が止まっている様子であったり、発展途上であったり、Spark 2系で動かなかったりで、ちょうどよいライブラリが他にありませんでした。*4

分散深層学習といえばPFNのChainer MN(Multi Node)が有名ですが、BigDLと共通しているのはSynchronousでData Parallelな学習方式です。この辺の話は以下の記事が詳しいです

engineering.skymind.io


しかし、BigDLのほうはまだmultiGPUクラスタには対応していない様子です.

github.com


ですが、仕事の問題設定的にGPUを使いたい状況ではないので、気にしないことにします。

Extremely high performance. To achieve high performance, BigDL uses Intel MKL and multi-threaded programming in each Spark task. Consequently, it is orders of magnitude faster than out-of-box open source Caffe, Torch or TensorFlow on a single-node Xeon (i.e., comparable with mainstream GPU).


とにかく、Spark上でKerasっぽいインターフェースで計算グラフ書けて分散学習出来るだけで最高!採用!

§3. Spark2.2上でBuildする

仕事の関係上、EMR5.8を使いたいのでSparkのバージョンは2.2で固定されます。一方BigDLが公式にサポートしているのは2.1までです。

しかしどうしても使いたかったので無理やりビルドします。
無理やりと言ってもただ mvnにバージョンのパラメータを渡す & build時のテストを無視するだけです。

[Backport 1321][Branch-0.2] Fix compile error on Spark 2.2 by yiheng · Pull Request #1410 · intel-analytics/BigDL · GitHub

↑このPRで "Jenkins pass. Merge" らしいのでテスト無視でも大丈夫だよ。大丈夫大丈夫。


以下雑な手順(参考リンク):

1. EMR5.8のクラスタをポチポチして建てます
2. Maven3系をインストール & mvnのパスを通す
3. git clone https://github.com/intel-analytics/BigDL && cd BigDL
4. mvn -Pspark2.x -Dspark.version=2.2.0 -DskipTests clean package 
5. 以下の2つの.jar, .confファイルが生成されている事を確認 (docsが言ってる場所と全然違うので注意…)
  - BigDL/spark/dl/target/bigdl-0.3.0-SNAPSHOT-jar-with-dependencies.jar
  - BigDL/spark/dl/target/classes/spark-bigdl.conf

2つの生成物を適当なOptionで渡してspark-shellを起動してみましょう。

問題なく起動出来たらビルド成功です。

§4. とりあえず動かす

まず最初に、Engineオブジェクトのinitメソッドを実行します。

scala> import com.intel.analytics.bigdl.utils.Engine
import com.intel.analytics.bigdl.utils.Engine

scala> Engine.init

これによりsparkクラスタの設定をBigDL側が良しなに解釈して分散処理が可能な状態にしてくれます。


次にBigDLにおけるnumpyのカウンターパートであるTensorクラスを使ってみます

scala> import com.intel.analytics.bigdl.tensor.Tensor
import com.intel.analytics.bigdl.tensor.Tensor

scala> Tensor[Double](2,2).fill(1.0)
res9: com.intel.analytics.bigdl.tensor.Tensor[Double] =
1.0     1.0
1.0     1.0
[com.intel.analytics.bigdl.tensor.DenseTensor of size 2x2]

こんな感じです。

§5. Sequential / Functional API

さて、BigDLではモデルを組むためのAPIが2種類用意されています。

名前は例のごとく Sequential APIFunctional API.

今回はSequential APIを使って行こうと思います。

以下に例を載せますが、どこかで見た感じです。

import com.intel.analytics.bigdl.nn._

// Linear(10 dim -> 2 dim) -> Sigmoid -> Sum (2 dim -> 1 dim)
val model = Sequential[Double]()
model.add(Linear[Double]())
model.add(Sigmoid[Double]())
model.add(Sum[Double](dimension = 1, nInputDims = 1))

§6. Sequential / Functional API

データの渡し方が気になる所ですが、公式ドキュメントが詳しいのでそちらを読んだほうが早いかと思います → https://bigdl-project.github.io/master/#APIGuide/Data/


ざっくりと言うと、データセット内のサンプル毎に所望のデータをTensor型のインスタンスに変換してcom.intel.analytics.bigdl.dataset.Sampleクラスのインスタンスを生成し、それら全体から成るRDDを生成するだけです。

§7. Example

少し複雑な計算グラフでデータ生成から学習までやってみましょう。

今回モデリングする真のモデルは以下のようなものとします。入力が2種類あり、それぞれで演算したのち和を取るようなものです。

(x_1 x_2) -> x = x1 * 0.5 + x_2 * 1   ↘
(y_1, y_2) -> y = sigmoid(y_1 + y_2) → y + x


まずは準備 :

import org.apache.spark.SparkContext
val sc = new SparkContext

import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.dataset.Sample
import com.intel.analytics.bigdl.utils.T
import com.intel.analytics.bigdl.tensor.Tensor
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.utils.Engine

Engine.init


次にデータセットを作ります。


先程説明したようにSampleのインスタンスを生成しSeqにappendしたのち、SparkContexを用いてRDDに変換しています。Sampleのインスタンスを生成する際の引数の渡し方によってモデルの入力の数やラベルの有無、ターゲットの数が自動的に解釈されます (この辺を参照: BigDL/Sample.scala at master · intel-analytics/BigDL · GitHub )

parallelizeの引数のintはクラスタのコアの数に合わせて変更してください。

var rawData: Seq[Sample[Double]] = Seq.empty[Sample[Double]]

// Generate ToyData.
for (_ <- 0 until 10000){
  val (x_1: Double, x_2: Double) = (math.random, math.random)
  val x: Double = x_1 * 0.5 + x_2 * 1 + 0 
  val (y_1: Double, y_2: Double) = (math.random, math.random)
  val y: Double = 1 / (1 + math.exp( - (y_1 + y_2)))
  val output: Double = x + y
  rawData = rawData :+ Sample[Double](Array[Tensor[Double]](Tensor[Double](T(x_1, x_2)), Tensor[Double](T(y_1, y_2))), Tensor[Double](T(output)))
}

// convert Data to RDD.
val trainData = sc.parallelize(rawData, 1)

最後にモデル生成 & Optimizerを設定して学習させます:

// Define computational graph
val model = Sequential[Double]()
val branches = ParallelTable[Double]()
val branch1 = Sequential[Double]().add(Linear[Double](2,1))
val branch2 = Sequential[Double]().add(Sum[Double](dimension = 1, nInputDims = 1)).add(Sigmoid[Double]())
branches.add(branch1).add(branch2)
model.add(branches).add(CAddTable[Double]())

// Optimizer
val optimizer = Optimizer(model, trainData, MSECriterion[Double](), 1000)
optimizer.setEndWhen(Trigger.maxEpoch(500))
optimizer.setOptimMethod(new Adam())
optimizer.optimize()

生成したモデルのグラフがどうなっているかは簡単に確認出来ます :

scala> model
res11: com.intel.analytics.bigdl.nn.Sequential[Double] =
Sequential[1dc81520]{
  [input -> (1) -> (2) -> output]
  (1): nn.ParallelTable {
	input
	  |`-> (1): Sequential[42800512]{
	  |      [input -> (1) -> output]
	  |      (1): Linear[73f1aed4](2 -> 1)
	  |    }
	   `-> (2): Sequential[430fc0ba]{
	         [input -> (1) -> (2) -> output]
	         (1): nn.Sum
	         (2): Sigmoid[12e87dac]
	       }
	   ... -> output
}
  (2): CAddTable[758dff98]
}


学習された重みも以下のように確認できます :

scala> model.getParametersTable
res12: com.intel.analytics.bigdl.utils.Table =
{
  Linear73f1aed4:  {
    weight: 0.5007541670100257	1.0077963847969507
    [com.intel.analytics.bigdl.tensor.DenseTensor of size 1x2]
    bias: -0.10326187716322073
    [com.intel.analytics.bigdl.tensor.DenseTensor of size 1]
    gradBias: 0.0
    [com.intel.analytics.bigdl.tensor.DenseTensor of size 1]
    gradWeight: 0.0	0.0
    [com.intel.analytics.bigdl.tensor.DenseTensor of size 1x2]
  }
}


新のモデルのパラメータが学習されている様子が確認出来ました。

§8. 終わりに

今回は、日本語はさておき英語でも情報があまりないBigDLをEMR上でビルドして動かしてみました。

Spark上の深層学習フレームワーク界隈はどこが覇権を握るかまだまだ分かりませんが、僕はしばらくBigDLを愛用していこうかと思います。


EMR上でポチポチするだけなので是非お試しを。

*1:全く気にしないと言う事はないですが…

*2:というよりモデルをサービスに落とすことを考えた柔軟なモデルを作れない. これはscikit-learnも同じ

*3:なんてダサい名前なんだ…と思ったのは内緒です

*4:全部試したわけではないので、嘘つけ!みたいなマサカリは受け付けます