Notion Blog
技术分享1 分钟阅读

fastapi 实现流式数据传输

import requests
from fastapi import FastAPI
from starlette.responses import StreamingResponse

app = FastAPI()
@app.post('/claude/append_message')
def appendMessage():
    response = requests.request("POST", "", json="payload", headers={},stream=True)
    if response.status_code in [200, 201]:
        async def stream_content():
            for chunk in response.iter_content(1024):
                yield chunk
        return StreamingResponse(stream_content())

async def stream_content(): 定义了一个异步函数stream_content。 for chunk in r.iter_content(1024): 遍历请求响应体r的iter_content生成器,每次获取1KB的内容块。不修改就是原始数据 yield chunk 将每一块内容chunk yield 出来。 这样,这个函数实现了一个异步生成器,可以按流式方式生成响应内容块。

有关使用上的问题,欢迎您在底部评论区留言,一起交流~

读者评论

评论会同步写入该文在 Notion 中的页面底部(与正文同页,便于管理)。

0/1500

暂无评论,欢迎抢沙发。