Functioning v1, downloaded full Atlanta regional

This commit is contained in:
townandgown 2025-07-26 21:40:18 -05:00
parent 0e27ed0ebb
commit 0f4a39f426
10 changed files with 1047 additions and 6 deletions

View File

@ -5,16 +5,19 @@ import (
"log"
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/media"
"m3u8-downloader/pkg/transfer"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Goroutine to listen for shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
@ -23,13 +26,32 @@ func main() {
cancel()
}()
var wg sync.WaitGroup
var transferService *transfer.TransferService
if constants.EnableNASTransfer {
ts, err := transfer.NewTrasferService(constants.NASPath)
if err != nil {
log.Printf("Failed to create transfer service: %v", err)
log.Println("Continuing without transfer service...")
} else {
transferService = ts
wg.Add(1)
go func() {
defer wg.Done()
if err := transferService.Start(ctx); err != nil && err != context.Canceled {
log.Printf("Transfer service error: %v", err)
}
}()
log.Println("Transfer service started.")
}
}
variants, err := media.GetAllVariants(constants.MasterURL)
if err != nil {
log.Fatalf("Failed to get variants: %v", err)
}
log.Printf("Found %d variants", len(variants))
var wg sync.WaitGroup
sem := make(chan struct{}, constants.WorkerCount*len(variants))
for _, variant := range variants {
@ -42,4 +64,12 @@ func main() {
wg.Wait()
log.Println("All variant downloaders finished.")
if transferService != nil {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
transferService.Shutdown(shutdownCtx)
}
log.Println("All Services shut down.")
}

7
go.mod
View File

@ -2,4 +2,9 @@ module m3u8-downloader
go 1.23.0
require github.com/grafov/m3u8 v0.12.1
require (
github.com/fsnotify/fsnotify v1.9.0
github.com/grafov/m3u8 v0.12.1
)
require golang.org/x/sys v0.13.0 // indirect

View File

@ -3,11 +3,27 @@ package constants
import "time"
const (
MasterURL = "https://d17cyqyz9yhmep.cloudfront.net/streams/234951/playlist_vo_1752978025523_1752978954944.m3u8"
MasterURL = "https://live-fastly.flosports.tv/streams/mr159021-260419/playlist.m3u8?token=st%3D1753571418%7Eexp%3D1753571448%7Eacl%3D%2Fstreams%2Fmr159021-260419%2Fplaylist.m3u8%7Edata%3Dssai%3A0%3BuserId%3A14025903%3BstreamId%3A260419%3BmediaPackageRegion%3Afalse%3BdvrMinutes%3A360%3BtokenId%3Abadd289a-ade5-48fe-852f-7dbd1d57aca8%3Bpv%3A86400%7Ehmac2%3D8de65c26b185084a6be77e788cb0ba41be5fcac3ab86159b06f7572ca925d77ba7bd182124af2a432953d4223548f198742d1a238e937d875976cd42fe549838&mid_origin=media_store&keyName=FLOSPORTS_TOKEN_KEY_2023-08-02&streamCode=mr159021-260419"
WorkerCount = 4
RefreshDelay = 3 * time.Second
HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
REFERRER = "https://www.flomarching.com"
OutputDirPath = "./data"
OutputDirPath = "./data/flo_radio"
EnableNASTransfer = true
NASPath = "\\\\HomeLabNAS\\dci\\streams\\2025_Atlanta"
NASUsername = ""
NASPassword = ""
TransferWorkerCount = 2
TransferRetryLimit = 3
TransferTimeout = 30 * time.Second
FileSettlingDelay = 5 * time.Second
PersistencePath = "./data/transfer_queue.json"
TransferQueueSize = 1000
BatchSize = 10
CleanupAfterTransfer = true
CleanupBatchSize = 10
RetainLocalHours = 0
)

View File

@ -69,7 +69,7 @@ func GetAllVariants(masterURL string) ([]*StreamVariant, error) {
BaseURL: base,
ID: 0,
Resolution: "unknown",
OutputDir: path.Join(constants.OutputDirPath, "unknown"),
OutputDir: path.Join(constants.NASPath, "unknown"),
}}, nil
}
@ -83,7 +83,7 @@ func GetAllVariants(masterURL string) ([]*StreamVariant, error) {
vURL, _ := url.Parse(v.URI)
fullURL := base.ResolveReference(vURL).String()
resolution := extractResolution(v)
outputDir := path.Join(constants.OutputDirPath, resolution)
outputDir := path.Join(constants.NASPath, resolution)
variants = append(variants, &StreamVariant{
URL: fullURL,
Bandwidth: v.Bandwidth,

171
pkg/transfer/cleanup.go Normal file
View File

@ -0,0 +1,171 @@
package transfer
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
)
type CleanupService struct {
config CleanupConfig
pendingFiles []string
mu sync.Mutex
}
func NewCleanupService(config CleanupConfig) *CleanupService {
return &CleanupService{
config: config,
pendingFiles: make([]string, 0),
}
}
func (cs *CleanupService) ScheduleCleanup(filePath string) error {
if !cs.config.Enabled {
return nil
}
cs.mu.Lock()
defer cs.mu.Unlock()
cs.pendingFiles = append(cs.pendingFiles, filePath)
log.Printf("Scheduled file for cleanup: %s", filePath)
return nil
}
func (cs *CleanupService) Start(ctx context.Context) error {
if !cs.config.Enabled {
log.Println("Cleanup service disabled")
return nil
}
log.Printf("Cleanup service started (retention: %v, batch: %d)", cs.config.RetentionPeriod, cs.config.BatchSize)
ticker := time.NewTicker(cs.config.CheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Println("Cleanup service shutting down...")
return ctx.Err()
case <-ticker.C:
if err := cs.ExecuteCleanup(ctx); err != nil {
log.Printf("Cleanup error: %v", err)
}
}
}
}
func (cs *CleanupService) ExecuteCleanup(ctx context.Context) error {
cs.mu.Lock()
if len(cs.pendingFiles) == 0 {
cs.mu.Unlock()
return nil
}
batchSize := cs.config.BatchSize
if batchSize > len(cs.pendingFiles) {
batchSize = len(cs.pendingFiles)
}
cs.mu.Unlock()
log.Printf("Executing cleanup batch (size: %d)", batchSize)
batch := make([]string, batchSize)
copy(batch, cs.pendingFiles[:batchSize])
cs.pendingFiles = cs.pendingFiles[batchSize:]
cs.mu.Unlock()
log.Printf("Processing %d files for cleanup", len(batch))
var cleanedCount int
var errors []error
for _, filePath := range batch {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := cs.cleanupFile(filePath); err != nil {
errors = append(errors, fmt.Errorf("Failed to cleanup file %s: %w", filePath, err))
} else {
cleanedCount++
}
}
log.Printf("Cleanup batch completed (cleaned: %d, errors: %d)", cleanedCount, len(errors))
if len(errors) > 0 {
for i, err := range errors {
if i >= 3 {
log.Printf("... and %d more errors", len(errors)-3)
break
}
log.Printf("Error: %v", err)
}
}
return nil
}
func (cs *CleanupService) cleanupFile(filePath string) error {
info, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("Failed to get file info: %w", err)
}
if cs.config.RetentionPeriod > 0 {
if time.Since(info.ModTime()) < cs.config.RetentionPeriod {
log.Printf("File too new to cleanup: %s", filePath)
return nil
}
}
if err := os.Remove(filePath); err != nil {
return fmt.Errorf("Failed to remove file: %w", err)
}
log.Printf("File cleaned up: %s", filePath)
return nil
}
func (cs *CleanupService) GetPendingCount() int {
cs.mu.Lock()
defer cs.mu.Unlock()
return len(cs.pendingFiles)
}
func (cs *CleanupService) ForceCleanupAll(ctx context.Context) error {
log.Println("Force cleanup requested")
for {
cs.mu.Lock()
pendingCount := len(cs.pendingFiles)
cs.mu.Unlock()
if pendingCount == 0 {
break
}
if err := cs.ExecuteCleanup(ctx); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
}
}
log.Println("Force cleanup complete")
return nil
}

