New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

wavespeed

Package Overview
Dependencies
Maintainers
3
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

wavespeed

WaveSpeedAI Python Client — Official Python SDK for WaveSpeedAI inference platform

pipPyPI
Version
1.0.8
Maintainers
3
WaveSpeedAI logo

WaveSpeedAI Python SDK

Official Python SDK for the WaveSpeedAI inference platform

🌐 Visit wavespeed.ai📖 Documentation💬 Issues

Installation

pip install wavespeed

API Client

Run WaveSpeed AI models with a simple API:

import wavespeed

output = wavespeed.run(
    "wavespeed-ai/z-image/turbo",
    {"prompt": "Cat"},
)

print(output["outputs"][0])  # Output URL

Authentication

Set your API key via environment variable (You can get your API key from https://wavespeed.ai/accesskey):

export WAVESPEED_API_KEY="your-api-key"

Or pass it directly:

from wavespeed import Client

client = Client(api_key="your-api-key")
output = client.run("wavespeed-ai/z-image/turbo", {"prompt": "Cat"})

Options

output = wavespeed.run(
    "wavespeed-ai/z-image/turbo",
    {"prompt": "Cat"},
    timeout=36000.0,       # Max wait time in seconds (default: 36000.0)
    poll_interval=1.0,     # Status check interval (default: 1.0)
    enable_sync_mode=False, # Single request mode, no polling (default: False)
)

Sync Mode

Use enable_sync_mode=True for a single request that waits for the result (no polling).

Note: Not all models support sync mode. Check the model documentation for availability.

output = wavespeed.run(
    "wavespeed-ai/z-image/turbo",
    {"prompt": "Cat"},
    enable_sync_mode=True,
)

Retry Configuration

Configure retries at the client level:

from wavespeed import Client

client = Client(
    api_key="your-api-key",
    max_retries=0,            # Task-level retries (default: 0)
    max_connection_retries=5, # HTTP connection retries (default: 5)
    retry_interval=1.0,       # Base delay between retries in seconds (default: 1.0)
)

Upload Files

Upload images, videos, or audio files:

import wavespeed

url = wavespeed.upload("/path/to/image.png")
print(url)

Serverless Worker

Build serverless workers for the WaveSpeed platform.

Basic Handler

import wavespeed.serverless as serverless

def handler(job):
    job_input = job["input"]
    result = job_input.get("prompt", "").upper()
    return {"output": result}

serverless.start({"handler": handler})

Async Handler

import wavespeed.serverless as serverless

async def handler(job):
    job_input = job["input"]
    result = await process_async(job_input)
    return {"output": result}

serverless.start({"handler": handler})

Generator Handler (Streaming)

import wavespeed.serverless as serverless

def handler(job):
    for i in range(10):
        yield {"progress": i, "partial": f"chunk-{i}"}

serverless.start({"handler": handler})

Input Validation

from wavespeed.serverless.utils import validate

INPUT_SCHEMA = {
    "prompt": {"type": str, "required": True},
    "max_tokens": {"type": int, "required": False, "default": 100},
    "temperature": {
        "type": float,
        "required": False,
        "default": 0.7,
        "constraints": lambda x: 0 <= x <= 2,
    },
}

def handler(job):
    result = validate(job["input"], INPUT_SCHEMA)
    if "errors" in result:
        return {"error": result["errors"]}

    validated = result["validated_input"]
    # process with validated input...
    return {"output": "done"}

Concurrent Execution

Enable concurrent job processing with concurrency_modifier:

import wavespeed.serverless as serverless

def handler(job):
    return {"output": job["input"]["data"]}

def concurrency_modifier(current_concurrency):
    return 2  # Process 2 jobs concurrently

serverless.start({
    "handler": handler,
    "concurrency_modifier": concurrency_modifier
})

Local Development

Test with JSON Input

# Using CLI argument
python handler.py --test_input '{"input": {"prompt": "hello"}}'

# Using test_input.json file (auto-detected)
echo '{"input": {"prompt": "hello"}}' > test_input.json
python handler.py

Running Tests

# Run all tests
python -m pytest

# Run a single test file
python -m pytest tests/test_api.py

# Run a specific test
python -m pytest tests/test_api.py::TestClient::test_run_success -v

FastAPI Development Server

python handler.py --waverless_serve_api --waverless_api_port 8000

Then use the interactive Swagger UI at http://localhost:8000/ or make requests:

# Synchronous execution
curl -X POST http://localhost:8000/runsync \
  -H "Content-Type: application/json" \
  -d '{"input": {"prompt": "hello"}}'

# Async execution
curl -X POST http://localhost:8000/run \
  -H "Content-Type: application/json" \
  -d '{"input": {"prompt": "hello"}}'

CLI Options

OptionDescription
--test_input JSONRun locally with JSON test input
--waverless_serve_apiStart FastAPI development server
--waverless_api_host HOSTAPI server host (default: localhost)
--waverless_api_port PORTAPI server port (default: 8000)
--waverless_log_level LEVELLog level (DEBUG, INFO, WARN, ERROR)

Environment Variables

API Client

VariableDescription
WAVESPEED_API_KEYWaveSpeed API key

Serverless Worker

VariableDescription
WAVERLESS_POD_IDWorker/pod identifier
WAVERLESS_API_KEYAPI authentication key
WAVERLESS_WEBHOOK_GET_JOBJob fetch endpoint
WAVERLESS_WEBHOOK_POST_OUTPUTResult submission endpoint

License

MIT

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts