package server import ( "bytes" "log" "net/http" "sync" "time" "github.com/gorilla/websocket" "github.com/labstack/echo/v4" ) const ( maxmsgSizeBytes = 1_024 pongWait = 60 * time.Second pingPeriod = (pongWait * 9) / 10 writeWait = 10 * time.Second ) var ( space = []byte{' '} newline = []byte{'\n'} ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1_024, WriteBufferSize: 1_024, // TODO: configure CORS CheckOrigin: func(_ *http.Request) bool { return true }, } // lient represents a client in a single browser session. type Client struct { conn *websocket.Conn send chan []byte } func newSink(conn *websocket.Conn) *Client { return &Client{ conn: conn, send: make(chan []byte, 1), } } func (s *Client) wait() error { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() s.readPump() }() go func() { defer wg.Done() s.writePump() }() wg.Wait() return nil } func (s *Client) readPump() { defer s.conn.Close() s.conn.SetReadLimit(maxmsgSizeBytes) s.conn.SetReadDeadline(time.Now().Add(pongWait)) s.conn.SetPongHandler(func(string) error { s.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { _, msg, err := s.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("websocket read error: %v", err) } break } msg = bytes.TrimSpace(bytes.ReplaceAll(msg, newline, space)) log.Printf("websocket message rcvd: %s", msg) } } func (s *Client) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() s.conn.Close() }() for { select { case message, ok := <-s.send: s.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { s.conn.WriteMessage(websocket.CloseMessage, nil) return } w, err := s.conn.NextWriter(websocket.TextMessage) if err != nil { log.Printf("error getting writer: %v", err) return } w.Write(message) // Add queued chat messages to the current websocket message. n := len(s.send) for i := 0; i < n; i++ { w.Write(newline) w.Write(<-s.send) } if err := w.Close(); err != nil { log.Println("error closing writer") return } case <-ticker.C: s.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := s.conn.WriteMessage(websocket.PingMessage, nil); err != nil { log.Println("websocket pong timeout") return } } } } func (h *handler) upgradeWebsocket(c echo.Context) error { conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil) if err != nil { return err } sink := newSink(conn) // blocks until the connection has closed: sink.wait() return nil }