C#でTwitterアプリを作る 第3回 Streaming
第2回ではタイムラインの取得やツイートの投稿について紹介しました。
今回はツイッタークライアントを作るならもはや必須のStreamingAPIをCoreTweetから利用してみます。
ところで、第0回でも触れましたが、CoreTweetにはStreamingAPIの非同期処理をReactiveExtentionsで記述できる、CoreTweet.Streaming.Rxという拡張ライブラリが存在します。
もちろんCoreTweet.Streaming.Rxを用いなくてもLINQを使って同期的にStreamingAPIを使えるようになっているので、どちらも紹介します。
CoreTweetのみでStreaming
CoreTweet標準のStreamingApiクラスは、StreamingApi.StartStream
メソッドで接続します。
var stream = tokens.Streaming.StartStream(CoreTweet.Streaming.StreamingType.User, new StreamingParameters(replies => "all"));
StartStream
の引数は
- StreamingType( User, Public, Sample etc.)
- StreamingParameters
の2つですが、第2引数は省略可能です。
StreamingParameters
も第1回に紹介したパラメータと同様に、複数の方法でパラメータを指定できます。
ところで、StartStream
の戻り値の型は何かというと、IEnumerable<StreamingMessage>
です。つまり、ここから先はLINQで記述できるということです。嬉しいですね!
UserStreamに流れてきたツイートとイベントをコンソールに出力するサンプルはこちら
foreach(var message in stream) { if(message is StatusMessage) { var status = (message as StatusMessage).Status; Console.WriteLine(string.Format("{0}:{1}", status.User.ScreenName, status.Text)); } else if(message is EventMessage) { var ev = message as EventMessage; Console.WriteLine(string.Format("{0}:{1}->{2}", ev.Event, ev.Source.ScreenName, ev.Target.ScreenName)); } }
IE<Status>
で返ってくるREST APIのタイムラインとほとんど同じように扱えるため、Streamingであるということをあまり意識せずにプログラミングできます。
しかしこのコードでは同期処理のため、このforeach
から先に進むことはできません。非同期処理を行うには自前でTask
を使うなど、非同期用のコードを書く必要があります。
次に紹介するCoreTweet.Streaming.Rxを使えばRxの力を使って、非同期処理を簡単に記述できます
CoreTweet.Streaming.Rxを使う
*CoreTweet.Streaming.Rxを使うにはまず以下のimportを増やす必要があります
using System.Reactive.Linq; using CoreTweet.Streaming; using CoreTweet.Streaming.Reactive; using System.Threading;
CoreTweet単体の時はIEnumerable<StreamingMessage>
を生成しました。
Rxを使うので、今回はIObservable<StreamingMessage>
を発行します。
//publish stream
var streamRx = tokens.Streaming.StartObservableStream(StreamingType.User).Publish();
StartObservableStream
に渡す引数はStartStream
と同じです。Publish()
でIO<StreamingStatus>
が発行されます。
発行の次は非同期処理の内容を記述していきます。この後はCoreTweetというよりもほとんどRxの機能なので、詳しく知りたい方はReactive Extentions
でググって見てください
//action Action<StatusMessage> printStatus = (message) => { var status = (message as StatusMessage).Status; Console.WriteLine(string.Format("{0}:{1}", status.User.ScreenName, status.Text)); }; //subscribe actions for event streamRx.OfType<StatusMessage>().Subscribe(printStatus); streamRx.OfType<StatusMessage>().Subscribe(printStatus, onCompleted: () => Console.WriteLine("completed")); streamRx.OfType<StatusMessage>().Subscribe(printStatus, onError: exception => Console.WriteLine(exception.ToString()));
同じようなSubscribeを何度もしてますが、オーバーロードを紹介しているだけで、本当は1つです。
OfType<StatusMessage>
で受理するStreamingMessageの型を選んで、Subscribe
で非同期に行う処理を記述します。
処理を書き終わったら接続します
//connect stream
var connection = streamRx.Connect();
Connect()
の戻り値はIDisposable
です。このconnection
オブジェクトがDisposeされると、Streaming接続も自動的に切断されます。
次のコードでは、接続後30秒経つと切断されます。
//connect stream var connection = streamRx.Connect(); //wait Thread.Sleep(30000); //close connection connection.Dispose();
以上でCoreTweetのStreamingAPI関連の機能の紹介が終わりました。
次回はリツイートやお気に入り追加など、各種APIの使い方をまとめて紹介します。