Passer au contenu principal
L’X API prend en charge les données en temps réel via des endpoints comme l’endpoint de flux filtré (Filtered Stream Endpoint), qui fournit les Publications correspondantes à mesure qu’elles sont créées. Cela nécessite d’établir une connexion HTTP persistante.

Configuration et streaming de base

Synchrone

from xdk import Client
# Initialiser le client
client = Client(bearer_token="your_bearer_token")
# Diffuser les publications en continu (assurez-vous d'avoir configuré les règles au préalable)
for post_response in client.stream.posts():
    data = post_response.model_dump() if hasattr(post_response, 'model_dump') else dict(post_response)
    if 'data' in data and data['data']:
        tweet = data['data']
        post_text = tweet.get('text', '') if isinstance(tweet, dict) else (tweet.text if hasattr(tweet, 'text') else '')
        print(f"Post: {post_text}")

Asynchrone

import asyncio
from asyncio import Queue
import threading
from xdk import Client
async def stream_posts_async(client: Client):
    queue = Queue()
    loop = asyncio.get_event_loop()
    stop = threading.Event()
    def run_stream():
        for post in client.stream.posts():
            if stop.is_set():
                break
            asyncio.run_coroutine_threadsafe(queue.put(post), loop)
        asyncio.run_coroutine_threadsafe(queue.put(None), loop)
    threading.Thread(target=run_stream, daemon=True).start()
    while True:
        post = await queue.get()
        if post is None:
            break
        data = post.model_dump()
        if 'data' in data and data['data']:
            print(f"Publication : {data['data'].get('text', '')}")
    stop.set()
async def main():
    client = Client(bearer_token="your_bearer_token")
    await stream_posts_async(client)
asyncio.run(main())

Gestion des règles

Les règles définissent des filtres sur les données spécifiques que vous souhaitez obtenir (par exemple des mots-clés, des utilisateurs, etc.). Vous pouvez en savoir plus sur la création de règles à l’aide de ce guide Ajout de règles :
from xdk.stream.models import UpdateRulesRequest
# Ajouter une règle
add_rules = {
    "add": [
        {"value": "from:xdevelopers", "tag": "official_updates"}
    ]
}
request_body = UpdateRulesRequest(**add_rules)
response = client.stream.update_rules(body=request_body)
Suppression des règles :
from xdk.stream.models import UpdateRulesRequest
delete_rules = {
    "delete": {
        "ids": ["rule_id_1", "rule_id_2"]
    }
}
request_body = UpdateRulesRequest(**delete_rules)
response = client.stream.update_rules(body=request_body)
Liste des règles:
# get_rules returns an Iterator, so iterate over it
for page in client.stream.get_rules():
    if page.data:
        for rule in page.data:
            # Accéder aux attributs de règle - les modèles Pydantic prennent en charge l'accès par attribut et par dictionnaire
            rule_id = rule.id if hasattr(rule, 'id') else rule.get('id', '')
            rule_value = rule.value if hasattr(rule, 'value') else rule.get('value', '')
            rule_tag = rule.tag if hasattr(rule, 'tag') else rule.get('tag', '')
            print(f"ID: {rule_id}, Value: {rule_value}, Tag: {rule_tag}")
    break  # Remove break to get all pages
Pour connaître la syntaxe complète des règles, consultez la documentation sur les règles de streaming X.

Dépannage

  • 403 Forbidden : authentification non valide ou autorisations insuffisantes.
  • 420 Enhance Your Calm : limitation de débit ; attendez puis réessayez.
  • No Data : vérifiez les règles avec get_rules() ; assurez-vous qu’il existe des Publications correspondantes. Pour des exemples de code détaillés utilisant le XDK Python, consultez notre dépôt GitHub d’exemples de code. Pour plus d’exemples et pour la Référence de l’API, consultez les docstrings intégrés (par exemple help(client.tweets.search_recent)) ou les stubs générés dans le code source. Partagez vos retours via le dépôt GitHub.