mirror of
https://github.com/swissmakers/fail2ban-ui.git
synced 2026-04-11 13:47:05 +02:00
305 lines
6.8 KiB
Go
305 lines
6.8 KiB
Go
// Fail2ban UI - A Swiss made, management interface for Fail2ban.
|
|
//
|
|
// Copyright (C) 2025 Swissmakers GmbH (https://swissmakers.ch)
|
|
//
|
|
// Licensed under the GNU General Public License, Version 3 (GPL-3.0)
|
|
// You may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// https://www.gnu.org/licenses/gpl-3.0.en.html
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package web
|
|
|
|
import (
|
|
"encoding/json"
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/swissmakers/fail2ban-ui/internal/storage"
|
|
)
|
|
|
|
const (
|
|
// Time allowed to write a message to the peer
|
|
writeWait = 10 * time.Second
|
|
|
|
// Time allowed to read the next pong message from the peer
|
|
pongWait = 60 * time.Second
|
|
|
|
// Send pings to peer with this period (must be less than pongWait)
|
|
pingPeriod = (pongWait * 9) / 10
|
|
)
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
// Allow all origins for now - can be restricted in production
|
|
return true
|
|
},
|
|
}
|
|
|
|
// Client represents a WebSocket connection
|
|
type Client struct {
|
|
hub *Hub
|
|
conn *websocket.Conn
|
|
send chan []byte
|
|
}
|
|
|
|
// Hub maintains the set of active clients and broadcasts messages to them
|
|
type Hub struct {
|
|
// Registered clients
|
|
clients map[*Client]bool
|
|
|
|
// Inbound messages from clients
|
|
broadcast chan []byte
|
|
|
|
// Register requests from clients
|
|
register chan *Client
|
|
|
|
// Unregister requests from clients
|
|
unregister chan *Client
|
|
|
|
// Mutex for thread-safe operations
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// BroadcastConsoleLog broadcasts a console log message to all connected clients
|
|
func (h *Hub) BroadcastConsoleLog(message string) {
|
|
logMsg := map[string]interface{}{
|
|
"type": "console_log",
|
|
"message": message,
|
|
"time": time.Now().UTC().Format(time.RFC3339),
|
|
}
|
|
data, err := json.Marshal(logMsg)
|
|
if err != nil {
|
|
log.Printf("Error marshaling console log: %v", err)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case h.broadcast <- data:
|
|
default:
|
|
// Channel full, drop message
|
|
}
|
|
}
|
|
|
|
// NewHub creates a new WebSocket hub
|
|
func NewHub() *Hub {
|
|
return &Hub{
|
|
clients: make(map[*Client]bool),
|
|
broadcast: make(chan []byte, 256),
|
|
register: make(chan *Client),
|
|
unregister: make(chan *Client),
|
|
}
|
|
}
|
|
|
|
// Run starts the hub's main loop
|
|
func (h *Hub) Run() {
|
|
// Start heartbeat ticker
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case client := <-h.register:
|
|
h.mu.Lock()
|
|
h.clients[client] = true
|
|
h.mu.Unlock()
|
|
log.Printf("WebSocket client connected. Total clients: %d", len(h.clients))
|
|
|
|
case client := <-h.unregister:
|
|
h.mu.Lock()
|
|
if _, ok := h.clients[client]; ok {
|
|
delete(h.clients, client)
|
|
close(client.send)
|
|
}
|
|
h.mu.Unlock()
|
|
log.Printf("WebSocket client disconnected. Total clients: %d", len(h.clients))
|
|
|
|
case message := <-h.broadcast:
|
|
h.mu.RLock()
|
|
for client := range h.clients {
|
|
select {
|
|
case client.send <- message:
|
|
default:
|
|
close(client.send)
|
|
delete(h.clients, client)
|
|
}
|
|
}
|
|
h.mu.RUnlock()
|
|
|
|
case <-ticker.C:
|
|
// Send heartbeat to all clients
|
|
h.sendHeartbeat()
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendHeartbeat sends a heartbeat message to all connected clients
|
|
func (h *Hub) sendHeartbeat() {
|
|
message := map[string]interface{}{
|
|
"type": "heartbeat",
|
|
"time": time.Now().UTC().Unix(),
|
|
"status": "healthy",
|
|
}
|
|
data, err := json.Marshal(message)
|
|
if err != nil {
|
|
log.Printf("Error marshaling heartbeat: %v", err)
|
|
return
|
|
}
|
|
|
|
h.mu.RLock()
|
|
for client := range h.clients {
|
|
select {
|
|
case client.send <- data:
|
|
default:
|
|
close(client.send)
|
|
delete(h.clients, client)
|
|
}
|
|
}
|
|
h.mu.RUnlock()
|
|
}
|
|
|
|
// BroadcastBanEvent broadcasts a ban event to all connected clients
|
|
func (h *Hub) BroadcastBanEvent(event storage.BanEventRecord) {
|
|
message := map[string]interface{}{
|
|
"type": "ban_event",
|
|
"data": event,
|
|
}
|
|
data, err := json.Marshal(message)
|
|
if err != nil {
|
|
log.Printf("Error marshaling ban event: %v", err)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case h.broadcast <- data:
|
|
default:
|
|
log.Printf("Broadcast channel full, dropping ban event")
|
|
}
|
|
}
|
|
|
|
// BroadcastUnbanEvent broadcasts an unban event to all connected clients
|
|
func (h *Hub) BroadcastUnbanEvent(event storage.BanEventRecord) {
|
|
message := map[string]interface{}{
|
|
"type": "unban_event",
|
|
"data": event,
|
|
}
|
|
data, err := json.Marshal(message)
|
|
if err != nil {
|
|
log.Printf("Error marshaling unban event: %v", err)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case h.broadcast <- data:
|
|
default:
|
|
log.Printf("Broadcast channel full, dropping unban event")
|
|
}
|
|
}
|
|
|
|
// readPump pumps messages from the WebSocket connection to the hub
|
|
func (c *Client) readPump() {
|
|
defer func() {
|
|
c.hub.unregister <- c
|
|
c.conn.Close()
|
|
}()
|
|
|
|
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
c.conn.SetPongHandler(func(string) error {
|
|
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
_, _, err := c.conn.ReadMessage()
|
|
if err != nil {
|
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
|
log.Printf("WebSocket error: %v", err)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// writePump pumps messages from the hub to the WebSocket connection
|
|
func (c *Client) writePump() {
|
|
ticker := time.NewTicker(pingPeriod)
|
|
defer func() {
|
|
ticker.Stop()
|
|
c.conn.Close()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case message, ok := <-c.send:
|
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if !ok {
|
|
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
|
return
|
|
}
|
|
|
|
w, err := c.conn.NextWriter(websocket.TextMessage)
|
|
if err != nil {
|
|
return
|
|
}
|
|
w.Write(message)
|
|
|
|
// Add queued messages to the current websocket message
|
|
n := len(c.send)
|
|
for i := 0; i < n; i++ {
|
|
w.Write([]byte{'\n'})
|
|
w.Write(<-c.send)
|
|
}
|
|
|
|
if err := w.Close(); err != nil {
|
|
return
|
|
}
|
|
|
|
case <-ticker.C:
|
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// serveWS handles WebSocket requests from clients
|
|
func serveWS(hub *Hub, c *gin.Context) {
|
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
if err != nil {
|
|
log.Printf("WebSocket upgrade error: %v", err)
|
|
return
|
|
}
|
|
|
|
client := &Client{
|
|
hub: hub,
|
|
conn: conn,
|
|
send: make(chan []byte, 256),
|
|
}
|
|
|
|
client.hub.register <- client
|
|
|
|
// Allow collection of memory referenced by the caller by doing all work in new goroutines
|
|
go client.writePump()
|
|
go client.readPump()
|
|
}
|
|
|
|
// WebSocketHandler is the Gin handler for WebSocket connections
|
|
func WebSocketHandler(hub *Hub) gin.HandlerFunc {
|
|
return func(c *gin.Context) {
|
|
serveWS(hub, c)
|
|
}
|
|
}
|