Security News
Weekly Downloads Now Available in npm Package Search Results
Socket's package search now displays weekly downloads for npm packages, helping developers quickly assess popularity and make more informed decisions.
This project/package aims to provide a simple python interface to the wica-http server. Check out the main branch to get the blocking version of the package
Install with pip
pip install asyncwica
Here are some simple examples to get you started:
import asyncio
import time
async def simple_example():
"""A simple example of how to use AsyncWicaStream. Run it in main by uncommenting it! """
wica_stream = WicaStream(base_url="http://student08/ca/streams", channels=["MMAC3:STR:2"])
async def run_stream():
await wica_stream.create()
async for message in wica_stream.subscribe():
print(message)
async def stop_stream():
await asyncio.sleep(10)
print(await wica_stream.destroy())
await asyncio.gather(run_stream(), stop_stream())
async def example_using_with():
""" An example using the compound statement async with and another method to exit the event loop. Run it in main by uncommenting it!"""
async with WicaStream(base_url="http://student08/ca/streams", channels=["MMAC3:STR:2"]) as stream:
i:int = 0
async for message in stream.subscribe():
i+=1
print(message)
if i == 25:
break
async def multistream_example():
""" An example of how to run multiple streams at once using aiostream. Run it in main by uncommenting it! """
from aiostream import stream
streams = []
async def run_streams():
for _ in range(10):
wica_stream = WicaStream(base_url="http://student08/ca/streams", channels=["MMAC3:STR:2"])
streams.append(wica_stream)
await wica_stream.create()
print("Doing someting else before starting the stream...")
await asyncio.sleep(5)
subscribed_streams = []
for wica_stream in streams:
print(f"Subscribing to stream {wica_stream.id}")
subscribed_streams.append(wica_stream.subscribe())
combine = stream.merge(*subscribed_streams)
async with combine.stream() as streamer:
async for item in streamer:
print(item)
continue
async def stop_streams():
await asyncio.sleep(25)
for wica_stream in streams:
print(await wica_stream.destroy())
await asyncio.gather(run_streams(), stop_streams())
async def main():
#await simple_example()
#await example_using_with()
#await multistream_example()
pass
if __name__ == "__main__":
asyncio.run(main())
Current Features:
Check out the wiki for more info!
To contribute, simply clone the project.
You can uses pip -r requirements.txt
or the make file to set up the project.
Currently None
If you have any questions pleas contract 'niklas.laufkoetter@psi.ch'
FAQs
A simple async python API to access wica-http SSE.
We found that asyncwica demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Socket's package search now displays weekly downloads for npm packages, helping developers quickly assess popularity and make more informed decisions.
Security News
A Stanford study reveals 9.5% of engineers contribute almost nothing, costing tech $90B annually, with remote work fueling the rise of "ghost engineers."
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.