Groovy で JBoss Drools を使う2 - CEP機能
前回 id:fits:20120104 に引き続き、今回は Drools の CEP 機能(Drools Fusion)*1を Groovy で簡単に試してみました。
- Drools 5.4.0 beta1
- Groovy 1.8.5 (java 1.7.0_01 64bit)
サンプルソースは http://github.com/fits/try_samples/tree/master/blog/20120105/
CEP用ルール定義(.drl ファイル)
それでは CEP 用のルール定義を行います。
基本的な記法はルールエンジン用の .drl ファイルと変わりませんが、declare を使ってデータタイプを CEP 用に Event 宣言(@role(event) を付与)する点が異なります。
Event 宣言によって Sliding Window *2などが使えるようになります。
今回のサンプルでは以下のような処理を実装してみました。
- 5千円以上の Order が 2秒以内で続いた場合(厳密には終了時間の差が 2秒以内)に出力
- over window:time(3s) で 3秒間に発生した Order の totalPrice() 結果を sum と from accumulate を使って合計し、1万5千円以上だった場合に合計金額を出力
order_check.drl
package fits.sample //Event宣言 declare Order @role(event) end rule "2秒間に注文金額 5千円以上の処理が終了した場合に出力" salience 10 when $beforeOrder : Order($beforeName : name, totalPrice >= 5000) Order($name : name, totalPrice >= 5000, this finishes[2s] $beforeOrder) then System.out.printf("注文(5千円以上) : %s, before = %s \n", $name, $beforeName); end rule "直近の3秒間に発生した注文の合計金額が 1万5千円以上の場合に出力" when Number($res : intValue, intValue >= 15000) from accumulate( Order($total : totalPrice) over window:time(3s), sum($total) ) then System.out.printf("3秒間の合計金額(1万5千円以上) : %d \n", $res); end
なお、then ブロック内の処理でセミコロン(;)が必須な点に注意して下さい。(セミコロンが無いと Rule Compilation error が発生します)
CEP 処理の実装
実装内容はルールエンジンのものと特に変わりありません。(実装の説明に関しては id:fits:20120104 参照)
order_check.groovy
package fits.sample @Grab("org.drools:drools-core:5.4.0.Beta1") @Grab("org.drools:drools-compiler:5.4.0.Beta1") @Grab("com.sun.xml.bind:jaxb-xjc:2.2.5-b09") import org.drools.KnowledgeBaseFactory import org.drools.builder.KnowledgeBuilderFactory import org.drools.builder.ResourceType import org.drools.io.ResourceFactory class Order { String name BigDecimal subTotalPrice = BigDecimal.ZERO BigDecimal discountPrice = BigDecimal.ZERO BigDecimal totalPrice() { subTotalPrice.subtract(discountPrice) } } def builder = KnowledgeBuilderFactory.newKnowledgeBuilder() builder.add(ResourceFactory.newClassPathResource("order_check.drl", getClass()), ResourceType.DRL) if (builder.hasErrors()) { println builder.errors return } def base = KnowledgeBaseFactory.newKnowledgeBase() base.addKnowledgePackages(builder.getKnowledgePackages()) def session = base.newStatefulKnowledgeSession() (0..<10).each { def order = new Order(name: "order${it}", subTotalPrice: new BigDecimal((int)Math.random() * 10000)) println("order : ${order.name}, ${order.totalPrice()}") session.insert(order) //CEP処理の実施 session.fireAllRules() Thread.sleep(1000) } //セッションの終了 session.dispose()
実行例
> groovy order_check.groovy order : order0, 2588 order : order1, 4988 order : order2, 1266 order : order3, 3617 order : order4, 9771 order : order5, 5338 注文(5千円以上) : order5, before = order4 3秒間の合計金額(1万5千円以上) : 18726 order : order6, 7270 注文(5千円以上) : order6, before = order5 3秒間の合計金額(1万5千円以上) : 22379 order : order7, 6086 注文(5千円以上) : order7, before = order6 3秒間の合計金額(1万5千円以上) : 18694 order : order8, 9529 注文(5千円以上) : order8, before = order7 3秒間の合計金額(1万5千円以上) : 22885 order : order9, 1833 3秒間の合計金額(1万5千円以上) : 17448
ストリームを使った CEP 処理
Drools には entry point というストリームを扱うための仕組みが用意されています。
下記サンプルでは StatefulKnowledgeSession から取得した WorkingMemoryEntryPoint に対してデータを insert しています。
order_check_stream.groovy(ストリーム使用)
・・・ builder.add(ResourceFactory.newClassPathResource("order_check_stream.drl", getClass()), ResourceType.DRL) ・・・ def session = base.newStatefulKnowledgeSession() //entry point 取得(order stream は .drl ファイル側で定義する) def stream = session.getWorkingMemoryEntryPoint("order stream") (0..<10).each { def order = new Order(name: "order${it}", subTotalPrice: new BigDecimal((int)Math.random() * 10000)) println("order : ${order.name}, ${order.totalPrice()}") //entry point に対して insert する stream.insert(order) //CEP処理の実施 session.fireAllRules() Thread.sleep(1000) } //セッションの終了 session.dispose()
.drl ファイルで entry point を扱うには from entry-point "ストリーム名" とします。(ストリーム名は getWorkingMemoryEntryPoint メソッドの呼び出しで使用します)
order_check_stream.drl(ストリーム使用)
・・・ rule "2秒間に注文金額 5千円以上の処理が終了した場合に出力" salience 10 when $beforeOrder : Order($beforeName : name, totalPrice >= 5000) from entry-point "order stream" Order($name : name, totalPrice >= 5000, this finishes[2s] $beforeOrder) from entry-point "order stream" then System.out.printf("注文(5千円以上) : %s, before = %s \n", $name, $beforeName); end rule "直近の3秒間に発生した注文の合計金額が 1万5千円以上の場合に出力" when Number($res : intValue, intValue >= 15000) from accumulate( Order($total : totalPrice) over window:time(3s) from entry-point "order stream", sum($total) ) then System.out.printf("3秒間の合計金額(1万5千円以上) : %d \n", $res); end