package main import ( "fmt" "net/http" "sync" ) type SSEBroker struct { clients map[chan string]struct{} lock sync.RWMutex } func NewSSEBroker() *SSEBroker { return &SSEBroker{ clients: make(map[chan string]struct{}), } } func (b *SSEBroker) AddClient(c chan string) { b.lock.Lock() defer b.lock.Unlock() b.clients[c] = struct{}{} } func (b *SSEBroker) RemoveClient(c chan string) { b.lock.Lock() defer b.lock.Unlock() delete(b.clients, c) } func (b *SSEBroker) Broadcast(msg string) { b.lock.Lock() defer b.lock.Unlock() for c := range b.clients { c <- msg } } func (b *SSEBroker) Handler(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming not supported!", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") messageChannel := make(chan string) b.AddClient(messageChannel) defer b.RemoveClient(messageChannel) for { msg, open := <-messageChannel if !open { return } fmt.Fprintf(w, "data: %s\n\n", msg) flusher.Flush() } }