PulseAudio:再生データの書き込み

再生データの書き込み
サーバ上のシンク入力/ソース出力には、再生/録音用のバッファが存在します。
クライアントのストリームが書き込んだオーディオデータは、サーバー上の再生バッファに保持されます。

バッファに関する情報は、ストリームの作成時に、pa_buffer_attr 構造体で指定することができます。
オーディオの再生手順
PulseAudio でオーディオが再生される手順は、以下のようになります。

  1. クライアントが、ストリームに再生用のデータを書き込む。
  2. サーバーにデータが送られ、サーバー内の再生バッファに書き込まれる。
  3. サーバーが、各シンク入力の再生バッファからデータを読み込んで、データの再サンプリングや合成をする。
  4. シンクの再生バッファに、最終的なデータが書き込まれる (シンクのサンプルスペックで)。
  5. シンクの再生バッファから、実際のハードウェアにデータが出力される。

サーバーは、再生バッファに書き込まれたデータを、自動で処理して、再生していくので、オーディオを再生したいクライアントは、単純にデータを書き込んでいくだけで、再生できます。

PulseAudio は、複数のストリームを個別に操作することで、柔軟な音量調整などを行えますが、その分色々な手順を経ているので、書き込んでから実際に再生されるまでの遅延が増えます。
書き込み
ストリームに再生データを書き込むには、pa_stream_write() を使います。

int pa_stream_write(pa_stream *p, const void *data, size_t nbytes,
    pa_free_cb_t free_cb, int64_t offset, pa_seek_mode_t seek);

typedef void (*pa_free_cb_t)(void *p);

サーバー上の再生バッファに、データを書き込みます。
書き込みに成功した場合、再生バッファ上の書き込み位置が、次の位置に移動します (nbytes 分増加する)。

dataデータのバッファ
nbytes書き込むデータのバイト数。
ストリームのサンプルスペックの、フレームサイズの倍数であること
16bit 2ch なら、4 byte 単位
free_cbすべてのデータが書き込まれたときに呼び出される、コールバック関数。
NULL の場合、データは内部バッファにコピーされる。
それ以外の場合、内部で data ポインタが保持され、データはコピーされない。
data が、pa_stream_begin_write() で取得した領域の場合は、NULL にする。
offset書き込む位置のオフセット
seek書き込む位置のシークモード。

PA_SEEK_RELATIVE : 現在の書き込み位置を基準にする。
PA_SEEK_ABSOLUTE : バッファの先頭を基準にする。
PA_SEEK_RELATIVE_ON_READ : 現在の読み込み位置を基準にする。
PA_SEEK_RELATIVE_END : バッファの現在の末尾を基準にする。
戻り値成功時は 0

free_cb を NULL にすると、データは内部バッファにコピーされるので、書き込み後にデータを保持する必要がなくなります。
NULL 以外の場合は、データがすべて書き込まれた時に、コールバック関数が呼ばれるので、そこでバッファを解放するなどの処理を行います(解放が必要なければ、関数内で何もしなくてもいいが、コールバック関数は指定する必要がある)。

バッファに書き込む位置を指定できますが、基本的には、offset = 0, seek = PA_SEEK_RELATIVE にします。
これにより、前回書き込まれた位置の後に、データを追加する形になります。

PulseAudio の再生バッファは、リングバッファなどではなく、表面的には、書き込むほど位置が増加する、フラットなバッファとなっているので、再生開始位置を 0 として、単純にバッファ位置は増えていきます。
書き込みの準備
int pa_stream_begin_write(pa_stream *p, void **data, size_t *nbytes);
int pa_stream_cancel_write(pa_stream *p);

サーバーに書き込むためのバッファを自分で用意するのではなく、PulseAudio 側で用意されたメモリ領域を使いたい場合は、pa_stream_write() の前に、pa_stream_begin_write() を実行します。

これにより、サーバーへのデータのコピーを最適化できるので、write の前に、この関数を使うことが推奨されます。

*nbytes に、書き込みたいバイト数を指定して実行すると、*data に書き込み用のバッファのポインタが返り、*nbytes に、書き込み可能な最大バイト数が返ります。
*nbytes に (size_t)-1 を指定すると、サイズが自動的に選択されます。

手順
再生データを書き込む場合は、まず、pa_stream_begin_write() でポインタとサイズを取得した後、そのバッファに再生データを書き込み、その後、pa_stream_write() に、begin_write で取得したポインタと、実際に書き込んだサイズを指定して、サーバーにデータを書き込みます。
この時、free_cb は NULL にします。

