// Fail2ban UI - A Swiss made, management interface for Fail2ban. // // Copyright (C) 2025 Swissmakers GmbH (https://swissmakers.ch) // // Licensed under the GNU General Public License, Version 3 (GPL-3.0) // You may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.gnu.org/licenses/gpl-3.0.en.html // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package web import ( "encoding/json" "log" "net/http" "sync" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/swissmakers/fail2ban-ui/internal/storage" ) const ( // Time allowed to write a message to the peer writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer pongWait = 60 * time.Second // Send pings to peer with this period (must be less than pongWait) pingPeriod = (pongWait * 9) / 10 ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { // Allow all origins for now - can be restricted in production return true }, } // Client represents a WebSocket connection type Client struct { hub *Hub conn *websocket.Conn send chan []byte } // Hub maintains the set of active clients and broadcasts messages to them type Hub struct { // Registered clients clients map[*Client]bool // Inbound messages from clients broadcast chan []byte // Register requests from clients register chan *Client // Unregister requests from clients unregister chan *Client // Mutex for thread-safe operations mu sync.RWMutex } // NewHub creates a new WebSocket hub func NewHub() *Hub { return &Hub{ clients: make(map[*Client]bool), broadcast: make(chan []byte, 256), register: make(chan *Client), unregister: make(chan *Client), } } // Run starts the hub's main loop func (h *Hub) Run() { // Start heartbeat ticker ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case client := <-h.register: h.mu.Lock() h.clients[client] = true h.mu.Unlock() log.Printf("WebSocket client connected. Total clients: %d", len(h.clients)) case client := <-h.unregister: h.mu.Lock() if _, ok := h.clients[client]; ok { delete(h.clients, client) close(client.send) } h.mu.Unlock() log.Printf("WebSocket client disconnected. Total clients: %d", len(h.clients)) case message := <-h.broadcast: h.mu.RLock() for client := range h.clients { select { case client.send <- message: default: close(client.send) delete(h.clients, client) } } h.mu.RUnlock() case <-ticker.C: // Send heartbeat to all clients h.sendHeartbeat() } } } // sendHeartbeat sends a heartbeat message to all connected clients func (h *Hub) sendHeartbeat() { message := map[string]interface{}{ "type": "heartbeat", "time": time.Now().UTC().Unix(), "status": "healthy", } data, err := json.Marshal(message) if err != nil { log.Printf("Error marshaling heartbeat: %v", err) return } h.mu.RLock() for client := range h.clients { select { case client.send <- data: default: close(client.send) delete(h.clients, client) } } h.mu.RUnlock() } // BroadcastBanEvent broadcasts a ban event to all connected clients func (h *Hub) BroadcastBanEvent(event storage.BanEventRecord) { message := map[string]interface{}{ "type": "ban_event", "data": event, } data, err := json.Marshal(message) if err != nil { log.Printf("Error marshaling ban event: %v", err) return } select { case h.broadcast <- data: default: log.Printf("Broadcast channel full, dropping ban event") } } // readPump pumps messages from the WebSocket connection to the hub func (c *Client) readPump() { defer func() { c.hub.unregister <- c c.conn.Close() }() c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) for { _, _, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket error: %v", err) } break } } } // writePump pumps messages from the hub to the WebSocket connection func (c *Client) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.conn.Close() }() for { select { case message, ok := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } w, err := c.conn.NextWriter(websocket.TextMessage) if err != nil { return } w.Write(message) // Add queued messages to the current websocket message n := len(c.send) for i := 0; i < n; i++ { w.Write([]byte{'\n'}) w.Write(<-c.send) } if err := w.Close(); err != nil { return } case <-ticker.C: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } // serveWS handles WebSocket requests from clients func serveWS(hub *Hub, c *gin.Context) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { log.Printf("WebSocket upgrade error: %v", err) return } client := &Client{ hub: hub, conn: conn, send: make(chan []byte, 256), } client.hub.register <- client // Allow collection of memory referenced by the caller by doing all work in new goroutines go client.writePump() go client.readPump() } // WebSocketHandler is the Gin handler for WebSocket connections func WebSocketHandler(hub *Hub) gin.HandlerFunc { return func(c *gin.Context) { serveWS(hub, c) } }