読者です 読者をやめる 読者になる 読者になる

アジョブジ星通信

日常系バンザイ。

非同期処理のデータ伝達を楽にする System.Threading.Tasks.Channels

corefxlab Advent Calendar 12 日目です。頓挫しかしてねえ!!!

今回は System.Threading.Tasks.Channels の紹介です。また無駄に詳しくやっていこうと思います。

(というかこれ紹介でも解説でもなくリファレンスマニュアルってやつでは)

実をいうと、これに関しては README.md にとっても詳しく書いてあるので書く必要ない気がしなくもないですが、日本語でググれない問題を解決したり、僕の承認欲求を満たしたりするなどの副作用がありそうなので頑張っていきましょう。

注意

この記事の最終更新日は 2015/12/12 です。変化の多いリポジトリで、仕様も固まっていないことから、ここで説明する内容はすぐに古くなる可能性があります。

というかこの記事を書いている今現在も issue の変動が見られるので本当に変更が入りまくりそうです。

チャネルってなんぞや

僕の説明が正しい自信がないのでちゃんと理解して使いたい方はググってください。

チャネルは非同期処理においてデータ競合を防ぎながら、送信者から受信者へデータを伝達する方法のひとつです。送信者と受信者は一対一で、ひとつのデータが複数の受信者に渡るということが発生しないようになっているので、うまく使えば安全なキューや非同期処理の戻り値として利用できます。

System.Threading.Tasks.Channels の意義

何より C# には await があります。そこで、データの送受信の待機処理を直線的に書くことができ、読みやすく、流れを理解しやすいコードになります。

非同期処理におけるデータの受け渡しと加工は今までも System.Threading.Tasks.Dataflow パッケージで提供されてきましたが、これは流れを図で表したものを、ブロック単位でつなげていく、というような使い方をするものでした。対して System.Threading.Tasks.Channels では、加工部分には関与せず、データの受け渡しのみをシンプルに行うことができる設計になっています。

インストール

いつも通り MyGet の dotnet-corefxlab リポジトリからインストールしましょう。

また、 System.Threading.Tasks.Extensions パッケージが必要になります。自動でインストールされないので、 MyGet の dotnet-core リポジトリから取得してください。

IChannel<T> インターフェイス

送受信の両方に対応したチャネルオブジェクトは IChannel<T> インターフェイスを実装しています。このインターフェイスは IReadableChannel<T>(受信用)と IWritableChannel<T>(送信用)を継承しています。受信専用、送信専用のオブジェクトはこれらのどちらかを実装しています。

IReadableChannel<T> インターフェイス

ReadAsync メソッド
ValueTask<T> ReadAsync(CancellationToken cancellationToken = default(CancellationToken))

すでにバッファされたデータがある場合には同期的にそれを返し、そうでない場合は送信者が現れるまで待機します。

ValueTask 型*1は await することができるほか、 .AsTask() することで Task を取得することができます。これは同期的に値を取得できた時に無駄な Task を生成しないための工夫と思われます。

待機中にチャネルが閉じられた(TryComplete が呼ばれた)ときは、 InvalidOperationException*2 が発生します。

WaitToReadAsync メソッド
Task<bool> WaitToReadAsync(CancellationToken cancellationToken = default(CancellationToken))

送信者が現れるまで待機します。送信者が現れ、受信可能な状態になった時には true が、チャネルが閉じられた時には false が返ります。

WaitToReadAsync が完了し true が返ってきたからといって、すぐに値が取得可能な状態であるという保証はありません。というのも、複数スレッドから同時に待機し、他のスレッドが先に値を取得していく可能性があるからです。このように扱いが難しいメソッドであるということを意識して使用してください。

TryRead メソッド
bool TryRead(out T item)

すぐに値が取得できる(バッファされたデータがある, 送信者が待機している)ときに true を返し、 item に値をセットします。

Completion プロパティ
Task Completion { get; }

Completion で取得できる Task は、チャネルが閉じられると RanToCompletion, Canceled, Faulted のいずれかの状態になります。具体的には IWritableChannel<T>.TryComplete の error 引数が null だと RanToCompletion、 OperationCanceledException だと Canceled、その他の例外だと Faulted になります。

これは……どう使うことが想定されているのだろう……。

GetAwaiter 拡張メソッド

IReadableChannel<T> を await することは、 ReadAsync を await することと同じです。

GetAsyncEnumerator 拡張メソッド
IAsyncEnumerator<T> GetAsyncEnumerator<T>(
    this IReadableChannel<T> channel, CancellationToken cancellationToken = default(CancellationToken))

