Documentation Index
Fetch the complete documentation index at: https://generaltranslation.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
La X API admite datos en tiempo real mediante endpoints como el Filtered Stream Endpoint, que entrega las Publicaciones coincidentes a medida que se van produciendo. Esto requiere establecer una conexión HTTP persistente.
Configuración y streaming básico
from xdk import Client
# Inicializar el cliente
client = Client(bearer_token="your_bearer_token")
# Transmitir publicaciones (asegúrate de configurar las reglas primero)
for post_response in client.stream.posts():
data = post_response.model_dump() if hasattr(post_response, 'model_dump') else dict(post_response)
if 'data' in data and data['data']:
tweet = data['data']
post_text = tweet.get('text', '') if isinstance(tweet, dict) else (tweet.text if hasattr(tweet, 'text') else '')
print(f"Post: {post_text}")
import asyncio
from asyncio import Queue
import threading
from xdk import Client
async def stream_posts_async(client: Client):
queue = Queue()
loop = asyncio.get_event_loop()
stop = threading.Event()
def run_stream():
for post in client.stream.posts():
if stop.is_set():
break
asyncio.run_coroutine_threadsafe(queue.put(post), loop)
asyncio.run_coroutine_threadsafe(queue.put(None), loop)
threading.Thread(target=run_stream, daemon=True).start()
while True:
post = await queue.get()
if post is None:
break
data = post.model_dump()
if 'data' in data and data['data']:
print(f"Publicación: {data['data'].get('text', '')}")
stop.set()
async def main():
client = Client(bearer_token="your_bearer_token")
await stream_posts_async(client)
asyncio.run(main())
Las reglas definen filtros sobre los datos específicos que estás buscando (por ejemplo, palabras clave, usuarios, etc.). Puedes obtener más información sobre cómo crear reglas en esta guía.
Agregar reglas:
from xdk.stream.models import UpdateRulesRequest
# Añadir una regla
add_rules = {
"add": [
{"value": "from:xdevelopers", "tag": "official_updates"}
]
}
request_body = UpdateRulesRequest(**add_rules)
response = client.stream.update_rules(body=request_body)
Eliminar reglas:
from xdk.stream.models import UpdateRulesRequest
delete_rules = {
"delete": {
"ids": ["rule_id_1", "rule_id_2"]
}
}
request_body = UpdateRulesRequest(**delete_rules)
response = client.stream.update_rules(body=request_body)
Enumerar reglas:
# get_rules returns an Iterator, so iterate over it
for page in client.stream.get_rules():
if page.data:
for rule in page.data:
# Accede a los atributos de la regla - Los modelos Pydantic admiten acceso tanto por atributo como por diccionario
rule_id = rule.id if hasattr(rule, 'id') else rule.get('id', '')
rule_value = rule.value if hasattr(rule, 'value') else rule.get('value', '')
rule_tag = rule.tag if hasattr(rule, 'tag') else rule.get('tag', '')
print(f"ID: {rule_id}, Value: {rule_value}, Tag: {rule_tag}")
break # Remove break to get all pages
Para obtener la sintaxis completa de las reglas, consulta la documentación de reglas de streaming de X.
- 403 Forbidden: Autenticación no válida o permisos insuficientes.
- 420 Enhance Your Calm: Límite de solicitudes alcanzado; espera y vuelve a intentarlo.
- No Data: Verifica las reglas con
get_rules(); asegúrate de que existan Publicaciones coincidentes.
Para ver ejemplos de código detallados con el XDK de Python, consulta nuestro repositorio de ejemplos de código en GitHub.
Para más ejemplos y la Referencia de la API, revisa los docstrings incluidos en el código (por ejemplo, help(client.tweets.search_recent)) o los stubs generados en el código fuente. Envía tus comentarios a través del repositorio de GitHub.