WaveSpeed 批量生成:自信运行 1000+ 日均图像请求

WaveSpeed 批量生成:自信运行 1000+ 日均图像请求

你好!我是Dora。这一切始于一个小烦恼:我需要几百张变体图像来进行测试,而我通常的单请求循环感觉就像推着一辆轮子卡住的购物车。我一直听说WaveSpeed上的批量生成可以处理大量工作。我不需要什么花哨的功能,我只是想让工作感觉更轻松。

所以在12月下旬和本周的几个会话中,我在WaveSpeed上建立了一个简单的批处理管道,并让它运行超过1,000个图像请求。没有什么英雄壮举,只是稳定的吞吐量、清晰的状态和干净的重试。以下是对我有效的形状、阻碍我的部分,以及在我分散注意力时保持成本和错误不漂移的小选择。

批处理架构概述

生产者 / 队列 / 工作者 / 存储

我故意保持这些部分简单。一个小型生产者脚本收集提示和元数据,一个队列保存作业,无状态工作者调用WaveSpeed图像API,存储获取结果。每个部分都可以失败而不会拖累整个系统。

  • 生产者: 读取包含提示和每个图像设置(模型、尺寸、种子)的CSV文件。它使用幂等性密钥和软截止日期将每行写入队列中的一个作业。
  • 队列: 我曾经使用过Redis Streams一次,RabbitMQ一次。两者都有效。如果你还没有运行任何一个,Redis开始时更轻量级。
  • 工作者: 容器化进程,拉取作业,调用WaveSpeed,将结果写入对象存储(我使用了S3),并更新状态。它们是无状态的,所以扩展是一个旋钮,不是重建。
  • 存储: 一个图像桶,一个JSON元数据桶。按日期和批次ID的简单文件夹保持整洁。

令我惊讶的是,当我从100张图像扩展到1,200张图像时,代码变化很少;大多数问题是关于速度和防止重复,而不是吞吐量。

简单架构图

这是我脑海中保持的图景:

生产者 → 队列 → 工作者 → WaveSpeed API → 存储
          ↓                      ↑
       状态数据库 ←────────→ 指标/警报
  • 状态数据库可以是同一个Redis或轻量级Postgres表。
  • 指标在错误率或成本出现异常时提供警报。

这不聪明。那是关键。当API返回间歇性429/5xx时,队列会吸收。当工作者在运行中死亡时,另一个工作者在可见性超时后接手。

并发策略

安全的并行级别

在我的运行中,我从5个工作者开始,每个工作者执行2个飞行中请求。这给了我每分钟8-10张图像的稳定速率,不会触发限制。将并发请求提高到20次在短期内有效,但我看到重试激增。最好的设置不是最快的峰值,而是最平坦的平均值。

如果你要尝试这个:找到能让你的队列不增长的最小工作者数。然后逐步增加。观察p95延迟和错误率10-15分钟后再碰任何东西。

速率限制意识

WaveSpeed在文档中发布速率限制指南,但限制仍然随模型和账户而变化。我添加了两个防护栏:

  • 客户端令牌桶: 每个工作者在调用API之前获取一个令牌。令牌以计划的有效RPS补充。当我更改模型时,我调整了补充。
  • 回退纪律: 429和5xx触发指数回退与抖动,上限为30秒。这防止了短暂中断后的蜂拥。

我还用模型+尺寸标记每个作业,以便在需要时为每个模型设置单独的并发上限。这不是花哨的,只是一个小的switch语句,但它帮助避免了本地热点。

重试和幂等性

避免重复图像

我的第一个批次有一个隐静的bug:一个工作者在API调用返回后但在写入存储之前崩溃了。队列重试了该作业,我最终为两个相同的生成付款。不好玩。

为了阻止这种情况,我使存储路径从幂等性密钥和输入哈希确定性。如果重试发现图像已经写入,它就会短路并将作业标记为成功。便宜的修复,大的缓解。

幂等性密钥实现

我使用了规范化提示+模型+种子+尺寸+引导的SHA-256。该哈希变成:

  • API幂等性密钥(在标头或有效负载字段中发送,取决于SDK)
  • 存储文件名前缀
  • 作业的数据库主键

如果WaveSpeed的API尊重幂等性密钥(检查你的端点的文档),使用相同密钥的重复调用返回相同结果而不需要额外费用。如果不是,存储优先检查仍然防止你会为之付款两次的重复。

失败的作业恢复

并非每个失败都值得另一次尝试。我的经验法则:

  • 重试: 429、5xx、网络超时、“模型繁忙”或临时存储错误
  • 不重试: 4xx验证错误、缺失参数或明显错误的输入

我限制重试次数为5次,使用指数回退。之后,该作业进入死信队列并带有错误有效负载。每天一次,我对DLQ作业进行分类:一些获得修复的输入并重新排队,其他的被存档并附注。这将我整体1,200图像运行的失败率保持在1.5%以下。

作业状态管理

状态:pending/running/success/failed

