App Engineで動く並列処理フレームワーク ElShard
App Engineで大量のデータを並列処理するフレームワーク ElShard を作っています。batch addとdeleteのサンプルができたので、とりあえずまとめてみます。
ElShardは、入力リストを分割して処理して集約する考え方に基づいています。並列処理はApp EngineのTaskQueueで実現しています。タスク間では10kBのペイロードしかやり取りできないため、データをやり取りする用途には適していません。そのため、実際のデータはDatastoreに格納しておき、キーをタスク間でやり取りすることになります。
現段階では集約をどうやって実現するか未定ですが、キーでソートされるというDatastoreの性質をうまく利用できる気がします。Matcher APIが使えるといいなぁ。
リストを処理する
ElShardでは、InputとTaskの2種類のタスクを組み合わせてジョブネットのようなものを構成します。
Inputは、WebブラウザのHTTPリクエストを受けてリスト(List
分割されたリストはそれぞれTaskに渡されます。Taskがリストを返した場合はSplitterで分割されて後続のTaskに渡されます。何も返さなければ終了です。
Input, Task はインタフェースが定義されています。
public interface Input { public abstract List<String> input(Context context) throws Exception; }
public interface Task { public abstract List<String> run(List<String> input, Context context) throws Exception; }
タスクは TaskChainController をextendsして定義します。下記はフォームに入力されたテキストを1行ずつDatastoreに登録するサンプルです。まあ、こういうのは一気に登録した方が速い気がしますが(汗
public class AddController extends TaskChainController { @Override protected void configure(Configuration configuration) { configuration.setSplitter(new SizeSplitter(50)); } @Override public List<String> input(Context context) throws Exception { String userId = authenticationService.getUserId(); context.addParameter("user", userId); return Arrays.asList(asString("books").split("\n")); } @Override public List<String> run(List<String> input, Context context) { String user = asString("user"); for(String line : input) { String[] parts = line.split("\t"); if(parts.length < 1) { continue; } Book book = new Book(); book.setTitle(parts[0]); if(parts.length > 1) { book.setAuthor(parts[1]); } book.setUser(user); bookService.add(book); } return null; } }https://github.com/int128/elshard/blob/master/src/demo/org/hidetake/elshard/demo/controller/demo/internal/AddController.java
Datastoreのエンティティを処理する
エンティティを並列処理するには、Keyを取得してリストをばらまく方法と、カーソルで逐次処理する方法があると思います。後者は AppEngineで大量のエンティティを処理するパターン - GeekFactory で説明した方法です。
QueryとProcessorのインタフェースは下記のように定義されています。
public abstract ModelQuery<M> query(Context context); public abstract void process(List<M> input, Context context);
下記はbatch deleteを行うサンプルです。まあ、この例も一気に削除した方が速いですが(汗 エンティティの中身を見て操作する場合は重宝すると思います。
public class DeleteController extends QueryProcessorController<Book> { @Override protected void configure(QueryProcessorConfiguration configuration) { } @Override public ModelQuery<Book> query(Context context) { return Datastore.query(Book.class); } @Override public void process(List<Book> input, Context context) { List<Key> keys = new ArrayList<Key>(input.size()); for(Book entity : input) { keys.add(entity.getKey()); } Datastore.delete(keys); } }https://github.com/int128/elshard/blob/master/src/demo/org/hidetake/elshard/demo/controller/demo/internal/DeleteController.java