
Security News
Open Source Maintainers Feeling the Weight of the EU’s Cyber Resilience Act
The EU Cyber Resilience Act is prompting compliance requests that open source maintainers may not be obligated or equipped to handle.
This should be considered EXPERIMENTAL at the moment. At the moment, all I can say is that the test cases currently pass. I have not tested this for any real world use cases yet.
pip install temporal-python-sdk
Sample code for using this library can be found in Workflows in Python Using Temporal.
import asyncio
import logging
from datetime import timedelta
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "HelloActivity-python-tq"
NAMESPACE = "default"
# Activities Interface
class GreetingActivities:
@activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=1000))
async def compose_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
# Activities Implementation
class GreetingActivitiesImpl:
async def compose_greeting(self, greeting: str, name: str):
return greeting + " " + name + "!"
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def get_greeting(self, name: str) -> str:
raise NotImplementedError
# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)
pass
async def get_greeting(self, name):
return await self.greeting_activities.compose_greeting("Hello", name)
async def client_main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow)
result = await greeting_workflow.get_greeting("Python")
print(result)
print("Stopping workers.....")
await worker.stop()
print("Workers stopped......")
if __name__ == '__main__':
asyncio.run(client_main())
1.0
1.1
1.2
1.3
2.0
Post 2.0:
FAQs
Unofficial Python SDK for the Temporal Workflow Engine
We found that temporal-python-sdk demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
The EU Cyber Resilience Act is prompting compliance requests that open source maintainers may not be obligated or equipped to handle.
Security News
Crates.io adds Trusted Publishing support, enabling secure GitHub Actions-based crate releases without long-lived API tokens.
Research
/Security News
Undocumented protestware found in 28 npm packages disrupts UI for Russian-language users visiting Russian and Belarusian domains.