Separation to avoid a monolithic script

This commit is contained in:
kacarmichael 2025-07-24 01:45:24 -05:00
parent ca7bab9b34
commit 0e27ed0ebb
6 changed files with 330 additions and 291 deletions

View File

@ -2,64 +2,15 @@ package main
import (
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/media"
"os"
"os/signal"
"path"
"strings"
"sync"
"syscall"
"time"
"github.com/grafov/m3u8"
)
const (
MasterURL = "https://d17cyqyz9yhmep.cloudfront.net/streams/234951/playlist_vo_1752978025523_1752978954944.m3u8"
WorkerCount = 4
RefreshDelay = 3 * time.Second
HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
REFERRER = "https://www.flomarching.com"
OutputDirPath = "./data"
)
type StreamVariant struct {
URL string
Bandwidth uint32
BaseURL *url.URL
ID int
Resolution string
OutputDir string
}
type SegmentJob struct {
URI string
Seq uint64
VariantID int
Variant *StreamVariant
}
func (j SegmentJob) AbsoluteURL() string {
rel, _ := url.Parse(j.URI)
return j.Variant.BaseURL.ResolveReference(rel).String()
}
func (j SegmentJob) Key() string {
return fmt.Sprintf("%d:%s", j.Seq, j.URI)
}
type httpError struct {
code int
}
func (e *httpError) Error() string { return fmt.Sprintf("http %d", e.code) }
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -72,259 +23,23 @@ func main() {
cancel()
}()
variants, err := getAllVariants(MasterURL)
variants, err := media.GetAllVariants(constants.MasterURL)
if err != nil {
log.Fatalf("Failed to get variants: %v", err)
}
log.Printf("Found %d variants", len(variants))
var wg sync.WaitGroup
sem := make(chan struct{}, WorkerCount*len(variants))
sem := make(chan struct{}, constants.WorkerCount*len(variants))
for _, variant := range variants {
wg.Add(1)
go func(v *StreamVariant) {
go func(v *media.StreamVariant) {
defer wg.Done()
variantDownloader(ctx, v, sem)
media.VariantDownloader(ctx, v, sem)
}(variant)
}
wg.Wait()
log.Println("All variant downloaders finished.")
}
func getAllVariants(masterURL string) ([]*StreamVariant, error) {
client := &http.Client{}
req, _ := http.NewRequest("GET", masterURL, nil)
req.Header.Set("User-Agent", HTTPUserAgent)
req.Header.Set("Referer", REFERRER)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
playlist, listType, err := m3u8.DecodeFrom(resp.Body, true)
if err != nil {
return nil, err
}
base, _ := url.Parse(masterURL)
if listType == m3u8.MEDIA {
return []*StreamVariant{{
URL: masterURL,
Bandwidth: 0,
BaseURL: base,
ID: 0,
Resolution: "unknown",
OutputDir: path.Join(OutputDirPath, "unknown"),
}}, nil
}
master := playlist.(*m3u8.MasterPlaylist)
if len(master.Variants) == 0 {
return nil, fmt.Errorf("no variants found in master playlist")
}
variants := make([]*StreamVariant, 0, len(master.Variants))
for i, v := range master.Variants {
vURL, _ := url.Parse(v.URI)
fullURL := base.ResolveReference(vURL).String()
resolution := extractResolution(v)
outputDir := path.Join(OutputDirPath, resolution)
variants = append(variants, &StreamVariant{
URL: fullURL,
Bandwidth: v.Bandwidth,
BaseURL: base.ResolveReference(vURL),
ID: i,
Resolution: resolution,
OutputDir: outputDir,
})
}
return variants, nil
}
func extractResolution(variant *m3u8.Variant) string {
if variant.Resolution != "" {
parts := strings.Split(variant.Resolution, "x")
if len(parts) == 2 {
return parts[1] + "p"
}
}
switch {
case variant.Bandwidth >= 5000000:
return "1080p"
case variant.Bandwidth >= 3000000:
return "720p"
case variant.Bandwidth >= 1500000:
return "480p"
case variant.Bandwidth >= 800000:
return "360p"
default:
return "240p"
}
}
func variantDownloader(ctx context.Context, variant *StreamVariant, sem chan struct{}) {
log.Printf("Starting %s variant downloader (bandwidth: %d)", variant.Resolution, variant.Bandwidth)
ticker := time.NewTicker(RefreshDelay)
defer ticker.Stop()
client := &http.Client{}
seen := make(map[string]bool)
for {
select {
case <-ctx.Done():
return
default:
}
media, err := loadMediaPlaylist(variant.URL)
seq := media.SeqNo
if err != nil {
log.Printf("%s: Error loading media playlist: %v", variant.Resolution, err)
goto waitTick
}
for _, seg := range media.Segments {
if seg == nil {
continue
}
job := SegmentJob{
URI: seg.URI,
Seq: seq,
VariantID: variant.ID,
Variant: variant,
}
segmentKey := job.Key()
if seen[segmentKey] {
seq++
continue
}
seen[segmentKey] = true
sem <- struct{}{} // Acquire
go func(j SegmentJob) {
defer func() { <-sem }() // Release
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
err := downloadSegment(ctx, client, j.AbsoluteURL(), j.Variant.OutputDir)
name := strings.TrimSuffix(path.Base(j.Key()), path.Ext(path.Base(j.Key())))
if err == nil {
log.Printf("✓ %s downloaded segment %s", j.Variant.Resolution, name)
} else if isHTTPStatus(err, 403) {
log.Printf("✗ %s failed to download segment %s (403)", j.Variant.Resolution, name)
} else {
log.Printf("✗ %s failed to download segment %s: %v", j.Variant.Resolution, name, err)
}
}(job)
seq++
}
if media.Closed {
log.Printf("%s: Playlist closed (#EXT-X-ENDLIST)", variant.Resolution)
return
}
waitTick:
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}
func loadMediaPlaylist(mediaURL string) (*m3u8.MediaPlaylist, error) {
client := &http.Client{}
req, _ := http.NewRequest("GET", mediaURL, nil)
req.Header.Set("User-Agent", HTTPUserAgent)
req.Header.Set("Referer", REFERRER)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
pl, listType, err := m3u8.DecodeFrom(resp.Body, true)
if err != nil {
return nil, err
}
if listType == m3u8.MASTER {
return nil, fmt.Errorf("expected media playlist but got master")
}
return pl.(*m3u8.MediaPlaylist), nil
}
func downloadSegment(ctx context.Context, client *http.Client, segmentURL string, outputDir string) error {
for attempt := 0; attempt < 2; attempt++ {
if attempt > 0 {
time.Sleep(300 * time.Millisecond)
}
req, err := http.NewRequestWithContext(ctx, "GET", segmentURL, nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", HTTPUserAgent)
req.Header.Set("Referer", REFERRER)
resp, err := client.Do(req)
if err != nil {
if attempt == 1 {
return err
}
continue
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
io.Copy(io.Discard, resp.Body)
httpErr := &httpError{code: resp.StatusCode}
if resp.StatusCode == 403 && attempt == 0 {
continue
}
return httpErr
}
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("failed to create output directory: %w", err)
}
fileName := safeFileName(path.Join(outputDir, path.Base(segmentURL)))
out, err := os.Create(fileName)
if err != nil {
return err
}
defer out.Close()
n, err := io.Copy(out, resp.Body)
if err != nil {
return err
}
if n == 0 {
return fmt.Errorf("zero-byte download for %s", segmentURL)
}
return nil
}
return fmt.Errorf("exhausted retries")
}
func safeFileName(base string) string {
if i := strings.IndexAny(base, "?&#"); i >= 0 {
base = base[:i]
}
if base == "" {
base = fmt.Sprintf("seg-%d.ts", time.Now().UnixNano())
}
return base
}
func isHTTPStatus(err error, code int) bool {
var he *httpError
if errors.As(err, &he) {
return he.code == code
}
return false
}

View File

@ -0,0 +1,13 @@
package constants
import "time"
const (
MasterURL = "https://d17cyqyz9yhmep.cloudfront.net/streams/234951/playlist_vo_1752978025523_1752978954944.m3u8"
WorkerCount = 4
RefreshDelay = 3 * time.Second
HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
REFERRER = "https://www.flomarching.com"
OutputDirPath = "./data"
)

20
pkg/httpClient/error.go Normal file
View File

@ -0,0 +1,20 @@
package httpClient
import (
"errors"
"fmt"
)
type HttpError struct {
Code int
}
func (e *HttpError) Error() string { return fmt.Sprintf("httpClient %d", e.Code) }
func IsHTTPStatus(err error, code int) bool {
var he *HttpError
if errors.As(err, &he) {
return he.Code == code
}
return false
}

29
pkg/media/playlist.go Normal file
View File

@ -0,0 +1,29 @@
package media
import (
"fmt"
"github.com/grafov/m3u8"
"m3u8-downloader/pkg/constants"
"net/http"
)
func LoadMediaPlaylist(mediaURL string) (*m3u8.MediaPlaylist, error) {
client := &http.Client{}
req, _ := http.NewRequest("GET", mediaURL, nil)
req.Header.Set("User-Agent", constants.HTTPUserAgent)
req.Header.Set("Referer", constants.REFERRER)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
pl, listType, err := m3u8.DecodeFrom(resp.Body, true)
if err != nil {
return nil, err
}
if listType == m3u8.MASTER {
return nil, fmt.Errorf("expected media playlist but got master")
}
return pl.(*m3u8.MediaPlaylist), nil
}

94
pkg/media/segment.go Normal file
View File

@ -0,0 +1,94 @@
package media
import (
"context"
"fmt"
"io"
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/httpClient"
"net/http"
"net/url"
"os"
"path"
"strings"
"time"
)
type SegmentJob struct {
URI string
Seq uint64
VariantID int
Variant *StreamVariant
}
func (j SegmentJob) AbsoluteURL() string {
rel, _ := url.Parse(j.URI)
return j.Variant.BaseURL.ResolveReference(rel).String()
}
func (j SegmentJob) Key() string {
return fmt.Sprintf("%d:%s", j.Seq, j.URI)
}
func DownloadSegment(ctx context.Context, client *http.Client, segmentURL string, outputDir string) error {
for attempt := 0; attempt < 2; attempt++ {
if attempt > 0 {
time.Sleep(300 * time.Millisecond)
}
req, err := http.NewRequestWithContext(ctx, "GET", segmentURL, nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", constants.HTTPUserAgent)
req.Header.Set("Referer", constants.REFERRER)
resp, err := client.Do(req)
if err != nil {
if attempt == 1 {
return err
}
continue
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
io.Copy(io.Discard, resp.Body)
httpErr := &httpClient.HttpError{Code: resp.StatusCode}
if resp.StatusCode == 403 && attempt == 0 {
continue
}
return httpErr
}
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("failed to create output directory: %w", err)
}
fileName := safeFileName(path.Join(outputDir, path.Base(segmentURL)))
out, err := os.Create(fileName)
if err != nil {
return err
}
defer out.Close()
n, err := io.Copy(out, resp.Body)
if err != nil {
return err
}
if n == 0 {
return fmt.Errorf("zero-byte download for %s", segmentURL)
}
return nil
}
return fmt.Errorf("exhausted retries")
}
func safeFileName(base string) string {
if i := strings.IndexAny(base, "?&#"); i >= 0 {
base = base[:i]
}
if base == "" {
base = fmt.Sprintf("seg-%d.ts", time.Now().UnixNano())
}
return base
}

168
pkg/media/stream.go Normal file
View File

@ -0,0 +1,168 @@
package media
import (
"context"
"fmt"
"github.com/grafov/m3u8"
"log"
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/httpClient"
"net/http"
"net/url"
"path"
"strings"
"time"
)
type StreamVariant struct {
URL string
Bandwidth uint32
BaseURL *url.URL
ID int
Resolution string
OutputDir string
}
func extractResolution(variant *m3u8.Variant) string {
if variant.Resolution != "" {
parts := strings.Split(variant.Resolution, "x")
if len(parts) == 2 {
return parts[1] + "p"
}
}
switch {
case variant.Bandwidth >= 5000000:
return "1080p"
case variant.Bandwidth >= 3000000:
return "720p"
case variant.Bandwidth >= 1500000:
return "480p"
case variant.Bandwidth >= 800000:
return "360p"
default:
return "240p"
}
}
func GetAllVariants(masterURL string) ([]*StreamVariant, error) {
client := &http.Client{}
req, _ := http.NewRequest("GET", masterURL, nil)
req.Header.Set("User-Agent", constants.HTTPUserAgent)
req.Header.Set("Referer", constants.REFERRER)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
playlist, listType, err := m3u8.DecodeFrom(resp.Body, true)
if err != nil {
return nil, err
}
base, _ := url.Parse(masterURL)
if listType == m3u8.MEDIA {
return []*StreamVariant{{
URL: masterURL,
Bandwidth: 0,
BaseURL: base,
ID: 0,
Resolution: "unknown",
OutputDir: path.Join(constants.OutputDirPath, "unknown"),
}}, nil
}
master := playlist.(*m3u8.MasterPlaylist)
if len(master.Variants) == 0 {
return nil, fmt.Errorf("no variants found in master playlist")
}
variants := make([]*StreamVariant, 0, len(master.Variants))
for i, v := range master.Variants {
vURL, _ := url.Parse(v.URI)
fullURL := base.ResolveReference(vURL).String()
resolution := extractResolution(v)
outputDir := path.Join(constants.OutputDirPath, resolution)
variants = append(variants, &StreamVariant{
URL: fullURL,
Bandwidth: v.Bandwidth,
BaseURL: base.ResolveReference(vURL),
ID: i,
Resolution: resolution,
OutputDir: outputDir,
})
}
return variants, nil
}
func VariantDownloader(ctx context.Context, variant *StreamVariant, sem chan struct{}) {
log.Printf("Starting %s variant downloader (bandwidth: %d)", variant.Resolution, variant.Bandwidth)
ticker := time.NewTicker(constants.RefreshDelay)
defer ticker.Stop()
client := &http.Client{}
seen := make(map[string]bool)
for {
select {
case <-ctx.Done():
return
default:
}
playlist, err := LoadMediaPlaylist(variant.URL)
seq := playlist.SeqNo
if err != nil {
log.Printf("%s: Error loading playlist playlist: %v", variant.Resolution, err)
goto waitTick
}
for _, seg := range playlist.Segments {
if seg == nil {
continue
}
job := SegmentJob{
URI: seg.URI,
Seq: seq,
VariantID: variant.ID,
Variant: variant,
}
segmentKey := job.Key()
if seen[segmentKey] {
seq++
continue
}
seen[segmentKey] = true
sem <- struct{}{} // Acquire
go func(j SegmentJob) {
defer func() { <-sem }() // Release
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
err := DownloadSegment(ctx, client, j.AbsoluteURL(), j.Variant.OutputDir)
name := strings.TrimSuffix(path.Base(j.Key()), path.Ext(path.Base(j.Key())))
if err == nil {
log.Printf("✓ %s downloaded segment %s", j.Variant.Resolution, name)
} else if httpClient.IsHTTPStatus(err, 403) {
log.Printf("✗ %s failed to download segment %s (403)", j.Variant.Resolution, name)
} else {
log.Printf("✗ %s failed to download segment %s: %v", j.Variant.Resolution, name, err)
}
}(job)
seq++
}
if playlist.Closed {
log.Printf("%s: Playlist closed (#EXT-X-ENDLIST)", variant.Resolution)
return
}
waitTick:
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}