服务端代码

func (a *DeviceApi) GetRunDeviceAttribute(c *gin.Context) {  
   c.Writer.Header().Set("Content-Type", "text/event-stream")  
   c.Writer.Header().Set("Cache-Control", "no-cache")  
   c.Writer.Header().Set("Connection", "keep-alive")  
  
   var once sync.Once  
   deviceId := c.Query("deviceId")  
   employeeId := utils2.GetEmployeeId(c)  
   ident := fmt.Sprintf("sse:device:state:%s:%d", deviceId, employeeId)  
  
   currentChan := make(chan []byte)  
  
   cleanUp := func() {  
      once.Do(func() {  
         close(currentChan)  
         delete(global.SSE, ident)  
         fmt.Println("清理资源,关闭通道")  
      })  
   }  
   defer cleanUp()  
  
   // 确保上一个通道被关闭  
   if prevChan, ok := global.SSE[ident]; ok {  
      close(prevChan)  
      delete(global.SSE, ident)  
   }  
  
   // 首次从redis获取数据  
   prefix := fmt.Sprintf("sse:device:state:%s", deviceId)  
   if global.GvaRedis != nil {  
      msg := global.GvaRedis.Get(context.Background(), prefix).Val()  
      if msg != "" {  
         msgStr := fmt.Sprintf("data:%s\n\n", msg)  
         c.Writer.Write([]byte(msgStr))  
         c.Writer.Flush()  
      } else {  
         c.Writer.Write([]byte("data:\n\n"))  
         c.Writer.Flush()  
      }  
   }  
  
   global.SSE[ident] = currentChan  
   timeoutDuration := 120 * time.Second  
   timer := time.NewTimer(timeoutDuration)  
  
   for {  
      select {  
      case data := <-currentChan: // 从通道中读取数据  
         msgStr := fmt.Sprintf("data:%s\n\n", data)  
         c.Writer.Write([]byte(msgStr))  
         c.Writer.Flush()  
  
         if !timer.Stop() {  
            <-timer.C  
         }  
         timer.Reset(timeoutDuration)  
      case <-timer.C:  
         global.GvaLog.Error("sse连接超时")  
         cleanUp()  
         return  
      case <-c.Request.Context().Done():  
         global.GvaLog.Error("sse客户端断开连接")  
         cleanUp()  
         return  
      }  
   }  
}

客户端代码:

function createEventSource(url, options, success, fail) {  
    // 中间件逻辑,例如检查 token 或添加全局参数  
    const params = new URLSearchParams(options).toString();  
    const token = localStorage.getItem('token')  
    const fullUrl = process.env.REACT_APP_BASE_URL + url + `?x-token=${token}&${params}`;  
  
    const eventSource = new EventSource(fullUrl);  
    eventSource.onmessage = (event) => {  
        console.log('Message from server:', event.data);  
        if (success) {  
            success(event.data)  
        }  
    };  
  
    // 监听自定义事件  
    eventSource.addEventListener("customEvent", function (event) {  
        console.log("接收到自定义事件:", event);  
    });  
  
    // 监听打开事件  
    eventSource.onopen = function () {  
        console.log("SSE 连接已打开");  
    };  
  
    eventSource.onerror = (error) => {  
        console.error('SSE Error:', error);  
        eventSource.close(); // 发生错误时关闭连接  
        if (fail) {  
            fail(error)  
        }  
    };  
  
    return eventSource;  
}  
  
export const getRunDeviceAttribute = (deviceId, callback, errorCallback) => {  
    // 创建 EventSource 连接  
    return createEventSource('/device/getRunDeviceAttribute', {  
        deviceId: deviceId,  
    }, callback, errorCallback);  
}

nginx配置

location / {
    proxy_pass   http://127.0.0.1:8611;

    # 超时设置,适配长连接
    proxy_connect_timeout 600;
    proxy_send_timeout 600;
    proxy_read_timeout 600;
    send_timeout 600;

    # 关键配置
    proxy_http_version 1.1;          # 强制使用 HTTP/1.1 支持持久连接
    proxy_buffering off;             # 禁用 Nginx 缓冲,实时传输数据
    proxy_cache off;                 # 禁用缓存
    chunked_transfer_encoding on;    # 支持分块传输
    add_header Cache-Control 'no-cache';  # 禁止客户端缓存
    proxy_set_header Connection '';  # 清空 Connection 头部,保持长连接
}