FastAPI 速查表
FastAPI是一个用于构建API的现代、高性能的Web框架,使用Python 3.6+并基于标准的Python,该文档为用户提供FastAPI的速查表,帮助用户快速入门。
入门
安装 FastAPI
$ pip install "fastapi[all]"
可以分开来安装
假如你想将应用程序部署到生产环境,你可能要执行以下操作:
$ pip install fastapi
并且安装 uvicorn 来作为服务器:
$ pip install "uvicorn[standard]"
运行代码
$ uvicorn main:app --reload
Python: 3.9.5 FastAPI: 0.103.1
最小程序
下面代码会直接启动http服务,也可以使用 uvicorn main:app --reload
from fastapi import FastAPI
import uvicorn
app = FastAPI()
添加一个 API 的示例
# http://127.0.0.1:8000/
@app.get("/")
async def root():
    return {"message": "Hello World"}
if __name__ == '__main__':
    uvicorn.run(app='main:app', reload=True)
路径参数
最基本的路径参数
# http://127.0.0.1:8000/items/1
@app.get("/items/{item_id}")
async def read_item(item_id):
    return {"item_id": item_id} # item_id自定义
多个路径参数
# http://127.0.0.1:8000/items/1/2
@app.get("/items/{item_id}/{user_id}")
async def read_item(item_id, user_id):
    return {"item_id": item_id, "user_id": user_id}
有类型的路径参数
# http://127.0.0.1:8000/items/1
@app.get("/items/{item_id}")
async def read_item(item_id: int):
    return {"item_id": item_id}
文件路径参数
# http://127.0.0.1:8000/file//home/my/my.txt
@app.get("/file/{file_path:path}")
async def read_item(file_path):
    return {"file_path": file_path}
查询参数
带默认值的查询参数
# http://127.0.0.1:8000/items/?skip=0&limit=2
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}]
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
    return fake_items_db[skip: skip + limit]
可选查询参数
# http://127.0.0.1:8000/items/1?q=admin
from typing import Union
@app.get("/items/{item_id}")
async def read_item(item_id: str, q: Union[str, None] = None):
    if q:
        return {"item_id": item_id, "q": q}
    return {"item_id": item_id}
多路径多查询参数
# http://127.0.0.1:8000/users/1/items/2
# or 
# http://127.0.0.1:8000/users/1/items/2?q=query&short=true
@app.get("/users/{user_id}/items/{item_id}")
async def read_user_item(
    user_id: int,
    item_id: str,
    q: Union[str, None] = None,
    short: bool = False
):
    item = {"item_id": item_id, "owner_id": user_id}
    if q:
        item.update({"q": q})
    if not short:
        item.update(
            {"description": "这是一个令人惊叹的项目,有很长的描述"}
        )
    return item
必需查询参数
# http://127.0.0.1:8000/items/123?needy=yes
@app.get("/items/{item_id}")
async def read_user_item(item_id: str, needy: str):
    item = {"item_id": item_id, "needy": needy}
    return item
请求体
from pydantic import BaseModel
from typing import Union
class Item(BaseModel):
    name: str = '小明'
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None
@app.post("/items/")
async def create_item(item: Item):
    print(item.name)
    return item
调用
curl -X 'POST' \
  'http://127.0.0.1:8000/items/' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "小明",
  "description": "string",
  "price": 0,
  "tax": 0
}'
查询参数和字符串校验
from fastapi import Query
@app.get("/items/")
async def read_items(
    q: Union[str, None] = Query(default=None, max_length=50)
):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results
参数列表
| 参数 | 含义 | 类型 | 
|---|---|---|
| default | 默认值 | 任意类型或... | 
| max_length | 最大长度 | int | 
| min_length | 最小长度 | int | 
| pattern | 正则匹配 | string | 
| alias | 别名参数 | string | 
| deprecated | 准备弃用参数 | bool | 
多个相同的查询参数
# http://127.0.0.1:8000/items/?q=foo&q=bar
@app.get("/items/")
async def read_items(
    q: Union[List[str], None] = Query(default=None)
):
    query_items = {"q": q}
    return query_items
路径参数和数值校验
Path 用法基本和 Query 相同,参考:FastAPI官方文档
导入 Path
from fastapi import FastAPI, Path, Query
from typing_extensions import Annotated
声明元数据
@app.get("/items/{item_id}")
async def read_items(
    item_id: Annotated[int, Path(title="要获取的项目的 ID")],
    q: Annotated[str | None, Query(alias="item-query")] = None,
):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    return results
参数列表
| 参数 | 含义 | 类型 | 
|---|---|---|
| ... | 和 Query 具有相同参数 | ... | 
| ge | 大于等于 | int float | 
| gt | 大于 | int float | 
| le | 小于等于 | int float | 
| le | 小于等于 | int float | 
| title | api文档的标题 | string | 
其他参数
都具有 Query 的参数,max_length、min_length 等
Cookie参数
from fastapi import Cookie
@app.get("/items/")
async def read_items(
    ads_id: Annotated[Union[str, None], Cookie()] = None
):
    return {"ads_id": ads_id}
Header 参数
from fastapi import Header
@app.get("/items/")
async def read_items(
    user_agent: Annotated[Union[str, None], Header()] = None,
    items_id: Annotated[Union[int, None], Header(ge=1)] = None
):
    return {"User-Agent": user_agent, "items_id": items_id}
表单数据
接收的不是 JSON,而是表单字段时,要使用 Form。
安装
$ pip install python-multipart
HTML
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
</head>
<body>
<form method="post" action="http://127.0.0.1:8000/login">
    <span>账号:</span><input type="text" name="username">
    <br>
    <span>密码:</span><input type="password" name="password">
    <br>
    <input type="submit" value="登录">
</form>
</body>
</html>
FastAPI
from fastapi import FastAPI, Form
import uvicorn
app = FastAPI()
@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):
    return {"username": username}
if __name__ == '__main__':
    uvicorn.run(app='main:app', reload=True)
文件上传
from fastapi import FastAPI, UploadFile
from fastapi.responses import HTMLResponse
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile):
    print(file.file.read().decode())
    return {"filenames": file.filename, "type": str(type(file.file))}
@app.get("/")
async def main():
    content = """<body>
<form action="/uploadfile/" enctype="multipart/form-data" method="post">
<input name="file" type="file" multiple>
<input type="submit">
</form>
</body>"""
    return HTMLResponse(content=content)
UploadFile 属性
| 属性名 | 含义 | 返回 | 
|---|---|---|
| filename | 文件名 | 上传的文件名 | 
| content_type | 内容类型 | MIME类型 | 
| file | 文件 | SpooledTemporaryFile 具有 read,write方法 | 
UploadFile async 方法
| 方法名 | 含义 | 
|---|---|
| write(data) | 把 data写入文件 | 
| read(size) | 按指定数量的字节读取文件内容 | 
| seek(offset) | 移动至文件 offset(int)字节处的位置 | 
| close() | 关闭文件 | 
依赖项
依赖项使用场景
- 共享业务逻辑(复用相同的代码逻辑)
- 共享数据库连接
- 实现安全、验证、角色权限
- 等……
创建依赖项
from typing import Union
from fastapi import Depends, FastAPI
app = FastAPI()
read_items 和 read_users 方法依赖 common_parameters
                                    白话就是 read_items 和 read_users 都需要 q,skip,limit 查询参数
async def common_parameters(
    q: Union[str, None] = None,
    skip: int = 0,
    limit: int = 100
):
    return {"q": q, "skip": skip, "limit": limit}
@app.get("/items/")
async def read_items(
    commons: dict = Depends(common_parameters)
):
    return commons
@app.get("/users/")
async def read_users(
    commons: dict = Depends(common_parameters)
):
    return commons
类作为依赖项
from typing import Union
from fastapi import Depends, FastAPI
app = FastAPI()
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}]
class CommonQueryParams:
    def __init__(
        self,
        q: Union[str, None] = None,
        skip: int = 0,
        limit: int = 100
    ):
        self.q = q
        self.skip = skip
        self.limit = limit
read_itemsx 接收一个 commons 参数,类型是 CommonQueryParams
                                    CommonQueryParams 接收三个参数,这三个参数是调用 api 的时候传
@app.get("/items/")
async def read_items(
  commons: CommonQueryParams = Depends(CommonQueryParams)
):
  response = {}
  if commons.q:
      response.update({"q": commons.q})
  items = fake_items_db[commons.skip : commons.skip + commons.limit]
  response.update({"items": items})
  return response
