
技术分享1 分钟阅读
fastapi 实现流式数据传输
import requests
from fastapi import FastAPI
from starlette.responses import StreamingResponse
app = FastAPI()
@app.post('/claude/append_message')
def appendMessage():
response = requests.request("POST", "", json="payload", headers={},stream=True)
if response.status_code in [200, 201]:
async def stream_content():
for chunk in response.iter_content(1024):
yield chunk
return StreamingResponse(stream_content())async def stream_content(): 定义了一个异步函数stream_content。
for chunk in r.iter_content(1024): 遍历请求响应体r的iter_content生成器,每次获取1KB的内容块。不修改就是原始数据
yield chunk 将每一块内容chunk yield 出来。
这样,这个函数实现了一个异步生成器,可以按流式方式生成响应内容块。
有关使用上的问题,欢迎您在底部评论区留言,一起交流~
读者评论
评论会同步写入该文在 Notion 中的页面底部(与正文同页,便于管理)。
暂无评论,欢迎抢沙发。