Saltar al contenido principal
La X API admite datos en tiempo real mediante endpoints como el Filtered Stream Endpoint, que entrega las Publicaciones coincidentes a medida que se van produciendo. Esto requiere establecer una conexión HTTP persistente.

Configuración y streaming básico

Síncrono

from xdk import Client
# Inicializar el cliente
client = Client(bearer_token="your_bearer_token")
# Transmitir publicaciones (asegúrate de configurar las reglas primero)
for post_response in client.stream.posts():
    data = post_response.model_dump() if hasattr(post_response, 'model_dump') else dict(post_response)
    if 'data' in data and data['data']:
        tweet = data['data']
        post_text = tweet.get('text', '') if isinstance(tweet, dict) else (tweet.text if hasattr(tweet, 'text') else '')
        print(f"Post: {post_text}")

Asíncrono

import asyncio
from asyncio import Queue
import threading
from xdk import Client
async def stream_posts_async(client: Client):
    queue = Queue()
    loop = asyncio.get_event_loop()
    stop = threading.Event()
    def run_stream():
        for post in client.stream.posts():
            if stop.is_set():
                break
            asyncio.run_coroutine_threadsafe(queue.put(post), loop)
        asyncio.run_coroutine_threadsafe(queue.put(None), loop)
    threading.Thread(target=run_stream, daemon=True).start()
    while True:
        post = await queue.get()
        if post is None:
            break
        data = post.model_dump()
        if 'data' in data and data['data']:
            print(f"Publicación: {data['data'].get('text', '')}")
    stop.set()
async def main():
    client = Client(bearer_token="your_bearer_token")
    await stream_posts_async(client)
asyncio.run(main())

Gestión de reglas

Las reglas definen filtros sobre los datos específicos que estás buscando (por ejemplo, palabras clave, usuarios, etc.). Puedes obtener más información sobre cómo crear reglas en esta guía. Agregar reglas:
from xdk.stream.models import UpdateRulesRequest
# Añadir una regla
add_rules = {
    "add": [
        {"value": "from:xdevelopers", "tag": "official_updates"}
    ]
}
request_body = UpdateRulesRequest(**add_rules)
response = client.stream.update_rules(body=request_body)
Eliminar reglas:
from xdk.stream.models import UpdateRulesRequest
delete_rules = {
    "delete": {
        "ids": ["rule_id_1", "rule_id_2"]
    }
}
request_body = UpdateRulesRequest(**delete_rules)
response = client.stream.update_rules(body=request_body)
Enumerar reglas:
# get_rules returns an Iterator, so iterate over it
for page in client.stream.get_rules():
    if page.data:
        for rule in page.data:
            # Accede a los atributos de la regla - Los modelos Pydantic admiten acceso tanto por atributo como por diccionario
            rule_id = rule.id if hasattr(rule, 'id') else rule.get('id', '')
            rule_value = rule.value if hasattr(rule, 'value') else rule.get('value', '')
            rule_tag = rule.tag if hasattr(rule, 'tag') else rule.get('tag', '')
            print(f"ID: {rule_id}, Value: {rule_value}, Tag: {rule_tag}")
    break  # Remove break to get all pages
Para obtener la sintaxis completa de las reglas, consulta la documentación de reglas de streaming de X.

Solución de problemas

  • 403 Forbidden: Autenticación no válida o permisos insuficientes.
  • 420 Enhance Your Calm: Límite de solicitudes alcanzado; espera y vuelve a intentarlo.
  • No Data: Verifica las reglas con get_rules(); asegúrate de que existan Publicaciones coincidentes. Para ver ejemplos de código detallados con el XDK de Python, consulta nuestro repositorio de ejemplos de código en GitHub. Para más ejemplos y la Referencia de la API, revisa los docstrings incluidos en el código (por ejemplo, help(client.tweets.search_recent)) o los stubs generados en el código fuente. Envía tus comentarios a través del repositorio de GitHub.