Generación en lote en WaveSpeed: Ejecuta 1.000+ solicitudes de imágenes diarias con confianza

Generación en lote en WaveSpeed: Ejecuta 1.000+ solicitudes de imágenes diarias con confianza

¡Hola, gente! Soy Dora. Esto comenzó con una pequeña molestia: necesitaba algunos cientos de imágenes variantes para una prueba, y mi bucle habitual de una sola solicitud se sentía como empujar un carrito de compras con una rueda atascada. Seguía escuchando que la Generación por lotes en WaveSpeed podría manejar volumen. No necesitaba fuegos artificiales. Solo quería que el trabajo se sintiera más ligero.

Entonces, durante algunas sesiones a finales de diciembre y nuevamente esta semana, configuré un simple pipeline de lotes en WaveSpeed y le pedí que ejecutara más de 1,000 solicitudes de imagen. Sin heroicidades, solo rendimiento constante, estados claros y reintentos limpios. A continuación se encuentra la forma que funcionó para mí, las partes que se interpusieron en el camino y las pequeñas opciones que evitaron que los costos y errores se dispararan mientras mi atención estaba en otro lugar.

Descripción general de la arquitectura de lotes

Productor / Cola / Trabajador / Almacenamiento

Mantuve las piezas aburridas a propósito. Un pequeño script productor recopila indicaciones y metadatos, una cola mantiene trabajos, trabajadores sin estado llaman a la API de imágenes de WaveSpeed, y el almacenamiento recibe los resultados. Cada parte puede fallar sin derribar todo el sistema.

  • Productor: Lee un CSV de indicaciones y configuraciones por imagen (modelo, tamaño, semilla). Escribe un trabajo por fila en la cola con una clave de idempotencia y un límite suave.
  • Cola: Usé Redis Streams una vez y RabbitMQ otra. Ambos funcionaron. Si aún no ejecutas ninguno de los dos, Redis es más ligero para comenzar.
  • Trabajadores: Procesos en contenedores que extraen trabajos, llaman a WaveSpeed, escriben resultados en almacenamiento de objetos (usé S3) y actualizan el estado. Son sin estado, por lo que el escalado es un control, no una reconstrucción.
  • Almacenamiento: Un bucket para imágenes, uno para metadatos JSON. Carpetas simples por fecha e ID de lote mantienen todo organizado.

Lo que me sorprendió fue cuán poco código cambió cuando escalé de 100 a 1,200 imágenes; la mayoría de los problemas eran sobre ritmo y protección contra duplicados, no rendimiento.

Diagrama de arquitectura simple

Aquí está el cuadro que mantuve en mi cabeza:

Productor → Cola → Trabajadores → API de WaveSpeed → Almacenamiento
             ↓                           ↑
          BD de Estado ←────────→ Métricas/Alertas
  • La BD de Estado puede ser el mismo Redis o una tabla Postgres ligera.
  • Las Métricas alimentan alertas cuando las tasas de error o costos se vuelven extraños.

No es inteligente. Ese es el punto. Cuando la API devuelve 429/5xx intermitentes, la cola lo absorbe. Cuando un trabajador muere a mitad de la ejecución, otro lo recoge después del tiempo de visibilidad.

Estrategia de concurrencia

Niveles de paralelismo seguro

En mis ejecuciones, comencé con 5 trabajadores, cada uno haciendo 2 solicitudes simultáneas. Eso me dio un rendimiento constante de 8–10 imágenes/minuto sin exceder límites. Aumentar a 20 solicitudes concurrentes funcionó brevemente, luego vi que los reintentos se disparaban. La mejor configuración no era el pico más rápido, sino el promedio más plano.

Si lo intentas: encuentra el número más pequeño de trabajadores que evite que tu cola crezca. Luego avanza poco a poco. Observa la latencia p95 y las tasas de error durante 10–15 minutos antes de tocar nada.

Conciencia del límite de velocidad

WaveSpeed publica orientación sobre límites de velocidad en los documentos, pero los límites aún varían con el modelo y la cuenta. Agregué dos salvaguardias:

  • Cubo de tokens del lado del cliente: Cada trabajador adquiere un token antes de llamar a la API. Los tokens se recargan a las RPS efectivas del plan. Cuando cambié de modelo, ajusté la recarga.
  • Disciplina de retroceso: 429 y 5xx desencadenan un retroceso exponencial con jitter, limitado a 30 segundos. Esto evitó avalanchas después de cortes breves.

