Skip to Content
Steel is in alpha 🎉
Streaming (WS & SSE)Server-Sent Events (SSE)

Server-Sent Events (SSE)

Steel provides built-in support for Server-Sent Events (SSE) with automatic AsyncAPI documentation generation. SSE provides one-way real-time communication from server to client, ideal for live feeds, notifications, and streaming updates.

Basic SSE Handler

type NotificationParams struct { UserID int `path:"user_id" description:"User ID to receive notifications"` } r.SSE("/sse/notifications/:user_id", func(conn *router.SSEConnection, params NotificationParams) error { // Store user information conn.SetMetadata("user_id", params.UserID) conn.SetMetadata("connected_at", time.Now()) // Send welcome message conn.SendMessage(router.SSEMessage{ ID: generateMessageID(), Event: "connected", Data: map[string]interface{}{ "message": "Connected to notification stream", "user_id": params.UserID, "time": time.Now(), }, }) // Set up notification listener go listenForUserNotifications(params.UserID, conn) // Keep connection alive for !conn.IsClosed() { time.Sleep(30 * time.Second) // Send heartbeat conn.SendMessage(router.SSEMessage{ Event: "heartbeat", Data: map[string]interface{}{"time": time.Now()}, }) } return nil }, router.WithAsyncSummary("User Notifications"), router.WithAsyncDescription("Real-time notification stream for authenticated users"), router.WithAsyncTags("notifications", "sse"))

Live Data Streaming

Stream real-time data updates:

type LiveDataParams struct { StreamType string `query:"type" description:"Type of data stream (metrics, logs, events)"` Interval int `query:"interval" description:"Update interval in seconds (default: 1)"` } r.SSE("/sse/live-data", func(conn *router.SSEConnection, params LiveDataParams) error { if params.Interval == 0 { params.Interval = 1 // Default to 1 second } ticker := time.NewTicker(time.Duration(params.Interval) * time.Second) defer ticker.Stop() for { select { case <-ticker.C: var data interface{} switch params.StreamType { case "metrics": data = getCurrentMetrics() case "logs": data = getRecentLogs() case "events": data = getRecentEvents() default: data = map[string]interface{}{ "error": "Unknown stream type", "type": params.StreamType, } } err := conn.SendMessage(router.SSEMessage{ ID: fmt.Sprintf("%d", time.Now().Unix()), Event: params.StreamType, Data: data, }) if err != nil { return err // Connection closed } case <-time.After(5 * time.Minute): // Timeout after 5 minutes of inactivity return fmt.Errorf("connection timeout") } } }, router.WithAsyncSummary("Live Data Stream"), router.WithAsyncDescription("Stream real-time application data"), router.WithAsyncTags("streaming", "data", "sse"))

SSE Message Format

SSE messages follow the standard format with optional fields:

type SSEMessage struct { ID string `json:"id,omitempty"` // Message ID for client tracking Event string `json:"event,omitempty"` // Event type Data interface{} `json:"data"` // Message payload Retry int `json:"retry,omitempty"` // Retry interval in milliseconds } // Example usage conn.SendMessage(router.SSEMessage{ ID: "msg-123", Event: "user_update", Data: map[string]interface{}{ "user_id": 456, "name": "John Doe", "status": "online", }, Retry: 3000, // Retry after 3 seconds if connection drops })

Broadcasting to SSE Connections

Use the connection manager to broadcast to multiple clients:

// Get the connection manager cm := r.ConnectionManager() // Broadcast to all SSE connections cm.BroadcastSSE(router.SSEMessage{ Event: "system_update", Data: map[string]interface{}{ "message": "New feature deployed", "version": "1.2.3", "time": time.Now(), }, }) // Broadcast to specific users func notifyUsers(userIDs []int, event string, data interface{}) { connections := cm.SSEConnections() for _, conn := range connections { if userIDMeta, ok := conn.GetMetadata("user_id"); ok { if userID, ok := userIDMeta.(int); ok { for _, targetUserID := range userIDs { if userID == targetUserID { conn.SendMessage(router.SSEMessage{ Event: event, Data: data, }) break } } } } } }

Advanced Examples

Progress Tracking

Stream progress updates for long-running operations:

type ProgressParams struct { TaskID string `path:"task_id" description:"Task ID to track"` } r.SSE("/sse/progress/:task_id", func(conn *router.SSEConnection, params ProgressParams) error { taskID := params.TaskID // Validate task exists and user has access task, err := getTask(taskID) if err != nil { return fmt.Errorf("task not found: %s", taskID) } // Store task information in connection metadata conn.SetMetadata("task_id", taskID) conn.SetMetadata("user_id", task.UserID) // Send initial status conn.SendMessage(router.SSEMessage{ Event: "progress_start", Data: map[string]interface{}{ "task_id": taskID, "status": task.Status, "progress": task.Progress, "started_at": task.StartedAt, }, }) // Monitor task progress for !conn.IsClosed() { // Get current task status currentTask, err := getTask(taskID) if err != nil { conn.SendMessage(router.SSEMessage{ Event: "error", Data: map[string]interface{}{ "message": "Failed to get task status", "error": err.Error(), }, }) return err } // Send progress update conn.SendMessage(router.SSEMessage{ Event: "progress_update", Data: map[string]interface{}{ "task_id": taskID, "status": currentTask.Status, "progress": currentTask.Progress, "message": currentTask.StatusMessage, "updated_at": time.Now(), }, }) // Check if task is complete if currentTask.Status == "completed" || currentTask.Status == "failed" { conn.SendMessage(router.SSEMessage{ Event: "progress_complete", Data: map[string]interface{}{ "task_id": taskID, "status": currentTask.Status, "result": currentTask.Result, "completed_at": currentTask.CompletedAt, }, }) return nil // Close connection } // Wait before next update time.Sleep(1 * time.Second) } return nil }, router.WithAsyncSummary("Task Progress"), router.WithAsyncDescription("Real-time progress updates for long-running tasks"), router.WithAsyncTags("tasks", "progress", "sse"))

Log Streaming

Stream application logs in real-time:

type LogStreamParams struct { Level string `query:"level" description:"Minimum log level (debug, info, warn, error)"` Service string `query:"service" description:"Filter by service name"` Follow bool `query:"follow" description:"Continue streaming new logs"` } r.SSE("/sse/logs", func(conn *router.SSEConnection, params LogStreamParams) error { // Set default log level if params.Level == "" { params.Level = "info" } // Store filtering criteria conn.SetMetadata("log_level", params.Level) conn.SetMetadata("service_filter", params.Service) // Send recent logs recentLogs := getRecentLogs(params.Level, params.Service, 50) for _, logEntry := range recentLogs { conn.SendMessage(router.SSEMessage{ Event: "log_entry", Data: logEntry, }) } if !params.Follow { return nil // Don't continue streaming } // Subscribe to new logs logChannel := subscribeToLogs(params.Level, params.Service) defer unsubscribeFromLogs(logChannel) for !conn.IsClosed() { select { case logEntry := <-logChannel: err := conn.SendMessage(router.SSEMessage{ Event: "log_entry", Data: logEntry, }) if err != nil { return err } case <-time.After(30 * time.Second): // Send keepalive conn.SendMessage(router.SSEMessage{ Event: "keepalive", Data: map[string]interface{}{"time": time.Now()}, }) } } return nil }, router.WithAsyncSummary("Log Stream"), router.WithAsyncDescription("Stream application logs in real-time"), router.WithAsyncTags("logs", "monitoring", "sse"))

Connection Management

Connection Metadata

Store and retrieve connection-specific data:

r.SSE("/sse/dashboard/:user_id", func(conn *router.SSEConnection, params struct { UserID int `path:"user_id"` }) error { // Store user preferences userPrefs := getUserPreferences(params.UserID) conn.SetMetadata("user_id", params.UserID) conn.SetMetadata("preferences", userPrefs) conn.SetMetadata("dashboards", userPrefs.DashboardIDs) // Send personalized data based on preferences for _, dashboardID := range userPrefs.DashboardIDs { dashboardData := getDashboardData(dashboardID) conn.SendMessage(router.SSEMessage{ Event: "dashboard_data", Data: map[string]interface{}{ "dashboard_id": dashboardID, "data": dashboardData, }, }) } // Continue streaming updates... return streamDashboardUpdates(conn) })

Connection Monitoring

Monitor SSE connection health:

func monitorSSEConnections(cm *router.ConnectionManager) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for range ticker.C { sseCount := len(cm.SSEConnections()) log.Printf("Active SSE connections: %d", sseCount) // Send heartbeat to all connections cm.BroadcastSSE(router.SSEMessage{ Event: "heartbeat", Data: map[string]interface{}{ "timestamp": time.Now(), "server_id": os.Getenv("SERVER_ID"), }, }) // Check for stale connections for clientID, conn := range cm.SSEConnections() { if connectedAt, ok := conn.GetMetadata("connected_at"); ok { if time.Since(connectedAt.(time.Time)) > 10*time.Minute { log.Printf("Closing stale SSE connection: %s", clientID) conn.Close() cm.RemoveSSEConnection(clientID) } } } } }

Security & Authentication

Authentication Middleware

Secure SSE endpoints with authentication:

func authSSEMiddleware(handler router.SSEHandler) router.SSEHandler { return func(conn *router.SSEConnection, params interface{}) error { // Check authentication from query parameters or headers token := conn.Request().URL.Query().Get("token") if token == "" { token = conn.Request().Header.Get("Authorization") } user, err := validateToken(token) if err != nil { return fmt.Errorf("authentication failed: %v", err) } // Store authenticated user info conn.SetMetadata("authenticated_user", user) conn.SetMetadata("user_id", user.ID) // Call original handler return handler(conn, params) } } // Apply authentication to SSE handlers r.SSE("/sse/secure/notifications/:user_id", authSSEMiddleware(notificationHandler), router.WithAsyncSummary("Secure Notifications"))

Access Control

Implement fine-grained access control:

r.SSE("/sse/admin/system-logs", func(conn *router.SSEConnection, params struct{}) error { // Check if user has admin privileges user, ok := conn.GetMetadata("authenticated_user") if !ok { return fmt.Errorf("authentication required") } if !user.(*User).IsAdmin { return fmt.Errorf("admin privileges required") } // Stream admin-only data... return streamSystemLogs(conn) }, router.WithAsyncSummary("Admin System Logs"), router.WithAsyncDescription("Stream system logs (admin only)"), router.WithAsyncTags("admin", "logs", "sse"))

Best Practices

Efficiency Tip: Use appropriate retry intervals and implement connection timeouts to prevent resource exhaustion.

1. Connection Timeouts

Implement proper timeout handling:

r.SSE("/sse/data", func(conn *router.SSEConnection, params DataParams) error { timeout := time.After(10 * time.Minute) // 10-minute timeout ticker := time.NewTicker(time.Duration(params.Interval) * time.Second) defer ticker.Stop() for { select { case <-ticker.C: // Send data update err := conn.SendMessage(router.SSEMessage{ Event: "data_update", Data: getCurrentData(), }) if err != nil { return err } case <-timeout: // Send timeout notification and close conn.SendMessage(router.SSEMessage{ Event: "timeout", Data: map[string]interface{}{"reason": "connection timeout"}, }) return fmt.Errorf("connection timeout") case <-conn.Request().Context().Done(): // Client disconnected return nil } } })

2. Error Handling

Implement comprehensive error handling:

r.SSE("/sse/stream", func(conn *router.SSEConnection, params StreamParams) error { defer func() { if r := recover(); r != nil { conn.SendMessage(router.SSEMessage{ Event: "error", Data: map[string]interface{}{ "message": "Internal server error", "code": "INTERNAL_ERROR", }, }) } }() for !conn.IsClosed() { data, err := getStreamData() if err != nil { // Send error to client but continue streaming conn.SendMessage(router.SSEMessage{ Event: "error", Data: map[string]interface{}{ "message": "Failed to get data", "error": err.Error(), "retry": true, }, }) time.Sleep(5 * time.Second) // Wait before retrying continue } conn.SendMessage(router.SSEMessage{ Event: "data", Data: data, }) time.Sleep(1 * time.Second) } return nil })

3. Resource Cleanup

Ensure proper resource cleanup:

func setupSSECleanup(cm *router.ConnectionManager) { go func() { for { time.Sleep(1 * time.Minute) for clientID, conn := range cm.SSEConnections() { if conn.IsClosed() { // Clean up resources associated with this connection if userID, ok := conn.GetMetadata("user_id"); ok { cleanupUserResources(userID.(int)) } if subscriptions, ok := conn.GetMetadata("subscriptions"); ok { unsubscribeFromAll(subscriptions.([]string)) } cm.RemoveSSEConnection(clientID) } } } }() }

4. Client Reconnection

Handle client reconnection gracefully:

r.SSE("/sse/events", func(conn *router.SSEConnection, params struct { LastEventID string `header:"Last-Event-ID"` // Standard SSE header }) error { // Send missed events if client is reconnecting if params.LastEventID != "" { missedEvents := getEventsSince(params.LastEventID) for _, event := range missedEvents { conn.SendMessage(router.SSEMessage{ ID: event.ID, Event: event.Type, Data: event.Data, }) } } // Continue with normal event streaming return streamEvents(conn) })

Server-Sent Events in Steel provide an efficient way to stream real-time data to clients while maintaining excellent performance and comprehensive documentation through AsyncAPI generation.

Last updated on