From 0f4a39f4260d960058d25119213ad74d4ed296c4 Mon Sep 17 00:00:00 2001 From: townandgown Date: Sat, 26 Jul 2025 21:40:18 -0500 Subject: [PATCH] Functioning v1, downloaded full Atlanta regional --- cmd/downloader/main.go | 32 ++++- go.mod | 7 +- pkg/constants/constants.go | 20 ++- pkg/media/stream.go | 4 +- pkg/transfer/cleanup.go | 171 ++++++++++++++++++++++++ pkg/transfer/nas.go | 124 +++++++++++++++++ pkg/transfer/queue.go | 264 +++++++++++++++++++++++++++++++++++++ pkg/transfer/service.go | 136 +++++++++++++++++++ pkg/transfer/types.go | 105 +++++++++++++++ pkg/transfer/watcher.go | 190 ++++++++++++++++++++++++++ 10 files changed, 1047 insertions(+), 6 deletions(-) create mode 100644 pkg/transfer/cleanup.go create mode 100644 pkg/transfer/nas.go create mode 100644 pkg/transfer/queue.go create mode 100644 pkg/transfer/service.go create mode 100644 pkg/transfer/types.go create mode 100644 pkg/transfer/watcher.go diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index 06ab5e1..e252e9a 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -5,16 +5,19 @@ import ( "log" "m3u8-downloader/pkg/constants" "m3u8-downloader/pkg/media" + "m3u8-downloader/pkg/transfer" "os" "os/signal" "sync" "syscall" + "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Goroutine to listen for shutdown signals sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { @@ -23,13 +26,32 @@ func main() { cancel() }() + var wg sync.WaitGroup + var transferService *transfer.TransferService + if constants.EnableNASTransfer { + ts, err := transfer.NewTrasferService(constants.NASPath) + if err != nil { + log.Printf("Failed to create transfer service: %v", err) + log.Println("Continuing without transfer service...") + } else { + transferService = ts + wg.Add(1) + go func() { + defer wg.Done() + if err := transferService.Start(ctx); err != nil && err != context.Canceled { + log.Printf("Transfer service error: %v", err) + } + }() + log.Println("Transfer service started.") + } + } + variants, err := media.GetAllVariants(constants.MasterURL) if err != nil { log.Fatalf("Failed to get variants: %v", err) } log.Printf("Found %d variants", len(variants)) - var wg sync.WaitGroup sem := make(chan struct{}, constants.WorkerCount*len(variants)) for _, variant := range variants { @@ -42,4 +64,12 @@ func main() { wg.Wait() log.Println("All variant downloaders finished.") + + if transferService != nil { + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer shutdownCancel() + transferService.Shutdown(shutdownCtx) + } + + log.Println("All Services shut down.") } diff --git a/go.mod b/go.mod index 7d6be6a..c751d04 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,9 @@ module m3u8-downloader go 1.23.0 -require github.com/grafov/m3u8 v0.12.1 +require ( + github.com/fsnotify/fsnotify v1.9.0 + github.com/grafov/m3u8 v0.12.1 +) + +require golang.org/x/sys v0.13.0 // indirect diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index eef90c0..74fe6cb 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -3,11 +3,27 @@ package constants import "time" const ( - MasterURL = "https://d17cyqyz9yhmep.cloudfront.net/streams/234951/playlist_vo_1752978025523_1752978954944.m3u8" + MasterURL = "https://live-fastly.flosports.tv/streams/mr159021-260419/playlist.m3u8?token=st%3D1753571418%7Eexp%3D1753571448%7Eacl%3D%2Fstreams%2Fmr159021-260419%2Fplaylist.m3u8%7Edata%3Dssai%3A0%3BuserId%3A14025903%3BstreamId%3A260419%3BmediaPackageRegion%3Afalse%3BdvrMinutes%3A360%3BtokenId%3Abadd289a-ade5-48fe-852f-7dbd1d57aca8%3Bpv%3A86400%7Ehmac2%3D8de65c26b185084a6be77e788cb0ba41be5fcac3ab86159b06f7572ca925d77ba7bd182124af2a432953d4223548f198742d1a238e937d875976cd42fe549838&mid_origin=media_store&keyName=FLOSPORTS_TOKEN_KEY_2023-08-02&streamCode=mr159021-260419" WorkerCount = 4 RefreshDelay = 3 * time.Second HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" REFERRER = "https://www.flomarching.com" - OutputDirPath = "./data" + OutputDirPath = "./data/flo_radio" + + EnableNASTransfer = true + NASPath = "\\\\HomeLabNAS\\dci\\streams\\2025_Atlanta" + NASUsername = "" + NASPassword = "" + TransferWorkerCount = 2 + TransferRetryLimit = 3 + TransferTimeout = 30 * time.Second + FileSettlingDelay = 5 * time.Second + PersistencePath = "./data/transfer_queue.json" + TransferQueueSize = 1000 + BatchSize = 10 + + CleanupAfterTransfer = true + CleanupBatchSize = 10 + RetainLocalHours = 0 ) diff --git a/pkg/media/stream.go b/pkg/media/stream.go index ef3f4ba..373ca49 100644 --- a/pkg/media/stream.go +++ b/pkg/media/stream.go @@ -69,7 +69,7 @@ func GetAllVariants(masterURL string) ([]*StreamVariant, error) { BaseURL: base, ID: 0, Resolution: "unknown", - OutputDir: path.Join(constants.OutputDirPath, "unknown"), + OutputDir: path.Join(constants.NASPath, "unknown"), }}, nil } @@ -83,7 +83,7 @@ func GetAllVariants(masterURL string) ([]*StreamVariant, error) { vURL, _ := url.Parse(v.URI) fullURL := base.ResolveReference(vURL).String() resolution := extractResolution(v) - outputDir := path.Join(constants.OutputDirPath, resolution) + outputDir := path.Join(constants.NASPath, resolution) variants = append(variants, &StreamVariant{ URL: fullURL, Bandwidth: v.Bandwidth, diff --git a/pkg/transfer/cleanup.go b/pkg/transfer/cleanup.go new file mode 100644 index 0000000..8eda06f --- /dev/null +++ b/pkg/transfer/cleanup.go @@ -0,0 +1,171 @@ +package transfer + +import ( + "context" + "fmt" + "log" + "os" + "sync" + "time" +) + +type CleanupService struct { + config CleanupConfig + pendingFiles []string + mu sync.Mutex +} + +func NewCleanupService(config CleanupConfig) *CleanupService { + return &CleanupService{ + config: config, + pendingFiles: make([]string, 0), + } +} + +func (cs *CleanupService) ScheduleCleanup(filePath string) error { + if !cs.config.Enabled { + return nil + } + + cs.mu.Lock() + defer cs.mu.Unlock() + cs.pendingFiles = append(cs.pendingFiles, filePath) + log.Printf("Scheduled file for cleanup: %s", filePath) + return nil +} + +func (cs *CleanupService) Start(ctx context.Context) error { + if !cs.config.Enabled { + log.Println("Cleanup service disabled") + return nil + } + + log.Printf("Cleanup service started (retention: %v, batch: %d)", cs.config.RetentionPeriod, cs.config.BatchSize) + + ticker := time.NewTicker(cs.config.CheckInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + log.Println("Cleanup service shutting down...") + return ctx.Err() + case <-ticker.C: + if err := cs.ExecuteCleanup(ctx); err != nil { + log.Printf("Cleanup error: %v", err) + } + } + } +} + +func (cs *CleanupService) ExecuteCleanup(ctx context.Context) error { + cs.mu.Lock() + if len(cs.pendingFiles) == 0 { + cs.mu.Unlock() + return nil + } + + batchSize := cs.config.BatchSize + if batchSize > len(cs.pendingFiles) { + batchSize = len(cs.pendingFiles) + } + cs.mu.Unlock() + + log.Printf("Executing cleanup batch (size: %d)", batchSize) + + batch := make([]string, batchSize) + copy(batch, cs.pendingFiles[:batchSize]) + cs.pendingFiles = cs.pendingFiles[batchSize:] + cs.mu.Unlock() + + log.Printf("Processing %d files for cleanup", len(batch)) + + var cleanedCount int + var errors []error + + for _, filePath := range batch { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err := cs.cleanupFile(filePath); err != nil { + errors = append(errors, fmt.Errorf("Failed to cleanup file %s: %w", filePath, err)) + } else { + cleanedCount++ + } + } + + log.Printf("Cleanup batch completed (cleaned: %d, errors: %d)", cleanedCount, len(errors)) + + if len(errors) > 0 { + for i, err := range errors { + if i >= 3 { + log.Printf("... and %d more errors", len(errors)-3) + break + } + log.Printf("Error: %v", err) + } + } + + return nil + +} + +func (cs *CleanupService) cleanupFile(filePath string) error { + info, err := os.Stat(filePath) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("Failed to get file info: %w", err) + } + + if cs.config.RetentionPeriod > 0 { + if time.Since(info.ModTime()) < cs.config.RetentionPeriod { + log.Printf("File too new to cleanup: %s", filePath) + return nil + } + } + + if err := os.Remove(filePath); err != nil { + return fmt.Errorf("Failed to remove file: %w", err) + } + + log.Printf("File cleaned up: %s", filePath) + return nil +} + +func (cs *CleanupService) GetPendingCount() int { + cs.mu.Lock() + defer cs.mu.Unlock() + return len(cs.pendingFiles) +} + +func (cs *CleanupService) ForceCleanupAll(ctx context.Context) error { + log.Println("Force cleanup requested") + + for { + cs.mu.Lock() + pendingCount := len(cs.pendingFiles) + cs.mu.Unlock() + + if pendingCount == 0 { + break + } + + if err := cs.ExecuteCleanup(ctx); err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(100 * time.Millisecond): + } + } + + log.Println("Force cleanup complete") + return nil +} diff --git a/pkg/transfer/nas.go b/pkg/transfer/nas.go new file mode 100644 index 0000000..7172c6c --- /dev/null +++ b/pkg/transfer/nas.go @@ -0,0 +1,124 @@ +package transfer + +import ( + "context" + "fmt" + "io" + "log" + "os" + "path/filepath" +) + +type NASTransfer struct { + config NASConfig + connected bool +} + +func NewNASTransfer(config NASConfig) *NASTransfer { + return &NASTransfer{ + config: config, + } +} + +func (nt *NASTransfer) TransferFile(ctx context.Context, item *TransferItem) error { + destPath := filepath.Join(nt.config.Path, item.DestinationPath) + + destDir := filepath.Dir(destPath) + if err := nt.ensureDirectoryExists(destDir); err != nil { + return fmt.Errorf("Failed to create directory %s: %w", destDir, err) + } + + transferCtx, cancel := context.WithTimeout(ctx, nt.config.Timeout) + defer cancel() + + if err := nt.copyFile(transferCtx, item.SourcePath, destPath); err != nil { + return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err) + } + + if nt.config.VerifySize { + if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil { + os.Remove(destPath) + return fmt.Errorf("Failed to verify transfer: %w", err) + } + } + + log.Printf("File transfer completed: %s -> %s", item.SourcePath, destPath) + + return nil +} + +func (nt *NASTransfer) copyFile(ctx context.Context, srcPath, destPath string) error { + src, err := os.Open(srcPath) + if err != nil { + return fmt.Errorf("Failed to open source file: %w", err) + } + defer src.Close() + + dest, err := os.Create(destPath) + if err != nil { + return fmt.Errorf("Failed to create destination file: %w", err) + } + defer dest.Close() + + done := make(chan error, 1) + go func() { + _, err := io.Copy(dest, src) + done <- err + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-done: + if err != nil { + return err + } + + return dest.Sync() + } +} + +func (nt *NASTransfer) VerifyTransfer(srcPath, destPath string) error { + srcInfo, err := os.Stat(srcPath) + if err != nil { + return fmt.Errorf("Failed to stat source file: %w", err) + } + + destInfo, err := os.Stat(destPath) + if err != nil { + return fmt.Errorf("Failed to stat destination file: %w", err) + } + + if srcInfo.Size() != destInfo.Size() { + return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size()) + } + + return nil +} + +func (nt *NASTransfer) ensureDirectoryExists(path string) error { + if err := os.MkdirAll(path, 0755); err != nil { + return fmt.Errorf("Failed to create directory: %w", err) + } + return nil +} + +func (nt *NASTransfer) TestConnection() error { + testFile := filepath.Join(nt.config.Path, ".connection_test") + + f, err := os.Create(testFile) + if err != nil { + return fmt.Errorf("Failed to create test file: %w", err) + } + f.Close() + + os.Remove(testFile) + + nt.connected = true + log.Printf("Connected to NAS at %s", nt.config.Path) + return nil +} + +func (nt *NASTransfer) IsConnected() bool { + return nt.connected +} diff --git a/pkg/transfer/queue.go b/pkg/transfer/queue.go new file mode 100644 index 0000000..56bcb0a --- /dev/null +++ b/pkg/transfer/queue.go @@ -0,0 +1,264 @@ +package transfer + +import ( + "container/heap" + "context" + "encoding/json" + "fmt" + "log" + "os" + "sync" + "time" +) + +type TransferQueue struct { + config QueueConfig + items *PriorityQueue + stats *QueueStats + nasTransfer *NASTransfer + cleanup *CleanupService + workers []chan TransferItem + mu sync.RWMutex +} + +type PriorityQueue []*TransferItem + +func (pq PriorityQueue) Len() int { + return len(pq) +} + +func (pq PriorityQueue) Less(i, j int) bool { + return pq[i].Timestamp.After(pq[j].Timestamp) +} + +func (pq PriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] +} + +func (pq *PriorityQueue) Push(x interface{}) { + item := x.(*TransferItem) + *pq = append(*pq, item) +} + +func (pq *PriorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + *pq = old[0 : n-1] + return item +} + +func NewTransferQueue(config QueueConfig, nasTransfer *NASTransfer, cleanup *CleanupService) *TransferQueue { + pq := &PriorityQueue{} + heap.Init(pq) + + tq := &TransferQueue{ + config: config, + items: pq, + stats: &QueueStats{}, + nasTransfer: nasTransfer, + cleanup: cleanup, + workers: make([]chan TransferItem, config.WorkerCount), + } + + if err := tq.LoadState(); err != nil { + log.Printf("Failed to load queue state: %v", err) + } + + return tq +} + +func (tq *TransferQueue) Add(item TransferItem) error { + tq.mu.Lock() + defer tq.mu.Unlock() + + if tq.items.Len() >= tq.config.MaxQueueSize { + return fmt.Errorf("Queue is full (max size: %d)", tq.config.MaxQueueSize) + } + + heap.Push(tq.items, &item) + tq.stats.IncrementAdded() + + log.Printf("Added file to queue: %s", item.SourcePath) + + return nil +} + +func (tq *TransferQueue) ProcessQueue(ctx context.Context) error { + for i := 0; i < tq.config.WorkerCount; i++ { + workerChan := make(chan TransferItem, 1) + tq.workers[i] = workerChan + + go func(workerID int, workChan chan TransferItem) { + log.Printf("Worker %d started", workerID) + for { + select { + case <-ctx.Done(): + log.Printf("Transfer worker %d shutting down...", workerID) + return + case item := <-workChan: + tq.processItem(ctx, item) + } + } + }(i, workerChan) + } + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + log.Println("Transfer queue shutting down...") + return ctx.Err() + case <-ticker.C: + tq.dispatchWork() + + if time.Now().Unix()%30 == 0 { + if err := tq.SaveState(); err != nil { + log.Printf("Failed to save queue state: %v", err) + } + } + } + } +} + +func (tq *TransferQueue) dispatchWork() { + tq.mu.Lock() + defer tq.mu.Unlock() + + for i, workerChan := range tq.workers { + if len(workerChan) == 0 && tq.items.Len() > 0 { + item := heap.Pop(tq.items).(*TransferItem) + item.Status = StatusInProgress + + select { + case workerChan <- *item: + log.Printf("Dispatched file to worker %d: %s", i, item.SourcePath) + default: + heap.Push(tq.items, item) + item.Status = StatusPending + + } + } + } +} + +func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) { + maxRetries := 3 + + for attempt := 1; attempt <= maxRetries; attempt++ { + if attempt > 0 { + item.Status = StatusRetrying + backoff := time.Duration(attempt*attempt) * time.Second + log.Printf("Backing off for %d seconds before retrying (attempt %d/%d)", backoff, attempt, maxRetries) + + select { + case <-time.After(backoff): + case <-ctx.Done(): + return + } + } + + err := tq.nasTransfer.TransferFile(ctx, &item) + if err == nil { + item.Status = StatusCompleted + tq.stats.IncrementCompleted(item.FileSize) + + if tq.cleanup != nil { + if err := tq.cleanup.ScheduleCleanup(item.SourcePath); err != nil { + log.Printf("Failed to add file to cleanup list: %v", err) + } + } + log.Printf("File transfer completed: %s", item.SourcePath) + return + } + + item.LastError = err.Error() + item.RetryCount++ + + log.Printf("File transfer failed: %s (attempt %d/%d): %v", item.SourcePath, item.RetryCount, maxRetries, err) + + if attempt == maxRetries { + item.Status = StatusFailed + tq.stats.IncrementFailed() + log.Printf("Transfer permanently failed for file: %s", item.SourcePath) + return + } + } +} + +func (tq *TransferQueue) SaveState() error { + tq.mu.Lock() + defer tq.mu.Unlock() + + items := make([]*TransferItem, tq.items.Len()) + tempPQ := make(PriorityQueue, tq.items.Len()) + copy(tempPQ, *tq.items) + + for i := 0; i < len(items); i++ { + items[i] = heap.Pop(&tempPQ).(*TransferItem) + } + + data, err := json.MarshalIndent(map[string]interface{}{ + "items": items, + "stats": tq.stats, + "timestamp": time.Now(), + }, "", " ") + if err != nil { + return fmt.Errorf("Failed to marshal queue state: %w", err) + } + + if err := os.WriteFile(tq.config.PersistencePath, data, 0644); err != nil { + return fmt.Errorf("Failed to save queue state: %w", err) + } + + return nil +} + +func (tq *TransferQueue) LoadState() error { + data, err := os.ReadFile(tq.config.PersistencePath) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("Failed to load queue state: %w", err) + } + + var state struct { + Items []*TransferItem `json:"items"` + Stats *QueueStats `json:"stats"` + Timestamp time.Time `json:"timestamp"` + } + + if err := json.Unmarshal(data, &state); err != nil { + return fmt.Errorf("Failed to load queue state: %w", err) + } + + tq.mu.Lock() + defer tq.mu.Unlock() + + for _, item := range state.Items { + if item.Status == StatusPending || item.Status == StatusFailed { + heap.Push(tq.items, item) + } + } + + if state.Stats != nil { + tq.stats = state.Stats + } + + log.Printf("Loaded queue state: %d items restored from %v", + tq.items.Len(), state.Timestamp.Format(time.RFC3339)) + return nil +} + +func (tq *TransferQueue) GetStats() (int, int, int, int, int64) { + return tq.stats.GetStats() +} + +func (tq *TransferQueue) GetQueueSize() int { + tq.mu.RLock() + defer tq.mu.RUnlock() + return tq.items.Len() +} diff --git a/pkg/transfer/service.go b/pkg/transfer/service.go new file mode 100644 index 0000000..fbb8c5f --- /dev/null +++ b/pkg/transfer/service.go @@ -0,0 +1,136 @@ +package transfer + +import ( + "context" + "fmt" + "log" + "m3u8-downloader/pkg/constants" + "sync" + "time" +) + +type TransferService struct { + watcher *FileWatcher + queue *TransferQueue + nas *NASTransfer + cleanup *CleanupService + stats *QueueStats +} + +func NewTrasferService(outputDir string) (*TransferService, error) { + nasConfig := NASConfig{ + Path: constants.NASPath, + Username: constants.NASUsername, + Password: constants.NASPassword, + Timeout: constants.TransferTimeout, + RetryLimit: constants.TransferRetryLimit, + VerifySize: true, + } + nas := NewNASTransfer(nasConfig) + + if err := nas.TestConnection(); err != nil { + return nil, fmt.Errorf("Failed to connect to NAS: %w", err) + } + + cleanupConfig := CleanupConfig{ + Enabled: constants.CleanupAfterTransfer, + RetentionPeriod: constants.RetainLocalHours, + BatchSize: constants.CleanupBatchSize, + CheckInterval: constants.FileSettlingDelay, + } + cleanup := NewCleanupService(cleanupConfig) + + queueConfig := QueueConfig{ + WorkerCount: constants.TransferWorkerCount, + PersistencePath: constants.PersistencePath, + MaxQueueSize: constants.TransferQueueSize, + BatchSize: constants.BatchSize, + } + queue := NewTransferQueue(queueConfig, nas, cleanup) + + watcher, err := NewFileWatcher(outputDir, queue) + if err != nil { + return nil, fmt.Errorf("Failed to create file watcher: %w", err) + } + + return &TransferService{ + watcher: watcher, + queue: queue, + nas: nas, + cleanup: cleanup, + stats: queue.stats, + }, nil +} + +func (ts *TransferService) Start(ctx context.Context) error { + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + if err := ts.cleanup.Start(ctx); err != nil && err != context.Canceled { + log.Printf("Cleanup error: %v", err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := ts.watcher.Start(ctx); err != nil && err != context.Canceled { + log.Printf("Watcher error: %v", err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := ts.queue.ProcessQueue(ctx); err != nil && err != context.Canceled { + log.Printf("Queue error: %v", err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + ts.reportStats(ctx) + }() + + log.Println("Transfer service started") + wg.Wait() + + return nil +} + +func (ts *TransferService) reportStats(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + added, completed, failed, pending, bytes := ts.stats.GetStats() + queueSize := ts.queue.GetQueueSize() + cleanupPending := ts.cleanup.GetPendingCount() + + log.Printf("Transfer Stats: Added: %d, Completed: %d, Failed: %d, Pending: %d, Bytes: %d, Queue Size: %d, Cleanup Pending: %d", added, completed, failed, pending, bytes, queueSize, cleanupPending) + } + } +} + +func (ts *TransferService) Shutdown(ctx context.Context) error { + log.Println("Shutting down transfer service...") + + if err := ts.queue.SaveState(); err != nil { + return fmt.Errorf("Failed to save queue state: %w", err) + } + + if err := ts.cleanup.ForceCleanupAll(ctx); err != nil { + return fmt.Errorf("Failed to force cleanup: %w", err) + } + + log.Println("Transfer service shut down") + + return nil +} diff --git a/pkg/transfer/types.go b/pkg/transfer/types.go new file mode 100644 index 0000000..aa4c7c7 --- /dev/null +++ b/pkg/transfer/types.go @@ -0,0 +1,105 @@ +package transfer + +import ( + "sync" + "time" +) + +type TransferItem struct { + ID string + SourcePath string + DestinationPath string + Resolution string + Timestamp time.Time + RetryCount int + Status TransferStatus + FileSize int64 + LastError string +} + +type TransferStatus int + +const ( + StatusPending TransferStatus = iota + StatusInProgress + StatusCompleted + StatusFailed + StatusRetrying +) + +func (s TransferStatus) String() string { + switch s { + case StatusPending: + return "Pending" + case StatusInProgress: + return "In Progress" + case StatusCompleted: + return "Completed" + case StatusFailed: + return "Failed" + case StatusRetrying: + return "Retrying" + default: + return "Unknown" + } +} + +type NASConfig struct { + Path string + Username string + Password string + Timeout time.Duration + RetryLimit int + VerifySize bool +} + +type QueueConfig struct { + WorkerCount int + PersistencePath string + MaxQueueSize int + BatchSize int +} + +type CleanupConfig struct { + Enabled bool + RetentionPeriod time.Duration + BatchSize int + CheckInterval time.Duration +} + +type QueueStats struct { + mu sync.Mutex + TotalAdded int + TotalCompleted int + TotalFailed int + CurrentPending int + BytesTransferred int64 +} + +func (qs *QueueStats) IncrementAdded() { + qs.mu.Lock() + defer qs.mu.Unlock() + qs.TotalAdded++ + qs.CurrentPending++ +} + +func (qs *QueueStats) IncrementCompleted(bytes int64) { + qs.mu.Lock() + defer qs.mu.Unlock() + qs.TotalCompleted++ + qs.CurrentPending-- + qs.BytesTransferred += bytes +} + +func (qs *QueueStats) IncrementFailed() { + qs.mu.Lock() + defer qs.mu.Unlock() + qs.TotalFailed++ + qs.CurrentPending-- +} + +func (qs *QueueStats) GetStats() (int, int, int, int, int64) { + qs.mu.Lock() + defer qs.mu.Unlock() + return qs.TotalAdded, qs.TotalCompleted, qs.TotalFailed, qs.CurrentPending, qs.BytesTransferred +} diff --git a/pkg/transfer/watcher.go b/pkg/transfer/watcher.go new file mode 100644 index 0000000..52e814c --- /dev/null +++ b/pkg/transfer/watcher.go @@ -0,0 +1,190 @@ +package transfer + +import ( + "context" + "fmt" + "log" + "math/rand" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/fsnotify/fsnotify" +) + +type FileWatcher struct { + outputDir string + queue *TransferQueue + watcher *fsnotify.Watcher + settingDelay time.Duration + pendingFiles map[string]*time.Timer + mu sync.Mutex +} + +func NewFileWatcher(outputDir string, queue *TransferQueue) (*FileWatcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + return &FileWatcher{ + outputDir: outputDir, + queue: queue, + watcher: watcher, + settingDelay: time.Second, + pendingFiles: make(map[string]*time.Timer), + }, nil +} + +func (fw *FileWatcher) Start(ctx context.Context) error { + defer fw.watcher.Close() + + if err := fw.addWatchRecursive(fw.outputDir); err != nil { + return fmt.Errorf("Failed to add watch paths: %w", err) + } + + log.Printf("Starting file watcher on %s", fw.outputDir) + + for { + select { + case <-ctx.Done(): + log.Println("File watcher shutting down...") + return ctx.Err() + + case event, ok := <-fw.watcher.Events: + if !ok { + return fmt.Errorf("Watcher events channel closed") + } + fw.handleFileEvent(event) + + case err, ok := <-fw.watcher.Errors: + if !ok { + return fmt.Errorf("Watcher errors channel closed") + } + log.Printf("Watcher error: %v", err) + } + } +} + +func (fw *FileWatcher) addWatchRecursive(root string) error { + return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + log.Printf("Error walking path %s: %v", path, err) + return nil + } + + if info.IsDir() { + if err := fw.watcher.Add(path); err != nil { + log.Printf("Failed to watch directory %s: %v", path, err) + } else { + log.Printf("Watching directory %s", path) + } + } + + return nil + }) +} + +func (fw *FileWatcher) handleFileEvent(event fsnotify.Event) { + if !strings.HasSuffix(strings.ToLower(event.Name), ".ts") { + return + } + + switch { + case event.Op&fsnotify.Create == fsnotify.Create: + fw.scheduleTransfer(event.Name) + case event.Op&fsnotify.Write == fsnotify.Write: + fw.scheduleTransfer(event.Name) + case event.Op&fsnotify.Remove == fsnotify.Remove: + fw.cancelPendingTransfer(event.Name) + } + + if event.Op&fsnotify.Create == fsnotify.Create { + if info, err := os.Stat(event.Name); err == nil && info.IsDir() { + if err := fw.watcher.Add(event.Name); err != nil { + log.Printf("Failed to watch directory %s: %v", event.Name, err) + } else { + log.Printf("Watching directory %s", event.Name) + } + } + } +} + +func (fw *FileWatcher) scheduleTransfer(filePath string) { + fw.mu.Lock() + defer fw.mu.Unlock() + + if timer, exists := fw.pendingFiles[filePath]; exists { + timer.Stop() + } + + fw.pendingFiles[filePath] = time.AfterFunc(fw.settingDelay, func() { + fw.processFile(filePath) + fw.mu.Lock() + delete(fw.pendingFiles, filePath) + fw.mu.Unlock() + }) + + log.Printf("Scheduled file for transfer: %s", filePath) +} + +func (fw *FileWatcher) cancelPendingTransfer(filePath string) { + fw.mu.Lock() + defer fw.mu.Unlock() + + if timer, exists := fw.pendingFiles[filePath]; exists { + timer.Stop() + delete(fw.pendingFiles, filePath) + log.Printf("Canceled pending transfer for file: %s", filePath) + } +} + +func (fw *FileWatcher) processFile(filePath string) { + info, err := os.Stat(filePath) + if err != nil { + log.Printf("Failed to stat file %s: %v", filePath, err) + return + } + + resolution := fw.extractResolution(filePath) + + relPath, err := filepath.Rel(fw.outputDir, filePath) + if err != nil { + log.Printf("Failed to get relative path for file %s: %v", filePath, err) + return + } + + item := TransferItem{ + ID: generateID(), + SourcePath: filePath, + DestinationPath: relPath, + Resolution: resolution, + Timestamp: time.Now(), + Status: StatusPending, + FileSize: info.Size(), + } + + if err := fw.queue.Add(item); err != nil { + log.Printf("Failed to add file to queue: %v", err) + } else { + log.Printf("Added file to queue: %s", filePath) + } +} + +func (fw *FileWatcher) extractResolution(filePath string) string { + dir := filepath.Dir(filePath) + parts := strings.Split(dir, string(filepath.Separator)) + + for _, part := range parts { + if strings.HasSuffix(part, "p") { + return part + } + } + + return "" +} + +func generateID() string { + return fmt.Sprintf("transfer_%d_%d", time.Now().UnixNano(), rand.Intn(1000)) +}