Comments
Description
Transcript
ストリームデータ処理の分散並列化実行における マージ処理コスト削減
DEIM Forum 2010 B3-3 ストリームデータ処理の分散並列化実行における マージ処理コスト削減方式 勝沼 聡† 今木 常之† 西澤 格† 藤原 真二† †株式会社日立製作所 中央研究所 〒185-8601 東京都国分寺市東恋ヶ窪一丁目 280 番地 E-mail: †{satoshi.katsunuma.hb, tsuneyuki.imaki.nn, itaru.nishizawa.cw, shinji.fujiwara.yc}@hitachi.com あらまし 時々刻々と到着するデータをリアルタイムに処理するストリームデータ処理技術が注目されている. ストリームデータ処理では,大量データを処理するために分散並列化が必要であるが,単純にデータを分割して並 列に処理するデータ並列方式では,並列処理結果を時刻順に整列し単一のストリームにマージする処理が性能劣化 の要因となる.そこで本論文では,マージ処理において時刻順整列処理を削減する P-SORT を提案する.提案方式 を実装し評価した結果,性能劣化を回避でき,16 CPU コア利用時に単一 CPU コア実行比で 13 倍の性能向上を確認 した. キーワード リアルタイム,CEP,ストリーム,分散,並列 Distributed and Parallel Stream Data Processing for Reducing Merge Operation Overhead Satoshi KATSUNUMA† Tsuneyuki IMAKI† Itaru NISHIZAWA† and Shinji FUJIWARA† †Central Research Lab., Hitachi Ltd., 1-280, Higashi-koigakubo, Kokubunji-shi, Tokyo, 185-8601, Japan E-mail: †{satoshi.katsunuma.hb, tsuneyuki.imaki.nn, itaru.nishizawa.cw, shinji.fujiwara.yc}@hitachi.com Abstract Stream Data Processing has been widely accepted to process time series data in real-time fashion. Distributed and parallel stream data processing is required to process large volumes of data. However, the merge operation in traditional parallel data processing techniques degrades system performance. In this paper, we propose a distributed and parallel stream data processing method called P-SORT to reduce merge operation overhead. We implement P-SORT and evaluate its performance. Experimental result shows that the performance of P-SORT with 16 CPU-core execution is 13 times faster than that of single core execution. Keyword Real Time,CEP,Stream,Distributed,Parallel 1. は じ め に 株自動取引,電子マネー,車両位置,携帯操作,セ ンサデータなど,時々刻々と到着するデータのリアル タイム処理を必要とするアプリケーションが増加して エ リ 言 語 を 用 い る .CQL は ,デ ー タ ベ ー ス 管 理 シ ス テ ム で 用 い ら れ る 標 準 言 語 SQL に 似 た 言 語 で あ り ,複 雑 なデータ処理を簡潔に記述することが可能である. センサデータや株取引データなどのリアルタイム いる.そしてこのようなアプリケーションを効率的に 処理対象のデータ量は急激な増加傾向にあり,スト 処 理 す る 技 術 で あ る CEP (Complex Event Processing), リームデータ処理を適用しても単一の計算機では十分 及 び CEP を 実 現 す る ミ ド ル ウ ェ ア で あ る ス ト リ ー ム な性能が得らないケースが現れてきている.ストリー データ処理が注目されている.ストリームデータ処理 ム デ ー タ 処 理 で は 処 理 件 数 に 比 例 し て CPU 使 用 量 が は Stanford 大 , MIT な ど に よ り 研 究 さ れ [9][11], 製 品 増加するため,大量データを処理するにはマルチコア 化 も 活 発 化 し て い る [6][7][8]. 環 境 1や マ ル チ ノ ー ド 環 境 2に よ る 分 散 並 列 化 が 必 須 と ストリームデータ処理では,センサデータ,株価情 なる. 報などの実世界から時々刻々と到来する時系列データ ストリームデータ処理の分散並列化は,処理内容を であるストリームデータを入力する.そして処理対象 直 列 に 分 割 し て 実 行 す る 方 式 (パ イ プ ラ イ ン 並 列 方 式 ) のストリームデータを切り取り,メモリ上のウィンド [1][2][3]と , 処 理 対 象 デ ー タ を 分 割 し て 実 行 す る 方 式 ウに保持し,新たなデータが到着する度に保持した (デ ー タ 並 列 方 式 ) [4][5]に 分 類 す る こ と が で き ,そ れ ぞ データを更新し出力することで高速処理を実現する. またストリームデータ処理では,データ処理の定義に CQL (Continuous Query Language) [10]等 の 宣 言 的 な ク 1 単 一 計 算 機 に 複 数 CPU コ ア (以 下 , 単 に コ ア と 書 く )を 搭 載 し た 環 境 2 複数の計算機から構成される環境 れ利害得失がある.実世界を対象とした時系列データ 列 さ れ た デ ー タ と し て 出 力 す る .こ れ に よ り 図 2 に 示 処理を考えた場合,例えばセンサごとに閾値を超えた す よ う に ,処 理 結 果 を 受 信 す る ア プ リ ケ ー シ ョ ン で は , データ数を独立して集計するなど,対象データを分割 あるセンサにおいて異常が検出されると同時に,各セ できることが多い.このような処理では,データ並列 ンサの異常値の前後関係を解析することが可能となる. 方式では各データを単一コアに割当てて処理可能であ 障害原因分析アプリケーションでは,あらかじめデー るのに対し,パイプライン並列方式では複数コアで タベースに登録された障害原因パターンとマッチング データを送受信し処理する必要がある.このため通信 を取り,障害の原因を分析する.ストリームデータ処 量はパイプライン並列方式ではコア数に比例して増加 理 で は 時 刻 順 の 逆 転 を 許 容 す る 実 行 方 式 [12]も 研 究 さ し,性能劣化に繋がる.一方,データ並列方式では通 れているが,正確な処理結果を必要とするアプリケー 信量はコア数に依存せず一定であり,性能への影響は ションでは,本例で示したように時刻順に整列された 小さい. データが対象となる. しかしながら,従来のデータ並列方式では処理デー タを単一のストリームにマージする際に,時刻順に全 データを整列する処理が発生する.マージ処理では データ量に比例した処理が必要なため,本整列処理は 性 能 上 の ボ ト ル ネ ッ ク と な る .そ こ で 本 論 文 で は ,マ ー ジ 処 理 に よ る 性 能 劣 化 を 軽 減 す る 方 式 と し て P-SORT (Partial SOrt method in Real-Time merge operation) を 提 案 す る . P-SORT で は , 時 刻 順 整 列 の 範 囲 を 絞 る ことでマージ処理コストを削減する. 本論文の構成は以下の通りである.まず 2 節でスト リームデータ処理のデータ並列方式について述べる. そ し て , 次 に 3 節 に お い て 本 論 文 で 提 案 す る P-SORT を 説 明 し ,4 節 で そ の 評 価 に つ い て 述 べ る .5 節 で 関 連 研究をまとめ,最後に 6 節で今後の課題を述べる. ストリーム センサデータ (センサ String, 測定値 double, タイムスタンプ:測定時刻 Timestamp); クエリ q1 SELECT センサ,CNT(*) AS 分データ数 FROM センサデータ [range 1 minute] GROUP BY センサ; クエリ q2 SELECT センサ,CNT(*) AS 異常データ数 FROM センサデータ [range 1 minute] WHERE 測定値 > 閾値α GROUP BY センサ; クエリ q3 istream( SELECT センサ, q2.異常データ数 / q1.分データ数 AS 異常率 FROM q1, q2 WHERE q1.センサ = q2.センサ); 図 1: :クエリ及びストリームの記述例 2. ス ト リ ー ム デ ー タ 処 理 の デ ー タ 並 列 方 式 2.1. ストリームデータ ストリームデータ処 処 理 の時 刻 順 出 力 以下では,発電所で各機器に設置したセンサによっ 発電所 センサデータ 処理結果 アプリケーション て取得されるデータに,ストリームデータ処理を適用 して異常を検出する処理を説明する. 図 1 に CQL で 記 述 さ れ た ク エ リ 及 び ス ト リ ー ム を 示す. 「 セ ン サ デ ー タ 」ス ト リ ー ム は「 セ ン サ 」, 「測定 値」カラム,及びタイムスタンプ「測定時刻」から構 8:19 8:18 8:17 温度V 温度U 温度V 4% 6% 10% 10% 4% 6% 8:19 8:18 8:17 温度V 温度U 温度V 97℃ 101℃ 99℃ ストリーム処理 エンジン 温度U 温度V 10% 6% 10% 1分 6% 成 さ れ る .ク エ リ q1 で は セ ン サ デ ー タ ス ト リ ー ム を 入 力 と し , セ ン サ ご と の 最 近 一 分 間 の 合 計 デ ー タ 数 (分 温度U 測定値 マッチング デ ー タ 数 )を 算 出 す る . ま た ク エ リ q2 で は , 同 じ く セ ンサデータストリームを入力とし,センサごとの最近 一 分 間 の 測 定 値 が 閾 値 α を 超 え た デ ー タ 数 (異 常 デ ー 閾値α 8:18 8:19 時刻 各センサの異常値判定 タ 数 )を 算 出 す る .そ し て ク エ リ q3 で は ,ク エ リ q1 及 び q2 で 算 出 し た 分 デ ー タ 数 と 異 常 デ ー タ 数 か ら ,異 常 障害原因 パターン リアルタイム 障害原因分析 図 2: :ストリームデータ処理の動作例 率を算出する. 図 2 に,図 1 で定義したストリーム,クエリの処 理動作を示す.まず,発電所の機器に取り付けた各セ 2.2. データ並 データ 並 列 方 式 の 動 作 と 効 果 ストリームデータ処理では,クエリの処理対象デー ンサのデータをセンサデータストリームとして入力し, タが,ストリームの特定カラムの値ごとに処理が独立 ク エ リ q1~ q3 に 従 っ て ス ト リ ー ム デ ー タ 処 理 エ ン ジ し て い る こ と が 多 い .例 え ば 図 1 の ク エ リ q1~ q3 は , ンで逐次処理する.そしてこれらのクエリによりセン センサデータストリームにおけるセンサカラムの値ご サの値が異常か否か判定した処理結果を,時刻順に整 とに処理が独立している.このような場合,データ並 列方式では,入力されるストリームデータを特定カラ び複数コアを用いる二つの方式に大別される.以下で ム 値 に 従 っ て 複 数 の CPU コ ア に 振 り 分 け る . そ し て , はこれらの方式の動作を説明する. 各コアで同一クエリの処理を実行し,その処理結果を マージする. まず単一コアによるマージ処理では,複数コアで処 理されたデータをマージ処理するコア上のキューに転 データ並列方式でのデータの割当て方法を説明す 送し,該コアにおいて各キューのデータを時刻順に整 る.本論文においては,クエリ処理対象のデータが特 列 し て 出 力 す る . 例 え ば 図 4 で は CPU コ ア #5 に お い 定のカラムで分割可能であること,及びこのカラムが て ,CPU コ ア #1~ #4 の デ ー タ を 対 応 す る キ ュ ー に 格 納 ユーザによって指定されることを仮定する.指定され し,時刻順に整列した後に出力する. たカラムは,その値ごとにクエリを独立して処理可能 けテーブルを導出する.振分けテーブルはデータのカ ラム値をキーとしてコア名を参照するテーブルである. 例 え ば 図 1 の ク エ リ q1~ q3 は 2.1 節 で 述 べ た よ う に , センサごとに異常値か否かを判定する処理である.こ CPU#1 8:20 8:13 CPU#2 8:18 8:10 CPU#3 8:19 8:15 CPU#4 8:17 8:16 の場合,センサカラムの値ごとに処理が独立している ため,クエリ作成者がこのカラムを指定することで, マージ処理 CPU#5 であることを示し,このカラムを用いてデータの振分 キュー 整列 処理 ・・・ 8:15 8:13 8:10 全データ数分 の比較処理 図 3 のようにセンサをキーとする振分けテーブルが 導出される. ク エ リ 実 行 時 に は ,図 3 に 示 す よ う に 導 出 し た 振 分 け テ ー ブ ル に 従 っ て デ ー タ を 複 数 の CPU コ ア (本 例 の 全データ数分 全データ数分 の通信 のメモリ 場 合 CPU コ ア #1~ #4)に 振 り 分 け , 各 コ ア で ク エ リ q1 図 4: :単一コアによるマージ処理 ~ q3 に 従 っ て セ ン サ の 異 常 値 判 定 処 理 を 実 行 す る .そ して,処理結果をマージする処理では,データの整合 次に,複数コアによるマージ処理では,まず複数コ 性 を 確 保 す る た め ,各 CPU コ ア で 処 理 し た デ ー タ を 時 アで部分的に整列して別コアに転送する.そして転送 刻順に整列してアプリケーションに出力する. 先のコアで部分的に整列されたデータを再整列するこ 振分け処理 CPU#0 データ並列処理 マージ処理 CPU#1 CPU#5 アプリケーション とで全データを時刻順に整列する.例えば図 5 では CPU コ ア #1, #2 に お い て 処 理 さ れ た デ ー タ を CPU コ ア #5 で , CPU コ ア #3, #4 に お い て 処 理 さ れ た デ ー タ を CPU コ ア #6 で 整 列 し ,そ れ ら の 結 果 を CPU コ ア #7 CPU#2 振分けテーブル センサ名 センサ名 振分け 振分け 先 CPU#3 温度V 入圧 電力Y CPU#1 温度U 出圧 温度W CPU#2 CPU#3 電力X 電力Z 流量 回転 CPU#4 CPU#4 8:12 8:10 8:15 電力X 温度U 温度V で再整列することで,全データを時刻順に整列する. マージ処理 CPU#5 8:15 8:12 8:10 温度V 電力X 温度U 各センサの 異常値判定 時刻順整列 キュー CPU#1 8:20 8:13 CPU#2 8:18 8:10 整列 処理 CPU#7 8:20 8:18 8:13 8:10 キュー 時刻順整列 図 3: :ストリームデータ処理のデータ並列方式 CPU#6 CPU#3 8:19 8:15 キュー CPU#4 8:17 8:16 整列 処理 8:13 8:10 8:19 8:17 8:16 8:15 整列 処理 デ ー タ 並 列 方 式 の 効 果 と し て は ,(1)各 コ ア で 処 理 す 全データ数分 の比較処理 全データ数分 る デ ー タ 数 や 送 受 信 す る デ ー タ 数 が 減 る こ と で CPU 全データ数分 使 用 量 が 減 少 す る ,(2)コ ア 毎 の 処 理 デ ー タ 数 が 減 る こ の通信 とによりクエリ処理で必要となるメモリ量が減少する, ・・・ のメモリ 図 5: :複数コアによるマージ処理 という点が挙げられる.しかし従来のデータ並列方式 ではマージ処理が性能上のボトルネックとなる可能性 単一コア及び複数コアによる方式のいずれにおい が あ る た め , 2.3 節 で 詳 細 に 検 討 す る . ても,最終的には一つのコアにおいて全データ数分の 2.3. データ並 データ 並 列 方 式 に おけるマージ おける マージ処 マージ 処 理 処 理 で は , 図 4 に 示 す よ う に CPU コ ア #5 の 4 個 の 整列処理が必要になる.例えば単一コアによるマージ データ並列方式におけるマージ処理は,単一コア及 キ ュ ー の そ れ ぞ れ が 全 デ ー タ の 平 均 1/4 ず つ の デ ー タ を受信し,各キューのデータを整列するため,全デー 求に従って,整列が必要なデータを同じコアに割当て タ数分の処理が必要になる.また複数コアによるマー 処理することで整列処理を削減する. ジ 処 理 で も , 図 5 に 示 す よ う に CPU コ ア #7 の 2 個 の 例えば前述のアプリケーションでは,時刻順整列要 キ ュ ー が 全 デ ー タ の 平 均 1/2 の デ ー タ を 受 信 し , 各 求を「同一機器のセンサ間で整列が必要」と指定する キューのデータを整列するため,同様に全データ数分 こ と が で き る .そ し て 図 6(b)に 示 す よ う に ,同 じ 機 器 の処理が必要になる.このようにマージ処理では単一 のセンサデータを単一コアに割り当てる.このことで コアで全データを送受信し,時刻を比較するために, 図 6(a) の 従 来 方 式 に 示 さ れ る よ う な 全 デ ー タ に 対 す 全 デ ー タ 数 分 の CPU 処 理 量 が 必 要 に な る . ま た , 全 る整列処理を削減できる. データを単一コアのキューに格納するため,全データ 数分のメモリ量が必要になる.このためマージ処理が 3.2.2. 時 刻 順 整 列 要 求 の 指 定 性能上のボトルネックとなり,コア数を増やしても一 定以上性能が向上しない. 基 本 P-SORT で は , ユ ー ザ が デ ー タ 処 理 分 割 キ ー (Operator Partition Key,以 下 OPK と 略 す )を 指 定 す る . OPK は ス ト リ ー ム デ ー タ の カ ラ ム で あ り ,OPK で 指 定 3. マ ー ジ 処 理 コ ス ト 削 減 方 式 の 提 案 3.1. 時 刻 順 整 列 範 囲 の 限 定 したカラムの値が異なるデータは,互いに独立して処 理 可 能 で あ る こ と を 示 す .例 え ば ,図 1 の ク エ リ は 2.2 2 節で述べたように,ストリームデータ処理では時 節で述べたようにセンサカラムごとに独立して処理可 刻順にデータを処理する必要があるが,一般の業務に 能 で あ る の で , 図 7 に 示 す よ う に OPK と し て セ ン サ おいて全データに対する時刻順整列が必要な場合は少 を指定する. ない.例えば図 3 の障害原因を分析するアプリケー 次 に , 基 本 P-SORT が 時 刻 順 整 列 範 囲 を 算 出 す る た ションでは,一般的に発電所内の特定の障害が関連す め に , ユ ー ザ は 時 刻 順 整 列 分 割 キ ー (Sorting Partition るセンサは同じ機器,ある機器のグループなどの一定 Key,以 下 SPK と 略 す )を 指 定 す る .SPK で 指 定 さ れ た の範囲内に絞られることが多く,その範囲内のセンサ カラムで値が同じデータは,時刻順を整列する必要が を対象として障害を分析する.したがって,各センサ あ る こ と を 示 す .最 後 に ,OPK と SPK の 対 応 関 係 を 示 データの時刻順もその範囲内で守られていれば十分で す表として,ユーザは属性対応表を指定する.例えば ある.そこで本論文では,マージ処理による性能の劣 障害原因分析アプリケーションでは「同一機器のセン 化を軽減するために,出力データの時刻順整列を限定 サ に お い て 時 刻 順 整 列 が 必 要 」で あ る た め ,図 7 に 示 す る P-SORT を 提 案 す る . 以 下 で は , P-SORT と し て , す よ う に SPK と し て 機 器 が 指 定 さ れ る .そ し て 属 性 対 基 本 P-SORT,最 適 化 P-SORT の 二 つ の 方 式 を 説 明 す る . 応 表 と し て ,SPK 及 び OPK と し て 指 定 さ れ た 機 器 及 び センサの対応関係が指定される. 3.2. 基 本 P-SORT: : 時 刻 順 整 列 要 求 による割 による 割 当 て 3.2.1. 基 本 P-SORT の 方 針 3.2.3. デ ー タ 割 当 て と 整 列 処 理 の 省 略 データを時刻順に整列する範囲は,同じ処理でも結 次 に ユ ー ザ か ら 指 定 さ れ た OPK, SPK, 属 性 対 応 表 果を利用するアプリケーションによって異なるため, を用いて振分けテーブルを導出する.振分けテーブル 基 本 P-SORT で は 時 刻 順 整 列 の 要 求 を ユ ー ザ が 指 定 可 は , 2.2 節 で 述 べ た よ う に デ ー タ の 振 分 け 時 に 参 照 す 能なインタフェースを提供する.そして時刻順整列要 るテーブルであり,振り分けテーブルを参照すること CPU#1 CPU#5 機器A 電力X 電力Y 温度U 温度V 入圧 電力Y CPU#2 温度U 出圧 温度W CPU#3 電力X 電力Z CPU#4 CPU#1 電力X 温度V 温度W CPU#5 CPU#1 CPU#2 4コア間 整列 機器B 温度V温度W 電力Z 3センサ分 処理 機器C 機器A 電力Y 温度U 出圧 入圧 CPU#3 6センサ分 処理 回転 機器D 整列なし (a)従来方式:全データ整列 機器B 出圧 入圧 機器C 機器D 流量 (b)基本P-SORT: 時刻順整列要求による割当て 2コア間 整列 電力Z CPU#3 CPU#4 CPU#4 流量 回転 機器A CPU#2 3センサ分 処理 流量 回転 (c)最適化P-SORT: データ割当て最適化適用 図 6: :マージ処理コスト削減に向けたアプローチ でデータのカラム値をキーとしてそのデータを処理す を セ ン サ ご と に コ ア #1 , #2 に 振 分 け , 処 理 後 コ ア #5 る CPU コ ア 名 を 取 得 す る こ と が で き る . 基 本 P-SORT で整列する.これにより,各コアの処理データ数を最 で は ,SPK が 同 じ デ ー タ は 同 一 の CPU コ ア に 割 当 て る . 大 3 センサ分に抑えつつ,整列処理も高々2 コア間に そ し て ,属 性 対 応 表 を 参 照 し ,各 CPU コ ア に 割 り 当 て 留めることができる. た 各 SPK に 対 応 す る OPK を 抽 出 し ,抽 出 し た OPK の 値 を 振 分 け テ ー ブ ル の キ ー と す る .例 え ば ,図 7 で は , 3.3.2. デ ー タ 割 当 て の 最 適 化 SPK が 機 器 で あ る た め , 機 器 の と り う る 値 で あ る {機 最 適 化 P-SORT の デ ー タ 割 当 て に は , 基 本 P-SORT 器 A, 機 器 B, 機 器 C, 機 器 D}を CPU コ ア #1~ #4 に と 同 様 に OPK, SPK, 及 び 属 性 対 応 表 を 用 い る . そ し 割 当 て る .ま た OPK が セ ン サ で あ る た め ,属 性 対 応 表 て,前述したようにコアの処理データ数を一定以内に から機器に対応するセンサを求め,センサをキーとす 抑 え る た め に ,各 コ ア に 割 り 当 て る 最 大 OPK 数 を 見 積 る振分けテーブルを生成する. も り ,一 つ の SPK に 対 応 す る OPK の 数 が 最 大 OPK 数 そして実行時には,まず振分け処理において,デー を超える場合には複数コアに割当て,処理後に時刻順 タ 振 分 け テ ー ブ ル に 従 っ て 各 CPU コ ア に 入 力 デ ー タ 整列処理を実行する.また,マージ処理によるボトル を 送 信 す る . 例 え ば , 図 7 で は CPU コ ア #1~ #4 に 入 ネ ッ ク の 影 響 を 最 小 化 す る た め に ,対 応 す る OPK の 数 力 デ ー タ を 振 り 分 け る .そ し て 各 CPU コ ア で 処 理 し た が 多 い SPK か ら 優 先 し て 割 り 当 て る . 結 果 は ,別 CPU コ ア の 処 理 結 果 と 時 刻 順 整 列 す る こ と なく,その処理結果を利用する他のストリームデータ 処理や,アプリケーションに渡す.なお,この処理結 果を利用するストリームデータ処理についても同様に 複数コアで実行することができる. データ並列処理 データ 割当て センサ データ処理 分割キー (OPK) CPU#1 アプリ 機器A センサ 機器 時刻順整列 分割キー (SPK) 機器 センサ 機器A 温度U 温度V 温度W 電力X 電力Y 電力Z 機器B 入圧 出圧 機器C 回転 機器D 流量 振分け 振分け 先 CPU#2 温度U 温度V CPU#1 温度W 電力X 電力Y 電力Z 入圧 出圧 CPU#2 回転 CPU#3 流量 CPU#4 機器B 振分けテーブル 機器D CPU#3 機器C CPU#4 各センサの 異常値判定 属性対応表 センサ 機器 基 本 P-SORT で は , 処 理 内 容 に よ っ て は 各 コ ア に 処 理データを均等に振分けられず,性能が向上しない データ 割当て SPK SPK OPK 機器A 温度U 温度V 温度W 電力X 電力Y 電力Z 機器B 入圧 出圧 機器C 回転 機器D 流量 属性対応表 振分け 振分け 先 温度V 温度W CPU#1 電力X 電力Y 温度U CPU#2 電力Z 入圧 出圧 CPU#3 回転 流量 CPU#4 振分けテーブル センサ 再振分 け先 温度V 温度W CPU#5 電力X 電力Y 温度U 電力Z 整列処理 振分けテーブル 最大OPK数 3センサ(センサ数 10 / コア数 4) (ⅰ)機器A割当て CPU#1 電力X 温度V温度W CPU#2 電力Y温度U電力Z CPU#3 CPU#4 (ⅱ)機器B割当て 電力X 温度V温度W 電力Y温度U電力Z 入圧 出圧 (ⅲ)機器C、機器D割当て 電力X 温度V温度W 電力Y温度U電力Z 入圧 出圧 回転 流量 図 8: : 最 適 化 P-SORT に お け る デ ー タ 割 当 て 処 理 データ割当ての手順は以下のようになる. ① OPK 数 を コ ア 数 で 割 る こ と に よ り 最 大 OPK 数 ② 各 SPK に お い て ,対 応 す る OPK 数 を 導 出 す る . ③ 対 応 す る OPK 数 が 多 い SPK か ら 順 に 各 コ ア に 図 7: : 基 本 P-SORT の 動 作 3.3. 最 適 化 P-SORT: : データ割 データ 割 当 て 最 適 化 適 用 3.3.1. 最 適 化 P-SORT の 方 針 センサ OPK を導出する. 割 当 て る .対 応 す る OPK 数 が 最 大 OPK 数 よ り も 大 き い 場 合 に は ,OPK ご と に 複 数 コ ア に 分 け て割当てる. ケ ー ス が 想 定 さ れ る . そ こ で 最 適 化 P-SORT で は , 時 例 え ば 図 8 に 示 す 例 で は , セ ン サ 数 が 10, コ ア 数 刻順整列要求に従ったデータ振分けでコア間の処理 が 4 であることから,センサ数をコア数で割り,値を データ数に不均衡が生じる場合には,複数コアにデー 切 り 上 げ る こ と で 最 大 OPK 数 を 3 と 求 め る .そ し て (i) タを割当てる.そして処理後に時刻順整列要求に従っ 機 器 A の OPK 数 が 6 と 最 も 多 い た め , 最 初 に 機 器 A て整列処理を行う.データ割り当て時には,コアの処 の セ ン サ を 複 数 コ ア (#1,#2)に 割 当 て る .次 に ,(ii)OPK 理データ数の最大値を一定以内に抑えつつ,マージ処 数 が 二 番 目 に 多 い 機 器 B の セ ン サ を 単 一 コ ア (#3)に 割 理によるボトルネックの影響を最小化するようにデー 当 て る . さ ら に (iii)機 器 C, 及 び 機 器 D の セ ン サ を コ タ を 割 り 当 て る .例 え ば 図 6(b)で は コ ア #1 の 処 理 デ ー ア #4 に 割 当 て る . タ 数 が 6 セ ン サ 分 で あ り ,他 の コ ア #2~ #4 よ り も 多 い そして,このように決定したデータの割当て方法に た め ,図 6(c)に 示 す よ う に 機 器 が 機 器 A で あ る デ ー タ 従 っ て , 基 本 P-SORT と 同 様 に 振 分 け テ ー ブ ル を 生 成 す る . ま た 最 適 化 P-SORT で は 整 列 処 理 振 分 け テ ー ブ ルを生成する.整列処理振分けテーブルは,整列処理 が占める割合を導出する. ③ 処 理 デ ー タ 数 の 占 め る 割 合 が 大 き い SPK か ら 順 するコアにデータを振り分けるために用いるテーブル に 割 当 て る . SPK の 処 理 デ ー タ 数 が 最 大 処 理 で あ り ,OPK を キ ー と し て 整 列 処 理 す る コ ア 名 を 参 照 デ ー タ 数 よ り も 大 き い 場 合 に は , OPK ご と に 複 す る .例 え ば 図 8 で は ,前 述 の よ う に 機 器 A が 割 り 当 数コアに割当てる. て ら れ た コ ア #1,#2 の デ ー タ を 整 列 す る .し た が っ て そして,このように決定したデータ割当て方法に コ ア #1,#2 で 処 理 さ れ る セ ン サ を キ ー と し て 整 列 処 理 従 っ て ,3.3.2 節 で 述 べ た よ う に ,振 分 け テ ー ブ ル 及 び を す る コ ア #5 を 参 照 す る 整 列 処 理 振 分 け テ ー ブ ル を 整列処理振分けテーブルを生成し,これらのテーブル 生成する. を用いて部分的な時刻順整列処理を実行する. な お 各 OPK の 処 理 デ ー タ 数 の 比 が 実 行 時 に 変 化 す る 場 合 に は ,OPK ご と の デ ー タ 数 の 分 布 情 報 を 実 行 時 3.3.3. 部 分 的 な 時 刻 順 整 列 処 理 最 適 化 P-SORT で は , 従 来 の デ ー タ 並 列 方 式 と 同 様 に取得し,その情報に従って割当て方法を変更する方 に振分けテーブルに従って各コアに入力データを振り 法が考えられる.実行時の変更方法の検討については 分ける.そして処理されたデータを,整列処理振分け 今後の課題とする. テーブルに従って再振分けし,部分的な時刻順整列を 実 行 す る .図 9 は ,図 8 で 生 成 し た 振 分 け テ ー ブ ル 及 び,整列処理振分けテーブルを用いた部分的な時刻順 4. マ ー ジ 処 理 コ ス ト 削 減 方 式 の 評 価 4.1. 評 価 方 法 及 び 環 境 整 列 処 理 の 動 作 を 示 す .図 9 で は ,振 分 け テ ー ブ ル に ストリームデータ処理のデータ並列方式として,全 従 っ て コ ア #1,#2 に 割 り 当 て ら れ た デ ー タ を ,整 列 処 デ ー タ を 時 刻 順 に 整 列 す る 従 来 方 式 及 び ,基 本 P-SORT, 理 振 分 け テ ー ブ ル に 従 っ て コ ア #5 に 送 信 す る .そ し て 及 び 最 適 化 P-SORT を プ ロ ト タ イ プ 実 装 し た . そ し て コ ア #5 で ,#1,#2 で 処 理 さ れ た デ ー タ 間 で 時 刻 順 整 列 プ ロ ト タ イ プ を 1 ~ 4 台 の 計 算 機 (4 コ ア 搭 載 CPU: し ,ア プ リ ケ ー シ ョ ン に 出 力 す る .コ ア #3,#4 に 振 分 Intel® Core™2 Quad, メ モ リ 4GB)上 で 動 作 さ せ て 処 理 けられたデータは整列処理されることなくアプリケー 性 能 を 測 定 し た . 各 マ シ ン に は OS と し て Fedora Core ションに出力される. Linux 8, Java 仮 想 マ シ ン と し て Sun JVM v1.5 を 用 い CPU#0 データ並列処理 CPU#1 た . Java 仮 想 マ シ ン に は 1 コ ア 当 た り 768MB の メ モ 部分整列処理 CPU#5 アプリ 機器A センサ 振分け 振分け 先 温度V 温度W CPU#1 電力X 電力Y 温度U CPU#2 電力Z 入圧 出圧 CPU#3 回転 流量 CPU#4 振分けテーブル 電力X 温度V 温度W CPU#2 センサ 再振分 け先 電力Y 温度U 電力Z 温度V 温度W CPU#5 電力X 電力Y CPU#3 温度U 電力Z 出圧 入圧 CPU#4 評価に用いたデータは発電機器に取り付けられた 各 セ ン サ か ら の 測 定 値 1 日 分 700 万 件 で あ る . 評 価 に 機器A 機器B リ領域を割り当てた. 整列処理 振分けテーブル 用いたクエリではこれらのセンサデータを入力し,セ ンサ毎に設定された異常値判定条件に従って,異常値 か否か判定する.クエリの処理は,センサごとに独立 しているため,従来方式ではセンサごとに複数コアに 振 分 け て 処 理 し た .ま た 基 本 P-SORT,最 適 化 P-SORT で は OPK を セ ン サ と し , 結 果 を 取 得 す る ア プ リ ケ ー 機器C 機器D 回転 流量 図 9: : 最 適 化 P-SORT の 部 分 的 な 時 刻 順 整 列 処 理 シ ョ ン で は 機 器 ご と に 分 析 す る と 仮 定 し ,SPK を 機 器 とした. 異常値 判定条件 ストリーム 異常値判定 クエリ 3.3.4. OPK の デ ー タ 数 に 偏 り が あ る 場 合 各 OPK の 処 理 デ ー タ 数 に 偏 り が あ る 場 合 に は ,デ ー タ割当ての最適化において,コアの処理データ数や マ ー ジ 処 理 コ ス ト を OPK 数 で 見 積 も る こ と が で き な 出力用整形 クエリ センサ ストリーム 判定結果 ストリーム 図 10: :評価に用いたクエリ い .そ こ で ,OPK ご と の デ ー タ 数 の 分 布 情 報 を ,OPK, SPK,属 性 対 応 表 と 共 に ユ ー ザ に 指 定 さ せ る .そ し て , データ数の分布情報を用いて,以下のようにデータを 割当てる. 4.2. 評 価 結 果 と 考 察 4.1 節 で 述 べ た 環 境 で 計 算 機 数 ,コ ア 数 を 変 化 さ せ , そ れ ぞ れ の 実 行 環 境 で 700 万 件 の 処 理 時 間 を 測 定 し , ① 最 大 処 理 デ ー タ 数 を 1/(コ ア 数 )と す る . そ の 処 理 時 間 か ら 処 理 件 数 (件 /秒 )を 求 め た . 図 11 は ② 全 体 の 処 理 デ ー タ 数 に ,各 SPK の 処 理 デ ー タ 数 各コア数で実行した場合の,単一コア実行と比べた処 理件数比を表す. 従 来 の デ ー タ 並 列 方 式 で は , ス ル ー プ ッ ト 比 が 12 コ ア で 6.0 倍 , 16 コ ア で 6.1 倍 の 性 能 向 上 に 留 ま り , 表 1: :コ ア の 処 理 デ ー タ 数 と マ ー ジ 処 理 コ ス ト 比 較 ほ ぼ 16 コ ア 迄 に 性 能 向 上 が 止 ま る こ と を 確 認 し た .ま 従来 方式 た,従来方式と同様にセンサごとにデータを振分け, 時 刻 順 に 整 列 せ ず に 出 力 し た 場 合 に は , 12 コ ア で 11 倍 , 16 コ ア で 14 倍 と , 16 コ ア に お い て も 性 能 向 上 を 確認した.これにより,2 節で述べた時刻順整列処理 による性能のボトルネックが確認できた. 一 方 , 基 本 P-SORT で は 8 コ ア 使 用 時 で 6.1 倍 と 性 コア当たりの 最大処理データ数 (全 デ ー タ 数 比 ) 整列対象データ数 (全 デ ー タ 数 比 ) 8.5% 100% 基本 P-SORT 最適化 P-SOR T 17% 6.9% 0% 17% 能 が 向 上 し , 従 来 の デ ー タ 並 列 の 8 コ ア で の 3.9 倍 に 対し,整列処理を省いたことによる性能向上を確認し なお本評価では,全てのセンサが等間隔でデータを た.しかしながら,8 コア以上で実行させた場合には 発生すると仮定した.しかし,実際にはデータの種類 処 理 性 能 は 向 上 せ ず , 16 コ ア で 6.5 倍 の ス ル ー プ ッ ト に よ っ て 偏 る 場 合 が あ る . こ の よ う な 場 合 に は , 3.3.4 向上に留まった.これは,入力データにおいて単一の 節で述べたデータ数の分布情報を用いたデータの割当 機器に属するセンサ数が全体のセンサ数の多くを占め, てが必要になる.このような場合の評価は今後の課題 その機器のセンサを割り当てたコアが性能上のボトル とする. ネ ッ ク と な る た め で あ る . こ の よ う に 基 本 P-SORT で また,本論文ではデータ処理のレイテンシについて は,時刻順整列が必要なデータセットのデータ数の偏 は 評 価 し て い な い .時 刻 順 整 列 す る デ ー タ 数 が 増 加 し , りが大きい場合に,性能が頭打ちになってしまうこと データを処理するコア数が増加すると各コアの処理時 が確認できた. 間差が大きくなるためレイテンシが大きくなる.した が っ て 従 来 方 式 は , P-SORT と 比 較 し て レ イ テ ン シ も 全データ整列 P-SORT:時刻順整列要求による割当て P-SORT:データ割当て最適化 整列なし(参考) 処理性能(1コア比) 14 14 13 12 12 11 10 9 8 7 6 6 5 4 3 2 2 1 増大すると考えられる.レイテンシの評価についても 今後の課題とする. 5. 関 連 研 究 ストリームデータ処理の分散並列化の研究は,パイ プライン並列方式及びデータ並列方式に大別できる. パイプライン並列方式としては,各クエリのコアへの 割 当 て 方 法 を 実 行 時 に 変 更 す る 方 式 [1] や , 入 力 ス ト 11 2 3 44 5 6 7 88 9 コア数 10 11 12 12 13 14 15 16 16 図 11: :分散並列化方式の性能評価結果 リームが異なるクエリを各コアに最適に振り分ける方 式 [2][3] が あ る . し か し パ イ プ ラ イ ン 並 列 方 式 で は , 2 節で述べたように複数コアでデータを送受信し処理 することから,通信による性能劣化が大きい. 一 方 ,最 適 化 P-SORT で は 12 コ ア で 9.8 倍 ,16 コ ア 一方,データ並列方式としては,動的にデータを再 で 13 倍 と , 16 コ ア に お い て も 処 理 性 能 の 向 上 を 確 認 振 り 分 け す る 方 式 [5]が 検 討 さ れ て い る .し か し な が ら , し た . こ れ に よ り 最 適 化 P-SORT で , コ ア の 最 大 処 理 [5]で は デ ー タ 振 分 け 後 の マ ー ジ 処 理 に お い て ,時 刻 順 データ数を小さくしつつ,整列処理のボトルネックを 整列の範囲を狭めることを考慮しておらず,マージ処 最 小 化 す る 効 果 が 確 認 で き た .実 際 に 16 コ ア 実 行 に お 理のオーバヘッドを削減できないと推測する. い て は ,表 1 に 示 す よ う に ,コ ア の 最 大 処 理 デ ー タ 数 データ並列方式において,処理分割キーを自動的に が 全 処 理 デ ー タ 数 の 6.9% と な り , 従 来 方 式 や 基 本 導 出 す る 方 式 [4]が あ る .[4]で は 単 一 ク エ リ の 処 理 分 割 P-SORT よ り も 小 さ い こ と を 確 認 し た . 最 大 処 理 デ ー キーを抽出し,その処理分割キーから複数クエリの処 タ数が従来方式よりも小さくなるのは,クエリの処理 理 分 割 キ ー を 導 出 す る が , P-SORT と は 異 な り 各 コ ア を す る コ ア 数 が 従 来 方 式 で は 16 コ ア 中 12 コ ア で あ る にデータを均等に振り分けられない場合を考慮してい の に 対 し , 最 適 化 P-SORT は マ ー ジ 処 理 コ ス ト を 削 減 な い .そ の た め ,コ ア 間 の 処 理 デ ー タ 数 の 偏 り に よ り , す る こ と に よ り , 16 コ ア 中 15 コ ア に 増 加 し た た め で 性 能 向 上 し な い ケ ー ス が 想 定 さ れ る .ま た ,[4]は マ ー ある.また整列対象のデータ数も全処理データ数の ジ処理において時刻順整列の範囲を狭めることを考慮 17%に 留 ま っ た . していないため,マージ処理によるオーバヘッドが大 きいと推測する. 6. お わ り に 本論文では,ストリームデータ処理の分散並列化に おいて,マージ処理によるボトルネックを回避する P-SORT を 提 案 し た .P-SORT で は 時 刻 順 整 列 の 範 囲 を 絞 る こ と で マ ー ジ 処 理 コ ス ト を 削 減 す る . P-SORT を 実装し性能評価した結果,従来のデータ並列方式では 16 コ ア で 6.1 倍 の 性 能 向 上 に 留 ま る の に 対 し ,P-SORT で は 16 コ ア で 13 倍 と な る こ と を 確 認 し た . 今 後 は 実 行時のデータ再割当てに対応するなど,方式の拡張を 検討する.また,レイテンシの評価などさらに詳細な 評価を進める予定である. 参 考 文 献 [1] M. Cherniack, H. Balakrishnan, M. Balazinska, D. Carney, U. Cetintemel, Y. Xing, and S. Zdonik, “Scalable distributed stream processing”, Proc. of CIDR 2003. [2] Y. Xing, S. Zdonik, and J. Hwang, “Dynamic load distribution in the Borealis stream processor”, Proc. of ICDE 2005. [3] Y. Xing, J. Hwang, U. Cetintemel, and S. Zdonik, “Providing resiliency to load variations in distributed stream processing”, Proc. of VLDB 2006. [4] T. Johnson, M. S. Muthukrishnan, V. Shkapenyuk, and O. Spatscheck, “Query-aware partitioning for monitoring massive network data streams”, Proc. of SIGMOD 2008. [5] M. A. Shah, J. M. Hellerstein, S. Chandrasekaran, and M. J. Franklin, “Flux: an adaptive partitioning operator for continuous query systems”, Proc. of ICDE 2003. [6] Aleri, “Coral8 Technology Overview”, http://www.aleri.com/ [7] B. Gedik, H. Andrade, K. Wu, P. S. Yu, and M. Doo, “SPADE: The System S Declarative Stream Processing Engine”, Proc. of SIGMOD 2008. [8] StreamBase, “StreamBase 6.5.3 Documentation”, http://www.streambase.com/developers-home.htm [9] R. Motwani, J. Widom, A. Arasu, B. Babcock, S. Babu, M. Datar, G. Manku, C. Olston, J. Rosenstein, and R. Varma, “Query Processing, Resource Management, and Approximation in a Data Stream Management System”, Proc. of CIDR 2003. [10] A. Arasu, S. Babu, and J. Widom, “The CQL continuous query language: semantic foundations and query execution”, The VLDB Journal, Vol. 15, 2006. [11] D. J. Abadi, D. Carney, U. Cetintemel, M. Cherniack, C. Convey, S. Lee, M. Stonebraker, N. Tatbul, and S. Zdonik, “Aurora: a new model and architecture for data stream management”, The VLDB Journal, Vol. 12, 2003. [12] J. Li, K. Tufte, V. Shkapenyuk, V. Papadimos, T. Johnson, and D. Maie, “Out-of-order processing: a new architecture for high-performance stream systems”, Proc. of VLDB 2008.