我尝试了几种状态形状。最简单的粘住了:

  • pending: 排队,还没有被工作者租赁
  • running: 由具有租赁过期时间的工作者租赁
  • success: 图像和元数据已写入,检查通过
  • failed: 终端,带有错误代码和最后一次尝试时间戳

我添加了两个可选字段,值得注意:attempt_countlast_response_code。它们使仪表盘更易读,调试更少猜测。

作业超时处理

两个超时很重要:

  • 租赁超时: 如果工作者在运行中死亡,作业应该在N秒后返回pending。我使用了120秒。
  • API超时: 如果WaveSpeed在N秒内没有响应,中止并使用回退重试。我每次调用使用60秒。

当API很慢时,这两个可能会冲突。为了避免重复工作,我仅在租赁过期且工作者的心跳停止后才标记running→pending。心跳只是每10秒更新一次Redis哈希。如果心跳是新的,我延长租赁。

监控和警报

错误率跟踪

我在运行期间关注三个数字:

  • error_rate_5m:失败尝试的滚动5分钟比例
  • p95_latency:按模型、按大小
  • retry_depth:有多少作业在attempt ≥ 2上

如果error_rate_5m > 5%持续10分钟,我自动将并发减半并给自己发送一条笔记。大多数峰值在5分钟内没有手动调整就平复了。

成本激增警报

成本可能会悄悄增加。我记录了:

  • cost_per_image:WaveSpeed报告(如果可用),否则从计划估计
  • duplicate_prevented: 存储短路的计数
  • total_estimated_cost: 累计

cost_per_image相对于上一小时的平均值跳跃超过30%时,我暂停新作业摄入并让队列排干。有两次,这在账单漂移之前捕获了无意的参数更改(更大的尺寸、不同的模型)。像这样的安静防护栏值得它们的几行代码。


参考实现

Python伪代码
以下是我使用的形状。这不是完整的代码,只是框架:

# producer.py
for row in csv_rows:
    key = hash_inputs(row)
    job = { "id": key, "inputs": row, "deadline": now+6*3600 }
    queue.push(job)

# worker.py
while True:
    job = queue.lease(timeout=120)
    if not job:
        sleep(1)
        continue
    try:
        record_heartbeat(job.id)
        resp = wavespeed.generate_image(inputs=job.inputs, idempotency_key=job.id, timeout=60)
        path = storage_path(job.id, job.inputs)
        if not storage.exists(path):
            storage.write(path, resp.image)
            storage.write(path+'.json', resp.metadata)
        mark_success(job.id)
    except Retryable as e:
        mark_retry(job.id, e)
        backoff_sleep(job.attempt)
    except Fatal as e:
        mark_failed(job.id, e)
    finally:
        queue.release(job)

Node.js伪代码

// producer.mjs
for (const row of rows) {
    const key = hashInputs(row);
    queue.push({ id: key, inputs: row, deadline: Date.now() + 6 * 3600e3 }); // 6 hours
}


// worker.mjs
while (true) {
    const job = await queue.lease(120); // lease timeout in seconds

    if (!job) { // 原来的 ".job" 改为 "!job"
        await delay(1000);
        continue;
    }

    try {
        await heartbeat(job.id);

        const resp = await wavespeed.generateImage(
            { ...job.inputs, idempotencyKey: job.id },
            { timeout: 60000 } // 60 seconds
        );

        const path = makePath(job.id, job.inputs);

        if (!(await storage.exists(path))) { // 原来的 ".(await ...)" 修正
            await storage.write(path, resp.image);
            await storage.write(path + '.json', resp.metadata);
        }

        await markSuccess(job.id);
    } catch (e) {
        if (isRetryable(e)) {
            await markRetry(job.id, e);
        } else {
            await markFailed(job.id, e);
        }
    } finally {
        await queue.release(job);
    }
}

配置建议

  • 从小开始: 5-10个并发请求,然后逐步增加。观察p95error_rate_5m,不仅仅是吞吐量。
  • 每个模型的单独配置: 并发、超时和成本预期随模型和大小变化。
  • 到处都有幂等性: 请求中的密钥、确定性存储路径和用同一值键入的作业表。
  • 心跳和租赁: 它们听起来很讨厌,但它们可以保护你免受幻影重复。
  • 简单的仪表盘: 6-8个面板就足够了——队列长度、成功/分钟、错误/分钟、p95、重试深度和成本。

如果你已经在其他地方运行批处理作业,这会感觉很熟悉。WaveSpeed不需要重新思考,只需要一些仔细的防护栏。那就是我想要的。

我运行中的最后一条笔记:最平顺的批次是我几乎没有观察的那些。不是因为它是”设置后忘记”,而是因为系统在需要注意时告诉我,在它不需要时保持安静。那感觉就像是正确的速度类型。

你呢?你最近是否一直在用WaveSpeed批量处理图像?你的并发甜蜜点是什么(我目前一直稳定在8-10左右)?或者你遇到过任何狡猾的错误(比如重复收费)?欢迎在评论中分享你的设置、陷阱或提示!