Skip to Content
Steel is in alpha 🎉

WebSockets

Steel provides built-in support for real-time communication through WebSockets, complete with automatic AsyncAPI documentation generation. WebSockets enable full-duplex communication between client and server, perfect for real-time applications like chat, live updates, and collaborative features.

Basic WebSocket Handler

type ChatMessage struct { UserID int `json:"user_id" description:"User ID sending the message"` Message string `json:"message" description:"Chat message content"` Room string `json:"room" description:"Chat room name"` } type ChatResponse struct { MessageID int `json:"message_id" description:"Unique message ID"` UserID int `json:"user_id" description:"User ID who sent the message"` Message string `json:"message" description:"Chat message content"` Room string `json:"room" description:"Chat room name"` Timestamp time.Time `json:"timestamp" description:"Message timestamp"` } r.WebSocket("/ws/chat", func(conn *router.WSConnection, message ChatMessage) (*ChatResponse, error) { // Validate message if message.Message == "" { return nil, fmt.Errorf("message cannot be empty") } if message.Room == "" { return nil, fmt.Errorf("room is required") } // Process message response := &ChatResponse{ MessageID: generateMessageID(), UserID: message.UserID, Message: message.Message, Room: message.Room, Timestamp: time.Now(), } // Store message in database saveMessage(response) // Broadcast to other users in the same room broadcastToRoom(message.Room, response, conn.ClientID) return response, nil }, router.WithAsyncSummary("Chat WebSocket"), router.WithAsyncDescription("Real-time chat communication"), router.WithAsyncTags("chat", "websocket"))

Connection Management

Access connection metadata and manage user sessions:

r.WebSocket("/ws/live-updates", func(conn *router.WSConnection, message SubscribeMessage) (*SubscriptionResponse, error) { // Store user information in connection metadata conn.SetMetadata("user_id", message.UserID) conn.SetMetadata("subscriptions", message.Topics) conn.SetMetadata("joined_at", time.Now()) // Access request information userAgent := conn.Request().Header.Get("User-Agent") clientIP := conn.Request().RemoteAddr // Get URL parameters roomID := conn.Param("room_id") log.Printf("User %d connected from %s (room: %s)", message.UserID, clientIP, roomID) return &SubscriptionResponse{ Status: "subscribed", Topics: message.Topics, ClientID: conn.ClientID, ServerTime: time.Now(), }, nil })

Broadcasting Messages

Use the connection manager to broadcast messages:

// Get the connection manager cm := r.ConnectionManager() // Broadcast to all WebSocket connections cm.BroadcastWS(router.WSMessage{ Type: "system_announcement", Payload: map[string]interface{}{ "message": "Server maintenance in 5 minutes", "level": "warning", }, }) // Broadcast to specific connections func broadcastToRoom(roomID string, message interface{}, excludeClientID string) { connections := cm.WSConnections() for clientID, conn := range connections { if clientID == excludeClientID { continue // Don't send to sender } // Check if user is in the room if userRoom, ok := conn.GetMetadata("room"); ok && userRoom == roomID { conn.SendMessage(router.WSMessage{ Type: "chat_message", Payload: message, }) } } }

Advanced Example: Real-time Collaboration

Real-time collaborative document editing:

type DocumentEdit struct { DocumentID string `json:"document_id" description:"Document being edited"` UserID int `json:"user_id" description:"User making the edit"` Operation string `json:"operation" description:"Edit operation (insert, delete, format)"` Position int `json:"position" description:"Character position in document"` Content string `json:"content" description:"Content being inserted/modified"` Metadata interface{} `json:"metadata,omitempty" description:"Additional operation metadata"` } type DocumentEditResponse struct { EditID string `json:"edit_id" description:"Unique edit identifier"` DocumentID string `json:"document_id" description:"Document ID"` UserID int `json:"user_id" description:"User who made the edit"` Operation string `json:"operation" description:"Applied operation"` Position int `json:"position" description:"Final position"` Content string `json:"content" description:"Applied content"` Timestamp time.Time `json:"timestamp" description:"Edit timestamp"` Version int `json:"version" description:"Document version after edit"` } r.WebSocket("/ws/documents/:doc_id/edit", func(conn *router.WSConnection, edit DocumentEdit) (*DocumentEditResponse, error) { docID := conn.Param("doc_id") // Validate user has edit permissions if !hasEditPermission(edit.UserID, docID) { return nil, fmt.Errorf("insufficient permissions") } // Apply operational transformation for concurrent edits transformedEdit, newVersion := applyOperationalTransform(edit, docID) // Save edit to database editRecord := saveDocumentEdit(transformedEdit, newVersion) // Broadcast to other collaborators collaborators := getDocumentCollaborators(docID) for _, collaboratorConn := range collaborators { if collaboratorConn.ClientID != conn.ClientID { collaboratorConn.SendMessage(router.WSMessage{ Type: "document_edit", Payload: editRecord, }) } } return &DocumentEditResponse{ EditID: editRecord.ID, DocumentID: docID, UserID: edit.UserID, Operation: transformedEdit.Operation, Position: transformedEdit.Position, Content: transformedEdit.Content, Timestamp: time.Now(), Version: newVersion, }, nil }, router.WithAsyncSummary("Document Collaboration"), router.WithAsyncDescription("Real-time collaborative document editing"), router.WithAsyncTags("documents", "collaboration", "websocket"))

Connection Lifecycle Management

Connection Events

Handle connection lifecycle events:

r.WebSocket("/ws/game/:room_id", func(conn *router.WSConnection, message GameMessage) (*GameResponse, error) { roomID := conn.Param("room_id") // Handle different message types switch message.Type { case "join": return handlePlayerJoin(conn, roomID, message) case "move": return handlePlayerMove(conn, roomID, message) case "leave": return handlePlayerLeave(conn, roomID, message) default: return nil, fmt.Errorf("unknown message type: %s", message.Type) } }) func handlePlayerJoin(conn *router.WSConnection, roomID string, message GameMessage) (*GameResponse, error) { // Add player to room addPlayerToRoom(message.PlayerID, roomID) // Store connection metadata conn.SetMetadata("room_id", roomID) conn.SetMetadata("player_id", message.PlayerID) // Notify other players broadcastToRoom(roomID, GameResponse{ Type: "player_joined", PlayerID: message.PlayerID, Message: fmt.Sprintf("Player %s joined the game", message.PlayerName), }, conn.ClientID) return &GameResponse{ Type: "join_success", PlayerID: message.PlayerID, RoomID: roomID, Message: "Successfully joined the game", }, nil }

Connection Cleanup

Implement proper cleanup when connections close:

// Use a cleanup function when connections are removed func setupConnectionCleanup() { cm := r.ConnectionManager() // Monitor for closed connections (you'd implement this based on your needs) go func() { for { time.Sleep(10 * time.Second) // Check WebSocket connections for clientID, conn := range cm.WSConnections() { if conn.IsClosed() { // Clean up resources if roomID, ok := conn.GetMetadata("room_id"); ok { removePlayerFromRoom(conn.GetMetadata("player_id"), roomID.(string)) } cm.RemoveWSConnection(clientID) } } } }() }

Security & Authentication

Authentication

Implement authentication for WebSocket connections:

// WebSocket authentication middleware func authWebSocketMiddleware(next router.AsyncHandlerOption) router.AsyncHandlerOption { return func(info interface{}) { // Apply to WebSocket handler info if wsInfo, ok := info.(*router.WSHandlerInfo); ok { // Store original handler originalHandler := wsInfo.Handler // Wrap with authentication wsInfo.Handler = func(conn *router.WSConnection, message interface{}) (interface{}, error) { // Check authentication token from query params or headers token := conn.Request().URL.Query().Get("token") if token == "" { token = conn.Request().Header.Get("Authorization") } if !isValidToken(token) { return nil, fmt.Errorf("invalid authentication token") } // Call original handler return originalHandler.(func(*router.WSConnection, interface{}) (interface{}, error))(conn, message) } } next.ApplyToWS(info.(*router.WSHandlerInfo)) } } // Use authentication r.WebSocket("/ws/secure-chat", chatHandler, authWebSocketMiddleware, router.WithAsyncSummary("Authenticated Chat"))

Rate Limiting

Implement rate limiting for WebSocket connections:

type RateLimiter struct { connections map[string]*rate.Limiter mu sync.RWMutex } func (rl *RateLimiter) Allow(clientID string) bool { rl.mu.RLock() limiter, exists := rl.connections[clientID] rl.mu.RUnlock() if !exists { rl.mu.Lock() limiter = rate.NewLimiter(rate.Every(time.Second), 10) // 10 messages per second rl.connections[clientID] = limiter rl.mu.Unlock() } return limiter.Allow() } var rateLimiter = &RateLimiter{ connections: make(map[string]*rate.Limiter), } r.WebSocket("/ws/chat", func(conn *router.WSConnection, message ChatMessage) (*ChatResponse, error) { // Check rate limit if !rateLimiter.Allow(conn.ClientID) { return nil, fmt.Errorf("rate limit exceeded") } // Process message... return handleChatMessage(message) })

Best Practices

Performance Tip: Use connection pooling and efficient message serialization for high-traffic real-time applications.

1. Message Validation

Always validate incoming messages:

func validateChatMessage(message ChatMessage) error { if message.UserID <= 0 { return fmt.Errorf("invalid user ID") } if len(message.Message) == 0 { return fmt.Errorf("message cannot be empty") } if len(message.Message) > 1000 { return fmt.Errorf("message too long (max 1000 characters)") } if message.Room == "" { return fmt.Errorf("room is required") } return nil }

2. Error Handling

Implement comprehensive error handling:

r.WebSocket("/ws/chat", func(conn *router.WSConnection, message ChatMessage) (*ChatResponse, error) { // Validate message if err := validateChatMessage(message); err != nil { // Send error back to client conn.SendMessage(router.WSMessage{ Type: "error", Error: &router.WSError{ Code: "VALIDATION_ERROR", Message: err.Error(), Details: map[string]interface{}{ "field": "message", "value": message.Message, }, }, }) return nil, err } // Process message... })

3. Connection Monitoring

Monitor connection health:

func monitorConnections(cm *router.ConnectionManager) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for range ticker.C { wsCount := len(cm.WSConnections()) log.Printf("Active WebSocket connections: %d", wsCount) // Send heartbeat to all connections cm.BroadcastWS(router.WSMessage{ Type: "heartbeat", Payload: map[string]interface{}{ "timestamp": time.Now(), "server_id": os.Getenv("SERVER_ID"), }, }) } }

4. Graceful Shutdown

Implement graceful connection closure:

func gracefulShutdown(cm *router.ConnectionManager) { // Notify all connections of impending shutdown shutdownMessage := router.WSMessage{ Type: "server_shutdown", Payload: map[string]interface{}{ "message": "Server is shutting down", "reason": "maintenance", "when": time.Now().Add(30 * time.Second), }, } cm.BroadcastWS(shutdownMessage) // Wait for connections to close gracefully time.Sleep(30 * time.Second) // Force close remaining connections for clientID := range cm.WSConnections() { cm.RemoveWSConnection(clientID) } }

WebSocket support in Steel provides a robust foundation for building real-time interactive applications while maintaining the same high-quality documentation and type safety as your REST APIs.

Last updated on