WaveSpeed 批量生成:自信地每天運行 1,000+ 張圖像請求

WaveSpeed 批量生成:自信地每天運行 1,000+ 張圖像請求

你好,各位!我是 Dora。這一切始於一個小煩惱:我需要為測試準備幾百個變體圖像,而我常用的單次請求迴圈就像推著一輛輪子卡住的購物車。我一直聽說 WaveSpeed 上的批次生成可以處理大量工作。我不需要花俏的功能,只是想讓工作輕鬆一點。

所以在十二月末和本週的幾次會議中,我在 WaveSpeed 上設置了一個簡單的批次管道,並要求它運行超過 1,000 個圖像請求。沒有英雄氣概,只是穩定的輸送量、清晰的狀態和乾淨的重試。以下是對我有效的方式、遇到的障礙,以及讓成本和錯誤不在我不注意時增加的小選擇。

批次架構概述

生成者/隊列/工作程序/存儲

我故意讓各部分保持簡單。一個小型生成者腳本收集提示詞和元數據,隊列保存作業,無狀態工作程序調用 WaveSpeed 圖像 API,存儲保存結果。每一部分都可以故障,而不會導致整個系統癱瘓。

  • 生成者: 讀取包含提示詞和每個圖像設置(模型、尺寸、種子)的 CSV,將每行作為一個帶有冪等性鍵和軟截止時間的作業寫入隊列。
  • 隊列: 我曾經使用過 Redis Streams,也用過 RabbitMQ。兩者都有效。如果你還沒有運行其中任何一個,Redis 的起點更輕量。
  • 工作程序: 容器化進程,拉取作業、調用 WaveSpeed、將結果寫入對象存儲(我使用了 S3)並更新狀態。它們是無狀態的,所以擴展是一個開關,而不是重建。
  • 存儲: 一個用於圖像的桶,一個用於 JSON 元數據的桶。按日期和批次 ID 的簡單資料夾保持整潔。

讓我驚訝的是,當我從 100 張擴展到 1,200 張圖像時,代碼改動很少;大多數問題都是關於節奏和防止重複,而不是輸送量。

簡單架構圖

這是我心裡的圖:

生成者 → 隊列 → 工作程序 → WaveSpeed API → 存儲
             ↓                      ↑
          狀態數據庫 ←────────→ 指標/警報
  • 狀態數據庫可以是相同的 Redis 或輕量級 Postgres 表。
  • 指標在錯誤率或成本變得奇怪時觸發警報。

這不聰明。這就是重點。當 API 返回間歇性的 429/5xx 時,隊列會吸收它。當工作程序在運行中死亡時,另一個工作程序會在可見性超時後接管。

並發策略

安全的並行級別

在我的運行中,我從 5 個工作程序開始,每個都有 2 個進行中的請求。這給了我穩定的 8-10 張圖像/分鐘,而不會觸發限制。提升到 20 個並發請求短暫有效,但之後我看到重試激增。最好的設置不是最快的峰值,而是最平穩的平均值。

如果你想試試:找到使隊列不增長的最小工作程序數。然後逐漸增加。在改動任何東西之前,觀察 p95 延遲和錯誤率 10-15 分鐘。

速率限制意識

WaveSpeed 在文檔中發布速率限制指南,但限制因模型和帳戶而異。我添加了兩個防護措施:

  • 客戶端令牌桶: 每個工作程序在調用 API 前獲取令牌。令牌按計劃的有效 RPS 補充。當我更改模型時,我調整了補充率。
  • 退避紀律: 429 和 5xx 觸發指數退避,帶 抖動,上限為 30 秒。這防止了短暫停機後的衝擊。

我還用模型 + 尺寸標記每個作業,以便在需要時為每個模型設置單獨的並發上限。這不是花哨的,只是一個小的 switch 語句,但它幫助避免了局部熱點。

重試和冪等性

避免重複圖像

我的第一個批次有一個隱患:工作程序在 API 調用返回後、寫入存儲前崩潰。隊列重試作業,我最終為兩個相同的生成付費。不太好。

為了阻止這種情況,我從冪等性鍵和輸入哈希值確定了存儲路徑。如果重試發現圖像已經寫入,它會短路並只將作業標記為成功。便宜的修復,大大的寬心。

冪等性鍵實現

我使用了規範化提示詞 + 模型 + 種子 + 尺寸 + 指導的 SHA-256:

  • API 冪等性鍵(在請求頭或負載欄位中發送,取決於 SDK)
  • 存儲文件名前綴
  • 作業的數據庫主鍵

如果 WaveSpeed 的 API 尊重冪等性鍵(檢查你的端點的文檔),用相同鍵的重複調用會返回相同結果而不產生額外費用。如果沒有,存儲優先檢查仍然防止你要付費兩次的重複。

失敗作業恢復

並非每個失敗都應該重試。我的經驗法則:

  • 重試: 429、5xx、網絡超時、「模型忙」或暫時性存儲錯誤
  • 不重試: 4xx 驗證錯誤、缺少參數或明顯的錯誤輸入

我限制重試次數為 5,採用指數退避。之後,作業進入死信隊列,帶有錯誤負載。每天一次,我對 DLQ 作業進行分類:有些獲得修復的輸入並重新進入隊列,其他的被存檔並帶有注釋。這讓我在 1,200 圖像運行中的整體失敗率保持在 1.5% 以下。