124
pkg/transfer/nas.go Normal file
View File

@ -0,0 +1,124 @@
package transfer
import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
)
type NASTransfer struct {
config NASConfig
connected bool
}
func NewNASTransfer(config NASConfig) *NASTransfer {
return &NASTransfer{
config: config,
}
}
func (nt *NASTransfer) TransferFile(ctx context.Context, item *TransferItem) error {
destPath := filepath.Join(nt.config.Path, item.DestinationPath)
destDir := filepath.Dir(destPath)
if err := nt.ensureDirectoryExists(destDir); err != nil {
return fmt.Errorf("Failed to create directory %s: %w", destDir, err)
}
transferCtx, cancel := context.WithTimeout(ctx, nt.config.Timeout)
defer cancel()
if err := nt.copyFile(transferCtx, item.SourcePath, destPath); err != nil {
return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err)
}
if nt.config.VerifySize {
if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil {
os.Remove(destPath)
return fmt.Errorf("Failed to verify transfer: %w", err)
}
}
log.Printf("File transfer completed: %s -> %s", item.SourcePath, destPath)
return nil
}
func (nt *NASTransfer) copyFile(ctx context.Context, srcPath, destPath string) error {
src, err := os.Open(srcPath)
if err != nil {
return fmt.Errorf("Failed to open source file: %w", err)
}
defer src.Close()
dest, err := os.Create(destPath)
if err != nil {
return fmt.Errorf("Failed to create destination file: %w", err)
}
defer dest.Close()
done := make(chan error, 1)
go func() {
_, err := io.Copy(dest, src)
done <- err
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
if err != nil {
return err
}
return dest.Sync()
}
}
func (nt *NASTransfer) VerifyTransfer(srcPath, destPath string) error {
srcInfo, err := os.Stat(srcPath)
if err != nil {
return fmt.Errorf("Failed to stat source file: %w", err)
}
destInfo, err := os.Stat(destPath)
if err != nil {
return fmt.Errorf("Failed to stat destination file: %w", err)
}
if srcInfo.Size() != destInfo.Size() {
return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size())
}
return nil
}
func (nt *NASTransfer) ensureDirectoryExists(path string) error {
if err := os.MkdirAll(path, 0755); err != nil {
return fmt.Errorf("Failed to create directory: %w", err)
}
return nil
}
func (nt *NASTransfer) TestConnection() error {
testFile := filepath.Join(nt.config.Path, ".connection_test")
f, err := os.Create(testFile)
if err != nil {
return fmt.Errorf("Failed to create test file: %w", err)
}
f.Close()
os.Remove(testFile)
nt.connected = true
log.Printf("Connected to NAS at %s", nt.config.Path)
return nil
}
func (nt *NASTransfer) IsConnected() bool {
return nt.connected
}

264
pkg/transfer/queue.go Normal file
View File

@ -0,0 +1,264 @@
package transfer
import (
"container/heap"
"context"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
)
type TransferQueue struct {
config QueueConfig
items *PriorityQueue
stats *QueueStats
nasTransfer *NASTransfer
cleanup *CleanupService
workers []chan TransferItem
mu sync.RWMutex
}
type PriorityQueue []*TransferItem
func (pq PriorityQueue) Len() int {
return len(pq)
}
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].Timestamp.After(pq[j].Timestamp)
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *PriorityQueue) Push(x interface{}) {
item := x.(*TransferItem)
*pq = append(*pq, item)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
func NewTransferQueue(config QueueConfig, nasTransfer *NASTransfer, cleanup *CleanupService) *TransferQueue {
pq := &PriorityQueue{}
heap.Init(pq)
tq := &TransferQueue{
config: config,
items: pq,
stats: &QueueStats{},
nasTransfer: nasTransfer,
cleanup: cleanup,
workers: make([]chan TransferItem, config.WorkerCount),
}
if err := tq.LoadState(); err != nil {
log.Printf("Failed to load queue state: %v", err)
}
return tq
}
func (tq *TransferQueue) Add(item TransferItem) error {
tq.mu.Lock()
defer tq.mu.Unlock()
if tq.items.Len() >= tq.config.MaxQueueSize {
return fmt.Errorf("Queue is full (max size: %d)", tq.config.MaxQueueSize)
}
heap.Push(tq.items, &item)
tq.stats.IncrementAdded()
log.Printf("Added file to queue: %s", item.SourcePath)
return nil
}
func (tq *TransferQueue) ProcessQueue(ctx context.Context) error {
for i := 0; i < tq.config.WorkerCount; i++ {
workerChan := make(chan TransferItem, 1)
tq.workers[i] = workerChan
go func(workerID int, workChan chan TransferItem) {
log.Printf("Worker %d started", workerID)
for {
select {
case <-ctx.Done():
log.Printf("Transfer worker %d shutting down...", workerID)
return
case item := <-workChan:
tq.processItem(ctx, item)
}
}
}(i, workerChan)
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Println("Transfer queue shutting down...")
return ctx.Err()
case <-ticker.C:
tq.dispatchWork()
if time.Now().Unix()%30 == 0 {
if err := tq.SaveState(); err != nil {
log.Printf("Failed to save queue state: %v", err)
}
}
}
}
}
func (tq *TransferQueue) dispatchWork() {
tq.mu.Lock()
defer tq.mu.Unlock()
for i, workerChan := range tq.workers {
if len(workerChan) == 0 && tq.items.Len() > 0 {
item := heap.Pop(tq.items).(*TransferItem)
item.Status = StatusInProgress
select {
case workerChan <- *item:
log.Printf("Dispatched file to worker %d: %s", i, item.SourcePath)
default:
heap.Push(tq.items, item)
item.Status = StatusPending
}
}
}
}
func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
if attempt > 0 {
item.Status = StatusRetrying
backoff := time.Duration(attempt*attempt) * time.Second
log.Printf("Backing off for %d seconds before retrying (attempt %d/%d)", backoff, attempt, maxRetries)
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
}
err := tq.nasTransfer.TransferFile(ctx, &item)
if err == nil {
item.Status = StatusCompleted
tq.stats.IncrementCompleted(item.FileSize)
if tq.cleanup != nil {
if err := tq.cleanup.ScheduleCleanup(item.SourcePath); err != nil {
log.Printf("Failed to add file to cleanup list: %v", err)
}
}
log.Printf("File transfer completed: %s", item.SourcePath)
return
}
item.LastError = err.Error()
item.RetryCount++
log.Printf("File transfer failed: %s (attempt %d/%d): %v", item.SourcePath, item.RetryCount, maxRetries, err)
if attempt == maxRetries {
item.Status = StatusFailed
tq.stats.IncrementFailed()
log.Printf("Transfer permanently failed for file: %s", item.SourcePath)
return
}
}
}
func (tq *TransferQueue) SaveState() error {
tq.mu.Lock()
defer tq.mu.Unlock()
items := make([]*TransferItem, tq.items.Len())
tempPQ := make(PriorityQueue, tq.items.Len())
copy(tempPQ, *tq.items)
for i := 0; i < len(items); i++ {
items[i] = heap.Pop(&tempPQ).(*TransferItem)
}
data, err := json.MarshalIndent(map[string]interface{}{
"items": items,
"stats": tq.stats,
"timestamp": time.Now(),
}, "", " ")
if err != nil {
return fmt.Errorf("Failed to marshal queue state: %w", err)
}
if err := os.WriteFile(tq.config.PersistencePath, data, 0644); err != nil {
return fmt.Errorf("Failed to save queue state: %w", err)
}
return nil
}
func (tq *TransferQueue) LoadState() error {
data, err := os.ReadFile(tq.config.PersistencePath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("Failed to load queue state: %w", err)
}
var state struct {
Items []*TransferItem `json:"items"`
Stats *QueueStats `json:"stats"`
Timestamp time.Time `json:"timestamp"`
}
if err := json.Unmarshal(data, &state); err != nil {
return fmt.Errorf("Failed to load queue state: %w", err)
}
tq.mu.Lock()
defer tq.mu.Unlock()
for _, item := range state.Items {
if item.Status == StatusPending || item.Status == StatusFailed {
heap.Push(tq.items, item)
}
}
if state.Stats != nil {
tq.stats = state.Stats
}
log.Printf("Loaded queue state: %d items restored from %v",
tq.items.Len(), state.Timestamp.Format(time.RFC3339))
return nil
}
func (tq *TransferQueue) GetStats() (int, int, int, int, int64) {
return tq.stats.GetStats()
}
func (tq *TransferQueue) GetQueueSize() int {
tq.mu.RLock()
defer tq.mu.RUnlock()
return tq.items.Len()
}