pa_stream_begin_write() 後、データを書き込んだら、あまり時間を掛けずに pa_stream_write() を実行すること。

pa_stream_write() 後は、取得したポインタのメモリ領域は無効になるので、アクセスしないようにしてください。
また、取得したポインタは、明示的に解放する必要はありません。

キャンセル
pa_stream_begin_write() 後に、データの書き込みをキャンセルしたい場合は、pa_stream_cancel_write() を実行します。
バッファにセット済みのデータは削除され、取得したメモリ領域は無効になります。
書き込み可能なサイズの取得
size_t pa_stream_writable_size(const pa_stream *p);

サーバーに書き込むことができるバイト数を返します。
エラー時は (size_t)-1 が返ります。

フレームサイズ単位の値になるので、この関数で取得したサイズを使って書き込む場合、値を調整する必要はありません。

なお、後述しますが、この関数で返る値は、実際に使用可能な、バッファの最大サイズとは一致しません。
また、実際には一部のサイズが書き込み可能でも、ある程度まとめて書き込めるように、しばらくは 0 が返ってくる場合があります。
コールバック
void pa_stream_set_write_callback(pa_stream *p, pa_stream_request_cb_t cb, void *userdata);

//コールバック関数
typedef void (*pa_stream_request_cb_t)(pa_stream *p, size_t nbytes, void *userdata);

サーバーに新しいデータが書き込めるようになった時に実行される、コールバック関数をセットします。

渡される nbytes 引数は、現在書き込み可能なサイズです。

このコールバックも、現在のバッファの状況によって、ある程度サイズが調整された上で、実行されます。
その他のストリーム操作
再生を停止する
pa_operation *pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata);

//現在停止中か (負の値でエラー)
int pa_stream_is_corked(const pa_stream *s);

このストリームの再生を、一時停止または再開します。
b が 0 で再開、それ以外で一時停止します。

ストリームの connect 時に、flags で PA_STREAM_START_CORKED を ON にすると、最初に一時停止状態で接続されます。

なお、ストリームの再生を停止するということは、サーバーが、このシンク入力の再生バッファからデータを読み込まず、再生をさせない、ということなので、ストリームへのデータの書き込みなどは行えます。
再生されるまで待つ
サーバー上の再生バッファに書き込まれているデータが、すべて再生されるまで待ちたい場合は、pa_stream_drain() を使います。

pa_operation *pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata);

//コールバック関数
typedef void (*pa_stream_success_cb_t)(pa_stream *s, int success, void *userdata);

すべてのデータが再生され、再生バッファが空(読み込み位置が書き込み位置と同じ)になったときに、コールバック関数が実行されます。

pa_operation * が返るので、このオブジェクトを使って、操作が完了するまで待ちます。
バッファをフラッシュする
pa_operation *pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata);

再生/録音バッファをフラッシュし、バッファ上のすべてのデータを破棄します。

同じストリームで、別のオーディオを新しく再生したい時は、まだ再生されずに、バッファに残っているデータを消す必要があります。
プログラム
1秒分のバッファを確保し、そこにノコギリ波のデータを書き込んだ後、そのデータをサーバーに書き込みつつ、再生するプログラムです。
ドレミの音を各1秒、合計3秒間、再生します。

$ cc -o 12-write 12-write.c util.c -lpulse

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pulse/pulseaudio.h>
#include "util.h"

#define SAMPRATE 48000

PulseData *pulse;

//書き込み可能になった時

static void _cb_write(pa_stream *p,size_t nbytes,void *userdata)
{
    printf("- writeable: %d\n", (int)nbytes);
    
    pa_threaded_mainloop_signal(pulse->mainloop, 0);
}

//再生しつつ、バッファのすべてのデータを書き込み

