63 lines
1.1 KiB
Go
63 lines
1.1 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
)
|
|
|
|
type SSEBroker struct {
|
|
clients map[chan string]struct{}
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
func NewSSEBroker() *SSEBroker {
|
|
return &SSEBroker{
|
|
clients: make(map[chan string]struct{}),
|
|
}
|
|
}
|
|
|
|
func (b *SSEBroker) AddClient(c chan string) {
|
|
b.lock.Lock()
|
|
defer b.lock.Unlock()
|
|
b.clients[c] = struct{}{}
|
|
}
|
|
|
|
func (b *SSEBroker) RemoveClient(c chan string) {
|
|
b.lock.Lock()
|
|
defer b.lock.Unlock()
|
|
delete(b.clients, c)
|
|
}
|
|
|
|
func (b *SSEBroker) Broadcast(msg string) {
|
|
b.lock.Lock()
|
|
defer b.lock.Unlock()
|
|
for c := range b.clients {
|
|
c <- msg
|
|
}
|
|
}
|
|
|
|
func (b *SSEBroker) Handler(w http.ResponseWriter, r *http.Request) {
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "Streaming not supported!", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
|
|
messageChannel := make(chan string)
|
|
|
|
b.AddClient(messageChannel)
|
|
defer b.RemoveClient(messageChannel)
|
|
for {
|
|
msg, open := <-messageChannel
|
|
if !open {
|
|
return
|
|
}
|
|
fmt.Fprintf(w, "data: %s\n\n", msg)
|
|
flusher.Flush()
|
|
}
|
|
}
|