前言
FastAPI 是近幾年繼 Flask 之後更火紅的 Python web server,不僅能自動生成 OpenAPI-Swagger 的 API 文件,也增加了類似 Node.js 的 Async 異步機制,大幅的提升效能.儼然已成為 Python 開 API 的不二人選.但使用上有踩到 def 與 async def 的陷阱,所以有了這篇文章來解釋遇到的問題
Concurrency vs Parallelism
開始以前先複習一下,這兩個名詞是不等價的,描述的是不同的併發狀況
- Concurrency
- 單一時間點,單一 worker 一次只做一件事情,但會交錯切換來完成任務,旨在快速處理任務
- ex: 一個廚師同時切換工作項目來料理,如: 切菜, 煮飯, 燒水
- 多以 Thread 或 Async IO 來達成,因為 IO bound 的 task 更適合,例如: network 的連線等待或 file 的讀取
- Parallelism
- 單一時間點,有多個 worker 會同時做多件事情,所以並行的執行任務,旨在處理更多的任務
- ex: 銀行櫃檯有多個窗口,並行的處理任務
- 多以 Multiple Process 來達成,因為 CPU bound 的 task 更適合,例如: 大量耗時的 Machine Learning 矩陣計算
圖片來源: Async IO in Python: A Complete Walkthrough – Real Python
Coroutine vs Thread
對於 OS 來說,他只認得 Process 和 Thread,而 Coroutine 只是一個存在於單一 Thread 的 特化 function
- Coroutine
- 是一個可以暫停再繼續的 function
- 由運行程式的環境(ex: Python runtime)來管理,所以 OS 其實並不知道什麼是 coroutine,他只認得 Process 和 thread
- Python 中 Coroutine 是特化的 generator (generator 只能透過 yield 來取數據,但 coroutine 可允許使用者對他發送數據)
- 單一 Thread 中可以有多個 coroutine,所以可視為輕量的 thread,但本質上不是 thread
- 特性是 Cooperative multitasking (協同式多工),多個 coroutine 可隨著 user 的操作進行暫停與等待,接力運行任務
- Thread
- 由 OS 來管理,透過 context switching 來控制
- 一個 Process 中可有多個 Thread
- OS 調度的最小單元
- 特性是 Pre-emptive multitasking (搶佔式多工),會需要 lock 機制來避免 race-condition(但 Python 背後的 CPython 透過 GIL 鎖已經確保不會有競爭發生)
FastAPI
How Async IO?
FastAPI 是使用 Starlette 這個輕量的 ASGI framework 來搭建非同步的 web service,底層是用 AnyIO 這個 基於 asyncio 和 trio 之上的 library 作為 Network 和 concurency 的處理.並使用 async/await 這個語法糖來書寫非同步的程式碼.這個語法在 Node.js 也會看到,但其實底層設計截然不同
- Node.js
- Single Thread
- 採用 Google 用 C++開發的 V8 引擎作為內建的 event loop,再搭配 callback function 來達到異步
- 使用 C 語言的 libuv 函式庫來完成跨平台的 Async IO
- Node.js 初始運行就有預設的 event loop 在背後運行了
- Python
- Multiple Thread
- 透過 Coroutine 完成 Async IO
- 程式啟動時預設不會有 event loop,需要手動開啟才能使用(透過 asyncio 這個 library)
- FastAPI 底層就是靠 asyncio 來達到異步,透過一個 single thread 的 event loop 這種 concurrency model 來執行 coroutines
要用 async def 還是 def?
剛開始用 FastAPI 的時候,沒有看清楚官方文件,所以沒有搞懂這兩種差異,以為就跟 Node.js 的 async 一樣就好,結果 concurrent 運行後才發現 CPU-bound 的任務會阻塞其他 async 的 route
async def
- 丟到 event loop 來運行,完成 non-blocking IO
- 一旦函式內運行的是 CPU bound 的 task 就會造成阻塞,這時候其他用 async def 定義的 route path 也將會阻塞,等待 event queue 內的任務完成後才會被執行
- 對於 IO bound 的任務來說速度快
- 可以使用 await 語法來等待,但就會阻塞(不過對於像是 fetch 第三方 API 這種耗時的 Network 操作來說,用 await 阻塞是不可避免的)
def
- 丟到 Thread pool 來運行
- FastAPI 底層會自己幫我們 handle,額外開一個 Thread 處理函式定義的任務,不會阻塞與拖延當前的 main Thread
- 適合處理 CPU bound 的 task,因為不會阻塞,會馬上回 response
- 能開多少個 Thread 取決於 CPU 數量,預設每個 CPU 會開 5 個 Thread,所以對 8 core 的 CPU 來說,預設會開到最多 40 個 Thread(一旦超過,就會等待任務完成後,沒事做的 worker 就會繼續接任務做)
官方建議說,如果不知道該怎麼選擇,預設用 def
就好
async + sync 的 Experiment
建立四個 FastAPI 的 route,來看一下 async 和 sync 在以下情境的 concurrency 反應,每個都併發 100 個 requests 去請求,看最終耗時多久
from fastapi import FastAPI
from fastapi.responses import JSONResponse
import asyncio
import time
import uvicorn
app = FastAPI() # 建立後,會有一個worker thread處理event loop
@app.get("/async_slowest")
async def async_slowest():
# Mimic CPU bound task
time.sleep(5)
return JSONResponse(
status_code=200,
content="async with sync time sleep"
)
@app.get("/async_with_event_loop")
async def async_with_event_loop():
# Mimic CPU bound task, but run in event loop
loop = asyncio.get_event_loop()
# None can replaced with ThreadPoolExecutor
loop.run_in_executor(None, time.sleep, 1)
# Be aware that if use `await` here, it will block the IO
return JSONResponse(
status_code=200,
content="async run time sleep in event loop"
)
@app.get("/async_with_async_sleep")
async def async_with_async_sleep():
# Event loop will run other tasks while your await statement finishes its execution
await asyncio.sleep(1)
return JSONResponse(
status_code=200,
content="async with asyncio time sleep"
)
@app.get("/sync")
def sync_func():
# Mimic CPU bound task
time.sleep(1)
return JSONResponse(
status_code=200,
content="sync"
)
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
- 第一個
- 在 async 函式內跑 CPU-bound 的 task,
- 最慢,整體耗時 5*100=500 秒
- 第二個
- 為了解決上述 block 的問題,將 async 函式從 event loop 取出,把 CPU-bound 的 task 改丟到 thread pool 內運行
loop.run_in_executor(None, time.sleep, 1)
這行預設會開一個 Thread 來執行time.sleep(1)
- 如果想要開 multiple threads,也可以用
ThreadPoolExecutor
,詳細見官方文件 - 速度快,1 秒內回傳
- 第三個
- async 函式內不用 sync 的 time.sleep 做等待,因為這個 sync 等待會讓整個 execution flow 停擺,所以改使用 async 的 sleep,這樣就不會阻塞其他 async 的 route
- 速度快,等待一秒後回傳
- 第四個
- sync 的函式內用 sync 的 time.sleep,原本以為主程式會會被 block 住,沒想到並不會,原因出在剛剛提到的,def 的 route 背後是丟到 Thread pool 內,所以會額外開一個 Thread,不會阻塞 main thread
- 速度快,等待 3 秒後回傳 (因為併發 100 個 requests,但一次最多只有 40 個 thread 處理,所以 40+40+20 就會需要 3 秒來處理)
結論
以上就是 FastAPI 的 async def 與 sync def 的差異,了解兩個實作機制之後,再去看要處理的任務是 CPU-bound 還是 IO-bound,就會知道當 concurrency 處理後會不會被阻塞.
額外一提,CPU-bound 的任務或許架構上可以改為 sync request but async response
比如說有個情境是上傳一份圖片給後端當場做影像辨識,這時候 user 上傳後可能會需要在前端等待任務完成,如果此時有大量的請求,server 可能需要運行較久(不考慮水平擴展的話),前端 user 發的 requst 就會等非常久才拿到 response,但如果改為 async response,透過 Kafka 這種 message queue 來把完成事件透過 websocket 讓前端得知或通知 user 任務已完成,就可以免去讓 user 在窗口邊等待的不便.而 user 可以在 request 後拿到的 response 是這個 job 的 id,接下來在任務完成前都可以隨時呼叫另一隻 API 來查詢當前 job 的 status