63 lines
1.1 KiB
Go

package main
import (
"fmt"
"net/http"
"sync"
)
type SSEBroker struct {
clients map[chan string]struct{}
lock sync.RWMutex
}
func NewSSEBroker() *SSEBroker {
return &SSEBroker{
clients: make(map[chan string]struct{}),
}
}
func (b *SSEBroker) AddClient(c chan string) {
b.lock.Lock()
defer b.lock.Unlock()
b.clients[c] = struct{}{}
}
func (b *SSEBroker) RemoveClient(c chan string) {
b.lock.Lock()
defer b.lock.Unlock()
delete(b.clients, c)
}
func (b *SSEBroker) Broadcast(msg string) {
b.lock.Lock()
defer b.lock.Unlock()
for c := range b.clients {
c <- msg
}
}
func (b *SSEBroker) Handler(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported!", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
messageChannel := make(chan string)
b.AddClient(messageChannel)
defer b.RemoveClient(messageChannel)
for {
msg, open := <-messageChannel
if !open {
return
}
fmt.Fprintf(w, "data: %s\n\n", msg)
flusher.Flush()
}
}