CoreTweetでストリーミングを処理するためのRx入門
どうも、 CoreTweet の非同期・Rx・Windowsプラットフォーム担当のくせに今まで何も解説記事を書かないでいたことで有名な azyobuzin です。
CoreTweet の紹介
.NET で Twitter にアクセスするアレです。
Yet Another .NET Twitter Library
GitHub - CoreTweet/CoreTweet: Yet Another .NET Twitter Library
作者直々のステマ
http://qiita.com/lambdalice/items/55b1a3d8403ecc603b47
ついでにいうと彼が「LinqToTwitter はストリーミングのパース出来ないからアレ」って言ってるらしいですが、そのパースするライブラリの作者僕です。最近最新の LinqToTwitter に対応させました。
追記: LinqToTwitter が公式に対応したので UserStreamEx はお役御免になりました。
あとクラス一覧はこちら
CoreTweet: Main Page
おことわり
動作確認は現時点での HEAD で行っています。一応クラスに関しては 0.3.4 でのものを例に出しますがもしかしたら忘れてて最新のやつになってるかもしれません。というか次のアップデートで大幅に変わるのでそのうち変更点は説明します。
追記: 0.6.0 向けに書き換えました。 差分
Rxを利用するには、自分で System.Reactive をインストールする必要があります(生の IObservable を触るなら要らないです)。
Reactive Extensions(Rx) とは
Twitter のストリーミング API みたいにポンポンやってくるデータを LINQ の形式に沿って処理できるライブラリです。詳しくはググってください。すごい人たちがすごい記事を書いています。
環境
- .NET Framework 4.5, CoreTweet 0.6.0 を想定
- NuGet から CoreTweet,
Rx-MainSystem.Reactive をインストール - using System;
- using System.Linq;
- using System.Reactive.Linq;
- using System.Reactive.Subjects;
- using System.Threading;
- using CoreTweet;
- using CoreTweet.Streaming;
- Tokens t = Tokens.Create(アクセストークンとか);
UserStream を受信してみよう
まずはじめは UserStream を受信してコンソールに表示するところからスタートしましょう。ストリーミング API の使い方は wiki に書いてありますので知らないとは言わせません。
とりあえず最初のサンプルコードはこんな感じです。
t.Streaming.UserAsObservable() .Subscribe((StreamingMessage m) => Console.WriteLine(m));
これそのまま実行すると一瞬で終了しちゃうので、 Thread.Sleep
するなりしてください。
さて、これを実行するとクラス名がぽつぽつと表示されていきます。 ToString
をオーバーライドしてないので当然ですね。つまりそのぽつぽつと表示されるクラスのインスタンスが Console.WriteLine
に到達しているわけです。
詳しく見て行きましょう。 UserAsObservable
メソッドは IObservable<StreamingMessage>
を返します。この IObservable
は Subscribe
が呼ばれるとストリーミング API にアクセスして受信を開始します。 Subscribe
メソッドは IObservable
に対して「僕このデータ受け取るよ!」と宣言する意味を持ちます。
Subscribe
メソッドはいくつかオーバーロードがありますが、ここでは引数に Action<StreamingMessage> をひとつとるものを使いました。これは「データを受け取ったらこの Action
の処理を実行するよ!」という意味です。つまりこれでデータを受信したら Console.WriteLine
を実行すると IObservable
に対して宣言したので UserStream のデータが送られてくるようになった、というわけです。
ここまで長かった。日本語崩壊してる気がする。概念自体を知りたい方はこちらを参考にするといいと思います。
Rx入門 (2) - オブザーバーパターン - xin9le.net
切断してみよう
さっきの例では、強制終了させるかエラーが発生するまでずっと受信し続けてしまいます。なので次は切断してみます。
IDisposable disposable =
t.Streaming.UserAsObservable()
.Subscribe((StreamingMessage m) => Console.WriteLine(m));
Thread.Sleep(TimeSpan.FromSeconds(10));
disposable.Dispose();
はい、実は Subscribe
は IDisposable
を返します。なので Dispose
を呼ぶだけで受信を終了できます。 CoreTweet 側では Dispose
されたら通信を切断するように実装してある(ブレークポイントでテスト済み)ので安心してください。
エラー処理をする
今までの例ではうまく動くことを前提にしてきましたが、インターネットに接続するのでもちろんエラーが発生することも想定してコードを書きましょう。 Subscribe
にはエラー処理ができるオーバーロードがあります。これを使ってエラーが発生した場合にメッセージを表示するコードはこんな感じです。
t.Streaming.UserAsObservable()
.Subscribe(
(StreamingMessage m) => Console.WriteLine(m),
(Exception ex) => Console.WriteLine(ex),
() => Console.WriteLine("終点")
);
2 つ目の引数にはエラー時の処理を、 3 つ目の引数には IObservable
が終わりまで到達したときの処理を書きます。ストリーミングでは 3 つ目の引数が呼ばれてしまったら逆に問題なのですが、 Twitter が何しでかすかわからないので一応処理しておくことをお勧めします。
さぁ LINQ だ
接続・切断の基本がわかったら次はデータの処理です。 System.Reactive.Linq
という名前空間が物語っているように、 IEnumerable
を処理するかのような書き方で処理することが出来ます。
このサンプルコードでは新しいツイートを取得して内容を表示します。
t.Streaming.UserAsObservable()
.Where((StreamingMessage m) => m.Type == MessageType.Create)
.Cast<StatusMessage>()
.Select((StatusMessage m) => m.Status.Text)
.Subscribe((string s) => Console.WriteLine(s));
MessageType
の扱いに関しては wiki に書いてあるので読んでください。
Subscribe
を除けば完全に IEnumerable
を処理するのと同じですね。もちろんクエリ式で書くことも出来ますが、僕は好きではありません。
IObservable<string> observable = from m in t.Streaming.UserAsObservable() where m.Type == MessageType.Create select ((StatusMessage)m).Status.Text; observable.Subscribe(s => Console.WriteLine(s));
Where
とか Select
などのメソッドは Observable クラスに定義されている拡張メソッドです。 Take
を使えば数件取ってきて綺麗に切断してくれたりもできます。つまり LINQ とオブザーバーパターンの動きを少し覚えれば簡単に処理できるね!やったねたえちゃん!(やめろ)
ついでに IObservable
を await
すると最後の値が返ってきます。終わりがないストリーミング処理では使わないと思いますが。
エラー時に再接続する
ここからは応用編、かな? Enumerable
クラスにはなかった Rx 特有の便利メソッドを使えばエラーで死んでも痛くも痒くもない状態をつくりあげることができます。
ついでにこのやり方つい最近知りました。元ネタここです。
neue cc - Reactive Extensions + asyncによるC#5.0の非同期処理
var observable = t.Streaming.UserAsObservable();
observable.Catch(
observable.DelaySubscription(TimeSpan.FromSeconds(10)).Retry()
)
.Repeat()
.Subscribe((StreamingMessage m) => Console.WriteLine(m));
これでエラーが発生した場合は 10 秒待って再接続するようになります。なかなか簡単にテスト出来そうになかったので Observable.Range(0, 10)
とかを使って同じようなことやってみたら動いたのでたぶんうまくいきます。
では解説を。 Catch
は購読中にエラーが発生した場合、引数の IObservable
を続きとして使うようになります。
DelaySubscription
は名前の通り Subscribe
が行われるまでを遅らせます。というと実感が湧きにくいですが、ソースを見るとまんまそのまんまです。
Retry
は要するに Catch
を無限に自分自身に適用しているような感じです。エラーが発生しては Subscribe
し直しを繰り返します。引数でリトライの最大回数を指定できます。
ここまでをまとめると、まずはじめに observable
をそのまま実行し、エラーが発生したら、 10 秒待ってリトライし続ける方の IObservable
に切り替える、という形になります。さらにもしも終了してしまってはいけないので念のため Repeat
もしておきました。
Publish で処理を分割
昔の CoreTweet はすでに Publish
した状態で返していましたが僕がやめさせました(予測可能な挙動ではないと判断)。未だサンプルで多く使われているために Publish
が前提みたいな風潮があってわりとつらいので、これについては最後に持ってきました。理解して使ってくださいお願いします。
といいつつまずはサンプルを。ツイートとふぁぼを表示します。
IConnectableObservable<StreamingMessage> connectable = t.Streaming.UserAsObservable().Publish(); IDisposable d1 = connectable.OfType<StatusMessage>() .Select(m => m.Status) .Subscribe(status => Console.WriteLine( "@{0}: {1}", status.User.ScreenName, status.Text )); IDisposable d2 = connectable.OfType<EventMessage>() .Where(m => m.Event == EventCode.Favorite) .Subscribe(m => Console.WriteLine("★ by " + m.Source.ScreenName)); IDisposable disposable = connectable.Connect();
Publish
メソッドでつくられた IConnectableObservable<StreamingMessage>
に対して、 d1
、d2
の 2 つの購読者がいます。
ここでまず、 Publish
を使わなかった時の動作を考えてみましょう。再接続の例からわかるように、 1 つの購読者に対して 1 本のストリーミング通信が発生します。つまりこの場合では 2 本接続されることになります。
ここで Publish
を使うと 1 本の接続で複数の購読者に対してデータを分配することが出来ます(onError
, onComplete
も含めて)。 Connect
を呼ぶと実際に元の IObservable
を購読します。 Connect
後に Subscribe
すると途中から受信することが出来ます(つまりもう流れてしまったデータは拾えない)。
各 Subscribe
が返す IDisposable
は Publish
されたデータの購読をやめるためのもので元の IObservable
を Dispose
するものではありません。元の IObservable
を Dispose
する、つまり通信をキャンセルするには Connect
の戻り値のほうを Dispose
します。
Publish
のオーバーロードはほかに 3 つありますが、ストリーミングの処理で使えそうな例が思いつかなかった。
よくある間違い
Disconnect というメッセージがありますがあれは切断時に必ず受信するものではありません!切断は必ずエラーになる(はず)なので Subscribe
の onError
引数で処理してください!
まとめ
長かった。疲れた。図がない。酷いもんだ。
次回作にご期待ください。では。