前言
前陣子因為某接案需求要研究 Medusa 這個開源的電商專案,發現底層設計的 Event Architecture 有參考到 Transactionally Staged Job Drain 這個事務處理機制,因此拜讀了一下這篇 2017 年的文章,當時作者也在 Hacker News 上與網友有不少討論,相信對於研究底層的設計可以讓自己功力提升,因而有了這篇文章的誕生.
問題描述:Event Queue + ACID
聲明:本篇取材自 Transactionally Staged Job Drain 的整理消化,圖片皆來自於此.
文章探討的情境是:
任務佇列(Job Queue)消化的速度太快,導致後台服務(background worker)在該事務 commit 前就嘗試取用,進而無法訪問到預期可用的資料,也就是如何搭配 ACID 與 Event Queue 解決上述事務的問題.
底下是一個顯而易見的例子:
BEGIN TRANSACTION;
/* db_op1 */
INSERT INTO USER(id, name, age, email) VALUES (1, 'Madi', 27, 'example@gamil.com');
/* queue_job */
-- Some enqueuer worker
/* db_op2 */
INSERT INTO USER(id, name, age, email) VALUES (2, 'Kevin', 20, 'example2@gamil.com');
COMMIT;
當 db_op1
插入一筆用戶紀錄,接著 queue_job
將該任務 enqueue 到佇列中,此時後台作業(ex:電子信箱服務)打算將該使用者加入白名單,但該電子信箱服務卻在將任務 dequeue 取出時,發現 DB 查不到該筆用戶紀錄
此外,也會面臨到一個問題:
在事務 commit 後送入佇列的這個階段,如果發生不預期的錯誤,導致該紀錄有被資料庫 persist,但該任務卻其實沒有完成.
解決方案
文中提出一個解法,那就是使用 Transactions as gates 的設計,透過 transactionally-staged job drain(事務階段的任務流失) 設計來優雅的將任務 dequeue.
在該模式下,任務不會立即發送到佇列,而是暫存於 RDBMS 的某個 table 中(ex: staged_jobs),等待事務真的確認準備好且 commit 後才會送入佇列,而事件在真的要被處理前,對於 enqueuer 來說是不可見的,因此不會被 enqueuer 送入佇列中.
對於 Rollback 來說,當某 job 隨著某 transaction 加入到 staged_job 的 table 中,但爾後被 discard,則該 job 也會一併被捨棄,確保資料的一致性.
對於 enqueuer 來說,即使中途因故停止運作,重啟後仍然能夠依據當前 staged_job 的 table 來重新處理 job,降低 job loss 的風險.但我認為即使 job loss 的風險能夠控管,面臨 at least once 的 event,下游服務仍須設計成 idempotent 冪等性,確保有能力應付接收重複事件時,整體系統的資料是一致的.
整體運作流程大致如下:
與 Database as IPC 設計模式比較
與另一個設計模式 Database as IPC(一般來說視為 anti-pattern)相比,transcationally-staged job drain 的設計模式展現的 performance 更好,因爲對於這種暫存資料,瘋狂的 lock 與 unlock 會拖垮 DB 的效能,對於長時間運行的事務來說,更是會造成 job queue 漸漸不受控.
相反的,transcationally-staged job drain 的設計模式透過批量(batch)的方式,將準備好的 jobs 轉移給另一個更適合分散任務的服務(ex: Redis),更適合應付需派送大量事務給具有競爭性的 workers 的情境.
Medusa 的 EventBusService 實作
了解這個機制後,再回到自己研究的 Medusa 專案,可以在 EventBusService 的實作中窺見 transactionally-staged job drain 的蹤跡
// ...
/**
* If we are in an ongoing transaction, we store the jobs in the database
* instead of processing them immediately. We only want to process those
* events, if the transaction successfully commits. This is to avoid jobs
* being processed if the transaction fails.
*
* In case of a failing transaction, kobs stored in the database are removed
* as part of the rollback.
*/
if (this.transactionManager_) {
const stagedJobRepository = this.transactionManager_.getCustomRepository(this.stagedJobRepository_)
const jobToCreate = {
event_name: eventName,
data: data as unknown as Record<string, unknown>,
options: opts,
} as Partial<StagedJob>
const stagedJobInstance = stagedJobRepository.create(jobToCreate)
return await stagedJobRepository.save(stagedJobInstance)
}
this.queue_.add({ eventName, data }, opts)
// ...
// 將任務推入job queue的enqueuer
async enqueuer_(): Promise<void> {
while (this.shouldEnqueuerRun) {
const listConfig = {
relations: [],
skip: 0,
take: 1000,
}
const stagedJobRepo = this.manager_.getCustomRepository(
this.stagedJobRepository_
)
const jobs = await stagedJobRepo.find(listConfig)
await Promise.all(
jobs.map((job) => {
this.queue_
.add(
{ eventName: job.event_name, data: job.data },
job.options ?? { removeOnComplete: true }
)
.then(async () => {
await stagedJobRepo.remove(job)
})
})
)
await sleep(3000) // 每三秒polling一次事件,使用上有發現latency有稍高的問題,這部分尚在研究如何解決中
}
}
可以看到 Medusa 用 repository pattern 實作一個 staged_job 的 repository 來與 DB 內的暫存任務表取用任務,當該 transaction 完成後,才會將任務推入 job queue,確保事件的原子性.
而底下 enqueuer 是每三秒去輪詢(poll)一次,這部分使用上有感受到稍高的 latency,追溯原始碼後才發現原來這三秒是固定不可配置的,未來再想想如何解決這個問題,目前就先簡單紀錄.
總結
這個設計模式的文章是 2017 年公布的,文中滿多在探討 Ruby on Rails 的生態系,和滿多 Ruby 界第三方套件的設計模式做比較,但我沒有寫過 Ruby,所以相對來說較陌生,但感覺這個設計概念應該是 language-agnostic 的,應該不受語言框架的束縛,而 Database as IPC 的設計近年來也漸漸被 Message Queue(ex: RabbitMQ, Kafka…)取代,將不同 process 之間的溝通,透過這類服務來承擔,也構築了當今微服務常見的事件傳遞的設計.