메인 콘텐츠로 건너뛰기
X API는 Filtered Stream Endpoint와 같은 엔드포인트를 통해 실시간 데이터를 지원하며, 조건에 맞는 포스트를 실시간으로 전송합니다. 이를 위해서는 지속적인 HTTP 연결을 유지해야 합니다.

설정 및 기본 스트리밍

동기식

from xdk import Client
# Client 초기화
client = Client(bearer_token="your_bearer_token")
# 포스트 스트리밍 (먼저 규칙을 설정해야 합니다)
for post_response in client.stream.posts():
    data = post_response.model_dump() if hasattr(post_response, 'model_dump') else dict(post_response)
    if 'data' in data and data['data']:
        tweet = data['data']
        post_text = tweet.get('text', '') if isinstance(tweet, dict) else (tweet.text if hasattr(tweet, 'text') else '')
        print(f"Post: {post_text}")

비동기

import asyncio
from asyncio import Queue
import threading
from xdk import Client
async def stream_posts_async(client: Client):
    queue = Queue()
    loop = asyncio.get_event_loop()
    stop = threading.Event()
    def run_stream():
        for post in client.stream.posts():
            if stop.is_set():
                break
            asyncio.run_coroutine_threadsafe(queue.put(post), loop)
        asyncio.run_coroutine_threadsafe(queue.put(None), loop)
    threading.Thread(target=run_stream, daemon=True).start()
    while True:
        post = await queue.get()
        if post is None:
            break
        data = post.model_dump()
        if 'data' in data and data['data']:
            print(f"Post: {data['data'].get('text', '')}")
    stop.set()
async def main():
    client = Client(bearer_token="your_bearer_token")
    await stream_posts_async(client)
asyncio.run(main())

규칙 관리

규칙은 어떤 데이터를 찾을지에 대한 필터를 정의합니다 (예: 키워드, 사용자 등). 규칙을 만드는 방법에 대해서는 이 가이드를 참고하세요. 규칙 추가:
from xdk.stream.models import UpdateRulesRequest
# 규칙 추가
add_rules = {
    "add": [
        {"value": "from:xdevelopers", "tag": "official_updates"}
    ]
}
request_body = UpdateRulesRequest(**add_rules)
response = client.stream.update_rules(body=request_body)
규칙 삭제하기:
from xdk.stream.models import UpdateRulesRequest
delete_rules = {
    "delete": {
        "ids": ["rule_id_1", "rule_id_2"]
    }
}
request_body = UpdateRulesRequest(**delete_rules)
response = client.stream.update_rules(body=request_body)
규칙 조회:
# get_rules returns an Iterator, so iterate over it
for page in client.stream.get_rules():
    if page.data:
        for rule in page.data:
            # 규칙 속성 접근 - Pydantic 모델은 속성 및 딕셔너리 접근을 모두 지원합니다
            rule_id = rule.id if hasattr(rule, 'id') else rule.get('id', '')
            rule_value = rule.value if hasattr(rule, 'value') else rule.get('value', '')
            rule_tag = rule.tag if hasattr(rule, 'tag') else rule.get('tag', '')
            print(f"ID: {rule_id}, Value: {rule_value}, Tag: {rule_tag}")
    break  # Remove break to get all pages
전체 규칙 구문은 X Streaming Rules 문서를 참고하세요.

문제 해결

  • 403 Forbidden: 잘못된 인증 정보이거나 권한이 부족합니다.
  • 420 Enhance Your Calm: 요청 한도에 도달했습니다. 잠시 기다린 후 다시 시도하세요.
  • No Data: get_rules()로 규칙을 확인하고, 일치하는 포스트가 존재하는지 확인하세요. Python XDK를 사용하는 자세한 코드 예제는 code samples GitHub repo를 참고하세요. 추가 예제와 API 참조 문서는 인라인 도크스트링(예: help(client.tweets.search_recent))이나 소스 코드에 생성된 스텁을 참고하세요. GitHub repo를 통해 피드백을 보내주세요.