Official Python SDK for Stream Chat
Official Python API client for Stream Chat, a service for building chat applications.
Explore the docs »
Code Samples
·
Report Bug
·
Request Feature
:bulb: Major update in v4.0 <
The returned response objects are instances of StreamResponse
class. It inherits from dict
, so it's fully backward compatible. Additionally, it provides other benefits such as rate limit information (resp.rate_limit()
), response headers (resp.headers()
) or status code (resp.status_code()
).
📝 About Stream
You can sign up for a Stream account at our Get Started page.
You can use this library to access chat API endpoints server-side.
For the client-side integrations (web and mobile) have a look at the JavaScript, iOS and Android SDK libraries (docs).
⚙️ Installation
$ pip install stream-chat
✨ Getting started
:bulb: The library is almost 100% typed. Feel free to enable mypy for our library. We will introduce more improvements in the future in this area.
from stream_chat import StreamChat
chat = StreamChat(api_key="STREAM_KEY", api_secret="STREAM_SECRET")
chat.upsert_user({"id": "chuck", "name": "Chuck"})
channel = chat.channel("messaging", "kung-fu")
channel.create("chuck")
channel.send_message({"text": "AMA about kung-fu"}, "chuck")
resp = chat.deactivate_user("bruce_lee")
print(type(resp))
print(resp["user"]["id"])
rate_limit = resp.rate_limit()
print(f"{rate_limit.limit} / {rate_limit.remaining} / {rate_limit.reset}")
headers = resp.headers()
print(headers)
status_code = resp.status_code()
print(status_code)
Async
import asyncio
from stream_chat import StreamChatAsync
async def main():
async with StreamChatAsync(api_key="STREAM_KEY", api_secret="STREAM_SECRET") as chat:
await chat.upsert_user({"id": "chuck", "name": "Chuck"})
channel = chat.channel("messaging", "kung-fu")
await channel.create("chuck")
await channel.send_message({"text": "AMA about kung-fu"}, "chuck")
resp = await chat.deactivate_user("bruce_lee")
print(type(resp))
print(resp["user"]["id"])
rate_limit = resp.rate_limit()
print(f"{rate_limit.limit} / {rate_limit.remaining} / {rate_limit.reset}")
headers = resp.headers()
print(headers)
status_code = resp.status_code()
print(status_code)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
✍️ Contributing
We welcome code changes that improve this library or fix a problem, please make sure to follow all best practices and add tests if applicable before submitting a Pull Request on Github. We are very happy to merge your code in the official repository. Make sure to sign our Contributor License Agreement (CLA) first. See our license file for more details.
Head over to CONTRIBUTING.md for some development tips.
🧑💻 We are hiring!
We've recently closed a $38 million Series B funding round and we keep actively growing.
Our APIs are used by more than a billion end-users, and you'll have a chance to make a huge impact on the product within a team of the strongest engineers all over the world.
Check out our current openings and apply via Stream's website.