También etiqueté cada trabajo con modelo + tamaño para poder establecer límites de concurrencia separados por modelo si era necesario. No es sofisticado, solo una pequeña declaración switch, pero ayudó a evitar puntos calientes localizados.

Reintento e idempotencia

Evitar imágenes duplicadas

Mi primer lote tenía un error silencioso: un trabajador falló después de que la llamada de la API regresó, antes de escribir en el almacenamiento. La cola reintentó el trabajo y terminé pagando por dos generaciones idénticas. No fue divertido.

Para evitar esto, hice que la ruta de almacenamiento fuera determinista a partir de la clave de idempotencia y el hash de entrada. Si un reintento encuentra la imagen ya escrita, cortocircuita y simplemente marca el trabajo como exitoso. Una solución económica, un gran alivio.

Implementación de clave de idempotencia

Usé un SHA-256 de la indicación normalizada + modelo + semilla + tamaño + guía. Ese hash se convierte en:

  • la clave de idempotencia de la API (enviada en un encabezado o campo de carga útil, según el SDK)
  • el prefijo del nombre del archivo de almacenamiento
  • la clave principal de la base de datos para el trabajo

Si la API de WaveSpeed respeta las claves de idempotencia (verifica los documentos para tu punto final), las llamadas repetidas con la misma clave devuelven el mismo resultado sin cargos adicionales. Si no, la verificación de almacenamiento primero aún previene duplicados que pagarías dos veces.

Recuperación de trabajo fallido

No todo fracaso merece otro intento. Mi regla de oro:

  • Reintentar: 429, 5xx, tiempos de espera de red, “modelo ocupado” o errores transitorios de almacenamiento
  • No reintentar: 4xx con errores de validación, parámetros faltantes o entrada obvio mala

Limito los reintentos a 5 con retroceso exponencial. Después de eso, el trabajo cae en una cola de letras muertas con la carga útil de error. Una vez al día, reviso trabajos de DLQ: algunos obtienen entrada fija y se reintentaban, otros se archivan con una nota. Esto mantuvo mi tasa de fallo general por debajo del 1.5% para una ejecución de 1,200 imágenes.

Gestión del estado del trabajo

Estado: pendiente/ejecutándose/exitoso/fallido

Intenté algunas formas de estado. La más simple se mantuvo:

  • pendiente: en cola, aún no arrendado por un trabajador
  • ejecutándose: arrendado por un trabajador con expiración del arrendamiento
  • exitoso: imagen y metadatos escritos, verificaciones pasadas
  • fallido: terminal, con código de error e marca de tiempo del último intento

Agregué dos campos opcionales que valieron la pena: attempt_count y last_response_code. Hicieron los paneles más legibles y la depuración menos adivinatoria.

Manejo del tiempo de espera del trabajo

Dos tiempos de espera importan:

  • Tiempo de espera de arrendamiento: Si un trabajador falla a mitad de la ejecución, el trabajo debe volver a pendiente después de N segundos. Usé 120s.
  • Tiempo de espera de la API: Si WaveSpeed no responde en N segundos, aborta y reintenta con retroceso. Usé 60s por llamada.

Cuando la API es lenta, estos dos pueden pelear. Para evitar trabajo duplicado, solo marco ejecutándose → pendiente después de que expire el arrendamiento y se detenga el latido del trabajador. Los latidos eran solo una actualización de hash Redis cada 10 segundos. Si el latido es fresco, extiendo el arrendamiento.

Monitoreo y alertas

Seguimiento de la tasa de error

Observé tres números durante las ejecuciones:

  • error_rate_5m: proporción rodante de 5 minutos de intentos fallidos
  • p95_latency: por modelo, por tamaño
  • retry_depth: cuántos trabajos están en intento ≥ 2

Si error_rate_5m > 5% durante 10 minutos, reduzco automáticamente la concurrencia a la mitad y me envío una nota. La mayoría de los picos se resolvieron dentro de cinco minutos sin ajustes manuales.

Alertas de picos de costos

