Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pyzeebe

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pyzeebe

Zeebe client api

  • 4.2.0
  • PyPI
  • Socket score

Maintainers
1

Compatible with: Camunda Platform 8

Coverage Status Test pyzeebe Integration test pyzeebe

GitHub tag (latest by date) PyPI PyPI - Python Version

Pyzeebe

pyzeebe is a python grpc client for Zeebe.

Zeebe version support:

Pyzeebe versionTested Zeebe versions
4.x.x8.2, 8.3, 8.4, 8.5
3.x.x1.0.0
2.x.x0.23, 0.24, 0.25, 0.26
1.x.x0.23, 0.24

Getting Started

To install:

pip install pyzeebe

For full documentation please visit: https://camunda-community-hub.github.io/pyzeebe/

Usage

Worker

The ZeebeWorker class gets jobs from the gateway and runs them.

import asyncio

from pyzeebe import ZeebeWorker, Job, JobController, create_insecure_channel


channel = create_insecure_channel(grpc_address="localhost:26500") # Create grpc channel
worker = ZeebeWorker(channel) # Create a zeebe worker


async def on_error(exception: Exception, job: Job, job_controller: JobController):
    """
    on_error will be called when the task fails
    """
    print(exception)
    await job_controller.set_error_status(job, f"Failed to handle job {job}. Error: {str(exception)}")


@worker.task(task_type="example", exception_handler=on_error)
def example_task(input: str) -> dict:
    return {"output": f"Hello world, {input}!"}


@worker.task(task_type="example2", exception_handler=on_error)
async def another_example_task(name: str) -> dict: # Tasks can also be async
    return {"output": f"Hello world, {name} from async task!"}

loop = asyncio.get_running_loop()
loop.run_until_complete(worker.work()) # Now every time that a task with type `example` or `example2` is called, the corresponding function will be called

Stop a worker:

await zeebe_worker.stop() # Stops worker after all running jobs have been completed

Client

from pyzeebe import ZeebeClient, create_insecure_channel

# Create a zeebe client
channel = create_insecure_channel(grpc_address="localhost:26500")
zeebe_client = ZeebeClient(channel)

# Run a Zeebe process instance
process_instance_key = await zeebe_client.run_process(bpmn_process_id="My zeebe process", variables={})

# Run a process and receive the result
process_instance_key, process_result = await zeebe_client.run_process_with_result(
    bpmn_process_id="My zeebe process",
    timeout=10000
)

# Deploy a BPMN process definition
await zeebe_client.deploy_resource("process.bpmn")

# Cancel a running process
await zeebe_client.cancel_process_instance(process_instance_key=12345)

# Publish message
await zeebe_client.publish_message(name="message_name", correlation_key="some_id")

Tests

Use the package manager pip to install pyzeebe

pytest tests/unit

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

Versioning

We use SemVer for versioning. For the versions available, see the tags on this repository.

License

We use the MIT license, see LICENSE.md for details

Keywords

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc