Geração em Lote no WaveSpeed: Execute 1.000+ Solicitações de Imagem Diárias com Confiança

Geração em Lote no WaveSpeed: Execute 1.000+ Solicitações de Imagem Diárias com Confiança

Olá, pessoal! Sou a Dora. Tudo começou com um pequeno incômodo: eu precisava de algumas centenas de imagens variantes para um teste, e meu usual loop de requisição única parecia empurrar um carrinho de compras com uma roda presa. Eu ouvia constantemente que a Batch Generation no WaveSpeed conseguiria lidar com volume. Eu não precisava de fireworks. Eu apenas queria que o trabalho parecesse mais leve.

Então, ao longo de algumas sessões no final de dezembro e novamente esta semana, configurei um pipeline batch simples no WaveSpeed e pedi para executar mais de 1.000 requisições de imagem. Nada de heroísmo, apenas throughput constante, estados claros e retentativas limpas. Abaixo está a forma que funcionou para mim, as partes que atrapalharam e as pequenas escolhas que mantiveram custos e erros sem aumentar enquanto minha atenção estava em outro lugar.

Visão Geral da Arquitetura de Batch

Produtor / Fila / Worker / Armazenamento

Mantive as peças simples de propósito. Um pequeno script produtor coleta prompts e metadados, uma fila mantém os trabalhos, workers stateless chamam a API de imagem do WaveSpeed, e o armazenamento recebe os resultados. Cada parte pode falhar sem derrubar todo o sistema.

  • Produtor: Lê um CSV de prompts e configurações por imagem (modelo, tamanho, seed). Escreve um trabalho por linha na fila com uma chave de idempotência e um prazo flexível.
  • Fila: Usei Redis Streams uma vez e RabbitMQ outra. Ambos funcionaram. Se você já não executa nenhum dos dois, Redis é mais leve para começar.
  • Workers: Processos em container que puxam trabalhos, chamam WaveSpeed, escrevem resultados no armazenamento de objetos (usei S3), e atualizam o status. Eles são stateless, então scaling é apenas um botão, não uma reconstrução.
  • Armazenamento: Um bucket para imagens, outro para metadados JSON. Pastas simples por data e ID de batch mantêm tudo organizado.

O que me surpreendeu foi como pouco código mudou quando escalei de 100 para 1.200 imagens; a maioria dos problemas era sobre ritmo e proteção contra duplicatas, não throughput.

Diagrama de Arquitetura Simples

Aqui está o quadro que mantive na cabeça:

Produtor → Fila → Workers → API WaveSpeed → Armazenamento
             ↓                      ↑
          BD de Estado ←────────→ Métricas/Alertas
  • O BD de Estado pode ser o mesmo Redis ou uma tabela Postgres leve.
  • Métricas alimentam alertas quando taxas de erro ou custos ficam estranhos.

Não é engenhoso. Esse é o ponto. Quando a API retorna intermitentemente 429/5xx, a fila absorve. Quando um worker morre no meio da execução, outro o pega após o timeout de visibilidade.

Estratégia de Concorrência

Níveis de paralelismo seguro

Nas minhas execuções, comecei com 5 workers, cada um fazendo 2 requisições em voo. Isso me deu um constante 8–10 imagens/minuto sem acionarmos limites. Aumentar para 20 requisições concorrentes funcionou brevemente, depois vi retentativas dispararem. A melhor configuração não era o pico mais rápido, era a média mais plana.

Se você estiver tentando isso: encontre o menor número de workers que mantém sua fila sem crescer. Depois aumente gradualmente. Monitore a latência p95 e as taxas de erro por 10–15 minutos antes de mexer em qualquer coisa.

Conscientização de limite de taxa

WaveSpeed publica orientação de limite de taxa na documentação, mas os limites ainda variam com modelo e conta. Adicionei dois amortecedores:

  • Token bucket do lado do cliente: Cada worker adquire um token antes de chamar a API. Tokens se reenchemem no RPS efetivo do plano. Quando mudei de modelo, ajustei o reenchimento.
  • Disciplina de backoff: 429 e 5xx acionam backoff exponencial com jitter, limitado a 30 segundos. Isso evitou stampedes após interrupções breves.

Também etiquetei cada trabalho com modelo + tamanho para que pudesse definir limites de concorrência separados por modelo quando necessário. Não é fancy, apenas um pequeno switch statement, mas ajudou a evitar hot spots localizados.

Retry & Idempotência

Evitando imagens duplicadas

Meu primeiro batch tinha um bug silencioso: um worker caiu após a chamada da API retornar, antes de escrever no armazenamento. A fila retentou o trabalho e acabei pagando por duas gerações idênticas. Não foi divertido.

Para parar isso, tornei o caminho de armazenamento determinístico a partir da chave de idempotência e do hash de entrada. Se uma retentativa encontrar a imagem já escrita, ela faz short-circuit e apenas marca o trabalho como sucesso. Correção barata, grande alívio.

Implementação de chave de idempotência

Usei um SHA-256 do prompt normalizado + modelo + seed + tamanho + guidance. Esse hash se torna:

  • a chave de idempotência da API (enviada em um cabeçalho ou campo de payload, dependendo do SDK)
  • o prefixo do nome do arquivo de armazenamento
  • a chave primária do banco de dados para o trabalho

Se a API do WaveSpeed respeita chaves de idempotência (verifique a documentação do seu endpoint), chamadas repetidas com a mesma chave retornam o mesmo resultado sem cobranças extras. Se não, a verificação de armazenamento primeiro ainda evita duplicatas que você pagaria duas vezes.

Recuperação de trabalho falhado

Nem toda falha merece outra tentativa. Minha regra de ouro:

  • Retente: 429, 5xx, timeouts de rede, “model busy”, ou erros transitórios de armazenamento
  • Não retente: 4xx com erros de validação, parâmetros ausentes, ou entrada obviamente ruim

Limito retentativas a 5 com backoff exponencial. Depois disso, o trabalho cai em uma fila de letra morta com o payload de erro. Uma vez por dia, faço triage dos trabalhos DLQ: alguns ganham entrada corrigida e são refilados, outros são arquivados com uma nota. Isso manteve minha taxa de falha geral abaixo de 1,5% para uma execução de 1.200 imagens.

Gerenciamento de Estado do Trabalho

Status: pending/running/success/failed

Tentei algumas formas de estado. A mais simples permaneceu:

  • pending: na fila, ainda não alugada por um worker
  • running: alugada por um worker com expiração de aluguel
  • success: imagem e metadados escritos, verificações aprovadas
  • failed: terminal, com código de erro e timestamp da última tentativa

Adicionei dois campos opcionais que compensaram: attempt_count e last_response_code. Eles tornaram os dashboards mais legíveis e a depuração menos especulativa.

Tratamento de timeout do trabalho

Dois timeouts importam:

  • Timeout de aluguel: Se um worker morre no meio da execução, o trabalho deve retornar a pending após N segundos. Usei 120s.
  • Timeout da API: Se WaveSpeed não responder em N segundos, aborte e retente com backoff. Usei 60s por chamada.

Quando a API é lenta, esses dois podem conflitar. Para evitar trabalho duplicado, só marco running → pending após a expiração do aluguel e o heartbeat do worker parar. Heartbeats eram apenas uma atualização de hash Redis a cada 10 segundos. Se o heartbeat é fresco, estendo o aluguel.

Monitoramento & Alertas

Rastreamento da taxa de erro

Observei três números durante as execuções:

  • error_rate_5m: proporção móvel de 5 minutos de tentativas falhadas
  • p95_latency: por modelo, por tamanho
  • retry_depth: quantos trabalhos estão na tentativa ≥ 2

Se error_rate_5m > 5% por 10 minutos, eu cortava a concorrência pela metade automaticamente e me enviava uma nota. A maioria dos picos se acalmava dentro de cinco minutos sem mexidas manuais.

Alertas de pico de custo