static void _write_data(void *buf,int sendsize)
{
    pa_stream *strm = pulse->stream;
    size_t size;
    void *writebuf;

    pa_threaded_mainloop_lock(pulse->mainloop);

    while(sendsize)
    {
        //書き込み可能なサイズ取得

        while(1)
        {
            size = pa_stream_writable_size(strm);
            if(size == (size_t)-1) goto END;

            if(size) break;

            //書き込み可能になるまで待つ
            pa_threaded_mainloop_wait(pulse->mainloop);
        }

        //準備

        printf("(size:%d, remain:%d)\n", (int)size, (int)sendsize);

        if(size > sendsize) size = sendsize;
        
        if(pa_stream_begin_write(strm, &writebuf, &size)) break;

        //コピー

        memcpy(writebuf, buf, size);

        //書き込み

        printf("* write: %d\n", (int)size);

        pa_stream_write(strm, writebuf, size, NULL, 0, PA_SEEK_RELATIVE);

        //次へ

        buf += size;
        sendsize -= size;
    }

END:
    pa_threaded_mainloop_unlock(pulse->mainloop);
}

static void _cb_drain_success(pa_stream *s,int success,void *data)
{
    pa_threaded_mainloop_signal(pulse->mainloop, 0);
}

static pa_operation *_wait_end(PulseData *p,void *data)
{
    return pa_stream_drain(p->stream, _cb_drain_success, NULL);
}

int main(void)
{
    uint8_t *buf;
    int i,bufsize;
    double freq[3] = {261.626, 293.665, 329.628};

    pulse = pulse_connect(0);
    if(!pulse) return 1;

    if(pulse_create_stream_play(pulse, PA_SAMPLE_S16LE, SAMPRATE, 1))
        pulse_enderr(pulse);

    //書き込み可能になった時のコールバックセット

    pa_threaded_mainloop_lock(pulse->mainloop);
    pa_stream_set_write_callback(pulse->stream, _cb_write, NULL);
    pa_threaded_mainloop_unlock(pulse->mainloop);

    //再生

    bufsize = 2 * SAMPRATE;

    buf = (uint8_t *)malloc(bufsize);

    for(i = 0; i < 3; i++)
    {
        pulse_write_buf_sawtooth(buf, bufsize, SAMPRATE, freq[i]);
        _write_data(buf, bufsize);
    }

    free(buf);

    //再生が終わるまで待つ

    printf("# wait\n");

    pulse_wait_operation(pulse, _wait_end, NULL);

    //

    pulse_free(pulse);

    return 0;
}
解説
出力結果は、以下のようになります。

# stream connected
(size:24000, remain:96000) <- 'ド'
* write: 24000
- writeable: 20048
(size:20048, remain:72000)
* write: 20048
- writeable: 3952
- writeable: 18416
(size:18416, remain:51952)
* write: 18416
- writeable: 18400
(size:18400, remain:33536)
* write: 18400
- writeable: 18384
(size:18384, remain:15136)
* write: 15136
(size:3248, remain:96000) <- 'レ'
* write: 3248
- writeable: 18400
(size:18400, remain:92752)
* write: 18400
- writeable: 18368
(size:18368, remain:74352)
* write: 18368
- writeable: 18352
(size:18352, remain:55984)
* write: 18352
- writeable: 18368
(size:18368, remain:37632)
* write: 18368
- writeable: 18400
(size:18400, remain:19264)
* write: 18400
- writeable: 8464
- writeable: 18368
(size:18368, remain:864)
* write: 864
(size:17504, remain:96000) <- 'ミ'
* write: 17504
- writeable: 18352
(size:18352, remain:78496)
* write: 18352
- writeable: 18384
(size:18384, remain:60144)
* write: 18384
- writeable: 18368
(size:18368, remain:41760)
* write: 18368
- writeable: 18368
(size:18368, remain:23392)
* write: 18368
- writeable: 18368
(size:18368, remain:5024)
* write: 5024
# wait
- writeable: 24000

確保したバッファに、1秒分のデータを作成した後、まず、pa_stream_writable_size() で、現在書き込み可能なサイズを取得します。
上記の場合は、24000 byte (1/4 秒分) です。

書き込み可能なサイズの分だけ、サーバーに書き込んだ後、ループを繰り返します。

最初の書き込みの後、すぐには次のデータを書き込めないので、新しいデータが書き込み可能になった時のコールバックが来るまで待ちます。
この間、サーバー上で順次再生が行われます。

最初に来たコールバック関数で渡された nbytes 引数は、20032 byte です。
シグナルを送り、wait を抜けた後、pa_stream_writable_size() で返った値も同じになります。

このように、データを書き込んだ後、バッファが空くまで再生して待つ、という処理を繰り返します。

1秒分のデータをすべて書き込めたら、次のデータを用意して、また書き込みます。

最後に、書き込み済みのデータがすべて再生されるまで待った後、終了します。