
技术分享1 分钟阅读
curl_cffi中content_callback回调处理,并流式响应前端
第一步安装库
fastapi
curl_cffi
uvicorn
sse_starlette代码示例
from sse_starlette.sse import EventSourceResponse
import uvicorn
from curl_cffi import requests
from fastapi import FastAPI
import queue
import threading
class ResponseCallbackHandler:
queue = queue.Queue(maxsize=1000)
flat = True
async def yield_data(self):
while self.flat:
data = self.queue.get()
if str(data) == "End":
break
else:
yield data
def put(self, e: bytes):
self.queue.put(e)
def end(self):
self.flat = False
self.queue.put(b"End")
requst
def request_send(callback):
def stream_callback(data):
callback.put(data)
requests.request("get", "htpp://www.test.com", impersonate="chrome110",content_callback=stream_callback)
callback.end()
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
@app.get('/')
async def root():
callback= ResponseCallbackHandler()
threading.Thread(target=request_send, args=(callback)).start()
return EventSourceResponse(callback.yield_data())
if __name__ == '__main__':
import uvicorn
uvicorn.run(app)
代码解释
这段代码做一个较详细的解释:
•
ResponseCallbackHandler 是一个自定义的类,用于管理一个队列和一个流式传输的端点。
queue 是一个大小限制为1000的队列,用于缓存流式传输的数据。
flat 是一个布尔标志,用于控制队列的打开/关闭。
yield_data 是一个异步生成器,从队列中不断取数据,直到 flat 被设置为False才停止。
•
put 方法用于往队列中放入数据。
•
end 方法用于关闭队列,将 flat 设置为 False,并向队列中放入一个 "End" 标志用来通知 yield_data 停止。
•
request_send函数用于发起请求获取数据,并通过 stream_callback 实时将响应数据流式传输到回调的队列中,请求结束后调用 end 方法关闭队列。
•
调用request_send函数,要使用threading新启线程,或者无法实现获取到callback就可以响应前端。
•
FastAPI 通过 EventSourceResponse(需要安装) 创建一个 SSE 连接,并异步生成队列中的数据,实现了流式传输。
•
客户端连接到该端点就可以通过 SSE 接收到实时的流式数据。
•
通过线程避免阻塞 ASGI 服务器,并使用队列和异步生成器实现了高效的流式传输。
概括来说,这段代码实现了一个基于 FastAPI、SSE 和队列的高效流式传输服务。让我知道如果哪里还不清楚!
有关使用上的问题,欢迎您在底部评论区留言,一起交流~
读者评论
评论会同步写入该文在 Notion 中的页面底部(与正文同页,便于管理)。
暂无评论,欢迎抢沙发。