アジョブジ星通信

日常系バンザイ。

CoreTweetでストリーミングを処理するためのRx入門

どうも、 CoreTweet の非同期・Rx・Windowsプラットフォーム担当のくせに今まで何も解説記事を書かないでいたことで有名な azyobuzin です。

CoreTweet の紹介

.NET で Twitter にアクセスするアレです。

Yet Another .NET Twitter Library

GitHub - CoreTweet/CoreTweet: Yet Another .NET Twitter Library

作者直々のステ
http://qiita.com/lambdalice/items/55b1a3d8403ecc603b47
ついでにいうと彼が「LinqToTwitter はストリーミングのパース出来ないからアレ」って言ってるらしいですが、そのパースするライブラリの作者僕です。最近最新の LinqToTwitter に対応させました。

追記: LinqToTwitter が公式に対応したので UserStreamEx はお役御免になりました。

あとクラス一覧はこちら
CoreTweet: Main Page

おことわり

動作確認は現時点での HEAD で行っています。一応クラスに関しては 0.3.4 でのものを例に出しますがもしかしたら忘れてて最新のやつになってるかもしれません。というか次のアップデートで大幅に変わるのでそのうち変更点は説明します。

追記: 0.6.0 向けに書き換えました。 差分

Rxを利用するには、自分で System.Reactive をインストールする必要があります(生の IObservable を触るなら要らないです)。

Reactive Extensions(Rx) とは

Twitter のストリーミング API みたいにポンポンやってくるデータを LINQ の形式に沿って処理できるライブラリです。詳しくはググってください。すごい人たちがすごい記事を書いています。

環境

  • .NET Framework 4.5, CoreTweet 0.6.0 を想定
  • NuGet から CoreTweet, Rx-Main System.Reactive をインストール
  • using System;
  • using System.Linq;
  • using System.Reactive.Linq;
  • using System.Reactive.Subjects;
  • using System.Threading;
  • using CoreTweet;
  • using CoreTweet.Streaming;
  • Tokens t = Tokens.Create(アクセストークンとか);

UserStream を受信してみよう

まずはじめは UserStream を受信してコンソールに表示するところからスタートしましょう。ストリーミング API の使い方は wiki に書いてありますので知らないとは言わせません。

とりあえず最初のサンプルコードはこんな感じです。

t.Streaming.UserAsObservable()
    .Subscribe((StreamingMessage m) => Console.WriteLine(m));

これそのまま実行すると一瞬で終了しちゃうので、 Thread.Sleep するなりしてください。

さて、これを実行するとクラス名がぽつぽつと表示されていきます。 ToString をオーバーライドしてないので当然ですね。つまりそのぽつぽつと表示されるクラスのインスタンスConsole.WriteLine に到達しているわけです。

詳しく見て行きましょう。 UserAsObservable メソッドは IObservable<StreamingMessage> を返します。この IObservableSubscribe が呼ばれるとストリーミング API にアクセスして受信を開始します。 Subscribe メソッドは IObservable に対して「僕このデータ受け取るよ!」と宣言する意味を持ちます。

Subscribe メソッドはいくつかオーバーロードがありますが、ここでは引数に Action<StreamingMessage> をひとつとるものを使いました。これは「データを受け取ったらこの Action の処理を実行するよ!」という意味です。つまりこれでデータを受信したら Console.WriteLine を実行すると IObservable に対して宣言したので UserStream のデータが送られてくるようになった、というわけです。

ここまで長かった。日本語崩壊してる気がする。概念自体を知りたい方はこちらを参考にするといいと思います。
Rx入門 (2) - オブザーバーパターン - xin9le.net

切断してみよう

さっきの例では、強制終了させるかエラーが発生するまでずっと受信し続けてしまいます。なので次は切断してみます。

IDisposable disposable =
    t.Streaming.UserAsObservable()
    .Subscribe((StreamingMessage m) => Console.WriteLine(m));

Thread.Sleep(TimeSpan.FromSeconds(10));

disposable.Dispose();

はい、実は SubscribeIDisposable を返します。なので Dispose を呼ぶだけで受信を終了できます。 CoreTweet 側では Dispose されたら通信を切断するように実装してある(ブレークポイントでテスト済み)ので安心してください。

エラー処理をする

今までの例ではうまく動くことを前提にしてきましたが、インターネットに接続するのでもちろんエラーが発生することも想定してコードを書きましょう。 Subscribe にはエラー処理ができるオーバーロードがあります。これを使ってエラーが発生した場合にメッセージを表示するコードはこんな感じです。

t.Streaming.UserAsObservable()
    .Subscribe(
        (StreamingMessage m) => Console.WriteLine(m),
        (Exception ex) => Console.WriteLine(ex),
        () => Console.WriteLine("終点")
    );

2 つ目の引数にはエラー時の処理を、 3 つ目の引数には IObservable が終わりまで到達したときの処理を書きます。ストリーミングでは 3 つ目の引数が呼ばれてしまったら逆に問題なのですが、 Twitter が何しでかすかわからないので一応処理しておくことをお勧めします。

さぁ LINQ

