メインコンテンツへスキップ

ストリーミングデータを処理する Client の構築

ストリーミングエンドポイントを使用する際には、効率的に活用するために押さえておくべき一般的なベストプラクティスがいくつかあります。    

クライアント設計

filter stream エンドポイントを使ってソリューションを構築する場合、次のことができるクライアントが必要です。
  1. filter stream エンドポイントへの HTTPS ストリーミング接続を確立する。
  2. ストリームからルールを追加・削除するために、filter stream rules エンドポイントへ POST リクエストを非同期で送信する。
  3. 低トラフィック時の処理 – ストリーミング接続を維持し、ポストオブジェクトとキープアライブシグナルを検出する。
  4. 高トラフィック時の処理 – 非同期処理を用いてストリームの取り込みと追加処理を分離し、クライアント側バッファが定期的にフラッシュされるようにする。
  5. クライアント側で利用量の追跡を管理する。
  6. ストリーム切断を検出し、状況を評価して自動的にストリームへ再接続する。  

ストリーミングエンドポイントへの接続

X API v2 のストリーミングエンドポイントへの接続を確立するということは、非常に長時間維持される HTTP リクエストを送信し、そのレスポンスを逐次的にパースしていくことを意味します。概念的には、HTTP 経由で終わりのないファイルをダウンロードしているようなものだと考えられます。接続が一度確立されると、その接続が開いている限り、X のサーバーはポストイベントをその接続を通じて送り続けます。  

データの取得

JSON オブジェクトの各フィールドには順序がなく、あらゆる状況で必ずしもすべてのフィールドが存在するとは限りません。同様に、個々のアクティビティはソートされた順序で配信されるとは限らず、重複したメッセージが届く場合もあります。時間の経過とともに、新しいメッセージタイプが追加され、ストリーム経由で送信される可能性があることも念頭に置いてください。 したがって、クライアントは次の点に対応できる必要があります。
  • 任意の順序で出現するフィールド
  • 予期しないフィールドまたは欠落しているフィールド
  • ソートされていない投稿
  • 重複したメッセージ
  • 任意のタイミングでストリームに流れてくる、新しい任意のメッセージタイプ
関連するポストデータと要求されたフィールドパラメータに加えて、ストリーム接続では次の種類のメッセージが配信される可能性があります。この一覧は網羅的ではなく、ストリームに追加のオブジェクトが導入される可能性がある点に注意してください。パーサーが予期しないメッセージ形式にも対応できるようにしてください。  

バッファリング

ストリーミングエンドポイントは、データが利用可能になり次第すぐに送信するため、多くの場合、大量のデータが流れます。X のサーバーがすぐに新しいデータをストリームに書き込めない場合 (たとえば、クライアントの読み取りが十分に速くない場合。詳しくは 切断処理 を参照してください) 、クライアントが処理に追いつけるように、サーバー側でコンテンツがバッファリングされます。しかし、このバッファがいっぱいになると、接続を切断するための強制切断が行われ、その時点までにバッファリングされていた投稿は破棄され、再送信されません。詳細は以下を参照してください。 アプリが処理に追いついていないタイミングを特定する 1 つの方法は、受信している投稿のタイムスタンプを現在時刻と比較し、その差を継続的に追跡することです。 パブリックインターネット上でのレイテンシーや一時的な不具合により、ストリームのバックアップを完全に排除することはできませんが、アプリを適切に構成することで、その多くを抑制できます。バックアップの発生を最小限に抑えるには、次の点に注意してください。
  • クライアントがストリームを十分な速度で読み取っていることを確認してください。通常、ストリームを読み取りながら本格的な処理を行うべきではありません。ストリームは読み取りに専念し、アクティビティは別のスレッド/プロセス/データストアに渡し、処理は非同期で行ってください。
  • データセンターが、大きな持続的データ量に加え、それを大きく上回るスパイク (例: 通常の 5〜10 倍のボリューム) にも対応できる十分な受信帯域幅を備えていることを確認してください。filtered stream の場合、必要となるデータ量およびそれに対応する帯域幅は、ルールによってマッチしている投稿に完全に依存します。  

