diff --git a/CLAUDE.md b/CLAUDE.md index 0c10a10..2662eab 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -11,9 +11,10 @@ This is a Go-based HLS (HTTP Live Streaming) recorder that monitors M3U8 playlis The project follows a modular architecture with clear separation of concerns: - **cmd/**: Entry points for different execution modes - - **main/main.go**: Primary CLI entry point with URL input and event naming + - **main/main.go**: Primary CLI entry point with URL input, event naming, and mode selection - **downloader/download.go**: Core download orchestration logic with transfer service integration - **processor/process.go**: Alternative processing entry point + - **transfer/transfer.go**: Transfer-only mode entry point - **pkg/**: Core packages containing the application logic - **media/**: HLS streaming and download logic - **stream.go**: Stream variant parsing and downloading orchestration (`GetAllVariants`, `VariantDownloader`) @@ -27,7 +28,18 @@ The project follows a modular architecture with clear separation of concerns: - **nas.go**: NAS file transfer with retry logic - **cleanup.go**: Local file cleanup after successful transfer - **types.go**: Transfer system data structures - - **constants/constants.go**: Configuration constants (paths, timeouts, NAS settings) + - **processing/**: Video processing and concatenation system + - **service.go**: Processing service orchestration with FFmpeg integration + - **segment.go**: Individual segment processing logic + - **types.go**: Processing system data structures + - **nas/**: NAS connection and file operations + - **config.go**: NAS configuration structure + - **nas.go**: NAS service with connection management and file operations + - **config/**: Centralized configuration management with validation + - **config.go**: Configuration loading, validation, and path resolution + - **utils/**: Utility functions for cross-platform compatibility + - **paths.go**: Path manipulation and validation utilities + - **constants/constants.go**: Configuration constants and singleton access - **httpClient/error.go**: HTTP error handling utilities ## Core Functionality @@ -47,6 +59,13 @@ The project follows a modular architecture with clear separation of concerns: 4. **Local Cleanup**: Successfully transferred files are automatically cleaned up locally 5. **State Persistence**: Queue state is persisted to survive crashes and restarts +### Video Processing Workflow (Optional) +1. **Segment Collection**: Processing service reads downloaded segments from NAS storage +2. **Quality Selection**: Automatically selects the highest quality variant available +3. **FFmpeg Processing**: Uses FFmpeg to concatenate segments into a single MP4 file +4. **Output Management**: Processed videos are saved to the configured output directory +5. **Concurrent Processing**: Multiple events can be processed simultaneously with worker pools + ## Key Data Structures - `StreamVariant`: Represents a stream quality variant with URL, bandwidth, resolution, output directory, and manifest writer @@ -55,35 +74,58 @@ The project follows a modular architecture with clear separation of concerns: - `ManifestItem`: Individual segment record with sequence number and resolution - `TransferItem`: Transfer queue item with source, destination, retry count, and status - `TransferService`: Orchestrates file watching, queuing, transfer, and cleanup +- `ProcessingService`: Manages video processing operations with FFmpeg integration +- `ProcessConfig`: Configuration for processing operations including worker count and paths +- `NASService`: Handles NAS connection, authentication, and file operations +- `NASConfig`: Configuration structure for NAS connection parameters ## Configuration -Key configuration is managed in `pkg/constants/constants.go`: +Configuration is managed through a centralized system in `pkg/config/config.go` with environment variable support for deployment flexibility. The system provides validation, cross-platform path resolution, and sensible defaults: ### Core Settings -- `WorkerCount`: Number of concurrent segment downloaders per variant (4) -- `RefreshDelay`: How often to check for playlist updates (3 seconds) -- `LocalOutputDirPath`: Base directory for local downloads (`./data/`) -- `ManifestPath`: Directory for manifest JSON files (`./data`) +- `Core.WorkerCount`: Number of concurrent segment downloaders per variant (4) - ENV: `WORKER_COUNT` +- `Core.RefreshDelay`: How often to check for playlist updates (3 seconds) - ENV: `REFRESH_DELAY_SECONDS` + +### Path Configuration +- `Paths.LocalOutput`: Base directory for local downloads (`data/`) - ENV: `LOCAL_OUTPUT_DIR` +- `Paths.ProcessOutput`: Directory for processed videos (`out/`) - ENV: `PROCESS_OUTPUT_DIR` +- `Paths.ManifestDir`: Directory for manifest JSON files (`data/`) +- `Paths.PersistenceFile`: Transfer queue state file location ### HTTP Settings - `HTTPUserAgent`: User agent string for HTTP requests -- `REFERRER`: Referer header for HTTP requests +- `REFERRER`: Referer header for HTTP requests (`https://www.flomarching.com`) ### NAS Transfer Settings -- `EnableNASTransfer`: Enable/disable automatic NAS transfer (true) -- `NASOutputPath`: UNC path to NAS storage (`\\HomeLabNAS\dci\streams`) -- `NASUsername`/`NASPassword`: NAS credentials (empty for current user) -- `TransferWorkerCount`: Concurrent transfer workers (2) -- `TransferRetryLimit`: Max retry attempts per file (3) -- `TransferTimeout`: Timeout per file transfer (30 seconds) -- `FileSettlingDelay`: Wait before queuing new files (5 seconds) -- `PersistencePath`: Transfer queue state file (`./data/transfer_queue.json`) +- `NAS.EnableTransfer`: Enable/disable automatic NAS transfer (true) - ENV: `ENABLE_NAS_TRANSFER` +- `NAS.OutputPath`: UNC path to NAS storage (`\\HomeLabNAS\dci\streams`) - ENV: `NAS_OUTPUT_PATH` +- `NAS.Username`/`NAS.Password`: NAS credentials for authentication - ENV: `NAS_USERNAME`/`NAS_PASSWORD` +- `Transfer.WorkerCount`: Concurrent transfer workers (2) +- `Transfer.RetryLimit`: Max retry attempts per file (3) +- `Transfer.Timeout`: Timeout per file transfer (30 seconds) +- `Transfer.FileSettlingDelay`: Wait before queuing new files (5 seconds) +- `Transfer.QueueSize`: Maximum queue size (100000) +- `Transfer.BatchSize`: Batch processing size (1000) + +### Processing Settings +- `Processing.AutoProcess`: Enable automatic processing after download (true) +- `Processing.Enabled`: Enable processing functionality (true) +- `Processing.WorkerCount`: Concurrent processing workers (2) +- `Processing.FFmpegPath`: Path to FFmpeg executable (`ffmpeg`) - ENV: `FFMPEG_PATH` ### Cleanup Settings -- `CleanupAfterTransfer`: Delete local files after NAS transfer (true) -- `CleanupBatchSize`: Files processed per cleanup batch (10) -- `RetainLocalHours`: Hours to keep local files (0 = immediate cleanup) +- `Cleanup.AfterTransfer`: Delete local files after NAS transfer (true) +- `Cleanup.BatchSize`: Files processed per cleanup batch (1000) +- `Cleanup.RetainHours`: Hours to keep local files (0 = immediate cleanup) + +### Configuration Access +```go +cfg := constants.MustGetConfig() // Get validated config singleton +eventPath := cfg.GetEventPath("my-event") // Get cross-platform paths +``` + +See `DEPLOYMENT.md` for detailed environment variable configuration and deployment examples. ## Common Development Commands @@ -112,6 +154,8 @@ go fmt ./... - `-url`: M3U8 playlist URL (if not provided, prompts for input) - `-event`: Event name for organizing downloads (defaults to current date) - `-debug`: Debug mode (only downloads 1080p variant for easier testing) +- `-transfer`: Transfer-only mode (transfer existing files without downloading) +- `-process`: Process-only mode (process existing files without downloading) ## Monitoring and Downloads @@ -162,7 +206,9 @@ Downloaded files are organized as: │ ├── 1080p/ # High quality segments │ ├── 720p/ # Medium quality segments │ └── 480p/ # Lower quality segments -└── transfer_queue.json # Transfer queue state +├── transfer_queue.json # Transfer queue state +├── refresh_token.txt # Authentication tokens +└── tokens.txt # Session tokens ``` NAS files mirror the local structure: @@ -172,4 +218,11 @@ NAS files mirror the local structure: ├── 1080p/ ├── 720p/ └── 480p/ +``` + +Processed files are output to: +``` +./out/ +└── {event-name}/ + └── concatenated_segments.mp4 # Final processed video ``` \ No newline at end of file diff --git a/DEPLOYMENT.md b/DEPLOYMENT.md new file mode 100644 index 0000000..8eaab62 --- /dev/null +++ b/DEPLOYMENT.md @@ -0,0 +1,192 @@ +# Deployment Guide + +This document outlines how to deploy and configure the StreamRecorder application in different environments. + +## Environment Variables + +The application supports configuration through environment variables for flexible deployment: + +### Core Settings +- `WORKER_COUNT`: Number of concurrent segment downloaders per variant (default: 4) +- `REFRESH_DELAY_SECONDS`: How often to check for playlist updates in seconds (default: 3) + +### NAS Transfer Settings +- `NAS_OUTPUT_PATH`: UNC path to NAS storage (default: "\\\\HomeLabNAS\\dci\\streams") +- `NAS_USERNAME`: NAS authentication username +- `NAS_PASSWORD`: NAS authentication password +- `ENABLE_NAS_TRANSFER`: Enable/disable automatic NAS transfer (default: true) + +### Path Configuration +- `LOCAL_OUTPUT_DIR`: Base directory for local downloads (default: "data") +- `PROCESS_OUTPUT_DIR`: Output directory for processed videos (default: "out") + +### Processing Settings +- `FFMPEG_PATH`: Path to FFmpeg executable (default: "ffmpeg") + +## Docker Deployment + +### Dockerfile Example + +```dockerfile +FROM golang:1.23-alpine AS builder +WORKDIR /app +COPY go.mod go.sum ./ +RUN go mod download +COPY . . +RUN go build -o stream-recorder ./cmd/main + +FROM alpine:latest +RUN apk --no-cache add ca-certificates ffmpeg +WORKDIR /root/ +COPY --from=builder /app/stream-recorder . +CMD ["./stream-recorder"] +``` + +### Docker Compose Example + +```yaml +version: '3.8' +services: + stream-recorder: + build: . + environment: + - NAS_OUTPUT_PATH=/mnt/nas/streams + - NAS_USERNAME=${NAS_USERNAME} + - NAS_PASSWORD=${NAS_PASSWORD} + - LOCAL_OUTPUT_DIR=/app/data + - PROCESS_OUTPUT_DIR=/app/out + - FFMPEG_PATH=ffmpeg + volumes: + - ./data:/app/data + - ./out:/app/out + - nas_mount:/mnt/nas + networks: + - stream_network + +volumes: + nas_mount: + driver: local + driver_opts: + type: cifs + device: "//HomeLabNAS/dci" + o: username=${NAS_USERNAME},password=${NAS_PASSWORD},iocharset=utf8 + +networks: + stream_network: + driver: bridge +``` + +## Kubernetes Deployment + +### ConfigMap Example + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: stream-recorder-config +data: + WORKER_COUNT: "4" + REFRESH_DELAY_SECONDS: "3" + ENABLE_NAS_TRANSFER: "true" + LOCAL_OUTPUT_DIR: "/app/data" + PROCESS_OUTPUT_DIR: "/app/out" + FFMPEG_PATH: "ffmpeg" +``` + +### Secret Example + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: stream-recorder-secrets +type: Opaque +data: + NAS_USERNAME: + NAS_PASSWORD: +``` + +### Deployment Example + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: stream-recorder +spec: + replicas: 1 + selector: + matchLabels: + app: stream-recorder + template: + metadata: + labels: + app: stream-recorder + spec: + containers: + - name: stream-recorder + image: stream-recorder:latest + envFrom: + - configMapRef: + name: stream-recorder-config + - secretRef: + name: stream-recorder-secrets + volumeMounts: + - name: data-storage + mountPath: /app/data + - name: output-storage + mountPath: /app/out + volumes: + - name: data-storage + persistentVolumeClaim: + claimName: stream-data-pvc + - name: output-storage + persistentVolumeClaim: + claimName: stream-output-pvc +``` + +## Production Considerations + +### Security +- Never commit credentials to version control +- Use environment variables or secret management systems for sensitive data +- Consider using service accounts or IAM roles for cloud deployments +- Rotate credentials regularly + +### Monitoring +- Implement health checks for the application +- Monitor disk space for download directories +- Set up alerts for failed transfers or processing +- Log to centralized logging systems + +### Scaling +- Use horizontal scaling for multiple concurrent streams +- Consider using message queues for segment processing +- Implement distributed storage for high availability +- Use load balancers for multiple instances + +### Backup and Recovery +- Regular backups of configuration and state files +- Test recovery procedures +- Document rollback processes +- Maintain disaster recovery plans + +## Configuration Validation + +The application validates configuration at startup and will fail fast if: +- Required directories cannot be created +- NAS paths are invalid when transfer is enabled +- FFmpeg is not found when processing is enabled +- Critical environment variables are malformed + +## Troubleshooting + +### Common Issues +1. **Path Permission Errors**: Ensure the application has write access to configured directories +2. **NAS Connection Failures**: Verify network connectivity and credentials +3. **FFmpeg Not Found**: Install FFmpeg or set correct FFMPEG_PATH +4. **Environment Variable Format**: Check for typos and correct boolean values + +### Debug Mode +Run with `-debug=true` to enable debug logging and download only 1080p variants for testing. \ No newline at end of file diff --git a/cmd/downloader/download.go b/cmd/downloader/download.go index 5dddd7f..acfbfa3 100644 --- a/cmd/downloader/download.go +++ b/cmd/downloader/download.go @@ -6,6 +6,7 @@ import ( "m3u8-downloader/pkg/constants" "m3u8-downloader/pkg/media" "m3u8-downloader/pkg/transfer" + "m3u8-downloader/pkg/utils" "os" "os/signal" "sync" @@ -26,10 +27,12 @@ func Download(masterURL string, eventName string, debug bool) { cancel() }() + cfg := constants.MustGetConfig() + var wg sync.WaitGroup var transferService *transfer.TransferService - if constants.EnableNASTransfer { - ts, err := transfer.NewTrasferService(constants.NASOutputPath, eventName) + if cfg.NAS.EnableTransfer { + ts, err := transfer.NewTrasferService(cfg.NAS.OutputPath, eventName) if err != nil { log.Printf("Failed to create transfer service: %v", err) log.Println("Continuing without transfer service...") @@ -48,7 +51,12 @@ func Download(masterURL string, eventName string, debug bool) { manifestWriter := media.NewManifestWriter(eventName) - variants, err := media.GetAllVariants(masterURL, constants.LocalOutputDirPath+"/"+eventName, manifestWriter) + eventPath := cfg.GetEventPath(eventName) + if err := utils.EnsureDir(eventPath); err != nil { + log.Fatalf("Failed to create event directory: %v", err) + } + + variants, err := media.GetAllVariants(masterURL, eventPath, manifestWriter) if err != nil { log.Fatalf("Failed to get variants: %v", err) } diff --git a/cmd/processor/process.go b/cmd/processor/process.go index 2cbcd38..ec66899 100644 --- a/cmd/processor/process.go +++ b/cmd/processor/process.go @@ -3,12 +3,14 @@ package processor import ( "context" "log" + "m3u8-downloader/pkg/constants" "m3u8-downloader/pkg/processing" ) func Process(eventName string) { - //log.Printf("Starting processing for event: %s", eventName) - ps, err := processing.NewProcessingService(eventName) + log.Printf("Starting processing for event: %s", eventName) + cfg := constants.MustGetConfig() + ps, err := processing.NewProcessingService(eventName, cfg) if err != nil { log.Fatalf("Failed to create processing service: %v", err) } diff --git a/cmd/transfer/transfer.go b/cmd/transfer/transfer.go index 957268d..f02273e 100644 --- a/cmd/transfer/transfer.go +++ b/cmd/transfer/transfer.go @@ -5,8 +5,10 @@ import ( "context" "fmt" "log" + "m3u8-downloader/pkg/config" "m3u8-downloader/pkg/constants" "m3u8-downloader/pkg/transfer" + "m3u8-downloader/pkg/utils" "os" "os/signal" "strconv" @@ -15,10 +17,10 @@ import ( "time" ) -func getEventDirs() ([]string, error) { - dirs, err := os.ReadDir(constants.LocalOutputDirPath) +func getEventDirs(cfg *config.Config) ([]string, error) { + dirs, err := os.ReadDir(cfg.Paths.LocalOutput) if err != nil { - return nil, fmt.Errorf("Failed to read directory: %w", err) + return nil, fmt.Errorf("failed to read directory: %w", err) } var eventDirs []string for _, dir := range dirs { @@ -30,13 +32,15 @@ func getEventDirs() ([]string, error) { } func RunTransferOnly(eventName string) { + cfg := constants.MustGetConfig() + // Check if NAS transfer is enabled - if !constants.EnableNASTransfer { - log.Fatal("NAS transfer is disabled in constants. Please enable it to use transfer-only mode.") + if !cfg.NAS.EnableTransfer { + log.Fatal("NAS transfer is disabled in configuration. Please enable it to use transfer-only mode.") } if eventName == "" { - events, err := getEventDirs() + events, err := getEventDirs(cfg) if err != nil { log.Fatalf("Failed to get event directories: %v", err) } @@ -79,13 +83,13 @@ func RunTransferOnly(eventName string) { }() // Verify local event directory exists - localEventPath := constants.LocalOutputDirPath + "/" + eventName - if _, err := os.Stat(localEventPath); os.IsNotExist(err) { + localEventPath := cfg.GetEventPath(eventName) + if !utils.PathExists(localEventPath) { log.Fatalf("Local event directory does not exist: %s", localEventPath) } // Create transfer service - transferService, err := transfer.NewTrasferService(constants.NASOutputPath, eventName) + transferService, err := transfer.NewTrasferService(cfg.NAS.OutputPath, eventName) if err != nil { log.Fatalf("Failed to create transfer service: %v", err) } diff --git a/pkg/config/config.go b/pkg/config/config.go new file mode 100644 index 0000000..a8e11da --- /dev/null +++ b/pkg/config/config.go @@ -0,0 +1,227 @@ +package config + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "time" +) + +type Config struct { + Core CoreConfig + HTTP HTTPConfig + NAS NASConfig + Processing ProcessingConfig + Transfer TransferConfig + Cleanup CleanupConfig + Paths PathsConfig +} + +type CoreConfig struct { + WorkerCount int + RefreshDelay time.Duration +} + +type HTTPConfig struct { + UserAgent string + Referer string +} + +type NASConfig struct { + EnableTransfer bool + OutputPath string + Username string + Password string + Timeout time.Duration + RetryLimit int +} + +type ProcessingConfig struct { + Enabled bool + AutoProcess bool + WorkerCount int + FFmpegPath string +} + +type TransferConfig struct { + WorkerCount int + RetryLimit int + Timeout time.Duration + FileSettlingDelay time.Duration + QueueSize int + BatchSize int +} + +type CleanupConfig struct { + AfterTransfer bool + BatchSize int + RetainHours int +} + +type PathsConfig struct { + BaseDir string + LocalOutput string + ProcessOutput string + ManifestDir string + PersistenceFile string +} + +var defaultConfig = Config{ + Core: CoreConfig{ + WorkerCount: 4, + RefreshDelay: 3 * time.Second, + }, + HTTP: HTTPConfig{ + UserAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36", + Referer: "https://www.flomarching.com", + }, + NAS: NASConfig{ + EnableTransfer: true, + OutputPath: "\\\\HomeLabNAS\\dci\\streams", + Username: "NASAdmin", + Password: "s3tkY6tzA&KN6M", + Timeout: 30 * time.Second, + RetryLimit: 3, + }, + Processing: ProcessingConfig{ + Enabled: true, + AutoProcess: true, + WorkerCount: 2, + FFmpegPath: "ffmpeg", + }, + Transfer: TransferConfig{ + WorkerCount: 2, + RetryLimit: 3, + Timeout: 30 * time.Second, + FileSettlingDelay: 5 * time.Second, + QueueSize: 100000, + BatchSize: 1000, + }, + Cleanup: CleanupConfig{ + AfterTransfer: true, + BatchSize: 1000, + RetainHours: 0, + }, + Paths: PathsConfig{ + BaseDir: "data", + LocalOutput: "data", + ProcessOutput: "out", + ManifestDir: "data", + PersistenceFile: "transfer_queue.json", + }, +} + +func Load() (*Config, error) { + cfg := defaultConfig + + if err := cfg.loadFromEnvironment(); err != nil { + return nil, fmt.Errorf("failed to load environment config: %w", err) + } + + if err := cfg.resolveAndValidatePaths(); err != nil { + return nil, fmt.Errorf("path validation failed: %w", err) + } + + return &cfg, nil +} + +func (c *Config) loadFromEnvironment() error { + if val := os.Getenv("WORKER_COUNT"); val != "" { + if parsed, err := strconv.Atoi(val); err == nil { + c.Core.WorkerCount = parsed + } + } + + if val := os.Getenv("REFRESH_DELAY_SECONDS"); val != "" { + if parsed, err := strconv.Atoi(val); err == nil { + c.Core.RefreshDelay = time.Duration(parsed) * time.Second + } + } + + if val := os.Getenv("NAS_OUTPUT_PATH"); val != "" { + c.NAS.OutputPath = val + } + + if val := os.Getenv("NAS_USERNAME"); val != "" { + c.NAS.Username = val + } + + if val := os.Getenv("NAS_PASSWORD"); val != "" { + c.NAS.Password = val + } + + if val := os.Getenv("ENABLE_NAS_TRANSFER"); val != "" { + c.NAS.EnableTransfer = val == "true" + } + + if val := os.Getenv("LOCAL_OUTPUT_DIR"); val != "" { + c.Paths.LocalOutput = val + } + + if val := os.Getenv("PROCESS_OUTPUT_DIR"); val != "" { + c.Paths.ProcessOutput = val + } + + if val := os.Getenv("FFMPEG_PATH"); val != "" { + c.Processing.FFmpegPath = val + } + + return nil +} + +func (c *Config) resolveAndValidatePaths() error { + cwd, err := os.Getwd() + if err != nil { + return fmt.Errorf("failed to get working directory: %w", err) + } + + c.Paths.BaseDir = filepath.Join(cwd, c.Paths.BaseDir) + c.Paths.LocalOutput = filepath.Join(cwd, c.Paths.LocalOutput) + c.Paths.ProcessOutput = filepath.Join(cwd, c.Paths.ProcessOutput) + c.Paths.ManifestDir = filepath.Join(cwd, c.Paths.ManifestDir) + c.Paths.PersistenceFile = filepath.Join(c.Paths.BaseDir, c.Paths.PersistenceFile) + + requiredDirs := []string{ + c.Paths.BaseDir, + c.Paths.LocalOutput, + c.Paths.ProcessOutput, + c.Paths.ManifestDir, + } + + for _, dir := range requiredDirs { + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("failed to create directory %s: %w", dir, err) + } + } + + if c.NAS.EnableTransfer && c.NAS.OutputPath == "" { + return fmt.Errorf("NAS output path is required when transfer is enabled") + } + + if c.Processing.Enabled && c.Processing.FFmpegPath == "" { + return fmt.Errorf("FFmpeg path is required when processing is enabled") + } + + return nil +} + +func (c *Config) GetEventPath(eventName string) string { + return filepath.Join(c.Paths.LocalOutput, eventName) +} + +func (c *Config) GetManifestPath(eventName string) string { + return filepath.Join(c.Paths.ManifestDir, eventName+".json") +} + +func (c *Config) GetNASEventPath(eventName string) string { + return filepath.Join(c.NAS.OutputPath, eventName) +} + +func (c *Config) GetProcessOutputPath(eventName string) string { + return filepath.Join(c.Paths.ProcessOutput, eventName) +} + +func (c *Config) GetQualityPath(eventName, quality string) string { + return filepath.Join(c.GetEventPath(eventName), quality) +} diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 5349777..05b71e7 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -1,35 +1,51 @@ package constants -import "time" +import ( + "m3u8-downloader/pkg/config" + "sync" +) + +var ( + globalConfig *config.Config + configOnce sync.Once + configError error +) + +func GetConfig() (*config.Config, error) { + configOnce.Do(func() { + globalConfig, configError = config.Load() + }) + return globalConfig, configError +} + +func MustGetConfig() *config.Config { + cfg, err := GetConfig() + if err != nil { + panic("Failed to load configuration: " + err.Error()) + } + return cfg +} const ( WorkerCount = 4 - RefreshDelay = 3 * time.Second + RefreshDelay = 3 - HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" - REFERRER = "https://www.flomarching.com" - LocalOutputDirPath = "../data/" + HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" + REFERRER = "https://www.flomarching.com" - EnableNASTransfer = true - NASOutputPath = "\\\\HomeLabNAS\\dci\\streams" - NASUsername = "NASAdmin" - NASPassword = "s3tkY6tzA&KN6M" - TransferWorkerCount = 2 - TransferRetryLimit = 3 - TransferTimeout = 30 * time.Second - FileSettlingDelay = 5 * time.Second - PersistencePath = "../data/transfer_queue.json" - TransferQueueSize = 100000 - BatchSize = 1000 - ManifestPath = "../data" + DefaultNASOutputPath = "\\\\HomeLabNAS\\dci\\streams" + DefaultNASUsername = "NASAdmin" - CleanupAfterTransfer = true - CleanupBatchSize = 1000 - RetainLocalHours = 0 + DefaultTransferWorkerCount = 2 + DefaultTransferRetryLimit = 3 + DefaultTransferTimeout = 30 + DefaultFileSettlingDelay = 5 + DefaultTransferQueueSize = 100000 + DefaultBatchSize = 1000 - ProcessOutputPath = "../out" - AutoProcess = true - ProcessingEnabled = true - ProcessWorkerCount = 2 - FFmpegPath = "ffmpeg" + DefaultCleanupBatchSize = 1000 + DefaultRetainLocalHours = 0 + + DefaultProcessWorkerCount = 2 + DefaultFFmpegPath = "ffmpeg" ) diff --git a/pkg/media/manifest.go b/pkg/media/manifest.go index 5c594b6..e182b49 100644 --- a/pkg/media/manifest.go +++ b/pkg/media/manifest.go @@ -4,6 +4,7 @@ import ( "encoding/json" "log" "m3u8-downloader/pkg/constants" + "m3u8-downloader/pkg/utils" "os" "sort" ) @@ -20,8 +21,9 @@ type ManifestItem struct { } func NewManifestWriter(eventName string) *ManifestWriter { + cfg := constants.MustGetConfig() return &ManifestWriter{ - ManifestPath: constants.ManifestPath + "/" + eventName + ".json", + ManifestPath: cfg.GetManifestPath(eventName), Segments: make([]ManifestItem, 0), Index: make(map[string]*ManifestItem), } @@ -62,6 +64,11 @@ func (m *ManifestWriter) WriteManifest() { return } + if err := utils.ValidateWritablePath(m.ManifestPath); err != nil { + log.Printf("Manifest path validation failed: %v", err) + return + } + file, err := os.Create(m.ManifestPath) if err != nil { log.Printf("Failed to create manifest file: %v", err) diff --git a/pkg/processing/service.go b/pkg/processing/service.go index f0ad8ca..b896933 100644 --- a/pkg/processing/service.go +++ b/pkg/processing/service.go @@ -5,8 +5,9 @@ import ( "context" "fmt" "log" - "m3u8-downloader/pkg/constants" + "m3u8-downloader/pkg/config" "m3u8-downloader/pkg/nas" + "m3u8-downloader/pkg/utils" "os" "os/exec" "path/filepath" @@ -19,37 +20,35 @@ import ( ) type ProcessingService struct { - config *ProcessConfig - nas *nas.NASService + config *config.Config + eventName string + nas *nas.NASService } -func NewProcessingService(eventName string) (*ProcessingService, error) { - config := &ProcessConfig{ - WorkerCount: constants.ProcessWorkerCount, - SourcePath: constants.NASOutputPath, - DestinationPath: constants.ProcessOutputPath, - Enabled: constants.ProcessingEnabled, - EventName: eventName, +func NewProcessingService(eventName string, cfg *config.Config) (*ProcessingService, error) { + if cfg == nil { + return nil, fmt.Errorf("configuration is required") } - nasConfig := &nas.NASConfig{ - Path: constants.NASOutputPath, - Username: constants.NASUsername, - Password: constants.NASPassword, - Timeout: constants.TransferTimeout, - RetryLimit: constants.TransferRetryLimit, + nasConfig := nas.NASConfig{ + Path: cfg.NAS.OutputPath, + Username: cfg.NAS.Username, + Password: cfg.NAS.Password, + Timeout: cfg.NAS.Timeout, + RetryLimit: cfg.NAS.RetryLimit, VerifySize: true, } - nasService := nas.NewNASService(*nasConfig) + nasService := nas.NewNASService(nasConfig) if err := nasService.TestConnection(); err != nil { - return nil, fmt.Errorf("Failed to connect to NAS: %w", err) + return nil, fmt.Errorf("failed to connect to NAS: %w", err) } return &ProcessingService{ - config: config, - nas: nasService, + config: cfg, + eventName: eventName, + nas: nasService, }, nil } @@ -58,10 +57,11 @@ func (ps *ProcessingService) GetSegmentInfo() (map[int]string, error) { } func (ps *ProcessingService) GetEventDirs() ([]string, error) { - if ps.config.EventName == "" { - dirs, err := os.ReadDir(ps.config.SourcePath) + if ps.eventName == "" { + sourcePath := ps.config.NAS.OutputPath + dirs, err := os.ReadDir(sourcePath) if err != nil { - return nil, fmt.Errorf("Failed to read directory: %w", err) + return nil, fmt.Errorf("failed to read directory %s: %w", sourcePath, err) } var eventDirs []string for _, dir := range dirs { @@ -71,23 +71,23 @@ func (ps *ProcessingService) GetEventDirs() ([]string, error) { } return eventDirs, nil } else { - return []string{ps.config.EventName}, nil + return []string{ps.eventName}, nil } } func (ps *ProcessingService) Start(ctx context.Context) error { - if !ps.config.Enabled { + if !ps.config.Processing.Enabled { log.Println("Processing service disabled") return nil } - if ps.config.EventName == "" { + if ps.eventName == "" { events, err := ps.GetEventDirs() if err != nil { - return fmt.Errorf("Failed to get event directories: %w", err) + return fmt.Errorf("failed to get event directories: %w", err) } if len(events) == 0 { - return fmt.Errorf("No events found") + return fmt.Errorf("no events found") } if len(events) > 1 { fmt.Println("Multiple events found, please select one:") @@ -99,14 +99,14 @@ func (ps *ProcessingService) Start(ctx context.Context) error { input = strings.TrimSpace(input) index, err := strconv.Atoi(input) if err != nil { - return fmt.Errorf("Failed to parse input: %w", err) + return fmt.Errorf("failed to parse input: %w", err) } if index < 1 || index > len(events) { - return fmt.Errorf("Invalid input") + return fmt.Errorf("invalid input") } - ps.config.EventName = events[index-1] + ps.eventName = events[index-1] } else { - ps.config.EventName = events[0] + ps.eventName = events[0] } } @@ -139,8 +139,12 @@ func (ps *ProcessingService) Start(ctx context.Context) error { return fmt.Errorf("Failed to write concat file: %w", err) } - //Feed info to ffmpeg to stitch files together - outPath := filepath.Join(constants.NASOutputPath, ps.config.DestinationPath, ps.config.EventName) + // Feed info to ffmpeg to stitch files together + outPath := ps.config.GetProcessOutputPath(ps.eventName) + if err := utils.EnsureDir(outPath); err != nil { + return fmt.Errorf("failed to create output directory: %w", err) + } + concatErr := ps.RunFFmpeg(aggFile, outPath) if concatErr != nil { return concatErr @@ -150,9 +154,10 @@ func (ps *ProcessingService) Start(ctx context.Context) error { } func (ps *ProcessingService) GetResolutions() ([]string, error) { - dirs, err := os.ReadDir(ps.config.SourcePath + "/" + ps.config.EventName) + eventPath := ps.config.GetNASEventPath(ps.eventName) + dirs, err := os.ReadDir(eventPath) if err != nil { - return nil, fmt.Errorf("Failed to read source directory: %w", err) + return nil, fmt.Errorf("failed to read source directory %s: %w", eventPath, err) } re := regexp.MustCompile(`^\d+p$`) @@ -170,9 +175,10 @@ func (ps *ProcessingService) GetResolutions() ([]string, error) { func (ps *ProcessingService) ParseResolutionDirectory(resolution string, ch chan<- SegmentInfo, wg *sync.WaitGroup) { defer wg.Done() - files, err := os.ReadDir(ps.config.SourcePath + "/" + ps.config.EventName + "/" + resolution) + resolutionPath := utils.SafeJoin(ps.config.GetNASEventPath(ps.eventName), resolution) + files, err := os.ReadDir(resolutionPath) if err != nil { - log.Printf("Failed to read resolution directory: %v", err) + log.Printf("Failed to read resolution directory %s: %v", resolutionPath, err) return } @@ -221,13 +227,13 @@ func (ps *ProcessingService) AggregateSegmentInfo(ch <-chan SegmentInfo) (map[in } func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) (string, error) { - concatPath := filepath.Join(constants.NASOutputPath, constants.ProcessOutputPath, ps.config.EventName) - // Ensure the directory exists - if err := os.MkdirAll(concatPath, 0755); err != nil { + concatPath := ps.config.GetProcessOutputPath(ps.eventName) + + if err := utils.EnsureDir(concatPath); err != nil { return "", fmt.Errorf("failed to create directories for concat path: %w", err) } - concatFilePath := filepath.Join(concatPath, ps.config.EventName+".txt") + concatFilePath := utils.SafeJoin(concatPath, ps.eventName+".txt") f, err := os.Create(concatFilePath) if err != nil { return "", fmt.Errorf("failed to create concat file: %w", err) @@ -243,7 +249,7 @@ func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) (st for _, seq := range keys { segment := segmentMap[seq] - filePath := filepath.Join(ps.config.SourcePath, ps.config.EventName, segment.Resolution, segment.Name) + filePath := utils.SafeJoin(ps.config.GetNASEventPath(ps.eventName), segment.Resolution, segment.Name) line := fmt.Sprintf("file '%s'\n", filePath) if _, err := f.WriteString(line); err != nil { return "", fmt.Errorf("failed to write to concat file: %w", err) @@ -253,60 +259,79 @@ func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) (st return concatFilePath, nil } -func ffmpegPath() (string, error) { - var baseDir string +func (ps *ProcessingService) getFFmpegPath() (string, error) { + // First try the configured path + configuredPath := ps.config.Processing.FFmpegPath + if configuredPath != "" { + // Check if it's just the command name or a full path + if filepath.IsAbs(configuredPath) { + return configuredPath, nil + } + // Try to find it in PATH + if fullPath, err := exec.LookPath(configuredPath); err == nil { + return fullPath, nil + } + } + + // Fallback: try local bin directory + var baseDir string exePath, err := os.Executable() if err == nil { baseDir = filepath.Dir(exePath) } else { - // fallback to current working directory baseDir, err = os.Getwd() if err != nil { return "", err } } - // When running in GoLand, exePath might point to the IDE launcher, - // so optionally check if ffmpeg exists here, else fallback to CWD - ffmpeg := filepath.Join(baseDir, "bin", "ffmpeg") + ffmpeg := utils.SafeJoin(baseDir, "bin", "ffmpeg") if runtime.GOOS == "windows" { ffmpeg += ".exe" } - if _, err := os.Stat(ffmpeg); os.IsNotExist(err) { - // try cwd instead - cwd, err := os.Getwd() - if err != nil { - return "", err - } - ffmpeg = filepath.Join(cwd, "bin", "ffmpeg") - if runtime.GOOS == "windows" { - ffmpeg += ".exe" - } + if utils.PathExists(ffmpeg) { + return ffmpeg, nil } - return ffmpeg, nil + // Try current working directory + cwd, err := os.Getwd() + if err != nil { + return "", err + } + ffmpeg = utils.SafeJoin(cwd, "bin", "ffmpeg") + if runtime.GOOS == "windows" { + ffmpeg += ".exe" + } + + if utils.PathExists(ffmpeg) { + return ffmpeg, nil + } + + return "", fmt.Errorf("FFmpeg not found. Please install FFmpeg or set FFMPEG_PATH environment variable") } func (ps *ProcessingService) RunFFmpeg(inputPath, outputPath string) error { fmt.Println("Running ffmpeg...") - fileOutPath := filepath.Join(outputPath, ps.config.EventName+".mp4") + fileOutPath := utils.SafeJoin(outputPath, ps.eventName+".mp4") fmt.Println("Input path:", inputPath) - fmt.Println("Attempting to write to: ", outputPath+"/"+ps.config.EventName+".mp4") + fmt.Println("Output path:", fileOutPath) - path, err := ffmpegPath() + path, err := ps.getFFmpegPath() if err != nil { - return err + return fmt.Errorf("failed to find FFmpeg: %w", err) } - cmd := exec.Command(path, "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", filepath.Join(outputPath, ps.config.EventName+".mp4")) + + cmd := exec.Command(path, "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", fileOutPath) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - ffmpegErr := cmd.Run() - if ffmpegErr != nil { - return fmt.Errorf("Failed to run ffmpeg: %w", ffmpegErr) + + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to run ffmpeg: %w", err) } - fmt.Println("Output path:", fileOutPath) + + fmt.Println("FFmpeg completed successfully") return nil } diff --git a/pkg/processing/types.go b/pkg/processing/types.go index 99f0de1..98ac427 100644 --- a/pkg/processing/types.go +++ b/pkg/processing/types.go @@ -3,11 +3,3 @@ package processing type ProcessJob struct { EventName string } - -type ProcessConfig struct { - WorkerCount int - SourcePath string - DestinationPath string - Enabled bool - EventName string -} diff --git a/pkg/transfer/service.go b/pkg/transfer/service.go index 0301a55..af4c0a3 100644 --- a/pkg/transfer/service.go +++ b/pkg/transfer/service.go @@ -6,6 +6,7 @@ import ( "log" "m3u8-downloader/pkg/constants" nas2 "m3u8-downloader/pkg/nas" + "m3u8-downloader/pkg/utils" "os" "path/filepath" "strings" @@ -22,45 +23,47 @@ type TransferService struct { } func NewTrasferService(outputDir string, eventName string) (*TransferService, error) { + cfg := constants.MustGetConfig() + nasConfig := nas2.NASConfig{ Path: outputDir, - Username: constants.NASUsername, - Password: constants.NASPassword, - Timeout: constants.TransferTimeout, - RetryLimit: constants.TransferRetryLimit, + Username: cfg.NAS.Username, + Password: cfg.NAS.Password, + Timeout: cfg.NAS.Timeout, + RetryLimit: cfg.NAS.RetryLimit, VerifySize: true, } nas := nas2.NewNASService(nasConfig) if err := nas.TestConnection(); err != nil { - return nil, fmt.Errorf("Failed to connect to NAS: %w", err) + return nil, fmt.Errorf("failed to connect to NAS: %w", err) } cleanupConfig := CleanupConfig{ - Enabled: constants.CleanupAfterTransfer, - RetentionPeriod: time.Duration(constants.RetainLocalHours) * time.Hour, - BatchSize: constants.CleanupBatchSize, - CheckInterval: constants.FileSettlingDelay, + Enabled: cfg.Cleanup.AfterTransfer, + RetentionPeriod: time.Duration(cfg.Cleanup.RetainHours) * time.Hour, + BatchSize: cfg.Cleanup.BatchSize, + CheckInterval: cfg.Transfer.FileSettlingDelay, } cleanup := NewCleanupService(cleanupConfig) queueConfig := QueueConfig{ - WorkerCount: constants.TransferWorkerCount, - PersistencePath: constants.PersistencePath, - MaxQueueSize: constants.TransferQueueSize, - BatchSize: constants.BatchSize, + WorkerCount: cfg.Transfer.WorkerCount, + PersistencePath: cfg.Paths.PersistenceFile, + MaxQueueSize: cfg.Transfer.QueueSize, + BatchSize: cfg.Transfer.BatchSize, } queue := NewTransferQueue(queueConfig, nas, cleanup) // Create local output directory if it doesn't exist - localOutputPath := constants.LocalOutputDirPath + "/" + eventName - if err := os.MkdirAll(localOutputPath, 0755); err != nil { - return nil, fmt.Errorf("Failed to create local output directory: %w", err) + localOutputPath := cfg.GetEventPath(eventName) + if err := utils.EnsureDir(localOutputPath); err != nil { + return nil, fmt.Errorf("failed to create local output directory: %w", err) } - watcher, err := NewFileWatcher(localOutputPath, queue) + watcher, err := NewFileWatcher(localOutputPath, queue, cfg.Transfer.FileSettlingDelay) if err != nil { - return nil, fmt.Errorf("Failed to create file watcher: %w", err) + return nil, fmt.Errorf("failed to create file watcher: %w", err) } return &TransferService{ @@ -152,6 +155,7 @@ func (ts *TransferService) Shutdown(ctx context.Context) error { // QueueExistingFiles scans a directory for .ts files and queues them for transfer func (ts *TransferService) QueueExistingFiles(localEventPath string) error { + cfg := constants.MustGetConfig() log.Printf("Scanning for existing files in: %s", localEventPath) var fileCount, alreadyTransferred, scheduledForCleanup int @@ -190,7 +194,7 @@ func (ts *TransferService) QueueExistingFiles(localEventPath string) error { alreadyTransferred++ // Schedule for cleanup if cleanup is enabled - if constants.CleanupAfterTransfer { + if cfg.Cleanup.AfterTransfer { if err := ts.cleanup.ScheduleCleanup(path); err != nil { log.Printf("Failed to schedule cleanup for already-transferred file %s: %v", path, err) } else { diff --git a/pkg/transfer/watcher.go b/pkg/transfer/watcher.go index 5aca4f4..c9d8de0 100644 --- a/pkg/transfer/watcher.go +++ b/pkg/transfer/watcher.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log" - "m3u8-downloader/pkg/constants" "math/rand" "os" "path/filepath" @@ -24,7 +23,7 @@ type FileWatcher struct { mu sync.Mutex } -func NewFileWatcher(outputDir string, queue *TransferQueue) (*FileWatcher, error) { +func NewFileWatcher(outputDir string, queue *TransferQueue, settlingDelay time.Duration) (*FileWatcher, error) { watcher, err := fsnotify.NewWatcher() if err != nil { return nil, err @@ -33,7 +32,7 @@ func NewFileWatcher(outputDir string, queue *TransferQueue) (*FileWatcher, error outputDir: outputDir, queue: queue, watcher: watcher, - settingDelay: constants.FileSettlingDelay, + settingDelay: settlingDelay, pendingFiles: make(map[string]*time.Timer), }, nil } diff --git a/pkg/utils/paths.go b/pkg/utils/paths.go new file mode 100644 index 0000000..83a698a --- /dev/null +++ b/pkg/utils/paths.go @@ -0,0 +1,62 @@ +package utils + +import ( + "fmt" + "os" + "path/filepath" + "strings" +) + +func SafeJoin(base string, elements ...string) string { + path := filepath.Join(append([]string{base}, elements...)...) + return filepath.Clean(path) +} + +func EnsureDir(path string) error { + if err := os.MkdirAll(path, 0755); err != nil { + return fmt.Errorf("failed to create directory %s: %w", path, err) + } + return nil +} + +func PathExists(path string) bool { + _, err := os.Stat(path) + return !os.IsNotExist(err) +} + +func IsValidPath(path string) bool { + if path == "" { + return false + } + + return !strings.ContainsAny(path, "<>:\"|?*") +} + +func NormalizePath(path string) string { + return filepath.Clean(strings.ReplaceAll(path, "\\", string(filepath.Separator))) +} + +func GetRelativePath(basePath, targetPath string) (string, error) { + rel, err := filepath.Rel(basePath, targetPath) + if err != nil { + return "", fmt.Errorf("failed to get relative path: %w", err) + } + return rel, nil +} + +func ValidateWritablePath(path string) error { + dir := filepath.Dir(path) + if err := EnsureDir(dir); err != nil { + return err + } + + testFile := filepath.Join(dir, ".write_test") + file, err := os.Create(testFile) + if err != nil { + return fmt.Errorf("path %s is not writable: %w", dir, err) + } + file.Close() + os.Remove(testFile) + + return nil +}