还可以简写
@app.get("/items/")
async def read_items(
  # 这里的 Depends 没有传参,FastAPI 会自动使用 CommonQueryParams
  commons: CommonQueryParams = Depends()
):
  response = {}
  if commons.q:
      response.update({"q": commons.q})
  items = fake_items_db[commons.skip : commons.skip + commons.limit]
  response.update({"items": items})
  return response
子依赖项
from typing import Union
from fastapi import Cookie, Depends, FastAPI
app = FastAPI()
def query_extractor(q: Union[str, None] = None):
    return q
def query_or_cookie_extractor(
    q: str = Depends(query_extractor),
    last_query: Union[str, None] = Cookie(default=None),
):
    if not q:
        return last_query
    return q
# read_query函数依赖query_or_cookie_extractor函数
# query_or_cookie_extractor函数又依赖query_extractor函数
# 就是说依赖项可以依赖其他依赖项,只要你不晕,可以无数次套娃
@app.get("/items/")
async def read_query(
  query_or_default: str = Depends(query_or_cookie_extractor)
):
    return {"q_or_cookie": query_or_default}
不使用缓存
使用 use_cache = False 参数不使用缓存数据,不使用 use_cache = False,value 和 value1 是一样的
def result_value():
    value = randint(1, 99)
    return value
def get_value(
  value: int = Depends(result_value, use_cache=False),
  value1: int = Depends(result_value, use_cache=False)
):
    return value, value1
@app.get('/value/')
async def needy_dependency(value: tuple = Depends(get_value)):
    return {"value": value}
全局依赖项
from fastapi import Depends, FastAPI, Header, HTTPException
async def verify_token(x_token: str = Header()):
  if x_token != "fake-super-secret-token":
    raise HTTPException(status_code=400, detail="X-Token 标头无效")
async def verify_key(x_key: str = Header()):
  if x_key != "fake-super-secret-key":
    raise HTTPException(status_code=400, detail="X-Key 标头无效")
  return x_key
全局依赖项很有用,后面的安全性就可以使用全局依赖项
app = FastAPI(
  dependencies=[Depends(verify_token), Depends(verify_key)]
)
@app.get("/items/")
async def read_items():
    return [{"item": "Portal Gun"}, {"item": "Plumbus"}]
@app.get("/users/")
async def read_users():
    return [{"username": "Rick"}, {"username": "Morty"}]
安全性
基于 Token 的认证
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
app = FastAPI()
使用 OAuth2PasswordBearer 创建一个 token 依赖
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
假设这是你的用户数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    }
}
创建一个用户模型
class User(BaseModel):
    username: str
    email: str
    full_name: str
    disabled: bool
创建一个简单的认证函数
def fake_hash_password(password: str):
    return "fakehashed" + password
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return User(**user_dict)
def fake_decode_token(token: str):
    # 这个函数应该验证 token 并返回用户信息
    # 这里我们只是简单地返回了用户名
    return get_user(fake_users_db, token)
创建一个依赖,用于从请求中获取 token 并验证用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=401,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = get_user(fake_users_db, form_data.username)
    if not user or user.hashed_password != fake_hash_password(form_data.password):
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user
使用 OAuth2PasswordBearer 来创建一个简单的 token 认证流程。
HTTPS 和证书
from fastapi import FastAPI
app = FastAPI()
在生产环境中,你应该使用一个真正的证书和私钥,你可以从像 Let's Encrypt 这样的证书颁发机构获得免费的证书,或者使用 OpenSSL 生成自签名证书
@app.get("/https")
async def read_https():
    return {"message": "Hello, HTTPS!"}
启动服务器时,使用以下命令来指定证书和私钥:
uvicorn main:app --host 0.0.0.0 --port 443 --ssl-keyfile /path/to/your/key.pem --ssl-certfile /path/to/your/cert.pem
FastAPI 默认支持 HTTPS,你只需要提供证书和私钥即可。
待更新
参考
- Python 速查表 (jaywcjlove.github.io)
- FastAPI 官方文档 (fastapi.tiangolo.com)
评论
欢迎提交文档错误或者建议。提交成功后自己可见,其他用户待审核通过后才可见。