Server-Sent Events (SSE)
Steel provides built-in support for Server-Sent Events (SSE) with automatic AsyncAPI documentation generation. SSE provides one-way real-time communication from server to client, ideal for live feeds, notifications, and streaming updates.
Basic SSE Handler
type NotificationParams struct {
UserID int `path:"user_id" description:"User ID to receive notifications"`
}
r.SSE("/sse/notifications/:user_id", func(conn *router.SSEConnection, params NotificationParams) error {
// Store user information
conn.SetMetadata("user_id", params.UserID)
conn.SetMetadata("connected_at", time.Now())
// Send welcome message
conn.SendMessage(router.SSEMessage{
ID: generateMessageID(),
Event: "connected",
Data: map[string]interface{}{
"message": "Connected to notification stream",
"user_id": params.UserID,
"time": time.Now(),
},
})
// Set up notification listener
go listenForUserNotifications(params.UserID, conn)
// Keep connection alive
for !conn.IsClosed() {
time.Sleep(30 * time.Second)
// Send heartbeat
conn.SendMessage(router.SSEMessage{
Event: "heartbeat",
Data: map[string]interface{}{"time": time.Now()},
})
}
return nil
}, router.WithAsyncSummary("User Notifications"),
router.WithAsyncDescription("Real-time notification stream for authenticated users"),
router.WithAsyncTags("notifications", "sse"))Live Data Streaming
Stream real-time data updates:
type LiveDataParams struct {
StreamType string `query:"type" description:"Type of data stream (metrics, logs, events)"`
Interval int `query:"interval" description:"Update interval in seconds (default: 1)"`
}
r.SSE("/sse/live-data", func(conn *router.SSEConnection, params LiveDataParams) error {
if params.Interval == 0 {
params.Interval = 1 // Default to 1 second
}
ticker := time.NewTicker(time.Duration(params.Interval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
var data interface{}
switch params.StreamType {
case "metrics":
data = getCurrentMetrics()
case "logs":
data = getRecentLogs()
case "events":
data = getRecentEvents()
default:
data = map[string]interface{}{
"error": "Unknown stream type",
"type": params.StreamType,
}
}
err := conn.SendMessage(router.SSEMessage{
ID: fmt.Sprintf("%d", time.Now().Unix()),
Event: params.StreamType,
Data: data,
})
if err != nil {
return err // Connection closed
}
case <-time.After(5 * time.Minute):
// Timeout after 5 minutes of inactivity
return fmt.Errorf("connection timeout")
}
}
}, router.WithAsyncSummary("Live Data Stream"),
router.WithAsyncDescription("Stream real-time application data"),
router.WithAsyncTags("streaming", "data", "sse"))SSE Message Format
SSE messages follow the standard format with optional fields:
type SSEMessage struct {
ID string `json:"id,omitempty"` // Message ID for client tracking
Event string `json:"event,omitempty"` // Event type
Data interface{} `json:"data"` // Message payload
Retry int `json:"retry,omitempty"` // Retry interval in milliseconds
}
// Example usage
conn.SendMessage(router.SSEMessage{
ID: "msg-123",
Event: "user_update",
Data: map[string]interface{}{
"user_id": 456,
"name": "John Doe",
"status": "online",
},
Retry: 3000, // Retry after 3 seconds if connection drops
})Broadcasting to SSE Connections
Use the connection manager to broadcast to multiple clients:
// Get the connection manager
cm := r.ConnectionManager()
// Broadcast to all SSE connections
cm.BroadcastSSE(router.SSEMessage{
Event: "system_update",
Data: map[string]interface{}{
"message": "New feature deployed",
"version": "1.2.3",
"time": time.Now(),
},
})
// Broadcast to specific users
func notifyUsers(userIDs []int, event string, data interface{}) {
connections := cm.SSEConnections()
for _, conn := range connections {
if userIDMeta, ok := conn.GetMetadata("user_id"); ok {
if userID, ok := userIDMeta.(int); ok {
for _, targetUserID := range userIDs {
if userID == targetUserID {
conn.SendMessage(router.SSEMessage{
Event: event,
Data: data,
})
break
}
}
}
}
}
}Advanced Examples
Progress Tracking
Stream progress updates for long-running operations:
type ProgressParams struct {
TaskID string `path:"task_id" description:"Task ID to track"`
}
r.SSE("/sse/progress/:task_id", func(conn *router.SSEConnection, params ProgressParams) error {
taskID := params.TaskID
// Validate task exists and user has access
task, err := getTask(taskID)
if err != nil {
return fmt.Errorf("task not found: %s", taskID)
}
// Store task information in connection metadata
conn.SetMetadata("task_id", taskID)
conn.SetMetadata("user_id", task.UserID)
// Send initial status
conn.SendMessage(router.SSEMessage{
Event: "progress_start",
Data: map[string]interface{}{
"task_id": taskID,
"status": task.Status,
"progress": task.Progress,
"started_at": task.StartedAt,
},
})
// Monitor task progress
for !conn.IsClosed() {
// Get current task status
currentTask, err := getTask(taskID)
if err != nil {
conn.SendMessage(router.SSEMessage{
Event: "error",
Data: map[string]interface{}{
"message": "Failed to get task status",
"error": err.Error(),
},
})
return err
}
// Send progress update
conn.SendMessage(router.SSEMessage{
Event: "progress_update",
Data: map[string]interface{}{
"task_id": taskID,
"status": currentTask.Status,
"progress": currentTask.Progress,
"message": currentTask.StatusMessage,
"updated_at": time.Now(),
},
})
// Check if task is complete
if currentTask.Status == "completed" || currentTask.Status == "failed" {
conn.SendMessage(router.SSEMessage{
Event: "progress_complete",
Data: map[string]interface{}{
"task_id": taskID,
"status": currentTask.Status,
"result": currentTask.Result,
"completed_at": currentTask.CompletedAt,
},
})
return nil // Close connection
}
// Wait before next update
time.Sleep(1 * time.Second)
}
return nil
}, router.WithAsyncSummary("Task Progress"),
router.WithAsyncDescription("Real-time progress updates for long-running tasks"),
router.WithAsyncTags("tasks", "progress", "sse"))Log Streaming
Stream application logs in real-time:
type LogStreamParams struct {
Level string `query:"level" description:"Minimum log level (debug, info, warn, error)"`
Service string `query:"service" description:"Filter by service name"`
Follow bool `query:"follow" description:"Continue streaming new logs"`
}
r.SSE("/sse/logs", func(conn *router.SSEConnection, params LogStreamParams) error {
// Set default log level
if params.Level == "" {
params.Level = "info"
}
// Store filtering criteria
conn.SetMetadata("log_level", params.Level)
conn.SetMetadata("service_filter", params.Service)
// Send recent logs
recentLogs := getRecentLogs(params.Level, params.Service, 50)
for _, logEntry := range recentLogs {
conn.SendMessage(router.SSEMessage{
Event: "log_entry",
Data: logEntry,
})
}
if !params.Follow {
return nil // Don't continue streaming
}
// Subscribe to new logs
logChannel := subscribeToLogs(params.Level, params.Service)
defer unsubscribeFromLogs(logChannel)
for !conn.IsClosed() {
select {
case logEntry := <-logChannel:
err := conn.SendMessage(router.SSEMessage{
Event: "log_entry",
Data: logEntry,
})
if err != nil {
return err
}
case <-time.After(30 * time.Second):
// Send keepalive
conn.SendMessage(router.SSEMessage{
Event: "keepalive",
Data: map[string]interface{}{"time": time.Now()},
})
}
}
return nil
}, router.WithAsyncSummary("Log Stream"),
router.WithAsyncDescription("Stream application logs in real-time"),
router.WithAsyncTags("logs", "monitoring", "sse"))Connection Management
Connection Metadata
Store and retrieve connection-specific data:
r.SSE("/sse/dashboard/:user_id", func(conn *router.SSEConnection, params struct {
UserID int `path:"user_id"`
}) error {
// Store user preferences
userPrefs := getUserPreferences(params.UserID)
conn.SetMetadata("user_id", params.UserID)
conn.SetMetadata("preferences", userPrefs)
conn.SetMetadata("dashboards", userPrefs.DashboardIDs)
// Send personalized data based on preferences
for _, dashboardID := range userPrefs.DashboardIDs {
dashboardData := getDashboardData(dashboardID)
conn.SendMessage(router.SSEMessage{
Event: "dashboard_data",
Data: map[string]interface{}{
"dashboard_id": dashboardID,
"data": dashboardData,
},
})
}
// Continue streaming updates...
return streamDashboardUpdates(conn)
})Connection Monitoring
Monitor SSE connection health:
func monitorSSEConnections(cm *router.ConnectionManager) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
sseCount := len(cm.SSEConnections())
log.Printf("Active SSE connections: %d", sseCount)
// Send heartbeat to all connections
cm.BroadcastSSE(router.SSEMessage{
Event: "heartbeat",
Data: map[string]interface{}{
"timestamp": time.Now(),
"server_id": os.Getenv("SERVER_ID"),
},
})
// Check for stale connections
for clientID, conn := range cm.SSEConnections() {
if connectedAt, ok := conn.GetMetadata("connected_at"); ok {
if time.Since(connectedAt.(time.Time)) > 10*time.Minute {
log.Printf("Closing stale SSE connection: %s", clientID)
conn.Close()
cm.RemoveSSEConnection(clientID)
}
}
}
}
}Security & Authentication
Authentication Middleware
Secure SSE endpoints with authentication:
func authSSEMiddleware(handler router.SSEHandler) router.SSEHandler {
return func(conn *router.SSEConnection, params interface{}) error {
// Check authentication from query parameters or headers
token := conn.Request().URL.Query().Get("token")
if token == "" {
token = conn.Request().Header.Get("Authorization")
}
user, err := validateToken(token)
if err != nil {
return fmt.Errorf("authentication failed: %v", err)
}
// Store authenticated user info
conn.SetMetadata("authenticated_user", user)
conn.SetMetadata("user_id", user.ID)
// Call original handler
return handler(conn, params)
}
}
// Apply authentication to SSE handlers
r.SSE("/sse/secure/notifications/:user_id",
authSSEMiddleware(notificationHandler),
router.WithAsyncSummary("Secure Notifications"))Access Control
Implement fine-grained access control:
r.SSE("/sse/admin/system-logs", func(conn *router.SSEConnection, params struct{}) error {
// Check if user has admin privileges
user, ok := conn.GetMetadata("authenticated_user")
if !ok {
return fmt.Errorf("authentication required")
}
if !user.(*User).IsAdmin {
return fmt.Errorf("admin privileges required")
}
// Stream admin-only data...
return streamSystemLogs(conn)
}, router.WithAsyncSummary("Admin System Logs"),
router.WithAsyncDescription("Stream system logs (admin only)"),
router.WithAsyncTags("admin", "logs", "sse"))Best Practices
Efficiency Tip: Use appropriate retry intervals and implement connection timeouts to prevent resource exhaustion.
1. Connection Timeouts
Implement proper timeout handling:
r.SSE("/sse/data", func(conn *router.SSEConnection, params DataParams) error {
timeout := time.After(10 * time.Minute) // 10-minute timeout
ticker := time.NewTicker(time.Duration(params.Interval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Send data update
err := conn.SendMessage(router.SSEMessage{
Event: "data_update",
Data: getCurrentData(),
})
if err != nil {
return err
}
case <-timeout:
// Send timeout notification and close
conn.SendMessage(router.SSEMessage{
Event: "timeout",
Data: map[string]interface{}{"reason": "connection timeout"},
})
return fmt.Errorf("connection timeout")
case <-conn.Request().Context().Done():
// Client disconnected
return nil
}
}
})2. Error Handling
Implement comprehensive error handling:
r.SSE("/sse/stream", func(conn *router.SSEConnection, params StreamParams) error {
defer func() {
if r := recover(); r != nil {
conn.SendMessage(router.SSEMessage{
Event: "error",
Data: map[string]interface{}{
"message": "Internal server error",
"code": "INTERNAL_ERROR",
},
})
}
}()
for !conn.IsClosed() {
data, err := getStreamData()
if err != nil {
// Send error to client but continue streaming
conn.SendMessage(router.SSEMessage{
Event: "error",
Data: map[string]interface{}{
"message": "Failed to get data",
"error": err.Error(),
"retry": true,
},
})
time.Sleep(5 * time.Second) // Wait before retrying
continue
}
conn.SendMessage(router.SSEMessage{
Event: "data",
Data: data,
})
time.Sleep(1 * time.Second)
}
return nil
})3. Resource Cleanup
Ensure proper resource cleanup:
func setupSSECleanup(cm *router.ConnectionManager) {
go func() {
for {
time.Sleep(1 * time.Minute)
for clientID, conn := range cm.SSEConnections() {
if conn.IsClosed() {
// Clean up resources associated with this connection
if userID, ok := conn.GetMetadata("user_id"); ok {
cleanupUserResources(userID.(int))
}
if subscriptions, ok := conn.GetMetadata("subscriptions"); ok {
unsubscribeFromAll(subscriptions.([]string))
}
cm.RemoveSSEConnection(clientID)
}
}
}
}()
}4. Client Reconnection
Handle client reconnection gracefully:
r.SSE("/sse/events", func(conn *router.SSEConnection, params struct {
LastEventID string `header:"Last-Event-ID"` // Standard SSE header
}) error {
// Send missed events if client is reconnecting
if params.LastEventID != "" {
missedEvents := getEventsSince(params.LastEventID)
for _, event := range missedEvents {
conn.SendMessage(router.SSEMessage{
ID: event.ID,
Event: event.Type,
Data: event.Data,
})
}
}
// Continue with normal event streaming
return streamEvents(conn)
})Server-Sent Events in Steel provide an efficient way to stream real-time data to clients while maintaining excellent performance and comprehensive documentation through AsyncAPI generation.