From 0e27ed0ebb43c28a1827a8f3e2a53777ef47942c Mon Sep 17 00:00:00 2001 From: kacarmichael Date: Thu, 24 Jul 2025 01:45:24 -0500 Subject: [PATCH] Separation to avoid a monolithic script --- cmd/downloader/main.go | 297 +------------------------------------ pkg/constants/constants.go | 13 ++ pkg/httpClient/error.go | 20 +++ pkg/media/playlist.go | 29 ++++ pkg/media/segment.go | 94 ++++++++++++ pkg/media/stream.go | 168 +++++++++++++++++++++ 6 files changed, 330 insertions(+), 291 deletions(-) create mode 100644 pkg/constants/constants.go create mode 100644 pkg/httpClient/error.go create mode 100644 pkg/media/playlist.go create mode 100644 pkg/media/segment.go create mode 100644 pkg/media/stream.go diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index 80a47ff..06ab5e1 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -2,64 +2,15 @@ package main import ( "context" - "errors" - "fmt" - "io" "log" - "net/http" - "net/url" + "m3u8-downloader/pkg/constants" + "m3u8-downloader/pkg/media" "os" "os/signal" - "path" - "strings" "sync" "syscall" - "time" - - "github.com/grafov/m3u8" ) -const ( - MasterURL = "https://d17cyqyz9yhmep.cloudfront.net/streams/234951/playlist_vo_1752978025523_1752978954944.m3u8" - WorkerCount = 4 - RefreshDelay = 3 * time.Second - - HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" - REFERRER = "https://www.flomarching.com" - OutputDirPath = "./data" -) - -type StreamVariant struct { - URL string - Bandwidth uint32 - BaseURL *url.URL - ID int - Resolution string - OutputDir string -} - -type SegmentJob struct { - URI string - Seq uint64 - VariantID int - Variant *StreamVariant -} - -func (j SegmentJob) AbsoluteURL() string { - rel, _ := url.Parse(j.URI) - return j.Variant.BaseURL.ResolveReference(rel).String() -} - -func (j SegmentJob) Key() string { - return fmt.Sprintf("%d:%s", j.Seq, j.URI) -} - -type httpError struct { - code int -} - -func (e *httpError) Error() string { return fmt.Sprintf("http %d", e.code) } - func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -72,259 +23,23 @@ func main() { cancel() }() - variants, err := getAllVariants(MasterURL) + variants, err := media.GetAllVariants(constants.MasterURL) if err != nil { log.Fatalf("Failed to get variants: %v", err) } log.Printf("Found %d variants", len(variants)) var wg sync.WaitGroup - sem := make(chan struct{}, WorkerCount*len(variants)) + sem := make(chan struct{}, constants.WorkerCount*len(variants)) for _, variant := range variants { wg.Add(1) - go func(v *StreamVariant) { + go func(v *media.StreamVariant) { defer wg.Done() - variantDownloader(ctx, v, sem) + media.VariantDownloader(ctx, v, sem) }(variant) } wg.Wait() log.Println("All variant downloaders finished.") } - -func getAllVariants(masterURL string) ([]*StreamVariant, error) { - client := &http.Client{} - req, _ := http.NewRequest("GET", masterURL, nil) - req.Header.Set("User-Agent", HTTPUserAgent) - req.Header.Set("Referer", REFERRER) - resp, err := client.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - playlist, listType, err := m3u8.DecodeFrom(resp.Body, true) - if err != nil { - return nil, err - } - - base, _ := url.Parse(masterURL) - - if listType == m3u8.MEDIA { - return []*StreamVariant{{ - URL: masterURL, - Bandwidth: 0, - BaseURL: base, - ID: 0, - Resolution: "unknown", - OutputDir: path.Join(OutputDirPath, "unknown"), - }}, nil - } - - master := playlist.(*m3u8.MasterPlaylist) - if len(master.Variants) == 0 { - return nil, fmt.Errorf("no variants found in master playlist") - } - - variants := make([]*StreamVariant, 0, len(master.Variants)) - for i, v := range master.Variants { - vURL, _ := url.Parse(v.URI) - fullURL := base.ResolveReference(vURL).String() - resolution := extractResolution(v) - outputDir := path.Join(OutputDirPath, resolution) - variants = append(variants, &StreamVariant{ - URL: fullURL, - Bandwidth: v.Bandwidth, - BaseURL: base.ResolveReference(vURL), - ID: i, - Resolution: resolution, - OutputDir: outputDir, - }) - } - return variants, nil -} - -func extractResolution(variant *m3u8.Variant) string { - if variant.Resolution != "" { - parts := strings.Split(variant.Resolution, "x") - if len(parts) == 2 { - return parts[1] + "p" - } - } - switch { - case variant.Bandwidth >= 5000000: - return "1080p" - case variant.Bandwidth >= 3000000: - return "720p" - case variant.Bandwidth >= 1500000: - return "480p" - case variant.Bandwidth >= 800000: - return "360p" - default: - return "240p" - } -} - -func variantDownloader(ctx context.Context, variant *StreamVariant, sem chan struct{}) { - log.Printf("Starting %s variant downloader (bandwidth: %d)", variant.Resolution, variant.Bandwidth) - ticker := time.NewTicker(RefreshDelay) - defer ticker.Stop() - client := &http.Client{} - seen := make(map[string]bool) - - for { - select { - case <-ctx.Done(): - return - default: - } - - media, err := loadMediaPlaylist(variant.URL) - seq := media.SeqNo - if err != nil { - log.Printf("%s: Error loading media playlist: %v", variant.Resolution, err) - goto waitTick - } - - for _, seg := range media.Segments { - if seg == nil { - continue - } - job := SegmentJob{ - URI: seg.URI, - Seq: seq, - VariantID: variant.ID, - Variant: variant, - } - segmentKey := job.Key() - if seen[segmentKey] { - seq++ - continue - } - seen[segmentKey] = true - - sem <- struct{}{} // Acquire - go func(j SegmentJob) { - defer func() { <-sem }() // Release - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - err := downloadSegment(ctx, client, j.AbsoluteURL(), j.Variant.OutputDir) - name := strings.TrimSuffix(path.Base(j.Key()), path.Ext(path.Base(j.Key()))) - if err == nil { - log.Printf("✓ %s downloaded segment %s", j.Variant.Resolution, name) - } else if isHTTPStatus(err, 403) { - log.Printf("✗ %s failed to download segment %s (403)", j.Variant.Resolution, name) - } else { - log.Printf("✗ %s failed to download segment %s: %v", j.Variant.Resolution, name, err) - } - }(job) - seq++ - } - - if media.Closed { - log.Printf("%s: Playlist closed (#EXT-X-ENDLIST)", variant.Resolution) - return - } - - waitTick: - select { - case <-ctx.Done(): - return - case <-ticker.C: - } - } -} - -func loadMediaPlaylist(mediaURL string) (*m3u8.MediaPlaylist, error) { - client := &http.Client{} - req, _ := http.NewRequest("GET", mediaURL, nil) - req.Header.Set("User-Agent", HTTPUserAgent) - req.Header.Set("Referer", REFERRER) - resp, err := client.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - pl, listType, err := m3u8.DecodeFrom(resp.Body, true) - if err != nil { - return nil, err - } - if listType == m3u8.MASTER { - return nil, fmt.Errorf("expected media playlist but got master") - } - return pl.(*m3u8.MediaPlaylist), nil -} - -func downloadSegment(ctx context.Context, client *http.Client, segmentURL string, outputDir string) error { - for attempt := 0; attempt < 2; attempt++ { - if attempt > 0 { - time.Sleep(300 * time.Millisecond) - } - req, err := http.NewRequestWithContext(ctx, "GET", segmentURL, nil) - if err != nil { - return err - } - req.Header.Set("User-Agent", HTTPUserAgent) - req.Header.Set("Referer", REFERRER) - - resp, err := client.Do(req) - if err != nil { - if attempt == 1 { - return err - } - continue - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - io.Copy(io.Discard, resp.Body) - httpErr := &httpError{code: resp.StatusCode} - if resp.StatusCode == 403 && attempt == 0 { - continue - } - return httpErr - } - - if err := os.MkdirAll(outputDir, 0755); err != nil { - return fmt.Errorf("failed to create output directory: %w", err) - } - - fileName := safeFileName(path.Join(outputDir, path.Base(segmentURL))) - out, err := os.Create(fileName) - if err != nil { - return err - } - defer out.Close() - - n, err := io.Copy(out, resp.Body) - if err != nil { - return err - } - if n == 0 { - return fmt.Errorf("zero-byte download for %s", segmentURL) - } - return nil - } - return fmt.Errorf("exhausted retries") -} - -func safeFileName(base string) string { - if i := strings.IndexAny(base, "?&#"); i >= 0 { - base = base[:i] - } - if base == "" { - base = fmt.Sprintf("seg-%d.ts", time.Now().UnixNano()) - } - return base -} - -func isHTTPStatus(err error, code int) bool { - var he *httpError - if errors.As(err, &he) { - return he.code == code - } - return false -} diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go new file mode 100644 index 0000000..eef90c0 --- /dev/null +++ b/pkg/constants/constants.go @@ -0,0 +1,13 @@ +package constants + +import "time" + +const ( + MasterURL = "https://d17cyqyz9yhmep.cloudfront.net/streams/234951/playlist_vo_1752978025523_1752978954944.m3u8" + WorkerCount = 4 + RefreshDelay = 3 * time.Second + + HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" + REFERRER = "https://www.flomarching.com" + OutputDirPath = "./data" +) diff --git a/pkg/httpClient/error.go b/pkg/httpClient/error.go new file mode 100644 index 0000000..7012303 --- /dev/null +++ b/pkg/httpClient/error.go @@ -0,0 +1,20 @@ +package httpClient + +import ( + "errors" + "fmt" +) + +type HttpError struct { + Code int +} + +func (e *HttpError) Error() string { return fmt.Sprintf("httpClient %d", e.Code) } + +func IsHTTPStatus(err error, code int) bool { + var he *HttpError + if errors.As(err, &he) { + return he.Code == code + } + return false +} diff --git a/pkg/media/playlist.go b/pkg/media/playlist.go new file mode 100644 index 0000000..e327da0 --- /dev/null +++ b/pkg/media/playlist.go @@ -0,0 +1,29 @@ +package media + +import ( + "fmt" + "github.com/grafov/m3u8" + "m3u8-downloader/pkg/constants" + "net/http" +) + +func LoadMediaPlaylist(mediaURL string) (*m3u8.MediaPlaylist, error) { + client := &http.Client{} + req, _ := http.NewRequest("GET", mediaURL, nil) + req.Header.Set("User-Agent", constants.HTTPUserAgent) + req.Header.Set("Referer", constants.REFERRER) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + pl, listType, err := m3u8.DecodeFrom(resp.Body, true) + if err != nil { + return nil, err + } + if listType == m3u8.MASTER { + return nil, fmt.Errorf("expected media playlist but got master") + } + return pl.(*m3u8.MediaPlaylist), nil +} diff --git a/pkg/media/segment.go b/pkg/media/segment.go new file mode 100644 index 0000000..0ea4d13 --- /dev/null +++ b/pkg/media/segment.go @@ -0,0 +1,94 @@ +package media + +import ( + "context" + "fmt" + "io" + "m3u8-downloader/pkg/constants" + "m3u8-downloader/pkg/httpClient" + "net/http" + "net/url" + "os" + "path" + "strings" + "time" +) + +type SegmentJob struct { + URI string + Seq uint64 + VariantID int + Variant *StreamVariant +} + +func (j SegmentJob) AbsoluteURL() string { + rel, _ := url.Parse(j.URI) + return j.Variant.BaseURL.ResolveReference(rel).String() +} + +func (j SegmentJob) Key() string { + return fmt.Sprintf("%d:%s", j.Seq, j.URI) +} + +func DownloadSegment(ctx context.Context, client *http.Client, segmentURL string, outputDir string) error { + for attempt := 0; attempt < 2; attempt++ { + if attempt > 0 { + time.Sleep(300 * time.Millisecond) + } + req, err := http.NewRequestWithContext(ctx, "GET", segmentURL, nil) + if err != nil { + return err + } + req.Header.Set("User-Agent", constants.HTTPUserAgent) + req.Header.Set("Referer", constants.REFERRER) + + resp, err := client.Do(req) + if err != nil { + if attempt == 1 { + return err + } + continue + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + io.Copy(io.Discard, resp.Body) + httpErr := &httpClient.HttpError{Code: resp.StatusCode} + if resp.StatusCode == 403 && attempt == 0 { + continue + } + return httpErr + } + + if err := os.MkdirAll(outputDir, 0755); err != nil { + return fmt.Errorf("failed to create output directory: %w", err) + } + + fileName := safeFileName(path.Join(outputDir, path.Base(segmentURL))) + out, err := os.Create(fileName) + if err != nil { + return err + } + defer out.Close() + + n, err := io.Copy(out, resp.Body) + if err != nil { + return err + } + if n == 0 { + return fmt.Errorf("zero-byte download for %s", segmentURL) + } + return nil + } + return fmt.Errorf("exhausted retries") +} + +func safeFileName(base string) string { + if i := strings.IndexAny(base, "?&#"); i >= 0 { + base = base[:i] + } + if base == "" { + base = fmt.Sprintf("seg-%d.ts", time.Now().UnixNano()) + } + return base +} diff --git a/pkg/media/stream.go b/pkg/media/stream.go new file mode 100644 index 0000000..ef3f4ba --- /dev/null +++ b/pkg/media/stream.go @@ -0,0 +1,168 @@ +package media + +import ( + "context" + "fmt" + "github.com/grafov/m3u8" + "log" + "m3u8-downloader/pkg/constants" + "m3u8-downloader/pkg/httpClient" + "net/http" + "net/url" + "path" + "strings" + "time" +) + +type StreamVariant struct { + URL string + Bandwidth uint32 + BaseURL *url.URL + ID int + Resolution string + OutputDir string +} + +func extractResolution(variant *m3u8.Variant) string { + if variant.Resolution != "" { + parts := strings.Split(variant.Resolution, "x") + if len(parts) == 2 { + return parts[1] + "p" + } + } + switch { + case variant.Bandwidth >= 5000000: + return "1080p" + case variant.Bandwidth >= 3000000: + return "720p" + case variant.Bandwidth >= 1500000: + return "480p" + case variant.Bandwidth >= 800000: + return "360p" + default: + return "240p" + } +} + +func GetAllVariants(masterURL string) ([]*StreamVariant, error) { + client := &http.Client{} + req, _ := http.NewRequest("GET", masterURL, nil) + req.Header.Set("User-Agent", constants.HTTPUserAgent) + req.Header.Set("Referer", constants.REFERRER) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + playlist, listType, err := m3u8.DecodeFrom(resp.Body, true) + if err != nil { + return nil, err + } + + base, _ := url.Parse(masterURL) + + if listType == m3u8.MEDIA { + return []*StreamVariant{{ + URL: masterURL, + Bandwidth: 0, + BaseURL: base, + ID: 0, + Resolution: "unknown", + OutputDir: path.Join(constants.OutputDirPath, "unknown"), + }}, nil + } + + master := playlist.(*m3u8.MasterPlaylist) + if len(master.Variants) == 0 { + return nil, fmt.Errorf("no variants found in master playlist") + } + + variants := make([]*StreamVariant, 0, len(master.Variants)) + for i, v := range master.Variants { + vURL, _ := url.Parse(v.URI) + fullURL := base.ResolveReference(vURL).String() + resolution := extractResolution(v) + outputDir := path.Join(constants.OutputDirPath, resolution) + variants = append(variants, &StreamVariant{ + URL: fullURL, + Bandwidth: v.Bandwidth, + BaseURL: base.ResolveReference(vURL), + ID: i, + Resolution: resolution, + OutputDir: outputDir, + }) + } + return variants, nil +} + +func VariantDownloader(ctx context.Context, variant *StreamVariant, sem chan struct{}) { + log.Printf("Starting %s variant downloader (bandwidth: %d)", variant.Resolution, variant.Bandwidth) + ticker := time.NewTicker(constants.RefreshDelay) + defer ticker.Stop() + client := &http.Client{} + seen := make(map[string]bool) + + for { + select { + case <-ctx.Done(): + return + default: + } + + playlist, err := LoadMediaPlaylist(variant.URL) + seq := playlist.SeqNo + if err != nil { + log.Printf("%s: Error loading playlist playlist: %v", variant.Resolution, err) + goto waitTick + } + + for _, seg := range playlist.Segments { + if seg == nil { + continue + } + job := SegmentJob{ + URI: seg.URI, + Seq: seq, + VariantID: variant.ID, + Variant: variant, + } + segmentKey := job.Key() + if seen[segmentKey] { + seq++ + continue + } + seen[segmentKey] = true + + sem <- struct{}{} // Acquire + go func(j SegmentJob) { + defer func() { <-sem }() // Release + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + err := DownloadSegment(ctx, client, j.AbsoluteURL(), j.Variant.OutputDir) + name := strings.TrimSuffix(path.Base(j.Key()), path.Ext(path.Base(j.Key()))) + if err == nil { + log.Printf("✓ %s downloaded segment %s", j.Variant.Resolution, name) + } else if httpClient.IsHTTPStatus(err, 403) { + log.Printf("✗ %s failed to download segment %s (403)", j.Variant.Resolution, name) + } else { + log.Printf("✗ %s failed to download segment %s: %v", j.Variant.Resolution, name, err) + } + }(job) + seq++ + } + + if playlist.Closed { + log.Printf("%s: Playlist closed (#EXT-X-ENDLIST)", variant.Resolution) + return + } + + waitTick: + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + } +}