メインコンテンツへスキップ
X API は、Filtered Stream エンドポイント などのエンドポイントを通じてリアルタイムのデータをサポートしており、条件に一致する投稿を投稿され次第配信します。これには HTTP 接続を永続的に確立しておく必要があります。

セットアップと基本的なストリーミング

同期

from xdk import Client
# Clientを初期化
client = Client(bearer_token="your_bearer_token")
# 投稿をストリーミング(最初にルールを設定してください)
for post_response in client.stream.posts():
    data = post_response.model_dump() if hasattr(post_response, 'model_dump') else dict(post_response)
    if 'data' in data and data['data']:
        tweet = data['data']
        post_text = tweet.get('text', '') if isinstance(tweet, dict) else (tweet.text if hasattr(tweet, 'text') else '')
        print(f"Post: {post_text}")

非同期

import asyncio
from asyncio import Queue
import threading
from xdk import Client
async def stream_posts_async(client: Client):
    queue = Queue()
    loop = asyncio.get_event_loop()
    stop = threading.Event()
    def run_stream():
        for post in client.stream.posts():
            if stop.is_set():
                break
            asyncio.run_coroutine_threadsafe(queue.put(post), loop)
        asyncio.run_coroutine_threadsafe(queue.put(None), loop)
    threading.Thread(target=run_stream, daemon=True).start()
    while True:
        post = await queue.get()
        if post is None:
            break
        data = post.model_dump()
        if 'data' in data and data['data']:
            print(f"Post: {data['data'].get('text', '')}")
    stop.set()
async def main():
    client = Client(bearer_token="your_bearer_token")
    await stream_posts_async(client)
asyncio.run(main())

ルール管理

ルールは、取得したい特定のデータ(キーワード、ユーザーなど)を絞り込むフィルターを定義します。ルールの構築方法については、このガイドで詳しく学ぶことができます。 ルールの追加:
from xdk.stream.models import UpdateRulesRequest
# ルールを追加
add_rules = {
    "add": [
        {"value": "from:xdevelopers", "tag": "official_updates"}
    ]
}
request_body = UpdateRulesRequest(**add_rules)
response = client.stream.update_rules(body=request_body)
ルールの削除
from xdk.stream.models import UpdateRulesRequest
delete_rules = {
    "delete": {
        "ids": ["rule_id_1", "rule_id_2"]
    }
}
request_body = UpdateRulesRequest(**delete_rules)
response = client.stream.update_rules(body=request_body)
ルールの一覧:
# get_rules returns an Iterator, so iterate over it
for page in client.stream.get_rules():
    if page.data:
        for rule in page.data:
            # ルール属性にアクセス - Pydantic モデルは属性アクセスと辞書アクセスの両方をサポートします
            rule_id = rule.id if hasattr(rule, 'id') else rule.get('id', '')
            rule_value = rule.value if hasattr(rule, 'value') else rule.get('value', '')
            rule_tag = rule.tag if hasattr(rule, 'tag') else rule.get('tag', '')
            print(f"ID: {rule_id}, Value: {rule_value}, Tag: {rule_tag}")
    break  # Remove break to get all pages
ルール構文の完全な仕様については、X Streaming Rules Docs を参照してください。

トラブルシューティング

  • 403 Forbidden: 認証情報が無効であるか、権限が不足しています。
  • 420 Enhance Your Calm: レート制限が適用されています。しばらく待ってから再試行してください。
  • No Data: get_rules() でルールを確認し、一致する投稿が存在することを確認してください。 Python XDK を使用した詳細なコード例については、コードサンプルの GitHub リポジトリを参照してください。 さらに多くの例や API リファレンスについては、インラインの docstring(例: help(client.tweets.search_recent)) やソース内の生成されたスタブを参照してください。GitHub リポジトリからフィードバックをお寄せください。