WebSockets
Steel provides built-in support for real-time communication through WebSockets, complete with automatic AsyncAPI documentation generation. WebSockets enable full-duplex communication between client and server, perfect for real-time applications like chat, live updates, and collaborative features.
Basic WebSocket Handler
type ChatMessage struct {
UserID int `json:"user_id" description:"User ID sending the message"`
Message string `json:"message" description:"Chat message content"`
Room string `json:"room" description:"Chat room name"`
}
type ChatResponse struct {
MessageID int `json:"message_id" description:"Unique message ID"`
UserID int `json:"user_id" description:"User ID who sent the message"`
Message string `json:"message" description:"Chat message content"`
Room string `json:"room" description:"Chat room name"`
Timestamp time.Time `json:"timestamp" description:"Message timestamp"`
}
r.WebSocket("/ws/chat", func(conn *router.WSConnection, message ChatMessage) (*ChatResponse, error) {
// Validate message
if message.Message == "" {
return nil, fmt.Errorf("message cannot be empty")
}
if message.Room == "" {
return nil, fmt.Errorf("room is required")
}
// Process message
response := &ChatResponse{
MessageID: generateMessageID(),
UserID: message.UserID,
Message: message.Message,
Room: message.Room,
Timestamp: time.Now(),
}
// Store message in database
saveMessage(response)
// Broadcast to other users in the same room
broadcastToRoom(message.Room, response, conn.ClientID)
return response, nil
}, router.WithAsyncSummary("Chat WebSocket"),
router.WithAsyncDescription("Real-time chat communication"),
router.WithAsyncTags("chat", "websocket"))Connection Management
Access connection metadata and manage user sessions:
r.WebSocket("/ws/live-updates", func(conn *router.WSConnection, message SubscribeMessage) (*SubscriptionResponse, error) {
// Store user information in connection metadata
conn.SetMetadata("user_id", message.UserID)
conn.SetMetadata("subscriptions", message.Topics)
conn.SetMetadata("joined_at", time.Now())
// Access request information
userAgent := conn.Request().Header.Get("User-Agent")
clientIP := conn.Request().RemoteAddr
// Get URL parameters
roomID := conn.Param("room_id")
log.Printf("User %d connected from %s (room: %s)", message.UserID, clientIP, roomID)
return &SubscriptionResponse{
Status: "subscribed",
Topics: message.Topics,
ClientID: conn.ClientID,
ServerTime: time.Now(),
}, nil
})Broadcasting Messages
Use the connection manager to broadcast messages:
// Get the connection manager
cm := r.ConnectionManager()
// Broadcast to all WebSocket connections
cm.BroadcastWS(router.WSMessage{
Type: "system_announcement",
Payload: map[string]interface{}{
"message": "Server maintenance in 5 minutes",
"level": "warning",
},
})
// Broadcast to specific connections
func broadcastToRoom(roomID string, message interface{}, excludeClientID string) {
connections := cm.WSConnections()
for clientID, conn := range connections {
if clientID == excludeClientID {
continue // Don't send to sender
}
// Check if user is in the room
if userRoom, ok := conn.GetMetadata("room"); ok && userRoom == roomID {
conn.SendMessage(router.WSMessage{
Type: "chat_message",
Payload: message,
})
}
}
}Advanced Example: Real-time Collaboration
Real-time collaborative document editing:
type DocumentEdit struct {
DocumentID string `json:"document_id" description:"Document being edited"`
UserID int `json:"user_id" description:"User making the edit"`
Operation string `json:"operation" description:"Edit operation (insert, delete, format)"`
Position int `json:"position" description:"Character position in document"`
Content string `json:"content" description:"Content being inserted/modified"`
Metadata interface{} `json:"metadata,omitempty" description:"Additional operation metadata"`
}
type DocumentEditResponse struct {
EditID string `json:"edit_id" description:"Unique edit identifier"`
DocumentID string `json:"document_id" description:"Document ID"`
UserID int `json:"user_id" description:"User who made the edit"`
Operation string `json:"operation" description:"Applied operation"`
Position int `json:"position" description:"Final position"`
Content string `json:"content" description:"Applied content"`
Timestamp time.Time `json:"timestamp" description:"Edit timestamp"`
Version int `json:"version" description:"Document version after edit"`
}
r.WebSocket("/ws/documents/:doc_id/edit", func(conn *router.WSConnection, edit DocumentEdit) (*DocumentEditResponse, error) {
docID := conn.Param("doc_id")
// Validate user has edit permissions
if !hasEditPermission(edit.UserID, docID) {
return nil, fmt.Errorf("insufficient permissions")
}
// Apply operational transformation for concurrent edits
transformedEdit, newVersion := applyOperationalTransform(edit, docID)
// Save edit to database
editRecord := saveDocumentEdit(transformedEdit, newVersion)
// Broadcast to other collaborators
collaborators := getDocumentCollaborators(docID)
for _, collaboratorConn := range collaborators {
if collaboratorConn.ClientID != conn.ClientID {
collaboratorConn.SendMessage(router.WSMessage{
Type: "document_edit",
Payload: editRecord,
})
}
}
return &DocumentEditResponse{
EditID: editRecord.ID,
DocumentID: docID,
UserID: edit.UserID,
Operation: transformedEdit.Operation,
Position: transformedEdit.Position,
Content: transformedEdit.Content,
Timestamp: time.Now(),
Version: newVersion,
}, nil
}, router.WithAsyncSummary("Document Collaboration"),
router.WithAsyncDescription("Real-time collaborative document editing"),
router.WithAsyncTags("documents", "collaboration", "websocket"))Connection Lifecycle Management
Connection Events
Handle connection lifecycle events:
r.WebSocket("/ws/game/:room_id", func(conn *router.WSConnection, message GameMessage) (*GameResponse, error) {
roomID := conn.Param("room_id")
// Handle different message types
switch message.Type {
case "join":
return handlePlayerJoin(conn, roomID, message)
case "move":
return handlePlayerMove(conn, roomID, message)
case "leave":
return handlePlayerLeave(conn, roomID, message)
default:
return nil, fmt.Errorf("unknown message type: %s", message.Type)
}
})
func handlePlayerJoin(conn *router.WSConnection, roomID string, message GameMessage) (*GameResponse, error) {
// Add player to room
addPlayerToRoom(message.PlayerID, roomID)
// Store connection metadata
conn.SetMetadata("room_id", roomID)
conn.SetMetadata("player_id", message.PlayerID)
// Notify other players
broadcastToRoom(roomID, GameResponse{
Type: "player_joined",
PlayerID: message.PlayerID,
Message: fmt.Sprintf("Player %s joined the game", message.PlayerName),
}, conn.ClientID)
return &GameResponse{
Type: "join_success",
PlayerID: message.PlayerID,
RoomID: roomID,
Message: "Successfully joined the game",
}, nil
}Connection Cleanup
Implement proper cleanup when connections close:
// Use a cleanup function when connections are removed
func setupConnectionCleanup() {
cm := r.ConnectionManager()
// Monitor for closed connections (you'd implement this based on your needs)
go func() {
for {
time.Sleep(10 * time.Second)
// Check WebSocket connections
for clientID, conn := range cm.WSConnections() {
if conn.IsClosed() {
// Clean up resources
if roomID, ok := conn.GetMetadata("room_id"); ok {
removePlayerFromRoom(conn.GetMetadata("player_id"), roomID.(string))
}
cm.RemoveWSConnection(clientID)
}
}
}
}()
}Security & Authentication
Authentication
Implement authentication for WebSocket connections:
// WebSocket authentication middleware
func authWebSocketMiddleware(next router.AsyncHandlerOption) router.AsyncHandlerOption {
return func(info interface{}) {
// Apply to WebSocket handler info
if wsInfo, ok := info.(*router.WSHandlerInfo); ok {
// Store original handler
originalHandler := wsInfo.Handler
// Wrap with authentication
wsInfo.Handler = func(conn *router.WSConnection, message interface{}) (interface{}, error) {
// Check authentication token from query params or headers
token := conn.Request().URL.Query().Get("token")
if token == "" {
token = conn.Request().Header.Get("Authorization")
}
if !isValidToken(token) {
return nil, fmt.Errorf("invalid authentication token")
}
// Call original handler
return originalHandler.(func(*router.WSConnection, interface{}) (interface{}, error))(conn, message)
}
}
next.ApplyToWS(info.(*router.WSHandlerInfo))
}
}
// Use authentication
r.WebSocket("/ws/secure-chat", chatHandler,
authWebSocketMiddleware,
router.WithAsyncSummary("Authenticated Chat"))Rate Limiting
Implement rate limiting for WebSocket connections:
type RateLimiter struct {
connections map[string]*rate.Limiter
mu sync.RWMutex
}
func (rl *RateLimiter) Allow(clientID string) bool {
rl.mu.RLock()
limiter, exists := rl.connections[clientID]
rl.mu.RUnlock()
if !exists {
rl.mu.Lock()
limiter = rate.NewLimiter(rate.Every(time.Second), 10) // 10 messages per second
rl.connections[clientID] = limiter
rl.mu.Unlock()
}
return limiter.Allow()
}
var rateLimiter = &RateLimiter{
connections: make(map[string]*rate.Limiter),
}
r.WebSocket("/ws/chat", func(conn *router.WSConnection, message ChatMessage) (*ChatResponse, error) {
// Check rate limit
if !rateLimiter.Allow(conn.ClientID) {
return nil, fmt.Errorf("rate limit exceeded")
}
// Process message...
return handleChatMessage(message)
})Best Practices
Performance Tip: Use connection pooling and efficient message serialization for high-traffic real-time applications.
1. Message Validation
Always validate incoming messages:
func validateChatMessage(message ChatMessage) error {
if message.UserID <= 0 {
return fmt.Errorf("invalid user ID")
}
if len(message.Message) == 0 {
return fmt.Errorf("message cannot be empty")
}
if len(message.Message) > 1000 {
return fmt.Errorf("message too long (max 1000 characters)")
}
if message.Room == "" {
return fmt.Errorf("room is required")
}
return nil
}2. Error Handling
Implement comprehensive error handling:
r.WebSocket("/ws/chat", func(conn *router.WSConnection, message ChatMessage) (*ChatResponse, error) {
// Validate message
if err := validateChatMessage(message); err != nil {
// Send error back to client
conn.SendMessage(router.WSMessage{
Type: "error",
Error: &router.WSError{
Code: "VALIDATION_ERROR",
Message: err.Error(),
Details: map[string]interface{}{
"field": "message",
"value": message.Message,
},
},
})
return nil, err
}
// Process message...
})3. Connection Monitoring
Monitor connection health:
func monitorConnections(cm *router.ConnectionManager) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
wsCount := len(cm.WSConnections())
log.Printf("Active WebSocket connections: %d", wsCount)
// Send heartbeat to all connections
cm.BroadcastWS(router.WSMessage{
Type: "heartbeat",
Payload: map[string]interface{}{
"timestamp": time.Now(),
"server_id": os.Getenv("SERVER_ID"),
},
})
}
}4. Graceful Shutdown
Implement graceful connection closure:
func gracefulShutdown(cm *router.ConnectionManager) {
// Notify all connections of impending shutdown
shutdownMessage := router.WSMessage{
Type: "server_shutdown",
Payload: map[string]interface{}{
"message": "Server is shutting down",
"reason": "maintenance",
"when": time.Now().Add(30 * time.Second),
},
}
cm.BroadcastWS(shutdownMessage)
// Wait for connections to close gracefully
time.Sleep(30 * time.Second)
// Force close remaining connections
for clientID := range cm.WSConnections() {
cm.RemoveWSConnection(clientID)
}
}WebSocket support in Steel provides a robust foundation for building real-time interactive applications while maintaining the same high-quality documentation and type safety as your REST APIs.