頭おかしいやつです。チャネルが閉じられるまで ReadAsync をループで回し続けるのと同じ効果があります。

async Task WriteToConsole<T>(IReadableChannel<T> chan)
{
    var enumerator = chan.GetAsyncEnumerator();
    while (await enumerator.MoveNextAsync())
        Console.WriteLine(enumerator.Current);

    Console.WriteLine("end");
}

async Task Run()
{
    var c = Channel.Create<int>();

    var t1 = Task.Run(async () =>
    {
        await Task.Delay(1000);
        c.TryWrite(1);
        await Task.Delay(1000);
        c.TryWrite(2);
        await Task.Delay(1000);
        c.TryComplete();
    });

    var t2 = WriteToConsole(c);

    await Task.WhenAll(t1, t2);

    // 1, 2, end が 1 秒遅れずつ表示される
}
AsObservable 拡張メソッド
IObservable<T> AsObservable<T>(this IReadableChannel<T> source)

IReadableChannel<T> をラップして IObservable<T> として扱えるようにします。

つまりチャネルを Rx で処理できるということなんでしょうけれど、 System.Threading.Tasks.Channels のターゲットフレームワークが dotnet5.3 で、 Rx-Main のターゲットフレームワークとマッチしないので Rx で遊べないよぉ……。

IWritableChannel<T> インターフェイス

IReadableChannel<T> と注意事項は同じなので、定義だけ書いておきますね。

Task WriteAsync(T item, CancellationToken cancellationToken = default(CancellationToken));
Task<bool> WaitToWriteAsync(CancellationToken cancellationToken = default(CancellationToken));
bool TryWrite(T item);
TryComplete メソッド
bool TryComplete(Exception error = null)

チャネルを閉じ、待機中の送受信者に例外を投げ終了通知を行います。

すでにチャネルが閉じられている時には false が返ります。

Complete 拡張メソッド
void Complete<T>(this IWritableChannel<T> channel, Exception error = null)

TryComplete を実行し、戻り値が false なら InvalidOperationException を投げます。

AsObserver 拡張メソッド
IObserver<T> AsObserver<T>(this IWritableChannel<T> target)

IWritableChannel<T> をラップして IObserver<T> として扱えるようにします。

OnNext メソッドは WriteAsync を呼び出しますが、同期的に待機するので、 UI スレッドで動いている IObservable にこれをオブザーバーとして突っ込むとフリーズする可能性があるので注意してください。

例: チャネル → 加工 → チャネル

async Task Run()
{
    var input = Channel.Create<int>();
    var output = PlusOneRx(input);

    var t1 = Task.Run(async () =>
    {
        await Task.Delay(1000);
        input.TryWrite(1);
        await Task.Delay(1000);
        input.TryWrite(2);
        await Task.Delay(1000);
        input.TryComplete();
    });

    // GetAsyncEnumerator のやつ
    var t2 = WriteToConsole(output);

    await Task.WhenAll(t1, t2);

    // 2, 3, end
}

IReadableChannel<int> PlusOneChan(IReadableChannel<int> source)
{
    var c = Channel.CreateUnbuffered<int>();

    Task.Run(async () =>
    {
        try
        {
            while (await source.WaitToReadAsync())
            {
                int item;
                if (source.TryRead(out item))
                    await c.WriteAsync(item);
            }

            c.TryComplete();
        }
        catch (Exception ex)
        {
            c.TryComplete(ex);
        }
    });

    return c;
}

IReadableChannel<int> PlusOneRx(IReadableChannel<int> source)
{
    var c = Channel.CreateUnbuffered<int>();

    source.AsObservable()
        .Select(x => x + 1)
        .Subscribe(c.AsObserver());

    return c;
}

(Rx 使えないから自分で Select 実装した……)

チャネルの種類

Channel.Createなんちゃらというメソッドでチャネルを作成します。ここでは用意されている Create メソッドを紹介します。

Create / CreateUnbuffered

IChannel<T> Create<T>(int bufferedCapacity = Unbounded, bool singleReaderWriter = false)
IChannel<T> CreateUnbuffered<T>()

これらは送受信に対応した、つまり普通のチャネルを作成します。

bufferedCapacity でどれだけバッファを持つか(バッファがいっぱいのときは送信者は待機することになります)を指定できます。これには 0 より大きい値か、 Channel.Unbounded(-1) を指定します。バッファなしのチャネルを作成するには CreateUnbuffered を使用します。 bufferedCapacity に 0 を与えても ArgumentOutOfRangeException を食らうだけなので注意してください。

