mirror of
https://github.com/swissmakers/fail2ban-ui.git
synced 2026-04-15 05:03:14 +02:00
Implementing WebSocked Support for immediately ban-messages
This commit is contained in:
268
pkg/web/websocket.go
Normal file
268
pkg/web/websocket.go
Normal file
@@ -0,0 +1,268 @@
|
||||
// Fail2ban UI - A Swiss made, management interface for Fail2ban.
|
||||
//
|
||||
// Copyright (C) 2025 Swissmakers GmbH (https://swissmakers.ch)
|
||||
//
|
||||
// Licensed under the GNU General Public License, Version 3 (GPL-3.0)
|
||||
// You may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// https://www.gnu.org/licenses/gpl-3.0.en.html
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package web
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/swissmakers/fail2ban-ui/internal/storage"
|
||||
)
|
||||
|
||||
const (
|
||||
// Time allowed to write a message to the peer
|
||||
writeWait = 10 * time.Second
|
||||
|
||||
// Time allowed to read the next pong message from the peer
|
||||
pongWait = 60 * time.Second
|
||||
|
||||
// Send pings to peer with this period (must be less than pongWait)
|
||||
pingPeriod = (pongWait * 9) / 10
|
||||
|
||||
// Maximum message size allowed from peer
|
||||
maxMessageSize = 512
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
// Allow all origins for now - can be restricted in production
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
// Client represents a WebSocket connection
|
||||
type Client struct {
|
||||
hub *Hub
|
||||
conn *websocket.Conn
|
||||
send chan []byte
|
||||
}
|
||||
|
||||
// Hub maintains the set of active clients and broadcasts messages to them
|
||||
type Hub struct {
|
||||
// Registered clients
|
||||
clients map[*Client]bool
|
||||
|
||||
// Inbound messages from clients
|
||||
broadcast chan []byte
|
||||
|
||||
// Register requests from clients
|
||||
register chan *Client
|
||||
|
||||
// Unregister requests from clients
|
||||
unregister chan *Client
|
||||
|
||||
// Mutex for thread-safe operations
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewHub creates a new WebSocket hub
|
||||
func NewHub() *Hub {
|
||||
return &Hub{
|
||||
clients: make(map[*Client]bool),
|
||||
broadcast: make(chan []byte, 256),
|
||||
register: make(chan *Client),
|
||||
unregister: make(chan *Client),
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the hub's main loop
|
||||
func (h *Hub) Run() {
|
||||
// Start heartbeat ticker
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case client := <-h.register:
|
||||
h.mu.Lock()
|
||||
h.clients[client] = true
|
||||
h.mu.Unlock()
|
||||
log.Printf("WebSocket client connected. Total clients: %d", len(h.clients))
|
||||
|
||||
case client := <-h.unregister:
|
||||
h.mu.Lock()
|
||||
if _, ok := h.clients[client]; ok {
|
||||
delete(h.clients, client)
|
||||
close(client.send)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
log.Printf("WebSocket client disconnected. Total clients: %d", len(h.clients))
|
||||
|
||||
case message := <-h.broadcast:
|
||||
h.mu.RLock()
|
||||
for client := range h.clients {
|
||||
select {
|
||||
case client.send <- message:
|
||||
default:
|
||||
close(client.send)
|
||||
delete(h.clients, client)
|
||||
}
|
||||
}
|
||||
h.mu.RUnlock()
|
||||
|
||||
case <-ticker.C:
|
||||
// Send heartbeat to all clients
|
||||
h.sendHeartbeat()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendHeartbeat sends a heartbeat message to all connected clients
|
||||
func (h *Hub) sendHeartbeat() {
|
||||
message := map[string]interface{}{
|
||||
"type": "heartbeat",
|
||||
"time": time.Now().UTC().Unix(),
|
||||
"status": "healthy",
|
||||
}
|
||||
data, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
log.Printf("Error marshaling heartbeat: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
h.mu.RLock()
|
||||
for client := range h.clients {
|
||||
select {
|
||||
case client.send <- data:
|
||||
default:
|
||||
close(client.send)
|
||||
delete(h.clients, client)
|
||||
}
|
||||
}
|
||||
h.mu.RUnlock()
|
||||
}
|
||||
|
||||
// BroadcastBanEvent broadcasts a ban event to all connected clients
|
||||
func (h *Hub) BroadcastBanEvent(event storage.BanEventRecord) {
|
||||
message := map[string]interface{}{
|
||||
"type": "ban_event",
|
||||
"data": event,
|
||||
}
|
||||
data, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
log.Printf("Error marshaling ban event: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case h.broadcast <- data:
|
||||
default:
|
||||
log.Printf("Broadcast channel full, dropping ban event")
|
||||
}
|
||||
}
|
||||
|
||||
// readPump pumps messages from the WebSocket connection to the hub
|
||||
func (c *Client) readPump() {
|
||||
defer func() {
|
||||
c.hub.unregister <- c
|
||||
c.conn.Close()
|
||||
}()
|
||||
|
||||
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
c.conn.SetPongHandler(func(string) error {
|
||||
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
return nil
|
||||
})
|
||||
|
||||
for {
|
||||
_, _, err := c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||
log.Printf("WebSocket error: %v", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// writePump pumps messages from the hub to the WebSocket connection
|
||||
func (c *Client) writePump() {
|
||||
ticker := time.NewTicker(pingPeriod)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
c.conn.Close()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-c.send:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if !ok {
|
||||
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
|
||||
w, err := c.conn.NextWriter(websocket.TextMessage)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
w.Write(message)
|
||||
|
||||
// Add queued messages to the current websocket message
|
||||
n := len(c.send)
|
||||
for i := 0; i < n; i++ {
|
||||
w.Write([]byte{'\n'})
|
||||
w.Write(<-c.send)
|
||||
}
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// serveWS handles WebSocket requests from clients
|
||||
func serveWS(hub *Hub, c *gin.Context) {
|
||||
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
if err != nil {
|
||||
log.Printf("WebSocket upgrade error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
client := &Client{
|
||||
hub: hub,
|
||||
conn: conn,
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
|
||||
client.hub.register <- client
|
||||
|
||||
// Allow collection of memory referenced by the caller by doing all work in new goroutines
|
||||
go client.writePump()
|
||||
go client.readPump()
|
||||
}
|
||||
|
||||
// WebSocketHandler is the Gin handler for WebSocket connections
|
||||
func WebSocketHandler(hub *Hub) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
serveWS(hub, c)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user