package transfer import ( "context" "fmt" "log" "m3u8-downloader/pkg/constants" "sync" "time" ) type TransferService struct { watcher *FileWatcher queue *TransferQueue nas *NASTransfer cleanup *CleanupService stats *QueueStats } func NewTrasferService(outputDir string, eventName string) (*TransferService, error) { nasConfig := NASConfig{ Path: outputDir + "/" + eventName, Username: constants.NASUsername, Password: constants.NASPassword, Timeout: constants.TransferTimeout, RetryLimit: constants.TransferRetryLimit, VerifySize: true, } nas := NewNASTransfer(nasConfig) if err := nas.TestConnection(); err != nil { return nil, fmt.Errorf("Failed to connect to NAS: %w", err) } cleanupConfig := CleanupConfig{ Enabled: constants.CleanupAfterTransfer, RetentionPeriod: constants.RetainLocalHours, BatchSize: constants.CleanupBatchSize, CheckInterval: constants.FileSettlingDelay, } cleanup := NewCleanupService(cleanupConfig) queueConfig := QueueConfig{ WorkerCount: constants.TransferWorkerCount, PersistencePath: constants.PersistencePath, MaxQueueSize: constants.TransferQueueSize, BatchSize: constants.BatchSize, } queue := NewTransferQueue(queueConfig, nas, cleanup) watcher, err := NewFileWatcher(constants.LocalOutputDirPath+"/"+eventName, queue) if err != nil { return nil, fmt.Errorf("Failed to create file watcher: %w", err) } return &TransferService{ watcher: watcher, queue: queue, nas: nas, cleanup: cleanup, stats: queue.stats, }, nil } func (ts *TransferService) Start(ctx context.Context) error { var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() if err := ts.cleanup.Start(ctx); err != nil && err != context.Canceled { log.Printf("Cleanup error: %v", err) } }() wg.Add(1) go func() { defer wg.Done() if err := ts.watcher.Start(ctx); err != nil && err != context.Canceled { log.Printf("Watcher error: %v", err) } }() wg.Add(1) go func() { defer wg.Done() if err := ts.queue.ProcessQueue(ctx); err != nil && err != context.Canceled { log.Printf("Queue error: %v", err) } }() wg.Add(1) go func() { defer wg.Done() ts.reportStats(ctx) }() log.Println("Transfer service started") wg.Wait() return nil } func (ts *TransferService) reportStats(ctx context.Context) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: added, completed, failed, pending, bytes := ts.stats.GetStats() queueSize := ts.queue.GetQueueSize() cleanupPending := ts.cleanup.GetPendingCount() log.Printf("Transfer Stats: Added: %d, Completed: %d, Failed: %d, Pending: %d, Bytes: %d, Queue Size: %d, Cleanup Pending: %d", added, completed, failed, pending, bytes, queueSize, cleanupPending) } } } func (ts *TransferService) Shutdown(ctx context.Context) error { log.Println("Shutting down transfer service...") if err := ts.queue.SaveState(); err != nil { return fmt.Errorf("Failed to save queue state: %w", err) } if err := ts.cleanup.ForceCleanupAll(ctx); err != nil { return fmt.Errorf("Failed to force cleanup: %w", err) } log.Println("Transfer service shut down") return nil }