Build Worker

Build Serverless Worker

Learn how to write handler code and build a worker image for Waverless.

Overview

A worker is a container that:

  1. Connects to Waverless
  2. Pulls tasks from the queue
  3. Executes your handler function
  4. Returns results

RunPod Compatibility

Waverless is fully compatible with RunPod. If you have existing RunPod workers, you can migrate with zero code changes — just update the environment variables.

Basic Handler

Create a file called handler.py:

import runpod
 
def handler(job):
    """
    Main handler function.
 
    Args:
        job: Dict containing 'id' and 'input'
 
    Returns:
        Dict with results
    """
    job_input = job.get("input", {})
    prompt = job_input.get("prompt", "")
 
    # Your processing logic here
    result = process(prompt)
 
    return {"output": result}
 
def process(prompt):
    # Your AI model inference
    return f"Processed: {prompt}"
 
# Start the worker
runpod.serverless.start({"handler": handler})

Handler Input/Output

Input Structure

job = {
    "id": "task_abc123",
    "input": {
        "prompt": "User input here",
        "other_param": "value"
    }
}

Output Structure

Return a dictionary:

# Success
return {"output": "result here"}
 
# With metadata
return {
    "output": "result",
    "metadata": {"processing_time": 1.5}
}

Error Handling

def handler(job):
    try:
        result = process(job["input"])
        return {"output": result}
    except Exception as e:
        return {"error": str(e)}

Streaming Output

For long-running tasks, use generator functions to stream results:

def handler(job):
    job_input = job.get("input", {})
 
    # Yield intermediate results
    for i in range(5):
        yield {"status": "processing", "step": i}
 
    # Final result
    yield {"output": "completed"}
 
runpod.serverless.start({"handler": handler})

Concurrency Control

Control how many tasks a worker processes simultaneously:

def concurrency_modifier(current_concurrency):
    """Return desired concurrency level."""
    return 2  # Process 2 tasks at once
 
runpod.serverless.start({
    "handler": handler,
    "concurrency_modifier": concurrency_modifier
})

Dockerfile

Create a Dockerfile for your worker:

FROM python:3.10-slim
 
WORKDIR /app
 
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
# Copy handler
COPY handler.py .
 
# Start worker
CMD ["python", "handler.py"]

requirements.txt:

runpod>=1.3.0
# your other dependencies

Build and Push

# Build image
docker build -t your-registry/your-worker:latest .
 
# Push to registry
docker push your-registry/your-worker:latest

Environment Variables

Waverless automatically sets these environment variables:

VariableDescription
RUNPOD_WEBHOOK_GET_JOBURL to pull tasks
RUNPOD_WEBHOOK_PINGURL for heartbeat
RUNPOD_POD_IDWorker ID

You can also set custom environment variables in your endpoint configuration.

Best Practices

1. Load Models at Startup

import runpod
 
# Load model once at startup
model = load_model()
 
def handler(job):
    # Use pre-loaded model
    return {"output": model.generate(job["input"]["prompt"])}
 
runpod.serverless.start({"handler": handler})

2. Handle Timeouts

import signal
 
def timeout_handler(signum, frame):
    raise TimeoutError("Task timeout")
 
def handler(job):
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(300)  # 5 minute timeout
 
    try:
        result = long_running_process(job["input"])
        return {"output": result}
    finally:
        signal.alarm(0)

3. Validate Input

def handler(job):
    job_input = job.get("input", {})
 
    # Validate required fields
    if "prompt" not in job_input:
        return {"error": "Missing required field: prompt"}
 
    # Validate types
    if not isinstance(job_input["prompt"], str):
        return {"error": "prompt must be a string"}
 
    return {"output": process(job_input["prompt"])}

4. Log Progress

import logging
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
def handler(job):
    logger.info(f"Starting job: {job['id']}")
 
    result = process(job["input"])
 
    logger.info(f"Completed job: {job['id']}")
    return {"output": result}

Testing Locally

Test your handler before deploying:

# test_handler.py
from handler import handler
 
test_job = {
    "id": "test_123",
    "input": {
        "prompt": "Test input"
    }
}
 
result = handler(test_job)
print(result)

Run:

python test_handler.py
© 2025 WaveSpeedAI. All rights reserved.