接続・切断の基本がわかったら次はデータの処理です。 System.Reactive.Linq という名前空間が物語っているように、 IEnumerable を処理するかのような書き方で処理することが出来ます。

このサンプルコードでは新しいツイートを取得して内容を表示します。

t.Streaming.UserAsObservable()
    .Where((StreamingMessage m) => m.Type == MessageType.Create)
    .Cast<StatusMessage>()
    .Select((StatusMessage m) => m.Status.Text)
    .Subscribe((string s) => Console.WriteLine(s));

MessageType の扱いに関しては wiki に書いてあるので読んでください。

Subscribe を除けば完全に IEnumerable を処理するのと同じですね。もちろんクエリ式で書くことも出来ますが、僕は好きではありません。

IObservable<string> observable =
    from m in t.Streaming.UserAsObservable()
    where m.Type == MessageType.Create
    select ((StatusMessage)m).Status.Text;
observable.Subscribe(s => Console.WriteLine(s));

Where とか Select などのメソッドは Observable クラスに定義されている拡張メソッドです。 Take を使えば数件取ってきて綺麗に切断してくれたりもできます。つまり LINQ とオブザーバーパターンの動きを少し覚えれば簡単に処理できるね!やったねたえちゃん!(やめろ)

ついでに IObservableawait すると最後の値が返ってきます。終わりがないストリーミング処理では使わないと思いますが。

エラー時に再接続する

ここからは応用編、かな? Enumerable クラスにはなかった Rx 特有の便利メソッドを使えばエラーで死んでも痛くも痒くもない状態をつくりあげることができます。

ついでにこのやり方つい最近知りました。元ネタここです。
neue cc - Reactive Extensions + asyncによるC#5.0の非同期処理

var observable = t.Streaming.UserAsObservable();
observable.Catch(
    observable.DelaySubscription(TimeSpan.FromSeconds(10)).Retry()
)
.Repeat()
.Subscribe((StreamingMessage m) => Console.WriteLine(m));

これでエラーが発生した場合は 10 秒待って再接続するようになります。なかなか簡単にテスト出来そうになかったので Observable.Range(0, 10) とかを使って同じようなことやってみたら動いたのでたぶんうまくいきます。

では解説を。 Catch は購読中にエラーが発生した場合、引数の IObservable を続きとして使うようになります。

DelaySubscription は名前の通り Subscribe が行われるまでを遅らせます。というと実感が湧きにくいですが、ソースを見るとまんまそのまんまです。

Retry は要するに Catch を無限に自分自身に適用しているような感じです。エラーが発生しては Subscribe し直しを繰り返します。引数でリトライの最大回数を指定できます。

ここまでをまとめると、まずはじめに observable をそのまま実行し、エラーが発生したら、 10 秒待ってリトライし続ける方の IObservable に切り替える、という形になります。さらにもしも終了してしまってはいけないので念のため Repeat もしておきました。

Publish で処理を分割

昔の CoreTweet はすでに Publish した状態で返していましたが僕がやめさせました(予測可能な挙動ではないと判断)。未だサンプルで多く使われているために Publish が前提みたいな風潮があってわりとつらいので、これについては最後に持ってきました。理解して使ってくださいお願いします。

といいつつまずはサンプルを。ツイートとふぁぼを表示します。

IConnectableObservable<StreamingMessage> connectable =
    t.Streaming.UserAsObservable().Publish();

IDisposable d1 = connectable.OfType<StatusMessage>()
    .Select(m => m.Status)
    .Subscribe(status => Console.WriteLine(
        "@{0}: {1}",
        status.User.ScreenName,
        status.Text
    ));

IDisposable d2 = connectable.OfType<EventMessage>()
    .Where(m => m.Event == EventCode.Favorite)
    .Subscribe(m => Console.WriteLine("★ by " + m.Source.ScreenName));

IDisposable disposable = connectable.Connect();

Publish メソッドでつくられた IConnectableObservable<StreamingMessage> に対して、 d1d2 の 2 つの購読者がいます。

ここでまず、 Publish を使わなかった時の動作を考えてみましょう。再接続の例からわかるように、 1 つの購読者に対して 1 本のストリーミング通信が発生します。つまりこの場合では 2 本接続されることになります。

ここで Publish を使うと 1 本の接続で複数の購読者に対してデータを分配することが出来ます(onError, onComplete も含めて)。 Connect を呼ぶと実際に元の IObservable を購読します。 Connect 後に Subscribe すると途中から受信することが出来ます(つまりもう流れてしまったデータは拾えない)。

Subscribe が返す IDisposablePublish されたデータの購読をやめるためのもので元の IObservableDispose するものではありません。元の IObservableDispose する、つまり通信をキャンセルするには Connect の戻り値のほうを Dispose します。

Publishオーバーロードはほかに 3 つありますが、ストリーミングの処理で使えそうな例が思いつかなかった。

よくある間違い

Disconnect というメッセージがありますがあれは切断時に必ず受信するものではありません!切断は必ずエラーになる(はず)なので SubscribeonError 引数で処理してください!

まとめ

長かった。疲れた。図がない。酷いもんだ。

次回作にご期待ください。では。