利用状況の追跡とルール管理

各開発者が自分のストリームに対して想定する「通常」のデータ量は異なるため、特定の割合の増減や期間について一般的な推奨値は用意していません。  ストリームのデータ量について、予期しない変動がないか監視することを検討してください。データ量の減少は、ストリームの切断とは異なる種類の問題を示している場合があります。このような状況では、ストリームは引き続きキープアライブ信号や新しいアクティビティデータを受信している可能性があります。ただし、投稿数が大きく減少している場合は、アプリケーションやネットワークへの流入データ量が減少する原因がないかを調査し、関連する通知がないかステータスページも確認してください。 このような監視を行うには、一定時間内に新しい投稿がどの程度発生すると想定しているかを追跡するとよいでしょう。ストリームのデータボリュームが指定したしきい値を大きく下回り、かつ設定した期間内に回復しない場合は、アラートや通知をトリガーするべきです。また、特にフィルタリングされたストリームのルールを変更している最中や、投稿アクティビティが急増するイベントが発生した場合には、データ量の大きな増加についても監視したほうがよいでしょう。 フィルタリングされたストリーム経由で配信される投稿は、月間の合計投稿ボリュームにカウントされることに注意してください。最適化のために、消費状況を追跡し調整する必要があります。ボリュームが高い場合は、必要に応じて各ルールに sample: オペレーターを追加し、マッチ率を 100% から sample:50sample:25 に下げることを検討してください。  さらに、事前に設定したしきい値をボリュームが超えた場合にチームへ通知する仕組みをアプリケーション内に実装することを推奨します。また、過剰なデータを取り込んでいるルールを自動的に削除する、あるいは極端な状況ではストリームから完全に切断するなどの対策を導入することも検討してください。  

システムメッセージへの応答

Keep-alive シグナル 少なくとも 20 秒ごとに、ストリームは確立済みの接続を通じて、\r\n のキャリッジリターンという形式の keep-alive シグナル (ハートビート) を送信し、クライアントのタイムアウトを防ぎます。クライアントアプリケーションは、ストリーム内に含まれる \r\n 文字を許容できるようにしておく必要があります。 クライアントが HTTP ライブラリで読み取りタイムアウトを正しく実装していれば、データがこの期間内に読み取られない場合に、HTTP プロトコルおよび HTTP ライブラリがイベントを発生させてくれるため、\r\n 文字を明示的に監視する必要はありません。 このイベントは、通常は例外のスロー、または使用している HTTP ライブラリに応じた別のイベントとして発生します。これらのタイムアウトを検出するために、HTTP メソッドをエラー/イベントハンドラーでラップすることを強く推奨します。タイムアウトが発生した場合、アプリケーションは再接続を試みる必要があります。 エラーメッセージ v2 ストリーミングエンドポイントは、ストリーム内でエラーメッセージを配信することもあります。以下に、これらのメッセージの基本的な形式といくつかの例を示します。配信されるメッセージは変更される可能性があり、新しいメッセージが追加される場合があることに注意してください。クライアントアプリケーションは、変化するシステムメッセージのペイロードを許容できるようにしておく必要があります。 エラーメッセージには、問題の解決方法を説明するドキュメントへのリンクが含まれている点に注意してください。 メッセージ形式:
	"errors": [{
		"title": "operational-disconnect",
		"disconnect_type": "UpstreamOperationalDisconnect",
		"detail": "This stream has been disconnected upstream for operational reasons.",
		"type": "https://api.x.com/2/problems/operational-disconnect"
	}]
}
バッファがフルになったことによる強制切断を示すエラーメッセージは、その強制切断を引き起こした滞留によってメッセージ自体が配送されず、クライアントに届かない可能性があります。そのため、再接続のトリガーとしてこれらのメッセージに依存しないでください。