Los costos pueden aumentarse gradualmente. Registré:

  • cost_per_image: informado por WaveSpeed si está disponible, sino estimado del plan
  • duplicate_prevented: conteo de cortocircuitos de almacenamiento
  • total_estimated_cost: acumulativo

Cuando cost_per_image saltó más del 30% contra el promedio de la última hora, pausé la ingesta de nuevos trabajos y dejé que la cola se drenara. Dos veces, esto capturó cambios de parámetros no intencionales (tamaños más grandes, modelo diferente) antes de que la factura se desviara. Las salvaguardias silenciosas como esta valen por sus pocas líneas de código.


Implementación de referencia

Seudocódigo Python
A continuación se presenta la forma que usé. No es código completo, solo la estructura:

# producer.py
for row in csv_rows:
    key = hash_inputs(row)
    job = { "id": key, "inputs": row, "deadline": now+6*3600 }
    queue.push(job)

# worker.py
while True:
    job = queue.lease(timeout=120)
    if not job:
        sleep(1)
        continue
    try:
        record_heartbeat(job.id)
        resp = wavespeed.generate_image(inputs=job.inputs, idempotency_key=job.id, timeout=60)
        path = storage_path(job.id, job.inputs)
        if not storage.exists(path):
            storage.write(path, resp.image)
            storage.write(path+'.json', resp.metadata)
        mark_success(job.id)
    except Retryable as e:
        mark_retry(job.id, e)
        backoff_sleep(job.attempt)
    except Fatal as e:
        mark_failed(job.id, e)
    finally:
        queue.release(job)

Seudocódigo Node.js

// producer.mjs
for (const row of rows) {
    const key = hashInputs(row);
    queue.push({ id: key, inputs: row, deadline: Date.now() + 6 * 3600e3 }); // 6 hours
}


// worker.mjs
while (true) {
    const job = await queue.lease(120); // lease timeout in seconds

    if (!job) { // 原来的 ".job" 改为 "!job"
        await delay(1000);
        continue;
    }

    try {
        await heartbeat(job.id);

        const resp = await wavespeed.generateImage(
            { ...job.inputs, idempotencyKey: job.id },
            { timeout: 60000 } // 60 seconds
        );

        const path = makePath(job.id, job.inputs);

        if (!(await storage.exists(path))) { // 原来的 ".(await ...)" 修正
            await storage.write(path, resp.image);
            await storage.write(path + '.json', resp.metadata);
        }

        await markSuccess(job.id);
    } catch (e) {
        if (isRetryable(e)) {
            await markRetry(job.id, e);
        } else {
            await markFailed(job.id, e);
        }
    } finally {
        await queue.release(job);
    }
}

Recomendaciones de configuración

  • Comienza pequeño: 5–10 solicitudes concurrentes, luego avanza poco a poco. Observa p95 y error_rate_5m, no solo rendimiento.
  • Configuraciones separadas por modelo: la concurrencia, el tiempo de espera y las expectativas de costos cambian con el modelo y el tamaño.
  • Idempotencia en todas partes: clave en la solicitud, ruta de almacenamiento determinista y una tabla de trabajos con clave en el mismo valor.
  • Latidos y arrendamientos: suenan exigentes, pero te ahorran de duplicados fantasma.
  • Paneles simples: 6–8 paneles son suficientes — longitud de cola, éxitos/min, errores/min, p95, profundidad de reintento y costo.

Si ya estás ejecutando trabajos por lotes en otro lugar, esto te resultará familiar. WaveSpeed no requirió un replanteamiento, solo algunas salvaguardias cuidadosas. Eso es lo que quería.

Una última nota de mis ejecuciones: los lotes más suave eran aquellos que apenas miraba. No porque fuera “configura y olvida”, sino porque el sistema me decía cuándo necesitaba atención y se mantenía callado cuando no. Eso se siente como el tipo correcto de velocidad.

¿Y tú? ¿Has procesado imágenes por lotes con WaveSpeed recientemente? ¿Cuál es tu punto óptimo para la concurrencia (estoy constantemente alrededor de 8–10 en este momento)? ¿O has encontrado algún error astuto (como cargos duplicados)? ¡Siéntete libre de compartir tu configuración, obstáculos o consejos en los comentarios!