136
pkg/transfer/service.go Normal file
View File

@ -0,0 +1,136 @@
package transfer
import (
"context"
"fmt"
"log"
"m3u8-downloader/pkg/constants"
"sync"
"time"
)
type TransferService struct {
watcher *FileWatcher
queue *TransferQueue
nas *NASTransfer
cleanup *CleanupService
stats *QueueStats
}
func NewTrasferService(outputDir string) (*TransferService, error) {
nasConfig := NASConfig{
Path: constants.NASPath,
Username: constants.NASUsername,
Password: constants.NASPassword,
Timeout: constants.TransferTimeout,
RetryLimit: constants.TransferRetryLimit,
VerifySize: true,
}
nas := NewNASTransfer(nasConfig)
if err := nas.TestConnection(); err != nil {
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
}
cleanupConfig := CleanupConfig{
Enabled: constants.CleanupAfterTransfer,
RetentionPeriod: constants.RetainLocalHours,
BatchSize: constants.CleanupBatchSize,
CheckInterval: constants.FileSettlingDelay,
}
cleanup := NewCleanupService(cleanupConfig)
queueConfig := QueueConfig{
WorkerCount: constants.TransferWorkerCount,
PersistencePath: constants.PersistencePath,
MaxQueueSize: constants.TransferQueueSize,
BatchSize: constants.BatchSize,
}
queue := NewTransferQueue(queueConfig, nas, cleanup)
watcher, err := NewFileWatcher(outputDir, queue)
if err != nil {
return nil, fmt.Errorf("Failed to create file watcher: %w", err)
}
return &TransferService{
watcher: watcher,
queue: queue,
nas: nas,
cleanup: cleanup,
stats: queue.stats,
}, nil
}
func (ts *TransferService) Start(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := ts.cleanup.Start(ctx); err != nil && err != context.Canceled {
log.Printf("Cleanup error: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := ts.watcher.Start(ctx); err != nil && err != context.Canceled {
log.Printf("Watcher error: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := ts.queue.ProcessQueue(ctx); err != nil && err != context.Canceled {
log.Printf("Queue error: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
ts.reportStats(ctx)
}()
log.Println("Transfer service started")
wg.Wait()
return nil
}
func (ts *TransferService) reportStats(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
added, completed, failed, pending, bytes := ts.stats.GetStats()
queueSize := ts.queue.GetQueueSize()
cleanupPending := ts.cleanup.GetPendingCount()
log.Printf("Transfer Stats: Added: %d, Completed: %d, Failed: %d, Pending: %d, Bytes: %d, Queue Size: %d, Cleanup Pending: %d", added, completed, failed, pending, bytes, queueSize, cleanupPending)
}
}
}
func (ts *TransferService) Shutdown(ctx context.Context) error {
log.Println("Shutting down transfer service...")
if err := ts.queue.SaveState(); err != nil {
return fmt.Errorf("Failed to save queue state: %w", err)
}
if err := ts.cleanup.ForceCleanupAll(ctx); err != nil {
return fmt.Errorf("Failed to force cleanup: %w", err)
}
log.Println("Transfer service shut down")
return nil
}

105
pkg/transfer/types.go Normal file
View File

@ -0,0 +1,105 @@
package transfer
import (
"sync"
"time"
)
type TransferItem struct {
ID string
SourcePath string
DestinationPath string
Resolution string
Timestamp time.Time
RetryCount int
Status TransferStatus
FileSize int64
LastError string
}
type TransferStatus int
const (
StatusPending TransferStatus = iota
StatusInProgress
StatusCompleted
StatusFailed
StatusRetrying
)
func (s TransferStatus) String() string {
switch s {
case StatusPending:
return "Pending"
case StatusInProgress:
return "In Progress"
case StatusCompleted:
return "Completed"
case StatusFailed:
return "Failed"
case StatusRetrying:
return "Retrying"
default:
return "Unknown"
}
}
type NASConfig struct {
Path string
Username string
Password string
Timeout time.Duration
RetryLimit int
VerifySize bool
}
type QueueConfig struct {
WorkerCount int
PersistencePath string
MaxQueueSize int
BatchSize int
}
type CleanupConfig struct {
Enabled bool
RetentionPeriod time.Duration
BatchSize int
CheckInterval time.Duration
}
type QueueStats struct {
mu sync.Mutex
TotalAdded int
TotalCompleted int
TotalFailed int
CurrentPending int
BytesTransferred int64
}
func (qs *QueueStats) IncrementAdded() {
qs.mu.Lock()
defer qs.mu.Unlock()
qs.TotalAdded++
qs.CurrentPending++
}
func (qs *QueueStats) IncrementCompleted(bytes int64) {
qs.mu.Lock()
defer qs.mu.Unlock()
qs.TotalCompleted++
qs.CurrentPending--
qs.BytesTransferred += bytes
}
func (qs *QueueStats) IncrementFailed() {
qs.mu.Lock()
defer qs.mu.Unlock()
qs.TotalFailed++
qs.CurrentPending--
}
func (qs *QueueStats) GetStats() (int, int, int, int, int64) {
qs.mu.Lock()
defer qs.mu.Unlock()
return qs.TotalAdded, qs.TotalCompleted, qs.TotalFailed, qs.CurrentPending, qs.BytesTransferred
}

190
pkg/transfer/watcher.go Normal file
View File

@ -0,0 +1,190 @@
package transfer
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
)
type FileWatcher struct {
outputDir string
queue *TransferQueue
watcher *fsnotify.Watcher
settingDelay time.Duration
pendingFiles map[string]*time.Timer
mu sync.Mutex
}
func NewFileWatcher(outputDir string, queue *TransferQueue) (*FileWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return &FileWatcher{
outputDir: outputDir,
queue: queue,
watcher: watcher,
settingDelay: time.Second,
pendingFiles: make(map[string]*time.Timer),
}, nil
}
func (fw *FileWatcher) Start(ctx context.Context) error {
defer fw.watcher.Close()
if err := fw.addWatchRecursive(fw.outputDir); err != nil {
return fmt.Errorf("Failed to add watch paths: %w", err)
}
log.Printf("Starting file watcher on %s", fw.outputDir)
for {
select {
case <-ctx.Done():
log.Println("File watcher shutting down...")
return ctx.Err()
case event, ok := <-fw.watcher.Events:
if !ok {
return fmt.Errorf("Watcher events channel closed")
}
fw.handleFileEvent(event)
case err, ok := <-fw.watcher.Errors:
if !ok {
return fmt.Errorf("Watcher errors channel closed")
}
log.Printf("Watcher error: %v", err)
}
}
}
func (fw *FileWatcher) addWatchRecursive(root string) error {
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Printf("Error walking path %s: %v", path, err)
return nil
}
if info.IsDir() {
if err := fw.watcher.Add(path); err != nil {
log.Printf("Failed to watch directory %s: %v", path, err)
} else {
log.Printf("Watching directory %s", path)
}
}
return nil
})
}
func (fw *FileWatcher) handleFileEvent(event fsnotify.Event) {
if !strings.HasSuffix(strings.ToLower(event.Name), ".ts") {
return
}
switch {
case event.Op&fsnotify.Create == fsnotify.Create:
fw.scheduleTransfer(event.Name)
case event.Op&fsnotify.Write == fsnotify.Write:
fw.scheduleTransfer(event.Name)
case event.Op&fsnotify.Remove == fsnotify.Remove:
fw.cancelPendingTransfer(event.Name)
}
if event.Op&fsnotify.Create == fsnotify.Create {
if info, err := os.Stat(event.Name); err == nil && info.IsDir() {
if err := fw.watcher.Add(event.Name); err != nil {
log.Printf("Failed to watch directory %s: %v", event.Name, err)
} else {
log.Printf("Watching directory %s", event.Name)
}
}
}
}
func (fw *FileWatcher) scheduleTransfer(filePath string) {
fw.mu.Lock()
defer fw.mu.Unlock()
if timer, exists := fw.pendingFiles[filePath]; exists {
timer.Stop()
}
fw.pendingFiles[filePath] = time.AfterFunc(fw.settingDelay, func() {
fw.processFile(filePath)
fw.mu.Lock()
delete(fw.pendingFiles, filePath)
fw.mu.Unlock()
})
log.Printf("Scheduled file for transfer: %s", filePath)
}
func (fw *FileWatcher) cancelPendingTransfer(filePath string) {
fw.mu.Lock()
defer fw.mu.Unlock()
if timer, exists := fw.pendingFiles[filePath]; exists {
timer.Stop()
delete(fw.pendingFiles, filePath)
log.Printf("Canceled pending transfer for file: %s", filePath)
}
}
func (fw *FileWatcher) processFile(filePath string) {
info, err := os.Stat(filePath)
if err != nil {
log.Printf("Failed to stat file %s: %v", filePath, err)
return
}
resolution := fw.extractResolution(filePath)
relPath, err := filepath.Rel(fw.outputDir, filePath)
if err != nil {
log.Printf("Failed to get relative path for file %s: %v", filePath, err)
return
}
item := TransferItem{
ID: generateID(),
SourcePath: filePath,
DestinationPath: relPath,
Resolution: resolution,
Timestamp: time.Now(),
Status: StatusPending,
FileSize: info.Size(),
}
if err := fw.queue.Add(item); err != nil {
log.Printf("Failed to add file to queue: %v", err)
} else {
log.Printf("Added file to queue: %s", filePath)
}
}
func (fw *FileWatcher) extractResolution(filePath string) string {
dir := filepath.Dir(filePath)
parts := strings.Split(dir, string(filepath.Separator))
for _, part := range parts {
if strings.HasSuffix(part, "p") {
return part
}
}
return ""
}
func generateID() string {
return fmt.Sprintf("transfer_%d_%d", time.Now().UnixNano(), rand.Intn(1000))
}