Post-Claude Cleanup

This commit is contained in:
townandgown 2025-08-10 23:41:45 -05:00
parent 3c49a3fa9f
commit 20df800715
13 changed files with 750 additions and 159 deletions

View File

@ -11,9 +11,10 @@ This is a Go-based HLS (HTTP Live Streaming) recorder that monitors M3U8 playlis
The project follows a modular architecture with clear separation of concerns:
- **cmd/**: Entry points for different execution modes
- **main/main.go**: Primary CLI entry point with URL input and event naming
- **main/main.go**: Primary CLI entry point with URL input, event naming, and mode selection
- **downloader/download.go**: Core download orchestration logic with transfer service integration
- **processor/process.go**: Alternative processing entry point
- **transfer/transfer.go**: Transfer-only mode entry point
- **pkg/**: Core packages containing the application logic
- **media/**: HLS streaming and download logic
- **stream.go**: Stream variant parsing and downloading orchestration (`GetAllVariants`, `VariantDownloader`)
@ -27,7 +28,18 @@ The project follows a modular architecture with clear separation of concerns:
- **nas.go**: NAS file transfer with retry logic
- **cleanup.go**: Local file cleanup after successful transfer
- **types.go**: Transfer system data structures
- **constants/constants.go**: Configuration constants (paths, timeouts, NAS settings)
- **processing/**: Video processing and concatenation system
- **service.go**: Processing service orchestration with FFmpeg integration
- **segment.go**: Individual segment processing logic
- **types.go**: Processing system data structures
- **nas/**: NAS connection and file operations
- **config.go**: NAS configuration structure
- **nas.go**: NAS service with connection management and file operations
- **config/**: Centralized configuration management with validation
- **config.go**: Configuration loading, validation, and path resolution
- **utils/**: Utility functions for cross-platform compatibility
- **paths.go**: Path manipulation and validation utilities
- **constants/constants.go**: Configuration constants and singleton access
- **httpClient/error.go**: HTTP error handling utilities
## Core Functionality
@ -47,6 +59,13 @@ The project follows a modular architecture with clear separation of concerns:
4. **Local Cleanup**: Successfully transferred files are automatically cleaned up locally
5. **State Persistence**: Queue state is persisted to survive crashes and restarts
### Video Processing Workflow (Optional)
1. **Segment Collection**: Processing service reads downloaded segments from NAS storage
2. **Quality Selection**: Automatically selects the highest quality variant available
3. **FFmpeg Processing**: Uses FFmpeg to concatenate segments into a single MP4 file
4. **Output Management**: Processed videos are saved to the configured output directory
5. **Concurrent Processing**: Multiple events can be processed simultaneously with worker pools
## Key Data Structures
- `StreamVariant`: Represents a stream quality variant with URL, bandwidth, resolution, output directory, and manifest writer
@ -55,35 +74,58 @@ The project follows a modular architecture with clear separation of concerns:
- `ManifestItem`: Individual segment record with sequence number and resolution
- `TransferItem`: Transfer queue item with source, destination, retry count, and status
- `TransferService`: Orchestrates file watching, queuing, transfer, and cleanup
- `ProcessingService`: Manages video processing operations with FFmpeg integration
- `ProcessConfig`: Configuration for processing operations including worker count and paths
- `NASService`: Handles NAS connection, authentication, and file operations
- `NASConfig`: Configuration structure for NAS connection parameters
## Configuration
Key configuration is managed in `pkg/constants/constants.go`:
Configuration is managed through a centralized system in `pkg/config/config.go` with environment variable support for deployment flexibility. The system provides validation, cross-platform path resolution, and sensible defaults:
### Core Settings
- `WorkerCount`: Number of concurrent segment downloaders per variant (4)
- `RefreshDelay`: How often to check for playlist updates (3 seconds)
- `LocalOutputDirPath`: Base directory for local downloads (`./data/`)
- `ManifestPath`: Directory for manifest JSON files (`./data`)
- `Core.WorkerCount`: Number of concurrent segment downloaders per variant (4) - ENV: `WORKER_COUNT`
- `Core.RefreshDelay`: How often to check for playlist updates (3 seconds) - ENV: `REFRESH_DELAY_SECONDS`
### Path Configuration
- `Paths.LocalOutput`: Base directory for local downloads (`data/`) - ENV: `LOCAL_OUTPUT_DIR`
- `Paths.ProcessOutput`: Directory for processed videos (`out/`) - ENV: `PROCESS_OUTPUT_DIR`
- `Paths.ManifestDir`: Directory for manifest JSON files (`data/`)
- `Paths.PersistenceFile`: Transfer queue state file location
### HTTP Settings
- `HTTPUserAgent`: User agent string for HTTP requests
- `REFERRER`: Referer header for HTTP requests
- `REFERRER`: Referer header for HTTP requests (`https://www.flomarching.com`)
### NAS Transfer Settings
- `EnableNASTransfer`: Enable/disable automatic NAS transfer (true)
- `NASOutputPath`: UNC path to NAS storage (`\\HomeLabNAS\dci\streams`)
- `NASUsername`/`NASPassword`: NAS credentials (empty for current user)
- `TransferWorkerCount`: Concurrent transfer workers (2)
- `TransferRetryLimit`: Max retry attempts per file (3)
- `TransferTimeout`: Timeout per file transfer (30 seconds)
- `FileSettlingDelay`: Wait before queuing new files (5 seconds)
- `PersistencePath`: Transfer queue state file (`./data/transfer_queue.json`)
- `NAS.EnableTransfer`: Enable/disable automatic NAS transfer (true) - ENV: `ENABLE_NAS_TRANSFER`
- `NAS.OutputPath`: UNC path to NAS storage (`\\HomeLabNAS\dci\streams`) - ENV: `NAS_OUTPUT_PATH`
- `NAS.Username`/`NAS.Password`: NAS credentials for authentication - ENV: `NAS_USERNAME`/`NAS_PASSWORD`
- `Transfer.WorkerCount`: Concurrent transfer workers (2)
- `Transfer.RetryLimit`: Max retry attempts per file (3)
- `Transfer.Timeout`: Timeout per file transfer (30 seconds)
- `Transfer.FileSettlingDelay`: Wait before queuing new files (5 seconds)
- `Transfer.QueueSize`: Maximum queue size (100000)
- `Transfer.BatchSize`: Batch processing size (1000)
### Processing Settings
- `Processing.AutoProcess`: Enable automatic processing after download (true)
- `Processing.Enabled`: Enable processing functionality (true)
- `Processing.WorkerCount`: Concurrent processing workers (2)
- `Processing.FFmpegPath`: Path to FFmpeg executable (`ffmpeg`) - ENV: `FFMPEG_PATH`
### Cleanup Settings
- `CleanupAfterTransfer`: Delete local files after NAS transfer (true)
- `CleanupBatchSize`: Files processed per cleanup batch (10)
- `RetainLocalHours`: Hours to keep local files (0 = immediate cleanup)
- `Cleanup.AfterTransfer`: Delete local files after NAS transfer (true)
- `Cleanup.BatchSize`: Files processed per cleanup batch (1000)
- `Cleanup.RetainHours`: Hours to keep local files (0 = immediate cleanup)
### Configuration Access
```go
cfg := constants.MustGetConfig() // Get validated config singleton
eventPath := cfg.GetEventPath("my-event") // Get cross-platform paths
```
See `DEPLOYMENT.md` for detailed environment variable configuration and deployment examples.
## Common Development Commands
@ -112,6 +154,8 @@ go fmt ./...
- `-url`: M3U8 playlist URL (if not provided, prompts for input)
- `-event`: Event name for organizing downloads (defaults to current date)
- `-debug`: Debug mode (only downloads 1080p variant for easier testing)
- `-transfer`: Transfer-only mode (transfer existing files without downloading)
- `-process`: Process-only mode (process existing files without downloading)
## Monitoring and Downloads
@ -162,7 +206,9 @@ Downloaded files are organized as:
│ ├── 1080p/ # High quality segments
│ ├── 720p/ # Medium quality segments
│ └── 480p/ # Lower quality segments
└── transfer_queue.json # Transfer queue state
├── transfer_queue.json # Transfer queue state
├── refresh_token.txt # Authentication tokens
└── tokens.txt # Session tokens
```
NAS files mirror the local structure:
@ -173,3 +219,10 @@ NAS files mirror the local structure:
├── 720p/
└── 480p/
```
Processed files are output to:
```
./out/
└── {event-name}/
└── concatenated_segments.mp4 # Final processed video
```

192
DEPLOYMENT.md Normal file
View File

@ -0,0 +1,192 @@
# Deployment Guide
This document outlines how to deploy and configure the StreamRecorder application in different environments.
## Environment Variables
The application supports configuration through environment variables for flexible deployment:
### Core Settings
- `WORKER_COUNT`: Number of concurrent segment downloaders per variant (default: 4)
- `REFRESH_DELAY_SECONDS`: How often to check for playlist updates in seconds (default: 3)
### NAS Transfer Settings
- `NAS_OUTPUT_PATH`: UNC path to NAS storage (default: "\\\\HomeLabNAS\\dci\\streams")
- `NAS_USERNAME`: NAS authentication username
- `NAS_PASSWORD`: NAS authentication password
- `ENABLE_NAS_TRANSFER`: Enable/disable automatic NAS transfer (default: true)
### Path Configuration
- `LOCAL_OUTPUT_DIR`: Base directory for local downloads (default: "data")
- `PROCESS_OUTPUT_DIR`: Output directory for processed videos (default: "out")
### Processing Settings
- `FFMPEG_PATH`: Path to FFmpeg executable (default: "ffmpeg")
## Docker Deployment
### Dockerfile Example
```dockerfile
FROM golang:1.23-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o stream-recorder ./cmd/main
FROM alpine:latest
RUN apk --no-cache add ca-certificates ffmpeg
WORKDIR /root/
COPY --from=builder /app/stream-recorder .
CMD ["./stream-recorder"]
```
### Docker Compose Example
```yaml
version: '3.8'
services:
stream-recorder:
build: .
environment:
- NAS_OUTPUT_PATH=/mnt/nas/streams
- NAS_USERNAME=${NAS_USERNAME}
- NAS_PASSWORD=${NAS_PASSWORD}
- LOCAL_OUTPUT_DIR=/app/data
- PROCESS_OUTPUT_DIR=/app/out
- FFMPEG_PATH=ffmpeg
volumes:
- ./data:/app/data
- ./out:/app/out
- nas_mount:/mnt/nas
networks:
- stream_network
volumes:
nas_mount:
driver: local
driver_opts:
type: cifs
device: "//HomeLabNAS/dci"
o: username=${NAS_USERNAME},password=${NAS_PASSWORD},iocharset=utf8
networks:
stream_network:
driver: bridge
```
## Kubernetes Deployment
### ConfigMap Example
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: stream-recorder-config
data:
WORKER_COUNT: "4"
REFRESH_DELAY_SECONDS: "3"
ENABLE_NAS_TRANSFER: "true"
LOCAL_OUTPUT_DIR: "/app/data"
PROCESS_OUTPUT_DIR: "/app/out"
FFMPEG_PATH: "ffmpeg"
```
### Secret Example
```yaml
apiVersion: v1
kind: Secret
metadata:
name: stream-recorder-secrets
type: Opaque
data:
NAS_USERNAME: <base64-encoded-username>
NAS_PASSWORD: <base64-encoded-password>
```
### Deployment Example
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: stream-recorder
spec:
replicas: 1
selector:
matchLabels:
app: stream-recorder
template:
metadata:
labels:
app: stream-recorder
spec:
containers:
- name: stream-recorder
image: stream-recorder:latest
envFrom:
- configMapRef:
name: stream-recorder-config
- secretRef:
name: stream-recorder-secrets
volumeMounts:
- name: data-storage
mountPath: /app/data
- name: output-storage
mountPath: /app/out
volumes:
- name: data-storage
persistentVolumeClaim:
claimName: stream-data-pvc
- name: output-storage
persistentVolumeClaim:
claimName: stream-output-pvc
```
## Production Considerations
### Security
- Never commit credentials to version control
- Use environment variables or secret management systems for sensitive data
- Consider using service accounts or IAM roles for cloud deployments
- Rotate credentials regularly
### Monitoring
- Implement health checks for the application
- Monitor disk space for download directories
- Set up alerts for failed transfers or processing
- Log to centralized logging systems
### Scaling
- Use horizontal scaling for multiple concurrent streams
- Consider using message queues for segment processing
- Implement distributed storage for high availability
- Use load balancers for multiple instances
### Backup and Recovery
- Regular backups of configuration and state files
- Test recovery procedures
- Document rollback processes
- Maintain disaster recovery plans
## Configuration Validation
The application validates configuration at startup and will fail fast if:
- Required directories cannot be created
- NAS paths are invalid when transfer is enabled
- FFmpeg is not found when processing is enabled
- Critical environment variables are malformed
## Troubleshooting
### Common Issues
1. **Path Permission Errors**: Ensure the application has write access to configured directories
2. **NAS Connection Failures**: Verify network connectivity and credentials
3. **FFmpeg Not Found**: Install FFmpeg or set correct FFMPEG_PATH
4. **Environment Variable Format**: Check for typos and correct boolean values
### Debug Mode
Run with `-debug=true` to enable debug logging and download only 1080p variants for testing.

View File

@ -6,6 +6,7 @@ import (
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/media"
"m3u8-downloader/pkg/transfer"
"m3u8-downloader/pkg/utils"
"os"
"os/signal"
"sync"
@ -26,10 +27,12 @@ func Download(masterURL string, eventName string, debug bool) {
cancel()
}()
cfg := constants.MustGetConfig()
var wg sync.WaitGroup
var transferService *transfer.TransferService
if constants.EnableNASTransfer {
ts, err := transfer.NewTrasferService(constants.NASOutputPath, eventName)
if cfg.NAS.EnableTransfer {
ts, err := transfer.NewTrasferService(cfg.NAS.OutputPath, eventName)
if err != nil {
log.Printf("Failed to create transfer service: %v", err)
log.Println("Continuing without transfer service...")
@ -48,7 +51,12 @@ func Download(masterURL string, eventName string, debug bool) {
manifestWriter := media.NewManifestWriter(eventName)
variants, err := media.GetAllVariants(masterURL, constants.LocalOutputDirPath+"/"+eventName, manifestWriter)
eventPath := cfg.GetEventPath(eventName)
if err := utils.EnsureDir(eventPath); err != nil {
log.Fatalf("Failed to create event directory: %v", err)
}
variants, err := media.GetAllVariants(masterURL, eventPath, manifestWriter)
if err != nil {
log.Fatalf("Failed to get variants: %v", err)
}

View File

@ -3,12 +3,14 @@ package processor
import (
"context"
"log"
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/processing"
)
func Process(eventName string) {
//log.Printf("Starting processing for event: %s", eventName)
ps, err := processing.NewProcessingService(eventName)
log.Printf("Starting processing for event: %s", eventName)
cfg := constants.MustGetConfig()
ps, err := processing.NewProcessingService(eventName, cfg)
if err != nil {
log.Fatalf("Failed to create processing service: %v", err)
}

View File

@ -5,8 +5,10 @@ import (
"context"
"fmt"
"log"
"m3u8-downloader/pkg/config"
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/transfer"
"m3u8-downloader/pkg/utils"
"os"
"os/signal"
"strconv"
@ -15,10 +17,10 @@ import (
"time"
)
func getEventDirs() ([]string, error) {
dirs, err := os.ReadDir(constants.LocalOutputDirPath)
func getEventDirs(cfg *config.Config) ([]string, error) {
dirs, err := os.ReadDir(cfg.Paths.LocalOutput)
if err != nil {
return nil, fmt.Errorf("Failed to read directory: %w", err)
return nil, fmt.Errorf("failed to read directory: %w", err)
}
var eventDirs []string
for _, dir := range dirs {
@ -30,13 +32,15 @@ func getEventDirs() ([]string, error) {
}
func RunTransferOnly(eventName string) {
cfg := constants.MustGetConfig()
// Check if NAS transfer is enabled
if !constants.EnableNASTransfer {
log.Fatal("NAS transfer is disabled in constants. Please enable it to use transfer-only mode.")
if !cfg.NAS.EnableTransfer {
log.Fatal("NAS transfer is disabled in configuration. Please enable it to use transfer-only mode.")
}
if eventName == "" {
events, err := getEventDirs()
events, err := getEventDirs(cfg)
if err != nil {
log.Fatalf("Failed to get event directories: %v", err)
}
@ -79,13 +83,13 @@ func RunTransferOnly(eventName string) {
}()
// Verify local event directory exists
localEventPath := constants.LocalOutputDirPath + "/" + eventName
if _, err := os.Stat(localEventPath); os.IsNotExist(err) {
localEventPath := cfg.GetEventPath(eventName)
if !utils.PathExists(localEventPath) {
log.Fatalf("Local event directory does not exist: %s", localEventPath)
}
// Create transfer service
transferService, err := transfer.NewTrasferService(constants.NASOutputPath, eventName)
transferService, err := transfer.NewTrasferService(cfg.NAS.OutputPath, eventName)
if err != nil {
log.Fatalf("Failed to create transfer service: %v", err)
}

227
pkg/config/config.go Normal file
View File

@ -0,0 +1,227 @@
package config
import (
"fmt"
"os"
"path/filepath"
"strconv"
"time"
)
type Config struct {
Core CoreConfig
HTTP HTTPConfig
NAS NASConfig
Processing ProcessingConfig
Transfer TransferConfig
Cleanup CleanupConfig
Paths PathsConfig
}
type CoreConfig struct {
WorkerCount int
RefreshDelay time.Duration
}
type HTTPConfig struct {
UserAgent string
Referer string
}
type NASConfig struct {
EnableTransfer bool
OutputPath string
Username string
Password string
Timeout time.Duration
RetryLimit int
}
type ProcessingConfig struct {
Enabled bool
AutoProcess bool
WorkerCount int
FFmpegPath string
}
type TransferConfig struct {
WorkerCount int
RetryLimit int
Timeout time.Duration
FileSettlingDelay time.Duration
QueueSize int
BatchSize int
}
type CleanupConfig struct {
AfterTransfer bool
BatchSize int
RetainHours int
}
type PathsConfig struct {
BaseDir string
LocalOutput string
ProcessOutput string
ManifestDir string
PersistenceFile string
}
var defaultConfig = Config{
Core: CoreConfig{
WorkerCount: 4,
RefreshDelay: 3 * time.Second,
},
HTTP: HTTPConfig{
UserAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
Referer: "https://www.flomarching.com",
},
NAS: NASConfig{
EnableTransfer: true,
OutputPath: "\\\\HomeLabNAS\\dci\\streams",
Username: "NASAdmin",
Password: "s3tkY6tzA&KN6M",
Timeout: 30 * time.Second,
RetryLimit: 3,
},
Processing: ProcessingConfig{
Enabled: true,
AutoProcess: true,
WorkerCount: 2,
FFmpegPath: "ffmpeg",
},
Transfer: TransferConfig{
WorkerCount: 2,
RetryLimit: 3,
Timeout: 30 * time.Second,
FileSettlingDelay: 5 * time.Second,
QueueSize: 100000,
BatchSize: 1000,
},
Cleanup: CleanupConfig{
AfterTransfer: true,
BatchSize: 1000,
RetainHours: 0,
},
Paths: PathsConfig{
BaseDir: "data",
LocalOutput: "data",
ProcessOutput: "out",
ManifestDir: "data",
PersistenceFile: "transfer_queue.json",
},
}
func Load() (*Config, error) {
cfg := defaultConfig
if err := cfg.loadFromEnvironment(); err != nil {
return nil, fmt.Errorf("failed to load environment config: %w", err)
}
if err := cfg.resolveAndValidatePaths(); err != nil {
return nil, fmt.Errorf("path validation failed: %w", err)
}
return &cfg, nil
}
func (c *Config) loadFromEnvironment() error {
if val := os.Getenv("WORKER_COUNT"); val != "" {
if parsed, err := strconv.Atoi(val); err == nil {
c.Core.WorkerCount = parsed
}
}
if val := os.Getenv("REFRESH_DELAY_SECONDS"); val != "" {
if parsed, err := strconv.Atoi(val); err == nil {
c.Core.RefreshDelay = time.Duration(parsed) * time.Second
}
}
if val := os.Getenv("NAS_OUTPUT_PATH"); val != "" {
c.NAS.OutputPath = val
}
if val := os.Getenv("NAS_USERNAME"); val != "" {
c.NAS.Username = val
}
if val := os.Getenv("NAS_PASSWORD"); val != "" {
c.NAS.Password = val
}
if val := os.Getenv("ENABLE_NAS_TRANSFER"); val != "" {
c.NAS.EnableTransfer = val == "true"
}
if val := os.Getenv("LOCAL_OUTPUT_DIR"); val != "" {
c.Paths.LocalOutput = val
}
if val := os.Getenv("PROCESS_OUTPUT_DIR"); val != "" {
c.Paths.ProcessOutput = val
}
if val := os.Getenv("FFMPEG_PATH"); val != "" {
c.Processing.FFmpegPath = val
}
return nil
}
func (c *Config) resolveAndValidatePaths() error {
cwd, err := os.Getwd()
if err != nil {
return fmt.Errorf("failed to get working directory: %w", err)
}
c.Paths.BaseDir = filepath.Join(cwd, c.Paths.BaseDir)
c.Paths.LocalOutput = filepath.Join(cwd, c.Paths.LocalOutput)
c.Paths.ProcessOutput = filepath.Join(cwd, c.Paths.ProcessOutput)
c.Paths.ManifestDir = filepath.Join(cwd, c.Paths.ManifestDir)
c.Paths.PersistenceFile = filepath.Join(c.Paths.BaseDir, c.Paths.PersistenceFile)
requiredDirs := []string{
c.Paths.BaseDir,
c.Paths.LocalOutput,
c.Paths.ProcessOutput,
c.Paths.ManifestDir,
}
for _, dir := range requiredDirs {
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %w", dir, err)
}
}
if c.NAS.EnableTransfer && c.NAS.OutputPath == "" {
return fmt.Errorf("NAS output path is required when transfer is enabled")
}
if c.Processing.Enabled && c.Processing.FFmpegPath == "" {
return fmt.Errorf("FFmpeg path is required when processing is enabled")
}
return nil
}
func (c *Config) GetEventPath(eventName string) string {
return filepath.Join(c.Paths.LocalOutput, eventName)
}
func (c *Config) GetManifestPath(eventName string) string {
return filepath.Join(c.Paths.ManifestDir, eventName+".json")
}
func (c *Config) GetNASEventPath(eventName string) string {
return filepath.Join(c.NAS.OutputPath, eventName)
}
func (c *Config) GetProcessOutputPath(eventName string) string {
return filepath.Join(c.Paths.ProcessOutput, eventName)
}
func (c *Config) GetQualityPath(eventName, quality string) string {
return filepath.Join(c.GetEventPath(eventName), quality)
}

View File

@ -1,35 +1,51 @@
package constants
import "time"
import (
"m3u8-downloader/pkg/config"
"sync"
)
var (
globalConfig *config.Config
configOnce sync.Once
configError error
)
func GetConfig() (*config.Config, error) {
configOnce.Do(func() {
globalConfig, configError = config.Load()
})
return globalConfig, configError
}
func MustGetConfig() *config.Config {
cfg, err := GetConfig()
if err != nil {
panic("Failed to load configuration: " + err.Error())
}
return cfg
}
const (
WorkerCount = 4
RefreshDelay = 3 * time.Second
RefreshDelay = 3
HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
REFERRER = "https://www.flomarching.com"
LocalOutputDirPath = "../data/"
EnableNASTransfer = true
NASOutputPath = "\\\\HomeLabNAS\\dci\\streams"
NASUsername = "NASAdmin"
NASPassword = "s3tkY6tzA&KN6M"
TransferWorkerCount = 2
TransferRetryLimit = 3
TransferTimeout = 30 * time.Second
FileSettlingDelay = 5 * time.Second
PersistencePath = "../data/transfer_queue.json"
TransferQueueSize = 100000
BatchSize = 1000
ManifestPath = "../data"
DefaultNASOutputPath = "\\\\HomeLabNAS\\dci\\streams"
DefaultNASUsername = "NASAdmin"
CleanupAfterTransfer = true
CleanupBatchSize = 1000
RetainLocalHours = 0
DefaultTransferWorkerCount = 2
DefaultTransferRetryLimit = 3
DefaultTransferTimeout = 30
DefaultFileSettlingDelay = 5
DefaultTransferQueueSize = 100000
DefaultBatchSize = 1000
ProcessOutputPath = "../out"
AutoProcess = true
ProcessingEnabled = true
ProcessWorkerCount = 2
FFmpegPath = "ffmpeg"
DefaultCleanupBatchSize = 1000
DefaultRetainLocalHours = 0
DefaultProcessWorkerCount = 2
DefaultFFmpegPath = "ffmpeg"
)

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"log"
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/utils"
"os"
"sort"
)
@ -20,8 +21,9 @@ type ManifestItem struct {
}
func NewManifestWriter(eventName string) *ManifestWriter {
cfg := constants.MustGetConfig()
return &ManifestWriter{
ManifestPath: constants.ManifestPath + "/" + eventName + ".json",
ManifestPath: cfg.GetManifestPath(eventName),
Segments: make([]ManifestItem, 0),
Index: make(map[string]*ManifestItem),
}
@ -62,6 +64,11 @@ func (m *ManifestWriter) WriteManifest() {
return
}
if err := utils.ValidateWritablePath(m.ManifestPath); err != nil {
log.Printf("Manifest path validation failed: %v", err)
return
}
file, err := os.Create(m.ManifestPath)
if err != nil {
log.Printf("Failed to create manifest file: %v", err)

View File

@ -5,8 +5,9 @@ import (
"context"
"fmt"
"log"
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/config"
"m3u8-downloader/pkg/nas"
"m3u8-downloader/pkg/utils"
"os"
"os/exec"
"path/filepath"
@ -19,36 +20,34 @@ import (
)
type ProcessingService struct {
config *ProcessConfig
config *config.Config
eventName string
nas *nas.NASService
}
func NewProcessingService(eventName string) (*ProcessingService, error) {
config := &ProcessConfig{
WorkerCount: constants.ProcessWorkerCount,
SourcePath: constants.NASOutputPath,
DestinationPath: constants.ProcessOutputPath,
Enabled: constants.ProcessingEnabled,
EventName: eventName,
func NewProcessingService(eventName string, cfg *config.Config) (*ProcessingService, error) {
if cfg == nil {
return nil, fmt.Errorf("configuration is required")
}
nasConfig := &nas.NASConfig{
Path: constants.NASOutputPath,
Username: constants.NASUsername,
Password: constants.NASPassword,
Timeout: constants.TransferTimeout,
RetryLimit: constants.TransferRetryLimit,
nasConfig := nas.NASConfig{
Path: cfg.NAS.OutputPath,
Username: cfg.NAS.Username,
Password: cfg.NAS.Password,
Timeout: cfg.NAS.Timeout,
RetryLimit: cfg.NAS.RetryLimit,
VerifySize: true,
}
nasService := nas.NewNASService(*nasConfig)
nasService := nas.NewNASService(nasConfig)
if err := nasService.TestConnection(); err != nil {
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
return nil, fmt.Errorf("failed to connect to NAS: %w", err)
}
return &ProcessingService{
config: config,
config: cfg,
eventName: eventName,
nas: nasService,
}, nil
}
@ -58,10 +57,11 @@ func (ps *ProcessingService) GetSegmentInfo() (map[int]string, error) {
}
func (ps *ProcessingService) GetEventDirs() ([]string, error) {
if ps.config.EventName == "" {
dirs, err := os.ReadDir(ps.config.SourcePath)
if ps.eventName == "" {
sourcePath := ps.config.NAS.OutputPath
dirs, err := os.ReadDir(sourcePath)
if err != nil {
return nil, fmt.Errorf("Failed to read directory: %w", err)
return nil, fmt.Errorf("failed to read directory %s: %w", sourcePath, err)
}
var eventDirs []string
for _, dir := range dirs {
@ -71,23 +71,23 @@ func (ps *ProcessingService) GetEventDirs() ([]string, error) {
}
return eventDirs, nil
} else {
return []string{ps.config.EventName}, nil
return []string{ps.eventName}, nil
}
}
func (ps *ProcessingService) Start(ctx context.Context) error {
if !ps.config.Enabled {
if !ps.config.Processing.Enabled {
log.Println("Processing service disabled")
return nil
}
if ps.config.EventName == "" {
if ps.eventName == "" {
events, err := ps.GetEventDirs()
if err != nil {
return fmt.Errorf("Failed to get event directories: %w", err)
return fmt.Errorf("failed to get event directories: %w", err)
}
if len(events) == 0 {
return fmt.Errorf("No events found")
return fmt.Errorf("no events found")
}
if len(events) > 1 {
fmt.Println("Multiple events found, please select one:")
@ -99,14 +99,14 @@ func (ps *ProcessingService) Start(ctx context.Context) error {
input = strings.TrimSpace(input)
index, err := strconv.Atoi(input)
if err != nil {
return fmt.Errorf("Failed to parse input: %w", err)
return fmt.Errorf("failed to parse input: %w", err)
}
if index < 1 || index > len(events) {
return fmt.Errorf("Invalid input")
return fmt.Errorf("invalid input")
}
ps.config.EventName = events[index-1]
ps.eventName = events[index-1]
} else {
ps.config.EventName = events[0]
ps.eventName = events[0]
}
}
@ -140,7 +140,11 @@ func (ps *ProcessingService) Start(ctx context.Context) error {
}
// Feed info to ffmpeg to stitch files together
outPath := filepath.Join(constants.NASOutputPath, ps.config.DestinationPath, ps.config.EventName)
outPath := ps.config.GetProcessOutputPath(ps.eventName)
if err := utils.EnsureDir(outPath); err != nil {
return fmt.Errorf("failed to create output directory: %w", err)
}
concatErr := ps.RunFFmpeg(aggFile, outPath)
if concatErr != nil {
return concatErr
@ -150,9 +154,10 @@ func (ps *ProcessingService) Start(ctx context.Context) error {
}
func (ps *ProcessingService) GetResolutions() ([]string, error) {
dirs, err := os.ReadDir(ps.config.SourcePath + "/" + ps.config.EventName)
eventPath := ps.config.GetNASEventPath(ps.eventName)
dirs, err := os.ReadDir(eventPath)
if err != nil {
return nil, fmt.Errorf("Failed to read source directory: %w", err)
return nil, fmt.Errorf("failed to read source directory %s: %w", eventPath, err)
}
re := regexp.MustCompile(`^\d+p$`)
@ -170,9 +175,10 @@ func (ps *ProcessingService) GetResolutions() ([]string, error) {
func (ps *ProcessingService) ParseResolutionDirectory(resolution string, ch chan<- SegmentInfo, wg *sync.WaitGroup) {
defer wg.Done()
files, err := os.ReadDir(ps.config.SourcePath + "/" + ps.config.EventName + "/" + resolution)
resolutionPath := utils.SafeJoin(ps.config.GetNASEventPath(ps.eventName), resolution)
files, err := os.ReadDir(resolutionPath)
if err != nil {
log.Printf("Failed to read resolution directory: %v", err)
log.Printf("Failed to read resolution directory %s: %v", resolutionPath, err)
return
}
@ -221,13 +227,13 @@ func (ps *ProcessingService) AggregateSegmentInfo(ch <-chan SegmentInfo) (map[in
}
func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) (string, error) {
concatPath := filepath.Join(constants.NASOutputPath, constants.ProcessOutputPath, ps.config.EventName)
// Ensure the directory exists
if err := os.MkdirAll(concatPath, 0755); err != nil {
concatPath := ps.config.GetProcessOutputPath(ps.eventName)
if err := utils.EnsureDir(concatPath); err != nil {
return "", fmt.Errorf("failed to create directories for concat path: %w", err)
}
concatFilePath := filepath.Join(concatPath, ps.config.EventName+".txt")
concatFilePath := utils.SafeJoin(concatPath, ps.eventName+".txt")
f, err := os.Create(concatFilePath)
if err != nil {
return "", fmt.Errorf("failed to create concat file: %w", err)
@ -243,7 +249,7 @@ func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) (st
for _, seq := range keys {
segment := segmentMap[seq]
filePath := filepath.Join(ps.config.SourcePath, ps.config.EventName, segment.Resolution, segment.Name)
filePath := utils.SafeJoin(ps.config.GetNASEventPath(ps.eventName), segment.Resolution, segment.Name)
line := fmt.Sprintf("file '%s'\n", filePath)
if _, err := f.WriteString(line); err != nil {
return "", fmt.Errorf("failed to write to concat file: %w", err)
@ -253,60 +259,79 @@ func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) (st
return concatFilePath, nil
}
func ffmpegPath() (string, error) {
var baseDir string
func (ps *ProcessingService) getFFmpegPath() (string, error) {
// First try the configured path
configuredPath := ps.config.Processing.FFmpegPath
if configuredPath != "" {
// Check if it's just the command name or a full path
if filepath.IsAbs(configuredPath) {
return configuredPath, nil
}
// Try to find it in PATH
if fullPath, err := exec.LookPath(configuredPath); err == nil {
return fullPath, nil
}
}
// Fallback: try local bin directory
var baseDir string
exePath, err := os.Executable()
if err == nil {
baseDir = filepath.Dir(exePath)
} else {
// fallback to current working directory
baseDir, err = os.Getwd()
if err != nil {
return "", err
}
}
// When running in GoLand, exePath might point to the IDE launcher,
// so optionally check if ffmpeg exists here, else fallback to CWD
ffmpeg := filepath.Join(baseDir, "bin", "ffmpeg")
ffmpeg := utils.SafeJoin(baseDir, "bin", "ffmpeg")
if runtime.GOOS == "windows" {
ffmpeg += ".exe"
}
if _, err := os.Stat(ffmpeg); os.IsNotExist(err) {
// try cwd instead
if utils.PathExists(ffmpeg) {
return ffmpeg, nil
}
// Try current working directory
cwd, err := os.Getwd()
if err != nil {
return "", err
}
ffmpeg = filepath.Join(cwd, "bin", "ffmpeg")
ffmpeg = utils.SafeJoin(cwd, "bin", "ffmpeg")
if runtime.GOOS == "windows" {
ffmpeg += ".exe"
}
if utils.PathExists(ffmpeg) {
return ffmpeg, nil
}
return ffmpeg, nil
return "", fmt.Errorf("FFmpeg not found. Please install FFmpeg or set FFMPEG_PATH environment variable")
}
func (ps *ProcessingService) RunFFmpeg(inputPath, outputPath string) error {
fmt.Println("Running ffmpeg...")
fileOutPath := filepath.Join(outputPath, ps.config.EventName+".mp4")
fileOutPath := utils.SafeJoin(outputPath, ps.eventName+".mp4")
fmt.Println("Input path:", inputPath)
fmt.Println("Attempting to write to: ", outputPath+"/"+ps.config.EventName+".mp4")
fmt.Println("Output path:", fileOutPath)
path, err := ffmpegPath()
path, err := ps.getFFmpegPath()
if err != nil {
return err
return fmt.Errorf("failed to find FFmpeg: %w", err)
}
cmd := exec.Command(path, "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", filepath.Join(outputPath, ps.config.EventName+".mp4"))
cmd := exec.Command(path, "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", fileOutPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
ffmpegErr := cmd.Run()
if ffmpegErr != nil {
return fmt.Errorf("Failed to run ffmpeg: %w", ffmpegErr)
if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to run ffmpeg: %w", err)
}
fmt.Println("Output path:", fileOutPath)
fmt.Println("FFmpeg completed successfully")
return nil
}

View File

@ -3,11 +3,3 @@ package processing
type ProcessJob struct {
EventName string
}
type ProcessConfig struct {
WorkerCount int
SourcePath string
DestinationPath string
Enabled bool
EventName string
}

View File

@ -6,6 +6,7 @@ import (
"log"
"m3u8-downloader/pkg/constants"
nas2 "m3u8-downloader/pkg/nas"
"m3u8-downloader/pkg/utils"
"os"
"path/filepath"
"strings"
@ -22,45 +23,47 @@ type TransferService struct {
}
func NewTrasferService(outputDir string, eventName string) (*TransferService, error) {
cfg := constants.MustGetConfig()
nasConfig := nas2.NASConfig{
Path: outputDir,
Username: constants.NASUsername,
Password: constants.NASPassword,
Timeout: constants.TransferTimeout,
RetryLimit: constants.TransferRetryLimit,
Username: cfg.NAS.Username,
Password: cfg.NAS.Password,
Timeout: cfg.NAS.Timeout,
RetryLimit: cfg.NAS.RetryLimit,
VerifySize: true,
}
nas := nas2.NewNASService(nasConfig)
if err := nas.TestConnection(); err != nil {
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
return nil, fmt.Errorf("failed to connect to NAS: %w", err)
}
cleanupConfig := CleanupConfig{
Enabled: constants.CleanupAfterTransfer,
RetentionPeriod: time.Duration(constants.RetainLocalHours) * time.Hour,
BatchSize: constants.CleanupBatchSize,
CheckInterval: constants.FileSettlingDelay,
Enabled: cfg.Cleanup.AfterTransfer,
RetentionPeriod: time.Duration(cfg.Cleanup.RetainHours) * time.Hour,
BatchSize: cfg.Cleanup.BatchSize,
CheckInterval: cfg.Transfer.FileSettlingDelay,
}
cleanup := NewCleanupService(cleanupConfig)
queueConfig := QueueConfig{
WorkerCount: constants.TransferWorkerCount,
PersistencePath: constants.PersistencePath,
MaxQueueSize: constants.TransferQueueSize,
BatchSize: constants.BatchSize,
WorkerCount: cfg.Transfer.WorkerCount,
PersistencePath: cfg.Paths.PersistenceFile,
MaxQueueSize: cfg.Transfer.QueueSize,
BatchSize: cfg.Transfer.BatchSize,
}
queue := NewTransferQueue(queueConfig, nas, cleanup)
// Create local output directory if it doesn't exist
localOutputPath := constants.LocalOutputDirPath + "/" + eventName
if err := os.MkdirAll(localOutputPath, 0755); err != nil {
return nil, fmt.Errorf("Failed to create local output directory: %w", err)
localOutputPath := cfg.GetEventPath(eventName)
if err := utils.EnsureDir(localOutputPath); err != nil {
return nil, fmt.Errorf("failed to create local output directory: %w", err)
}
watcher, err := NewFileWatcher(localOutputPath, queue)
watcher, err := NewFileWatcher(localOutputPath, queue, cfg.Transfer.FileSettlingDelay)
if err != nil {
return nil, fmt.Errorf("Failed to create file watcher: %w", err)
return nil, fmt.Errorf("failed to create file watcher: %w", err)
}
return &TransferService{
@ -152,6 +155,7 @@ func (ts *TransferService) Shutdown(ctx context.Context) error {
// QueueExistingFiles scans a directory for .ts files and queues them for transfer
func (ts *TransferService) QueueExistingFiles(localEventPath string) error {
cfg := constants.MustGetConfig()
log.Printf("Scanning for existing files in: %s", localEventPath)
var fileCount, alreadyTransferred, scheduledForCleanup int
@ -190,7 +194,7 @@ func (ts *TransferService) QueueExistingFiles(localEventPath string) error {
alreadyTransferred++
// Schedule for cleanup if cleanup is enabled
if constants.CleanupAfterTransfer {
if cfg.Cleanup.AfterTransfer {
if err := ts.cleanup.ScheduleCleanup(path); err != nil {
log.Printf("Failed to schedule cleanup for already-transferred file %s: %v", path, err)
} else {

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log"
"m3u8-downloader/pkg/constants"
"math/rand"
"os"
"path/filepath"
@ -24,7 +23,7 @@ type FileWatcher struct {
mu sync.Mutex
}
func NewFileWatcher(outputDir string, queue *TransferQueue) (*FileWatcher, error) {
func NewFileWatcher(outputDir string, queue *TransferQueue, settlingDelay time.Duration) (*FileWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
@ -33,7 +32,7 @@ func NewFileWatcher(outputDir string, queue *TransferQueue) (*FileWatcher, error
outputDir: outputDir,
queue: queue,
watcher: watcher,
settingDelay: constants.FileSettlingDelay,
settingDelay: settlingDelay,
pendingFiles: make(map[string]*time.Timer),
}, nil
}

62
pkg/utils/paths.go Normal file
View File

@ -0,0 +1,62 @@
package utils
import (
"fmt"
"os"
"path/filepath"
"strings"
)
func SafeJoin(base string, elements ...string) string {
path := filepath.Join(append([]string{base}, elements...)...)
return filepath.Clean(path)
}
func EnsureDir(path string) error {
if err := os.MkdirAll(path, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %w", path, err)
}
return nil
}
func PathExists(path string) bool {
_, err := os.Stat(path)
return !os.IsNotExist(err)
}
func IsValidPath(path string) bool {
if path == "" {
return false
}
return !strings.ContainsAny(path, "<>:\"|?*")
}
func NormalizePath(path string) string {
return filepath.Clean(strings.ReplaceAll(path, "\\", string(filepath.Separator)))
}
func GetRelativePath(basePath, targetPath string) (string, error) {
rel, err := filepath.Rel(basePath, targetPath)
if err != nil {
return "", fmt.Errorf("failed to get relative path: %w", err)
}
return rel, nil
}
func ValidateWritablePath(path string) error {
dir := filepath.Dir(path)
if err := EnsureDir(dir); err != nil {
return err
}
testFile := filepath.Join(dir, ".write_test")
file, err := os.Create(testFile)
if err != nil {
return fmt.Errorf("path %s is not writable: %w", dir, err)
}
file.Close()
os.Remove(testFile)
return nil
}