2025-07-22 00:34:04 -05:00

283 lines
6.2 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"strings"
"sync"
"syscall"
"time"
"github.com/grafov/m3u8"
)
// ==== CONFIG ====
const (
MasterURL = "https://d17cyqyz9yhmep.cloudfront.net/streams/234951/playlist_vo_1752978025523_1752978954944.m3u8" // Replace with your .m3u8
WorkerCount = 4
RefreshDelay = 3 * time.Second
HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
REFERRER = "https://www.flomarching.com"
OutputDirPath = "./data"
)
// ==== TYPES ====
type SegmentJob struct {
URI string
BaseURL *url.URL
Seq uint64
}
func (j SegmentJob) AbsoluteURL() string {
rel, _ := url.Parse(j.URI)
return j.BaseURL.ResolveReference(rel).String()
}
type httpError struct {
code int
}
func (e *httpError) Error() string { return fmt.Sprintf("http %d", e.code) }
// ==== MAIN ====
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle Ctrl+C
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("Shutting down...")
cancel()
}()
jobs := make(chan SegmentJob, 10) // smaller buffer to avoid stale tokens
var seen sync.Map
// Start playlist refresher
go playlistRefresher(ctx, MasterURL, jobs, &seen, RefreshDelay)
// Start download workers
var wg sync.WaitGroup
for i := 0; i < WorkerCount; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
numErrors, errorIDs := segmentDownloader(ctx, id, jobs)
if numErrors > 0 {
log.Printf("Worker %d: %d errors: %v", id, numErrors, errorIDs)
}
}(i)
}
wg.Wait()
log.Println("All workers finished.")
}
// ==== PLAYLIST LOGIC ====
func playlistRefresher(ctx context.Context, masterURL string, jobs chan<- SegmentJob, seen *sync.Map, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
variantURL, err := chooseVariant(masterURL)
if err != nil {
log.Printf("Variant selection failed, using master as media: %v", err)
variantURL = masterURL
}
baseVariant, _ := url.Parse(variantURL)
for {
select {
case <-ctx.Done():
close(jobs)
return
default:
}
media, err := loadMediaPlaylist(variantURL)
seq := media.SeqNo
if err != nil {
log.Printf("Error loading media playlist: %v", err)
goto waitTick
}
for _, seg := range media.Segments {
if seg == nil {
continue
}
key := fmt.Sprintf("%d:%s", seq, seg.URI)
if _, loaded := seen.LoadOrStore(key, struct{}{}); !loaded {
jobs <- SegmentJob{URI: seg.URI, BaseURL: baseVariant, Seq: seq}
}
seq++
}
if media.Closed {
log.Println("Playlist closed (#EXT-X-ENDLIST); closing jobs.")
close(jobs)
return
}
waitTick:
select {
case <-ctx.Done():
close(jobs)
return
case <-ticker.C:
}
}
}
func chooseVariant(masterURL string) (string, error) {
client := &http.Client{}
req, _ := http.NewRequest("GET", masterURL, nil)
req.Header.Set("User-Agent", HTTPUserAgent)
req.Header.Set("Referer", REFERRER)
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
playlist, listType, err := m3u8.DecodeFrom(resp.Body, true)
if err != nil {
return "", err
}
base, _ := url.Parse(masterURL)
if listType == m3u8.MEDIA {
return masterURL, nil
}
master := playlist.(*m3u8.MasterPlaylist)
if len(master.Variants) == 0 {
return "", fmt.Errorf("no variants found in master playlist")
}
best := master.Variants[0]
for _, v := range master.Variants {
if v.Bandwidth > best.Bandwidth {
best = v
}
}
vURL, _ := url.Parse(best.URI)
return base.ResolveReference(vURL).String(), nil
}
func loadMediaPlaylist(mediaURL string) (*m3u8.MediaPlaylist, error) {
client := &http.Client{}
req, _ := http.NewRequest("GET", mediaURL, nil)
req.Header.Set("User-Agent", HTTPUserAgent)
req.Header.Set("Referer", REFERRER)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
pl, listType, err := m3u8.DecodeFrom(resp.Body, true)
if err != nil {
return nil, err
}
if listType == m3u8.MASTER {
return nil, fmt.Errorf("expected media playlist but got master")
}
return pl.(*m3u8.MediaPlaylist), nil
}
// ==== WORKERS ====
func segmentDownloader(ctx context.Context, id int, jobs <-chan SegmentJob) (int, []string) {
client := &http.Client{}
numErrors := 0
errorIDs := make([]string, 0)
for {
select {
case <-ctx.Done():
return numErrors, errorIDs
case job, ok := <-jobs:
if !ok {
return numErrors, errorIDs
}
abs := job.AbsoluteURL()
if err := downloadSegment(client, abs); err != nil {
if isHTTPStatus(err, 403) {
time.Sleep(300 * time.Millisecond)
if err2 := downloadSegment(client, abs); err2 != nil {
fmt.Printf("Worker %d: 403 retry failed (%s): %v\n", id, path.Base(abs), err2)
numErrors++
errorIDs = append(errorIDs, path.Base(abs))
} else {
fmt.Printf("Worker %d: recovered 403 (%s)\n", id, path.Base(abs))
}
} else {
fmt.Printf("Worker %d: failed %s: %v\n", id, path.Base(abs), err)
}
} else {
fmt.Printf("Worker %d: downloaded %s\n", id, path.Base(abs))
}
}
}
}
func downloadSegment(client *http.Client, segmentURL string) error {
req, err := http.NewRequest("GET", segmentURL, nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", HTTPUserAgent)
req.Header.Set("Referer", REFERRER)
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
io.Copy(io.Discard, resp.Body)
return &httpError{code: resp.StatusCode}
}
fileName := safeFileName(path.Join(OutputDirPath, path.Base(segmentURL)))
out, err := os.Create(fileName)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, resp.Body)
return err
}
// ==== HELPERS ====
func safeFileName(base string) string {
if i := strings.IndexAny(base, "?&#"); i >= 0 {
base = base[:i]
}
if base == "" {
base = fmt.Sprintf("seg-%d.ts", time.Now().UnixNano())
}
return base
}
func isHTTPStatus(err error, code int) bool {
var he *httpError
if errors.As(err, &he) {
return he.code == code
}
return false
}