From 99594597dbd0282faaaf04e31e2d3b9ca360d0da Mon Sep 17 00:00:00 2001 From: townandgown Date: Sun, 27 Jul 2025 00:27:21 -0500 Subject: [PATCH] Manifest architecture present. Individual items not populating --- CLAUDE.md | 66 ++++++++++++++------- cmd/downloader/{main.go => download.go} | 23 ++++++-- cmd/main/main.go | 30 ++++++++++ cmd/proc/main.go | 1 - cmd/processor/process.go | 7 +++ pkg/constants/constants.go | 10 ++-- pkg/media/manifest.go | 78 +++++++++++++++++++++++++ pkg/media/stream.go | 22 +++++-- pkg/transfer/cleanup.go | 1 - pkg/transfer/nas.go | 7 ++- pkg/transfer/service.go | 6 +- 11 files changed, 210 insertions(+), 41 deletions(-) rename cmd/downloader/{main.go => download.go} (71%) create mode 100644 cmd/main/main.go delete mode 100644 cmd/proc/main.go create mode 100644 cmd/processor/process.go create mode 100644 pkg/media/manifest.go diff --git a/CLAUDE.md b/CLAUDE.md index 70ab624..6b54b32 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,31 +4,45 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project Overview -This is a Go-based M3U8 downloader that parses HLS (HTTP Live Streaming) playlists to extract video and audio stream metadata. The end goal of this project is to have a listening REST API take in m3u8 urls, parse them, and eventually send to a conversion service. +This is a Go-based HLS (HTTP Live Streaming) downloader that monitors M3U8 playlists and downloads video segments in real-time. The program takes a master M3U8 playlist URL, parses all available stream variants (different qualities/bitrates), and continuously monitors each variant's chunklist for new segments to download. ## Architecture -The project follows a clean separation of concerns: +The project follows a modular architecture with clear separation of concerns: -- **main.go**: Entry point that demonstrates usage of the media package -- **media/**: Core package containing M3U8 parsing logic - - **types.go**: Contains the main parsing logic and data structures (`StreamSet`, `VideoURL`, `AudioURL`) - - **utils.go**: Utility functions for parsing attributes and resolution calculations +- **cmd/**: Entry points for different execution modes + - **downloader/main.go**: Main downloader application that orchestrates variant downloading + - **proc/main.go**: Alternative processor entry point (currently minimal) +- **pkg/**: Core packages containing the application logic + - **media/**: HLS streaming and download logic + - **stream.go**: Stream variant parsing and downloading orchestration (`GetAllVariants`, `VariantDownloader`) + - **playlist.go**: M3U8 playlist loading and parsing (`LoadMediaPlaylist`) + - **segment.go**: Individual segment downloading logic (`DownloadSegment`, `SegmentJob`) + - **constants/constants.go**: Configuration constants (URLs, timeouts, output paths) + - **httpClient/error.go**: HTTP error handling utilities -The `GetStreamMetadata()` function is the main entry point that: -1. Fetches the M3U8 master playlist via HTTP -2. Parses the content line by line -3. Extracts video streams (`#EXT-X-STREAM-INF`) and audio streams (`#EXT-X-MEDIA`) -4. Returns a `StreamSet` containing all parsed metadata +## Core Functionality + +The main workflow is: +1. **Parse Master Playlist**: `GetAllVariants()` fetches and parses the master M3U8 to extract all stream variants with different qualities/bitrates +2. **Concurrent Monitoring**: Each variant gets its own goroutine running `VariantDownloader()` that continuously polls for playlist updates +3. **Segment Detection**: When new segments appear in a variant's playlist, they are queued for download +4. **Parallel Downloads**: Segments are downloaded concurrently with configurable worker pools and retry logic +5. **Quality Organization**: Downloaded segments are organized by resolution (1080p, 720p, etc.) in separate directories + +## Key Data Structures + +- `StreamVariant`: Represents a stream quality variant with URL, bandwidth, resolution, and output directory +- `SegmentJob`: Represents a segment download task with URI, sequence number, and variant info ## Common Development Commands ```bash -# Build the project -go build -o m3u8-downloader +# Build the downloader application +go build -o stream-recorder ./cmd/downloader -# Run the project -go run main.go +# Run the downloader +go run ./cmd/downloader/main.go # Run with module support go mod tidy @@ -40,12 +54,24 @@ go test ./... go fmt ./... ``` -## Key Data Structures +## Configuration -- `StreamSet`: Root structure containing playlist URL and all streams -- `VideoURL`: Represents video stream with bandwidth, codecs, resolution, frame rate -- `AudioURL`: Represents audio stream with media type, group ID, name, and selection flags +Key configuration is managed in `pkg/constants/constants.go`: +- `MasterURL`: The master M3U8 playlist URL to monitor +- `WorkerCount`: Number of concurrent segment downloaders per variant +- `RefreshDelay`: How often to check for playlist updates (3 seconds) +- `OutputDirPath`: Base directory for downloaded segments +- HTTP headers for requests (User-Agent, Referer) + +## Monitoring and Downloads + +The application implements real-time stream monitoring: +- **Continuous Polling**: Each variant playlist is checked every 3 seconds for new segments +- **Deduplication**: Uses segment URIs and sequence numbers to avoid re-downloading +- **Graceful Shutdown**: Responds to SIGINT/SIGTERM signals for clean exit +- **Error Resilience**: Retries failed downloads and handles HTTP 403 errors specially +- **Quality Detection**: Automatically determines resolution from bandwidth or explicit resolution data ## Error Handling -The current implementation uses `panic()` for error handling. When extending functionality, consider implementing proper error handling with returned error values following Go conventions. \ No newline at end of file +The implementation uses proper Go error handling patterns with custom HTTP error types. Failed downloads are logged with clear status indicators (✓ for success, ✗ for failure). \ No newline at end of file diff --git a/cmd/downloader/main.go b/cmd/downloader/download.go similarity index 71% rename from cmd/downloader/main.go rename to cmd/downloader/download.go index e252e9a..5dddd7f 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/download.go @@ -1,4 +1,4 @@ -package main +package downloader import ( "context" @@ -13,7 +13,7 @@ import ( "time" ) -func main() { +func Download(masterURL string, eventName string, debug bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -29,7 +29,7 @@ func main() { var wg sync.WaitGroup var transferService *transfer.TransferService if constants.EnableNASTransfer { - ts, err := transfer.NewTrasferService(constants.NASPath) + ts, err := transfer.NewTrasferService(constants.NASOutputPath, eventName) if err != nil { log.Printf("Failed to create transfer service: %v", err) log.Println("Continuing without transfer service...") @@ -46,7 +46,9 @@ func main() { } } - variants, err := media.GetAllVariants(constants.MasterURL) + manifestWriter := media.NewManifestWriter(eventName) + + variants, err := media.GetAllVariants(masterURL, constants.LocalOutputDirPath+"/"+eventName, manifestWriter) if err != nil { log.Fatalf("Failed to get variants: %v", err) } @@ -54,11 +56,19 @@ func main() { sem := make(chan struct{}, constants.WorkerCount*len(variants)) + manifest := media.NewManifestWriter(eventName) + for _, variant := range variants { + // Debug mode only tracks one variant for easier debugging + if debug { + if variant.Resolution != "1080p" { + continue + } + } wg.Add(1) go func(v *media.StreamVariant) { defer wg.Done() - media.VariantDownloader(ctx, v, sem) + media.VariantDownloader(ctx, v, sem, manifest) }(variant) } @@ -72,4 +82,7 @@ func main() { } log.Println("All Services shut down.") + + manifestWriter.WriteManifest() + log.Println("Manifest written.") } diff --git a/cmd/main/main.go b/cmd/main/main.go new file mode 100644 index 0000000..20d397d --- /dev/null +++ b/cmd/main/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "m3u8-downloader/cmd/downloader" + "os" + "strings" + "time" +) + +func main() { + + url := flag.String("url", "", "M3U8 playlist URL") + eventName := flag.String("event", time.Now().Format("2006-01-02"), "Event name") + debug := flag.Bool("debug", false, "Enable debug mode") + flag.Parse() + if *url == "" { + reader := bufio.NewReader(os.Stdin) + fmt.Print("Enter M3U8 playlist URL: ") + inputUrl, _ := reader.ReadString('\n') + inputUrl = strings.TrimSpace(inputUrl) + downloader.Download(inputUrl, *eventName, *debug) + return + } + + downloader.Download(*url, *eventName, *debug) + return +} diff --git a/cmd/proc/main.go b/cmd/proc/main.go deleted file mode 100644 index 6eb0478..0000000 --- a/cmd/proc/main.go +++ /dev/null @@ -1 +0,0 @@ -package proc diff --git a/cmd/processor/process.go b/cmd/processor/process.go new file mode 100644 index 0000000..e7002bb --- /dev/null +++ b/cmd/processor/process.go @@ -0,0 +1,7 @@ +package processor + +import "fmt" + +func Process() { + fmt.Println("Process") +} diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 74fe6cb..eceaa1d 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -3,16 +3,15 @@ package constants import "time" const ( - MasterURL = "https://live-fastly.flosports.tv/streams/mr159021-260419/playlist.m3u8?token=st%3D1753571418%7Eexp%3D1753571448%7Eacl%3D%2Fstreams%2Fmr159021-260419%2Fplaylist.m3u8%7Edata%3Dssai%3A0%3BuserId%3A14025903%3BstreamId%3A260419%3BmediaPackageRegion%3Afalse%3BdvrMinutes%3A360%3BtokenId%3Abadd289a-ade5-48fe-852f-7dbd1d57aca8%3Bpv%3A86400%7Ehmac2%3D8de65c26b185084a6be77e788cb0ba41be5fcac3ab86159b06f7572ca925d77ba7bd182124af2a432953d4223548f198742d1a238e937d875976cd42fe549838&mid_origin=media_store&keyName=FLOSPORTS_TOKEN_KEY_2023-08-02&streamCode=mr159021-260419" WorkerCount = 4 RefreshDelay = 3 * time.Second - HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" - REFERRER = "https://www.flomarching.com" - OutputDirPath = "./data/flo_radio" + HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" + REFERRER = "https://www.flomarching.com" + LocalOutputDirPath = "./data/" EnableNASTransfer = true - NASPath = "\\\\HomeLabNAS\\dci\\streams\\2025_Atlanta" + NASOutputPath = "\\\\HomeLabNAS\\dci\\streams" NASUsername = "" NASPassword = "" TransferWorkerCount = 2 @@ -22,6 +21,7 @@ const ( PersistencePath = "./data/transfer_queue.json" TransferQueueSize = 1000 BatchSize = 10 + ManifestPath = "./data" CleanupAfterTransfer = true CleanupBatchSize = 10 diff --git a/pkg/media/manifest.go b/pkg/media/manifest.go new file mode 100644 index 0000000..5c594b6 --- /dev/null +++ b/pkg/media/manifest.go @@ -0,0 +1,78 @@ +package media + +import ( + "encoding/json" + "log" + "m3u8-downloader/pkg/constants" + "os" + "sort" +) + +type ManifestWriter struct { + ManifestPath string + Segments []ManifestItem + Index map[string]*ManifestItem +} + +type ManifestItem struct { + SeqNo string `json:"seqNo"` + Resolution string `json:"resolution"` +} + +func NewManifestWriter(eventName string) *ManifestWriter { + return &ManifestWriter{ + ManifestPath: constants.ManifestPath + "/" + eventName + ".json", + Segments: make([]ManifestItem, 0), + Index: make(map[string]*ManifestItem), + } +} + +func (m *ManifestWriter) AddOrUpdateSegment(seqNo string, resolution string) { + if m.Index == nil { + m.Index = make(map[string]*ManifestItem) + } + + if m.Segments == nil { + m.Segments = make([]ManifestItem, 0) + } + + if existing, ok := m.Index[seqNo]; ok { + if resolution > existing.Resolution { + existing.Resolution = resolution + } + return + } else { + item := ManifestItem{ + SeqNo: seqNo, + Resolution: resolution, + } + m.Segments = append(m.Segments, item) + m.Index[seqNo] = &item + } +} + +func (m *ManifestWriter) WriteManifest() { + sort.Slice(m.Segments, func(i, j int) bool { + return m.Segments[i].SeqNo < m.Segments[j].SeqNo + }) + + data, err := json.MarshalIndent(m.Segments, "", " ") + if err != nil { + log.Printf("Failed to marshal manifest: %v", err) + return + } + + file, err := os.Create(m.ManifestPath) + if err != nil { + log.Printf("Failed to create manifest file: %v", err) + return + } + + defer file.Close() + + _, err = file.Write(data) + if err != nil { + log.Printf("Failed to write manifest file: %v", err) + return + } +} diff --git a/pkg/media/stream.go b/pkg/media/stream.go index 373ca49..e7d03ac 100644 --- a/pkg/media/stream.go +++ b/pkg/media/stream.go @@ -2,6 +2,7 @@ package media import ( "context" + "errors" "fmt" "github.com/grafov/m3u8" "log" @@ -21,6 +22,7 @@ type StreamVariant struct { ID int Resolution string OutputDir string + Writer *ManifestWriter } func extractResolution(variant *m3u8.Variant) string { @@ -44,7 +46,7 @@ func extractResolution(variant *m3u8.Variant) string { } } -func GetAllVariants(masterURL string) ([]*StreamVariant, error) { +func GetAllVariants(masterURL string, outputDir string, writer *ManifestWriter) ([]*StreamVariant, error) { client := &http.Client{} req, _ := http.NewRequest("GET", masterURL, nil) req.Header.Set("User-Agent", constants.HTTPUserAgent) @@ -69,7 +71,8 @@ func GetAllVariants(masterURL string) ([]*StreamVariant, error) { BaseURL: base, ID: 0, Resolution: "unknown", - OutputDir: path.Join(constants.NASPath, "unknown"), + OutputDir: path.Join(outputDir, "unknown"), + Writer: writer, }}, nil } @@ -83,7 +86,7 @@ func GetAllVariants(masterURL string) ([]*StreamVariant, error) { vURL, _ := url.Parse(v.URI) fullURL := base.ResolveReference(vURL).String() resolution := extractResolution(v) - outputDir := path.Join(constants.NASPath, resolution) + outputDir := path.Join(outputDir, resolution) variants = append(variants, &StreamVariant{ URL: fullURL, Bandwidth: v.Bandwidth, @@ -96,7 +99,7 @@ func GetAllVariants(masterURL string) ([]*StreamVariant, error) { return variants, nil } -func VariantDownloader(ctx context.Context, variant *StreamVariant, sem chan struct{}) { +func VariantDownloader(ctx context.Context, variant *StreamVariant, sem chan struct{}, manifest *ManifestWriter) { log.Printf("Starting %s variant downloader (bandwidth: %d)", variant.Resolution, variant.Bandwidth) ticker := time.NewTicker(constants.RefreshDelay) defer ticker.Stop() @@ -142,9 +145,18 @@ func VariantDownloader(ctx context.Context, variant *StreamVariant, sem chan str err := DownloadSegment(ctx, client, j.AbsoluteURL(), j.Variant.OutputDir) name := strings.TrimSuffix(path.Base(j.Key()), path.Ext(path.Base(j.Key()))) + if err == nil { log.Printf("✓ %s downloaded segment %s", j.Variant.Resolution, name) - } else if httpClient.IsHTTPStatus(err, 403) { + return + } + + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + // Suppress log: shutdown in progress + return + } + + if httpClient.IsHTTPStatus(err, 403) { log.Printf("✗ %s failed to download segment %s (403)", j.Variant.Resolution, name) } else { log.Printf("✗ %s failed to download segment %s: %v", j.Variant.Resolution, name, err) diff --git a/pkg/transfer/cleanup.go b/pkg/transfer/cleanup.go index 8eda06f..6e3e786 100644 --- a/pkg/transfer/cleanup.go +++ b/pkg/transfer/cleanup.go @@ -69,7 +69,6 @@ func (cs *CleanupService) ExecuteCleanup(ctx context.Context) error { if batchSize > len(cs.pendingFiles) { batchSize = len(cs.pendingFiles) } - cs.mu.Unlock() log.Printf("Executing cleanup batch (size: %d)", batchSize) diff --git a/pkg/transfer/nas.go b/pkg/transfer/nas.go index 7172c6c..d8134aa 100644 --- a/pkg/transfer/nas.go +++ b/pkg/transfer/nas.go @@ -15,9 +15,14 @@ type NASTransfer struct { } func NewNASTransfer(config NASConfig) *NASTransfer { - return &NASTransfer{ + nt := &NASTransfer{ config: config, } + err := nt.ensureDirectoryExists(nt.config.Path) + if err != nil { + log.Fatalf("Failed to create directory %s: %v", nt.config.Path, err) + } + return nt } func (nt *NASTransfer) TransferFile(ctx context.Context, item *TransferItem) error { diff --git a/pkg/transfer/service.go b/pkg/transfer/service.go index fbb8c5f..2ba5957 100644 --- a/pkg/transfer/service.go +++ b/pkg/transfer/service.go @@ -17,9 +17,9 @@ type TransferService struct { stats *QueueStats } -func NewTrasferService(outputDir string) (*TransferService, error) { +func NewTrasferService(outputDir string, eventName string) (*TransferService, error) { nasConfig := NASConfig{ - Path: constants.NASPath, + Path: outputDir + "/" + eventName, Username: constants.NASUsername, Password: constants.NASPassword, Timeout: constants.TransferTimeout, @@ -48,7 +48,7 @@ func NewTrasferService(outputDir string) (*TransferService, error) { } queue := NewTransferQueue(queueConfig, nas, cleanup) - watcher, err := NewFileWatcher(outputDir, queue) + watcher, err := NewFileWatcher(constants.LocalOutputDirPath+"/"+eventName, queue) if err != nil { return nil, fmt.Errorf("Failed to create file watcher: %w", err) }