singleReaderWriter が true かつ、 bufferCapacity が Unbounded のときは受信待ち・送信待ちをそれぞれ 1 件だけ抱えられるチャネルが作成されます。例えば 2 スレッドから同時に ReadAsync を呼び出した場合、 InvalidOperationException が発生します。

ReadFromStream / WriteToStream

(あっ……これ Create から始まってないじゃん……)

IReadableChannel<T> ReadFromStream<T>(Stream source)
IWritableChannel<T> WriteToStream<T>(Stream destination)

ストリームからチャネルを作成します。(は?)

BinaryReader / BinaryWriter を用いて T 型の値を取得したり書き込んだりするやつです。 T は bool, byte, char, decimal, double, float, int, long, sbyte, short, string, uint, ulong, ushort に対応しています。

README には「PipeStream や NetworkStream でクロスプロセス・クロスマシンな通信をするときに使います。設計的には他の型にも対応できるからそのうち?」みたいなことが書いてあります。正直こんな微妙な仕様なら自分で場合に応じて作ったほうがよくないか……。

CreateFromTask

IReadableChannel<T> CreateFromTask<T>(Task<T> source)

Task<T> からチャネルを作成します。つまり ReadAsync や WaitToReadAsync を呼ぶと、そのタスクの終了を待つという動作をします。

このチャネルは一度 ReadAsync / TryRead されると閉じられるので、二度目以降の ReadAsync は InvalidOperationException が発生します。

CreateFromEnumerable

IReadableChannel<T> CreateFromEnumerable<T>(IEnumerable<T> source)

IEnumerable<T> からチャネルを作成します。しかし、非同期で列挙されるわけではないことに注意してください。 ReadAsync を呼んだところで同期的に MoveNext を呼び出すので、 MoveNext に長時間かかるようなものを簡単に非同期化できるわけではありません。

Select

Channel.CaseRead / CaseWrite を使うと、一番先に読み書き可能になったチャネルに対して操作を行う、ということができます。

例を考えるのが面倒だったので、 README にあるサンプルを見てみましょう。

IChannel<int>    c1 = Channel.Create<int>();
IChannel<string> c2 = Channel.CreateUnbuffered<string>();
IChannel<double> c3 = Channel.Create<double>();
...
await Channel
    .CaseRead(c1, i => HandleInt32(i))
    .CaseRead(c2, s => HandleString(s))
    .CaseRead(c3, d => HandleDouble(d))
    .SelectAsync();

c1, c2, c3 のうち最初に受信可能になったものから第2引数のデリゲートが実行されます。

詳しい使い方です。最初の Channel.CaseRead / CaseWrite を呼び出すと CaseBuilder クラスのインスタンスが返って来ます。そのあとはメソッドチェーンでつなげることができます。 CaseBuilder のメンバーは以下のとおりです。

CaseBuilder CaseRead<T>(IReadableChannel<T> channel, Action<T> action)
CaseBuilder CaseRead<T>(IReadableChannel<T> channel, Func<T, Task> func)
CaseBuilder CaseWrite<T>(IWritableChannel<T> channel, T item, Action action)
CaseBuilder CaseWrite<T>(IWritableChannel<T> channel, T item, Func<Task> func)
CaseBuilder CaseDefault(Action action)
CaseBuilder CaseDefault(Func<Task> func)
Task<int> SelectUntilAsync(Func<int, bool> conditionFunc, CancellationToken cancellationToken = default(CancellationToken))
Task<bool> SelectAsync(CancellationToken cancellationToken = default(CancellationToken))

Caseなんとかをつなげて、最後に SelectAsync を呼び出す、というのが基本的な使い方です。戻り値の bool はいずれかの Case を実行したかを表します。

SelectUntilAsync では conditionFunc が false を返すまで Select を繰り返します。 conditionFunc の第1引数の int は何回実行されたかを表します。初回(1回目の Select の前)は 0 です。

CaseDefault を指定すると、即座に送受信可能なチャネルがなかった場合、待機せず CaseDefault に指定したデリゲートが実行されます。

Go 言語の select では送受信可能なチャネルが 2 つ以上あるときにはランダムで選択されるという謎仕様がありましたが、 System.Threading.Tasks.Channels では Case を指定した順番通り(先着順)になります。

終わり

実を言うとあんまり使いみちが思いついていなくてサンプルの書きようがありませんでした。その結果全メンバーの説明になってしまいました。。上限 1 のセマフォを使って制御していたところをチャネルに置き換えてみると便利かもしれないなぁと思いました。んー最後まで具体的な使いみち思いつかなかったぞ。

*1:System.Threading.Tasks.Extensions パッケージで定義

*2:private class ClosedChannelException : InvalidOperationException