मुख्य सामग्री पर जाएं
X API, फ़िल्टर्ड स्ट्रीम एंडपॉइंट जैसे एंडपॉइंट्स के ज़रिए रीयल-टाइम डेटा का समर्थन करता है और मिलान करने वाले पोस्ट्स को उनके होते ही उपलब्ध कराता है। इसके लिए एक स्थायी http कनेक्शन बनाए रखना आवश्यक है।

सेटअप और बुनियादी स्ट्रीमिंग

सिंक्रोनस

from xdk import Client
# Client प्रारंभ करें
client = Client(bearer_token="your_bearer_token")
# पोस्ट्स स्ट्रीम करें (सुनिश्चित करें कि पहले नियम सेट अप हों)
for post_response in client.stream.posts():
    data = post_response.model_dump() if hasattr(post_response, 'model_dump') else dict(post_response)
    if 'data' in data and data['data']:
        tweet = data['data']
        post_text = tweet.get('text', '') if isinstance(tweet, dict) else (tweet.text if hasattr(tweet, 'text') else '')
        print(f"Post: {post_text}")

असिंक्रोनस

import asyncio
from asyncio import Queue
import threading
from xdk import Client
async def stream_posts_async(client: Client):
    queue = Queue()
    loop = asyncio.get_event_loop()
    stop = threading.Event()
    def run_stream():
        for post in client.stream.posts():
            if stop.is_set():
                break
            asyncio.run_coroutine_threadsafe(queue.put(post), loop)
        asyncio.run_coroutine_threadsafe(queue.put(None), loop)
    threading.Thread(target=run_stream, daemon=True).start()
    while True:
        post = await queue.get()
        if post is None:
            break
        data = post.model_dump()
        if 'data' in data and data['data']:
            print(f"Post: {data['data'].get('text', '')}")
    stop.set()
async def main():
    client = Client(bearer_token="your_bearer_token")
    await stream_posts_async(client)
asyncio.run(main())

नियम प्रबंधन

नियम इस बात के लिए फ़िल्टर तय करते हैं कि आप कौन-सा विशिष्ट डेटा ढूंढ़ रहे हैं (जैसे, कीवर्ड, उपयोगकर्ता आदि)। नियम कैसे बनाएँ, इसके बारे में अधिक जानने के लिए इस गाइड को देखें। नियम जोड़ना:
from xdk.stream.models import UpdateRulesRequest
# एक नियम जोड़ें
add_rules = {
    "add": [
        {"value": "from:xdevelopers", "tag": "official_updates"}
    ]
}
request_body = UpdateRulesRequest(**add_rules)
response = client.stream.update_rules(body=request_body)
नियम हटाना:
from xdk.stream.models import UpdateRulesRequest
delete_rules = {
    "delete": {
        "ids": ["rule_id_1", "rule_id_2"]
    }
}
request_body = UpdateRulesRequest(**delete_rules)
response = client.stream.update_rules(body=request_body)
नियमों की सूची:
# get_rules एक Iterator लौटाता है, इसलिए इस पर iterate करें
for page in client.stream.get_rules():
    if page.data:
        for rule in page.data:
            # rule attributes तक पहुँचें - Pydantic models attribute और dict दोनों प्रकार की access को support करते हैं
            rule_id = rule.id if hasattr(rule, 'id') else rule.get('id', '')
            rule_value = rule.value if hasattr(rule, 'value') else rule.get('value', '')
            rule_tag = rule.tag if hasattr(rule, 'tag') else rule.get('tag', '')
            print(f"ID: {rule_id}, Value: {rule_value}, Tag: {rule_tag}")
    break  # सभी pages प्राप्त करने के लिए break हटाएँ
नियमों के पूर्ण सिंटैक्स के लिए, X स्ट्रीमिंग नियम दस्तावेज़ देखें।

समस्या निवारण

  • 403 Forbidden: अमान्य प्रमाणीकरण या अपर्याप्त अनुमतियाँ।
  • 420 Enhance Your Calm: रेट लिमिट लागू है; प्रतीक्षा करें और फिर से प्रयास करें।
  • No Data: get_rules() से नियम जाँचें; सुनिश्चित करें कि मेल खाने वाली पोस्ट्स मौजूद हैं। Python XDK का उपयोग करने वाले विस्तृत कोड उदाहरणों के लिए, हमारा code samples GitHub repo देखें। अधिक उदाहरणों और API संदर्भ के लिए, इनलाइन docstrings (जैसे, help(client.tweets.search_recent)) या स्रोत में जनरेट किए गए stubs देखें। GitHub repo के माध्यम से अपनी प्रतिक्रिया साझा करें।