diff --git a/CLAUDE.md b/CLAUDE.md index 6b54b32..0c10a10 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,45 +4,98 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project Overview -This is a Go-based HLS (HTTP Live Streaming) downloader that monitors M3U8 playlists and downloads video segments in real-time. The program takes a master M3U8 playlist URL, parses all available stream variants (different qualities/bitrates), and continuously monitors each variant's chunklist for new segments to download. +This is a Go-based HLS (HTTP Live Streaming) recorder that monitors M3U8 playlists and downloads video segments in real-time with automatic NAS transfer capabilities. The program takes a master M3U8 playlist URL, parses all available stream variants (different qualities/bitrates), continuously monitors each variant's chunklist for new segments, downloads them locally, and optionally transfers them to network storage for long-term archival. ## Architecture The project follows a modular architecture with clear separation of concerns: - **cmd/**: Entry points for different execution modes - - **downloader/main.go**: Main downloader application that orchestrates variant downloading - - **proc/main.go**: Alternative processor entry point (currently minimal) + - **main/main.go**: Primary CLI entry point with URL input and event naming + - **downloader/download.go**: Core download orchestration logic with transfer service integration + - **processor/process.go**: Alternative processing entry point - **pkg/**: Core packages containing the application logic - **media/**: HLS streaming and download logic - **stream.go**: Stream variant parsing and downloading orchestration (`GetAllVariants`, `VariantDownloader`) - **playlist.go**: M3U8 playlist loading and parsing (`LoadMediaPlaylist`) - **segment.go**: Individual segment downloading logic (`DownloadSegment`, `SegmentJob`) - - **constants/constants.go**: Configuration constants (URLs, timeouts, output paths) + - **manifest.go**: Manifest generation and segment tracking (`ManifestWriter`, `ManifestItem`) + - **transfer/**: NAS transfer system (complete implementation available) + - **service.go**: Transfer service orchestration + - **watcher.go**: File system monitoring for new downloads + - **queue.go**: Priority queue with worker pool management + - **nas.go**: NAS file transfer with retry logic + - **cleanup.go**: Local file cleanup after successful transfer + - **types.go**: Transfer system data structures + - **constants/constants.go**: Configuration constants (paths, timeouts, NAS settings) - **httpClient/error.go**: HTTP error handling utilities ## Core Functionality -The main workflow is: +### Download Workflow 1. **Parse Master Playlist**: `GetAllVariants()` fetches and parses the master M3U8 to extract all stream variants with different qualities/bitrates 2. **Concurrent Monitoring**: Each variant gets its own goroutine running `VariantDownloader()` that continuously polls for playlist updates 3. **Segment Detection**: When new segments appear in a variant's playlist, they are queued for download 4. **Parallel Downloads**: Segments are downloaded concurrently with configurable worker pools and retry logic 5. **Quality Organization**: Downloaded segments are organized by resolution (1080p, 720p, etc.) in separate directories +6. **Manifest Generation**: `ManifestWriter` tracks all downloaded segments with sequence numbers and resolutions + +### NAS Transfer Workflow (Optional) +1. **File Watching**: `FileWatcher` monitors download directories for new `.ts` files +2. **Transfer Queuing**: New files are added to a priority queue after a settling delay +3. **Background Transfer**: Worker pool transfers files to NAS with retry logic and verification +4. **Local Cleanup**: Successfully transferred files are automatically cleaned up locally +5. **State Persistence**: Queue state is persisted to survive crashes and restarts ## Key Data Structures -- `StreamVariant`: Represents a stream quality variant with URL, bandwidth, resolution, and output directory +- `StreamVariant`: Represents a stream quality variant with URL, bandwidth, resolution, output directory, and manifest writer - `SegmentJob`: Represents a segment download task with URI, sequence number, and variant info +- `ManifestWriter`: Tracks downloaded segments and generates JSON manifests +- `ManifestItem`: Individual segment record with sequence number and resolution +- `TransferItem`: Transfer queue item with source, destination, retry count, and status +- `TransferService`: Orchestrates file watching, queuing, transfer, and cleanup + +## Configuration + +Key configuration is managed in `pkg/constants/constants.go`: + +### Core Settings +- `WorkerCount`: Number of concurrent segment downloaders per variant (4) +- `RefreshDelay`: How often to check for playlist updates (3 seconds) +- `LocalOutputDirPath`: Base directory for local downloads (`./data/`) +- `ManifestPath`: Directory for manifest JSON files (`./data`) + +### HTTP Settings +- `HTTPUserAgent`: User agent string for HTTP requests +- `REFERRER`: Referer header for HTTP requests + +### NAS Transfer Settings +- `EnableNASTransfer`: Enable/disable automatic NAS transfer (true) +- `NASOutputPath`: UNC path to NAS storage (`\\HomeLabNAS\dci\streams`) +- `NASUsername`/`NASPassword`: NAS credentials (empty for current user) +- `TransferWorkerCount`: Concurrent transfer workers (2) +- `TransferRetryLimit`: Max retry attempts per file (3) +- `TransferTimeout`: Timeout per file transfer (30 seconds) +- `FileSettlingDelay`: Wait before queuing new files (5 seconds) +- `PersistencePath`: Transfer queue state file (`./data/transfer_queue.json`) + +### Cleanup Settings +- `CleanupAfterTransfer`: Delete local files after NAS transfer (true) +- `CleanupBatchSize`: Files processed per cleanup batch (10) +- `RetainLocalHours`: Hours to keep local files (0 = immediate cleanup) ## Common Development Commands ```bash -# Build the downloader application -go build -o stream-recorder ./cmd/downloader +# Build the main application +go build -o stream-recorder ./cmd/main -# Run the downloader -go run ./cmd/downloader/main.go +# Run with URL prompt +go run ./cmd/main/main.go + +# Run with command line arguments +go run ./cmd/main/main.go -url="https://example.com/playlist.m3u8" -event="my-event" -debug=true # Run with module support go mod tidy @@ -54,24 +107,69 @@ go test ./... go fmt ./... ``` -## Configuration +## Command Line Options -Key configuration is managed in `pkg/constants/constants.go`: -- `MasterURL`: The master M3U8 playlist URL to monitor -- `WorkerCount`: Number of concurrent segment downloaders per variant -- `RefreshDelay`: How often to check for playlist updates (3 seconds) -- `OutputDirPath`: Base directory for downloaded segments -- HTTP headers for requests (User-Agent, Referer) +- `-url`: M3U8 playlist URL (if not provided, prompts for input) +- `-event`: Event name for organizing downloads (defaults to current date) +- `-debug`: Debug mode (only downloads 1080p variant for easier testing) ## Monitoring and Downloads -The application implements real-time stream monitoring: +The application implements comprehensive real-time stream monitoring: + +### Download Features - **Continuous Polling**: Each variant playlist is checked every 3 seconds for new segments - **Deduplication**: Uses segment URIs and sequence numbers to avoid re-downloading - **Graceful Shutdown**: Responds to SIGINT/SIGTERM signals for clean exit - **Error Resilience**: Retries failed downloads and handles HTTP 403 errors specially - **Quality Detection**: Automatically determines resolution from bandwidth or explicit resolution data +- **Context Cancellation**: Proper timeout and cancellation handling for clean shutdowns + +### Transfer Features (when enabled) +- **Real-time Transfer**: Files are transferred to NAS as soon as they're downloaded +- **Queue Persistence**: Transfer queue survives application restarts +- **Retry Logic**: Failed transfers are retried with exponential backoff +- **Verification**: File sizes are verified after transfer +- **Automatic Cleanup**: Local files are removed after successful NAS transfer +- **Statistics Reporting**: Transfer progress and statistics are logged regularly + +### Manifest Generation +- **Segment Tracking**: All downloaded segments are tracked with sequence numbers +- **Resolution Mapping**: Segments are associated with their quality variants +- **JSON Output**: Manifest files are generated as sorted JSON arrays for easy processing ## Error Handling -The implementation uses proper Go error handling patterns with custom HTTP error types. Failed downloads are logged with clear status indicators (✓ for success, ✗ for failure). \ No newline at end of file +The implementation uses proper Go error handling patterns: +- **Custom HTTP Errors**: Structured error types for HTTP failures +- **Context-Aware Cancellation**: Proper handling of shutdown scenarios +- **Retry Logic**: Exponential backoff for transient failures +- **Logging**: Clear status indicators (✓ for success, ✗ for failure) +- **Graceful Degradation**: Transfer service failures don't stop downloads + +## Dependencies + +- `github.com/grafov/m3u8`: M3U8 playlist parsing +- `github.com/fsnotify/fsnotify`: File system event monitoring for NAS transfers + +## Data Organization + +Downloaded files are organized as: +``` +./data/ +├── {event-name}.json # Manifest file +├── {event-name}/ # Event-specific directory +│ ├── 1080p/ # High quality segments +│ ├── 720p/ # Medium quality segments +│ └── 480p/ # Lower quality segments +└── transfer_queue.json # Transfer queue state +``` + +NAS files mirror the local structure: +``` +\\HomeLabNAS\dci\streams\ +└── {event-name}/ + ├── 1080p/ + ├── 720p/ + └── 480p/ +``` \ No newline at end of file diff --git a/cmd/main/main.go b/cmd/main/main.go index 20d397d..d0e641b 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -5,17 +5,32 @@ import ( "flag" "fmt" "m3u8-downloader/cmd/downloader" + "m3u8-downloader/cmd/processor" + "m3u8-downloader/cmd/transfer" "os" "strings" "time" ) func main() { - url := flag.String("url", "", "M3U8 playlist URL") eventName := flag.String("event", time.Now().Format("2006-01-02"), "Event name") debug := flag.Bool("debug", false, "Enable debug mode") + transferOnly := flag.Bool("transfer", false, "Transfer-only mode: transfer existing files without downloading") + processOnly := flag.Bool("process", false, "Process-only mode: process existing files without downloading") + flag.Parse() + + if *transferOnly { + transfer.RunTransferOnly(*eventName) + return + } + + if *processOnly { + processor.Process() + return + } + if *url == "" { reader := bufio.NewReader(os.Stdin) fmt.Print("Enter M3U8 playlist URL: ") @@ -26,5 +41,4 @@ func main() { } downloader.Download(*url, *eventName, *debug) - return } diff --git a/cmd/processor/process.go b/cmd/processor/process.go index e7002bb..a1c6049 100644 --- a/cmd/processor/process.go +++ b/cmd/processor/process.go @@ -1,7 +1,15 @@ package processor -import "fmt" +import ( + "m3u8-downloader/pkg/processing" +) func Process() { - fmt.Println("Process") + + config := processing.ProcessConfig{ + WorkerCount: 4, + DestinationPath: "/Users/andrey/Downloads", + Enabled: true, + } + processing.NewProcessingService(&config, nil).Start(nil) } diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index eceaa1d..3ac8e6c 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -8,7 +8,7 @@ const ( HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" REFERRER = "https://www.flomarching.com" - LocalOutputDirPath = "./data/" + LocalOutputDirPath = "../data/" EnableNASTransfer = true NASOutputPath = "\\\\HomeLabNAS\\dci\\streams" @@ -18,12 +18,16 @@ const ( TransferRetryLimit = 3 TransferTimeout = 30 * time.Second FileSettlingDelay = 5 * time.Second - PersistencePath = "./data/transfer_queue.json" - TransferQueueSize = 1000 + PersistencePath = "../data/transfer_queue.json" + TransferQueueSize = 100000 BatchSize = 10 - ManifestPath = "./data" + ManifestPath = "../data" CleanupAfterTransfer = true CleanupBatchSize = 10 RetainLocalHours = 0 + + AutoProcess = true + ProcessingEnabled = true + ProcessWorkerCount = 2 ) diff --git a/pkg/nas/config.go b/pkg/nas/config.go new file mode 100644 index 0000000..cc40ac9 --- /dev/null +++ b/pkg/nas/config.go @@ -0,0 +1,12 @@ +package nas + +import "time" + +type NASConfig struct { + Path string + Username string + Password string + Timeout time.Duration + RetryLimit int + VerifySize bool +} diff --git a/pkg/processing/service.go b/pkg/processing/service.go new file mode 100644 index 0000000..e643ce0 --- /dev/null +++ b/pkg/processing/service.go @@ -0,0 +1,41 @@ +package processing + +import ( + "context" + "log" + "m3u8-downloader/pkg/nas" + "sync" +) + +type ProcessingService struct { + config *ProcessConfig + nas *nas.NASConfig +} + +func NewProcessingService(config *ProcessConfig, nas *nas.NASConfig) *ProcessingService { + return &ProcessingService{ + config: config, + nas: nas, + } +} + +func (ps *ProcessingService) Start(ctx context.Context) error { + if !ps.config.Enabled { + log.Println("Processing service disabled") + return nil + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + }() + wg.Wait() + + return nil +} + +func (ps *ProcessingService) ProcessEvent(eventName string) error { + return nil +} diff --git a/pkg/processing/types.go b/pkg/processing/types.go new file mode 100644 index 0000000..d29a70d --- /dev/null +++ b/pkg/processing/types.go @@ -0,0 +1,11 @@ +package processing + +type ProcessJob struct { + EventName string +} + +type ProcessConfig struct { + WorkerCount int + DestinationPath string + Enabled bool +} diff --git a/pkg/transfer/nas.go b/pkg/transfer/nas.go index d8134aa..65c38af 100644 --- a/pkg/transfer/nas.go +++ b/pkg/transfer/nas.go @@ -5,16 +5,17 @@ import ( "fmt" "io" "log" + "m3u8-downloader/pkg/nas" "os" "path/filepath" ) type NASTransfer struct { - config NASConfig + config nas.NASConfig connected bool } -func NewNASTransfer(config NASConfig) *NASTransfer { +func NewNASTransfer(config nas.NASConfig) *NASTransfer { nt := &NASTransfer{ config: config, } @@ -127,3 +128,37 @@ func (nt *NASTransfer) TestConnection() error { func (nt *NASTransfer) IsConnected() bool { return nt.connected } + +// FileExists checks if a file already exists on the NAS and optionally verifies size +func (nt *NASTransfer) FileExists(destinationPath string, expectedSize int64) (bool, error) { + fullDestPath := filepath.Join(nt.config.Path, destinationPath) + + destInfo, err := os.Stat(fullDestPath) + if err != nil { + if os.IsNotExist(err) { + return false, nil // File doesn't exist, no error + } + return false, fmt.Errorf("failed to stat NAS file: %w", err) + } + + // File exists, check size if expected size is provided + if expectedSize > 0 && destInfo.Size() != expectedSize { + log.Printf("NAS file size mismatch for %s: expected=%d, actual=%d", + fullDestPath, expectedSize, destInfo.Size()) + return false, nil // File exists but wrong size, treat as not existing + } + + return true, nil +} + +// GetFileSize returns the size of a file on the NAS +func (nt *NASTransfer) GetFileSize(destinationPath string) (int64, error) { + fullDestPath := filepath.Join(nt.config.Path, destinationPath) + + destInfo, err := os.Stat(fullDestPath) + if err != nil { + return 0, fmt.Errorf("failed to stat NAS file: %w", err) + } + + return destInfo.Size(), nil +} diff --git a/pkg/transfer/queue.go b/pkg/transfer/queue.go index 56bcb0a..66c7747 100644 --- a/pkg/transfer/queue.go +++ b/pkg/transfer/queue.go @@ -145,6 +145,24 @@ func (tq *TransferQueue) dispatchWork() { } func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) { + // Check if file already exists on NAS before attempting transfer + if exists, err := tq.nasTransfer.FileExists(item.DestinationPath, item.FileSize); err != nil { + log.Printf("Failed to check if file exists on NAS for %s: %v", item.SourcePath, err) + // Continue with transfer attempt on error + } else if exists { + log.Printf("File already exists on NAS, skipping transfer: %s", item.SourcePath) + item.Status = StatusCompleted + tq.stats.IncrementCompleted(item.FileSize) + + // Schedule for cleanup + if tq.cleanup != nil { + if err := tq.cleanup.ScheduleCleanup(item.SourcePath); err != nil { + log.Printf("Failed to schedule cleanup for existing file %s: %v", item.SourcePath, err) + } + } + return + } + maxRetries := 3 for attempt := 1; attempt <= maxRetries; attempt++ { diff --git a/pkg/transfer/service.go b/pkg/transfer/service.go index 2ba5957..03bc64e 100644 --- a/pkg/transfer/service.go +++ b/pkg/transfer/service.go @@ -5,6 +5,10 @@ import ( "fmt" "log" "m3u8-downloader/pkg/constants" + nas2 "m3u8-downloader/pkg/nas" + "os" + "path/filepath" + "strings" "sync" "time" ) @@ -18,8 +22,8 @@ type TransferService struct { } func NewTrasferService(outputDir string, eventName string) (*TransferService, error) { - nasConfig := NASConfig{ - Path: outputDir + "/" + eventName, + nasConfig := nas2.NASConfig{ + Path: outputDir, Username: constants.NASUsername, Password: constants.NASPassword, Timeout: constants.TransferTimeout, @@ -34,9 +38,9 @@ func NewTrasferService(outputDir string, eventName string) (*TransferService, er cleanupConfig := CleanupConfig{ Enabled: constants.CleanupAfterTransfer, - RetentionPeriod: constants.RetainLocalHours, + RetentionPeriod: time.Duration(constants.RetainLocalHours) * time.Hour, BatchSize: constants.CleanupBatchSize, - CheckInterval: constants.FileSettlingDelay, + CheckInterval: 1 * time.Minute, } cleanup := NewCleanupService(cleanupConfig) @@ -48,7 +52,13 @@ func NewTrasferService(outputDir string, eventName string) (*TransferService, er } queue := NewTransferQueue(queueConfig, nas, cleanup) - watcher, err := NewFileWatcher(constants.LocalOutputDirPath+"/"+eventName, queue) + // Create local output directory if it doesn't exist + localOutputPath := constants.LocalOutputDirPath + "/" + eventName + if err := os.MkdirAll(localOutputPath, 0755); err != nil { + return nil, fmt.Errorf("Failed to create local output directory: %w", err) + } + + watcher, err := NewFileWatcher(localOutputPath, queue) if err != nil { return nil, fmt.Errorf("Failed to create file watcher: %w", err) } @@ -134,3 +144,102 @@ func (ts *TransferService) Shutdown(ctx context.Context) error { return nil } + +// QueueExistingFiles scans a directory for .ts files and queues them for transfer +func (ts *TransferService) QueueExistingFiles(localEventPath string) error { + log.Printf("Scanning for existing files in: %s", localEventPath) + + var fileCount, alreadyTransferred, scheduledForCleanup int + + // Extract event name from path for NAS destination + eventName := filepath.Base(localEventPath) + + err := filepath.Walk(localEventPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + log.Printf("Error accessing path %s: %v", path, err) + return nil // Continue walking + } + + // Only process .ts files + if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".ts") { + // Extract resolution from directory path + resolution := ts.extractResolutionFromPath(path) + + // Get relative path from event directory + relPath, err := filepath.Rel(localEventPath, path) + if err != nil { + log.Printf("Failed to get relative path for %s: %v", path, err) + return nil + } + + // Build NAS destination path (eventName/relPath) + nasDestPath := filepath.Join(eventName, relPath) + + // Check if file already exists on NAS with matching size + exists, err := ts.nas.FileExists(nasDestPath, info.Size()) + if err != nil { + log.Printf("Failed to check NAS file existence for %s: %v", path, err) + // Continue with transfer attempt on error + } else if exists { + log.Printf("File already exists on NAS: %s (%s, %d bytes)", path, resolution, info.Size()) + alreadyTransferred++ + + // Schedule for cleanup if cleanup is enabled + if constants.CleanupAfterTransfer { + if err := ts.cleanup.ScheduleCleanup(path); err != nil { + log.Printf("Failed to schedule cleanup for already-transferred file %s: %v", path, err) + } else { + scheduledForCleanup++ + } + } + return nil // Skip queuing this file + } + + // Create transfer item + item := TransferItem{ + ID: ts.generateTransferID(), + SourcePath: path, + DestinationPath: nasDestPath, + Resolution: resolution, + Timestamp: info.ModTime(), + Status: StatusPending, + FileSize: info.Size(), + } + + // Add to queue + if err := ts.queue.Add(item); err != nil { + log.Printf("Failed to queue file %s: %v", path, err) + } else { + log.Printf("Queued file: %s (%s, %d bytes)", path, resolution, info.Size()) + fileCount++ + } + } + + return nil + }) + + if err != nil { + return fmt.Errorf("failed to walk directory: %w", err) + } + + log.Printf("File scan completed - Queued: %d, Already transferred: %d, Scheduled for cleanup: %d", + fileCount, alreadyTransferred, scheduledForCleanup) + return nil +} + +func (ts *TransferService) extractResolutionFromPath(filePath string) string { + dir := filepath.Dir(filePath) + parts := strings.Split(dir, string(filepath.Separator)) + + for _, part := range parts { + if strings.HasSuffix(part, "p") { + return part + } + } + + return "unknown" +} + +func (ts *TransferService) generateTransferID() string { + return fmt.Sprintf("transfer_existing_%d", time.Now().UnixNano()) +} diff --git a/pkg/transfer/types.go b/pkg/transfer/types.go index aa4c7c7..be35896 100644 --- a/pkg/transfer/types.go +++ b/pkg/transfer/types.go @@ -44,15 +44,6 @@ func (s TransferStatus) String() string { } } -type NASConfig struct { - Path string - Username string - Password string - Timeout time.Duration - RetryLimit int - VerifySize bool -} - type QueueConfig struct { WorkerCount int PersistencePath string diff --git a/pkg/transfer/watcher.go b/pkg/transfer/watcher.go index 52e814c..5aca4f4 100644 --- a/pkg/transfer/watcher.go +++ b/pkg/transfer/watcher.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "m3u8-downloader/pkg/constants" "math/rand" "os" "path/filepath" @@ -32,7 +33,7 @@ func NewFileWatcher(outputDir string, queue *TransferQueue) (*FileWatcher, error outputDir: outputDir, queue: queue, watcher: watcher, - settingDelay: time.Second, + settingDelay: constants.FileSettlingDelay, pendingFiles: make(map[string]*time.Timer), }, nil }