Custos podem aumentar. Registrei:

  • cost_per_image: relatado pelo WaveSpeed se disponível, senão estimado a partir do plano
  • duplicate_prevented: contagem de short-circuits de armazenamento
  • total_estimated_cost: cumulativo

Quando cost_per_image pulava mais de 30% contra a média da última hora, eu pausava a entrada de novos trabalhos e deixava a fila drenar. Duas vezes, isso detectou mudanças de parâmetro não intencionais (tamanhos maiores, modelo diferente) antes da conta aumentar. Amortecedores silenciosos como este valem suas poucas linhas de código.


Implementação de Referência

Pseudo código Python
Abaixo está a forma que usei. Não é código completo, apenas os ossos:

# producer.py
for row in csv_rows:
    key = hash_inputs(row)
    job = { "id": key, "inputs": row, "deadline": now+6*3600 }
    queue.push(job)

# worker.py
while True:
    job = queue.lease(timeout=120)
    if not job:
        sleep(1)
        continue
    try:
        record_heartbeat(job.id)
        resp = wavespeed.generate_image(inputs=job.inputs, idempotency_key=job.id, timeout=60)
        path = storage_path(job.id, job.inputs)
        if not storage.exists(path):
            storage.write(path, resp.image)
            storage.write(path+'.json', resp.metadata)
        mark_success(job.id)
    except Retryable as e:
        mark_retry(job.id, e)
        backoff_sleep(job.attempt)
    except Fatal as e:
        mark_failed(job.id, e)
    finally:
        queue.release(job)

Pseudo código Node.js

// producer.mjs
for (const row of rows) {
    const key = hashInputs(row);
    queue.push({ id: key, inputs: row, deadline: Date.now() + 6 * 3600e3 }); // 6 hours
}


// worker.mjs
while (true) {
    const job = await queue.lease(120); // lease timeout in seconds

    if (!job) { // 原来的 ".job" 改为 "!job"
        await delay(1000);
        continue;
    }

    try {
        await heartbeat(job.id);

        const resp = await wavespeed.generateImage(
            { ...job.inputs, idempotencyKey: job.id },
            { timeout: 60000 } // 60 seconds
        );

        const path = makePath(job.id, job.inputs);

        if (!(await storage.exists(path))) { // 原来的 ".(await ...)" 修正
            await storage.write(path, resp.image);
            await storage.write(path + '.json', resp.metadata);
        }

        await markSuccess(job.id);
    } catch (e) {
        if (isRetryable(e)) {
            await markRetry(job.id, e);
        } else {
            await markFailed(job.id, e);
        }
    } finally {
        await queue.release(job);
    }
}

Recomendações de Configuração

  • Comece pequeno: 5–10 requisições concorrentes, depois aumente gradualmente. Monitore p95 e error_rate_5m, não apenas throughput.
  • Configurações separadas por modelo: concorrência, timeout e expectativas de custo mudam com modelo e tamanho.
  • Idempotência em todo lugar: chave na requisição, caminho de armazenamento determinístico e uma tabela de trabalho com a mesma chave.
  • Heartbeats e aluguéis: parecem chatos, mas economizam você de duplicatas fantasmagóricas.
  • Dashboards simples: 6–8 painéis são suficientes — comprimento da fila, sucessos/min, erros/min, p95, profundidade de retry e custo.

Se você já está executando batch jobs em outro lugar, isso parecerá familiar. WaveSpeed não exigiu uma reformulação, apenas alguns amortecedores cuidadosos. Isso é o que eu queria.

Uma última nota das minhas execuções: os batches mais suaves eram aqueles que eu mal observava. Não porque era “set and forget”, mas porque o sistema me dizia quando precisava de atenção e ficava quieto quando não precisava. Isso parece ser o tipo certo de velocidade.

E você? Você tem processado imagens em lote com WaveSpeed ultimamente? Qual é seu ponto ideal para concorrência (eu estou consistentemente em torno de 8–10 agora)? Ou você encontrou algum bug sorrateiro (como cobranças duplicadas)? Sinta-se à vontade para compartilhar sua configuração, armadilhas ou dicas nos comentários!