作業狀態管理

狀態:pending/running/success/failed

我嘗試了幾種狀態形式。最簡單的堅持了下來:

  • pending: 已排隊,還未被工作程序租用
  • running: 被工作程序租用,帶有租用過期時間
  • success: 圖像和元數據已寫入,檢查通過
  • failed: 終端狀態,帶有錯誤代碼和最後嘗試時間戳

我添加了兩個可選欄位,收益匪淺:attempt_countlast_response_code。它們使儀表板更易於閱讀,除錯不那麼猜測性。

作業超時處理

兩個超時重要:

  • 租用超時: 如果工作程序在運行中死亡,作業應在 N 秒後返回待機。我使用了 120 秒。
  • API 超時: 如果 WaveSpeed 在 N 秒內沒有響應,中止並用退避重試。我每次調用使用了 60 秒。

當 API 緩慢時,這兩個可能會衝突。為了避免重複工作,我只在租用過期且工作程序的心跳停止後將 running 標記為 pending。心跳只是 Redis 哈希每 10 秒更新一次。如果心跳是新鮮的,我會延長租用。

監控和警報

錯誤率追蹤

我在運行期間觀察了三個數字:

  • error_rate_5m:失敗嘗試的 5 分鐘滾動比例
  • p95_latency:每個模型、每個尺寸
  • retry_depth:有多少作業處於嘗試 ≥ 2 的狀態

如果 error_rate_5m > 5% 持續 10 分鐘,我自動將並發減半並向自己發送一條消息。大多數尖峰在 5 分鐘內穩定下來,無需手動調整。

成本尖峰警報

成本可能會增加。我記錄了:

  • cost_per_image: WaveSpeed 報告的(如果可用),否則從計劃估計
  • duplicate_prevented: 存儲短路計數
  • total_estimated_cost: 累計

cost_per_image 相比上一小時的平均值跳升超過 30% 時,我暫停新作業進入並讓隊列清空。兩次,這在賬單漂移之前捕捉到了無意中的參數更改(更大的尺寸、不同的模型)。像這樣的安靜防護欄值得花代碼。


參考實現

Python 偽代碼
以下是我使用的形式。這不是完整的代碼,只是骨架:

# producer.py
for row in csv_rows:
    key = hash_inputs(row)
    job = { "id": key, "inputs": row, "deadline": now+6*3600 }
    queue.push(job)

# worker.py
while True:
    job = queue.lease(timeout=120)
    if not job:
        sleep(1)
        continue
    try:
        record_heartbeat(job.id)
        resp = wavespeed.generate_image(inputs=job.inputs, idempotency_key=job.id, timeout=60)
        path = storage_path(job.id, job.inputs)
        if not storage.exists(path):
            storage.write(path, resp.image)
            storage.write(path+'.json', resp.metadata)
        mark_success(job.id)
    except Retryable as e:
        mark_retry(job.id, e)
        backoff_sleep(job.attempt)
    except Fatal as e:
        mark_failed(job.id, e)
    finally:
        queue.release(job)

Node.js 偽代碼

// producer.mjs
for (const row of rows) {
    const key = hashInputs(row);
    queue.push({ id: key, inputs: row, deadline: Date.now() + 6 * 3600e3 }); // 6 hours
}


// worker.mjs
while (true) {
    const job = await queue.lease(120); // lease timeout in seconds

    if (!job) { // 原來的 ".job" 改為 "!job"
        await delay(1000);
        continue;
    }

    try {
        await heartbeat(job.id);

        const resp = await wavespeed.generateImage(
            { ...job.inputs, idempotencyKey: job.id },
            { timeout: 60000 } // 60 seconds
        );

        const path = makePath(job.id, job.inputs);

        if (!(await storage.exists(path))) { // 原來的 ".(await ...)" 修正
            await storage.write(path, resp.image);
            await storage.write(path + '.json', resp.metadata);
        }

        await markSuccess(job.id);
    } catch (e) {
        if (isRetryable(e)) {
            await markRetry(job.id, e);
        } else {
            await markFailed(job.id, e);
        }
    } finally {
        await queue.release(job);
    }
}

配置建議

  • 小規模開始: 5-10 個並發請求,然後逐漸增加。觀察 p95error_rate_5m,而不只是輸送量。
  • 每個模型單獨配置: 並發、超時和成本預期隨著模型和尺寸而改變。
  • 冪等性無處不在: 請求中的鍵、確定性存儲路徑和用相同值作為鍵的作業表。
  • 心跳和租用: 它們聽起來很麻煩,但它們可以保護你免受虛幻重複。
  • 簡單儀表板: 6-8 個面板就夠了 — 隊列長度、成功/分鐘、錯誤/分鐘、p95、重試深度和成本。

如果你已經在其他地方運行批次作業,這將感到熟悉。WaveSpeed 不需要重新思考,只需要幾個仔細的防護措施。那就是我想要的。

我的運行中的最後一個說明:最順利的批次是我幾乎沒有看管的。不是因為它是「設置後忘記」,而是因為系統告訴我什麼時候需要注意,什麼時候保持安靜。那感覺像是正確的速度。

你呢?你最近在用 WaveSpeed 批量處理圖像嗎?你的並發甜蜜點是多少(我現在一直在 8-10 左右)?或者你遇到過任何隱蔽的漏洞(例如重複收費)?請在評論中分享你的設置、陷阱或提示!