
Security News
Deno 2.2 Improves Dependency Management and Expands Node.js Compatibility
Deno 2.2 enhances Node.js compatibility, improves dependency management, adds OpenTelemetry support, and expands linting and task automation for developers.
This project/package aims to provide a simple python interface to the wica-http server. Check out the main branch to get the blocking version of the package
Install with pip
pip install asyncwica
Here are some simple examples to get you started:
import asyncio
import time
async def simple_example():
"""A simple example of how to use AsyncWicaStream. Run it in main by uncommenting it! """
wica_stream = WicaStream(base_url="http://student08/ca/streams", channels=["MMAC3:STR:2"])
async def run_stream():
await wica_stream.create()
async for message in wica_stream.subscribe():
print(message)
async def stop_stream():
await asyncio.sleep(10)
print(await wica_stream.destroy())
await asyncio.gather(run_stream(), stop_stream())
async def example_using_with():
""" An example using the compound statement async with and another method to exit the event loop. Run it in main by uncommenting it!"""
async with WicaStream(base_url="http://student08/ca/streams", channels=["MMAC3:STR:2"]) as stream:
i:int = 0
async for message in stream.subscribe():
i+=1
print(message)
if i == 25:
break
async def multistream_example():
""" An example of how to run multiple streams at once using aiostream. Run it in main by uncommenting it! """
from aiostream import stream
streams = []
async def run_streams():
for _ in range(10):
wica_stream = WicaStream(base_url="http://student08/ca/streams", channels=["MMAC3:STR:2"])
streams.append(wica_stream)
await wica_stream.create()
print("Doing someting else before starting the stream...")
await asyncio.sleep(5)
subscribed_streams = []
for wica_stream in streams:
print(f"Subscribing to stream {wica_stream.id}")
subscribed_streams.append(wica_stream.subscribe())
combine = stream.merge(*subscribed_streams)
async with combine.stream() as streamer:
async for item in streamer:
print(item)
continue
async def stop_streams():
await asyncio.sleep(25)
for wica_stream in streams:
print(await wica_stream.destroy())
await asyncio.gather(run_streams(), stop_streams())
async def main():
#await simple_example()
#await example_using_with()
#await multistream_example()
pass
if __name__ == "__main__":
asyncio.run(main())
Current Features:
Check out the wiki for more info!
To contribute, simply clone the project.
You can uses pip -r requirements.txt
or the make file to set up the project.
Currently None
If you have any questions pleas contract 'niklas.laufkoetter@psi.ch'
FAQs
A simple async python API to access wica-http SSE.
We found that asyncwica demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Deno 2.2 enhances Node.js compatibility, improves dependency management, adds OpenTelemetry support, and expands linting and task automation for developers.
Security News
React's CRA deprecation announcement sparked community criticism over framework recommendations, leading to quick updates acknowledging build tools like Vite as valid alternatives.
Security News
Ransomware payment rates hit an all-time low in 2024 as law enforcement crackdowns, stronger defenses, and shifting policies make attacks riskier and less profitable.