package main import ( "context" "errors" "fmt" "io" "log" "net/http" "net/url" "os" "os/signal" "path" "strings" "sync" "syscall" "time" "github.com/grafov/m3u8" ) const ( MasterURL = "https://d17cyqyz9yhmep.cloudfront.net/streams/234951/playlist_vo_1752978025523_1752978954944.m3u8" WorkerCount = 4 RefreshDelay = 3 * time.Second HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" REFERRER = "https://www.flomarching.com" OutputDirPath = "./data" ) type StreamVariant struct { URL string Bandwidth uint32 BaseURL *url.URL ID int Resolution string OutputDir string } type SegmentJob struct { URI string Seq uint64 VariantID int Variant *StreamVariant } func (j SegmentJob) AbsoluteURL() string { rel, _ := url.Parse(j.URI) return j.Variant.BaseURL.ResolveReference(rel).String() } func (j SegmentJob) Key() string { return fmt.Sprintf("%d:%s", j.Seq, j.URI) } type httpError struct { code int } func (e *httpError) Error() string { return fmt.Sprintf("http %d", e.code) } func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan log.Println("Shutting down...") cancel() }() variants, err := getAllVariants(MasterURL) if err != nil { log.Fatalf("Failed to get variants: %v", err) } log.Printf("Found %d variants", len(variants)) var wg sync.WaitGroup sem := make(chan struct{}, WorkerCount*len(variants)) for _, variant := range variants { wg.Add(1) go func(v *StreamVariant) { defer wg.Done() variantDownloader(ctx, v, sem) }(variant) } wg.Wait() log.Println("All variant downloaders finished.") } func getAllVariants(masterURL string) ([]*StreamVariant, error) { client := &http.Client{} req, _ := http.NewRequest("GET", masterURL, nil) req.Header.Set("User-Agent", HTTPUserAgent) req.Header.Set("Referer", REFERRER) resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() playlist, listType, err := m3u8.DecodeFrom(resp.Body, true) if err != nil { return nil, err } base, _ := url.Parse(masterURL) if listType == m3u8.MEDIA { return []*StreamVariant{{ URL: masterURL, Bandwidth: 0, BaseURL: base, ID: 0, Resolution: "unknown", OutputDir: path.Join(OutputDirPath, "unknown"), }}, nil } master := playlist.(*m3u8.MasterPlaylist) if len(master.Variants) == 0 { return nil, fmt.Errorf("no variants found in master playlist") } variants := make([]*StreamVariant, 0, len(master.Variants)) for i, v := range master.Variants { vURL, _ := url.Parse(v.URI) fullURL := base.ResolveReference(vURL).String() resolution := extractResolution(v) outputDir := path.Join(OutputDirPath, resolution) variants = append(variants, &StreamVariant{ URL: fullURL, Bandwidth: v.Bandwidth, BaseURL: base.ResolveReference(vURL), ID: i, Resolution: resolution, OutputDir: outputDir, }) } return variants, nil } func extractResolution(variant *m3u8.Variant) string { if variant.Resolution != "" { parts := strings.Split(variant.Resolution, "x") if len(parts) == 2 { return parts[1] + "p" } } switch { case variant.Bandwidth >= 5000000: return "1080p" case variant.Bandwidth >= 3000000: return "720p" case variant.Bandwidth >= 1500000: return "480p" case variant.Bandwidth >= 800000: return "360p" default: return "240p" } } func variantDownloader(ctx context.Context, variant *StreamVariant, sem chan struct{}) { log.Printf("Starting %s variant downloader (bandwidth: %d)", variant.Resolution, variant.Bandwidth) ticker := time.NewTicker(RefreshDelay) defer ticker.Stop() client := &http.Client{} seen := make(map[string]bool) for { select { case <-ctx.Done(): return default: } media, err := loadMediaPlaylist(variant.URL) seq := media.SeqNo if err != nil { log.Printf("%s: Error loading media playlist: %v", variant.Resolution, err) goto waitTick } for _, seg := range media.Segments { if seg == nil { continue } job := SegmentJob{ URI: seg.URI, Seq: seq, VariantID: variant.ID, Variant: variant, } segmentKey := job.Key() if seen[segmentKey] { seq++ continue } seen[segmentKey] = true sem <- struct{}{} // Acquire go func(j SegmentJob) { defer func() { <-sem }() // Release ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() err := downloadSegment(ctx, client, j.AbsoluteURL(), j.Variant.OutputDir) name := strings.TrimSuffix(path.Base(j.Key()), path.Ext(path.Base(j.Key()))) if err == nil { log.Printf("✓ %s downloaded segment %s", j.Variant.Resolution, name) } else if isHTTPStatus(err, 403) { log.Printf("✗ %s failed to download segment %s (403)", j.Variant.Resolution, name) } else { log.Printf("✗ %s failed to download segment %s: %v", j.Variant.Resolution, name, err) } }(job) seq++ } if media.Closed { log.Printf("%s: Playlist closed (#EXT-X-ENDLIST)", variant.Resolution) return } waitTick: select { case <-ctx.Done(): return case <-ticker.C: } } } func loadMediaPlaylist(mediaURL string) (*m3u8.MediaPlaylist, error) { client := &http.Client{} req, _ := http.NewRequest("GET", mediaURL, nil) req.Header.Set("User-Agent", HTTPUserAgent) req.Header.Set("Referer", REFERRER) resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() pl, listType, err := m3u8.DecodeFrom(resp.Body, true) if err != nil { return nil, err } if listType == m3u8.MASTER { return nil, fmt.Errorf("expected media playlist but got master") } return pl.(*m3u8.MediaPlaylist), nil } func downloadSegment(ctx context.Context, client *http.Client, segmentURL string, outputDir string) error { for attempt := 0; attempt < 2; attempt++ { if attempt > 0 { time.Sleep(300 * time.Millisecond) } req, err := http.NewRequestWithContext(ctx, "GET", segmentURL, nil) if err != nil { return err } req.Header.Set("User-Agent", HTTPUserAgent) req.Header.Set("Referer", REFERRER) resp, err := client.Do(req) if err != nil { if attempt == 1 { return err } continue } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { io.Copy(io.Discard, resp.Body) httpErr := &httpError{code: resp.StatusCode} if resp.StatusCode == 403 && attempt == 0 { continue } return httpErr } if err := os.MkdirAll(outputDir, 0755); err != nil { return fmt.Errorf("failed to create output directory: %w", err) } fileName := safeFileName(path.Join(outputDir, path.Base(segmentURL))) out, err := os.Create(fileName) if err != nil { return err } defer out.Close() n, err := io.Copy(out, resp.Body) if err != nil { return err } if n == 0 { return fmt.Errorf("zero-byte download for %s", segmentURL) } return nil } return fmt.Errorf("exhausted retries") } func safeFileName(base string) string { if i := strings.IndexAny(base, "?&#"); i >= 0 { base = base[:i] } if base == "" { base = fmt.Sprintf("seg-%d.ts", time.Now().UnixNano()) } return base } func isHTTPStatus(err error, code int) bool { var he *httpError if errors.As(err, &he) { return he.code == code } return false }