Using in Python SDK

Using in Python SDK

Use the official Python SDK to run WaveSpeedAI models with just a few lines of code.

Install

pip install wavespeed

Set your API key

Get your key from https://wavespeed.ai/accesskey (see Authentication).

export WAVESPEED_API_KEY="your-api-key"

Or pass it directly via the client.

Run a model (one-liner)

import wavespeed
 
output = wavespeed.run(
    "wavespeed-ai/z-image/turbo",
    input={"prompt": "Cat"},
)
 
print(output["outputs"][0])  # Output URL

Use the Client explicitly

from wavespeed import Client
 
client = Client(api_key="your-api-key")  # or rely on WAVESPEED_API_KEY env
result = client.run(
    "wavespeed-ai/z-image/turbo",
    input={"prompt": "A cinematic cat, dramatic rim light"},
)
print(result["outputs"][0])

Options

import wavespeed
 
output = wavespeed.run(
    "wavespeed-ai/z-image/turbo",
    input={"prompt": "Cat"},
    timeout=36000.0,   # Max wait time in seconds (default: 36000.0)
    poll_interval=1.0, # Status check interval (default: 1.0)
)

Upload files

import wavespeed
 
url = wavespeed.upload("/path/to/image.png")
print(url)  # Use this URL as an input to models that accept image/video/audio

Serverless worker (optional)

Run logic on the WaveSpeed platform using the serverless helper.

Basic handler:

import wavespeed.serverless as serverless
 
def handler(job):
    job_input = job["input"]
    result = job_input.get("prompt", "").upper()
    return {"output": result}
 
serverless.start({"handler": handler})

Async handler:

import wavespeed.serverless as serverless
 
async def handler(job):
    job_input = job["input"]
    # await your async work here...
    return {"output": job_input}
 
serverless.start({"handler": handler})

Generator handler (streaming):

import wavespeed.serverless as serverless
 
def handler(job):
    for i in range(10):
        yield {"progress": i, "partial": f"chunk-{i}"}
 
serverless.start({"handler": handler})

Concurrency control:

import wavespeed.serverless as serverless
 
def handler(job):
    return {"output": job["input"]["data"]}
 
def concurrency_modifier(current_concurrency):
    return 2  # Process 2 jobs concurrently
 
serverless.start({
    "handler": handler,
    "concurrency_modifier": concurrency_modifier
})

Local development helpers (optional)

Run locally with a JSON test input:

python handler.py --test_input '{"input": {"prompt": "hello"}}'

Or place test_input.json in the same directory:

echo '{"input": {"prompt": "hello"}}' > test_input.json
python handler.py

Start a FastAPI dev server:

python handler.py --waverless_serve_api --waverless_api_port 8000

Test it:

# Synchronous
curl -X POST http://localhost:8000/runsync \
  -H "Content-Type: application/json" \
  -d '{"input": {"prompt": "hello"}}'
 
# Async
curl -X POST http://localhost:8000/run \
  -H "Content-Type: application/json" \
  -d '{"input": {"prompt": "hello"}}'

Environment variables

API client:

VariableDescription
WAVESPEED_API_KEYWaveSpeed API key

Serverless worker:

VariableDescription
WAVERLESS_POD_IDWorker/pod identifier
WAVERLESS_API_KEYAPI authentication key
WAVERLESS_WEBHOOK_GET_JOBJob fetch endpoint
WAVERLESS_WEBHOOK_POST_OUTPUTResult submission endpoint

References

  • Python SDK repository: https://github.com/WaveSpeedAI/wavespeed-python
  • Desktop App: see Using in Desktop App
© 2025 WaveSpeedAI. All rights reserved.