django实现sse接口

up-k / 2024-10-16 / 原文

基于django来实现sse
最近在实现通过post调用三方的接口得到sse流数据,并且自己需要用拿到的数据用sse流在返回给前端

from rest_framework.views import APIView
from django.http import StreamingHttpResponse, JsonResponse

class ChatMessageViewSet(APIView):

    def post(self, request):
        """
        获取列表
        :param request:
        :return:
        """
        # 发送POST请求到第三方API,并开启流式响应
        try:
            response = requests.post(url, headers=headers, json=payload, stream=True)
            if response.status_code == 200:
                # 创建一个StreamingHttpResponse来逐块传递数据
                def stream_response(ai_answer_str: str):
                    for chunk in response.iter_lines():  # 使用iter_lines处理行
                        try:
                            yield f"{chunk}\n\n"  # 格式化为SSE事件
                        except json.JSONDecodeError as e:
                            print(2222, e)
                            # 如果解析 JSON 失败,直接发送原始数据
                            yield f"{chunk}\n\n"  # 格式化为SSE事件

                # 返回StreamingHttpResponse
                return StreamingHttpResponse(stream_response(), content_type='text/event-stream')

            else:
                # 如果请求失败,返回错误信息
                return JsonResponse({"code": 400, "message": response.json(), "data": {}}, status=200)
        except Exception as e:
            # 处理请求异常
            return JsonResponse({"code": 400, "message": str(e), "data": {}}, status=200)

这里必须注意的是 sse返回的格式,一定要是 (data: \n\n),开头是data冒号和两个空格,最后以两个换行结尾