diff --git a/cmd/processor/process.go b/cmd/processor/process.go index 6a748aa..2cbcd38 100644 --- a/cmd/processor/process.go +++ b/cmd/processor/process.go @@ -7,7 +7,7 @@ import ( ) func Process(eventName string) { - log.Printf("Starting processing for event: %s", eventName) + //log.Printf("Starting processing for event: %s", eventName) ps, err := processing.NewProcessingService(eventName) if err != nil { log.Fatalf("Failed to create processing service: %v", err) diff --git a/pkg/nas/nas.go b/pkg/nas/nas.go index 33b6753..f748c77 100644 --- a/pkg/nas/nas.go +++ b/pkg/nas/nas.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "log" - "m3u8-downloader/pkg/transfer" "os" "os/exec" "path/filepath" @@ -13,55 +12,28 @@ import ( ) type NASService struct { - config NASConfig + Config NASConfig connected bool } func NewNASService(config NASConfig) *NASService { nt := &NASService{ - config: config, + Config: config, } // Establish network connection with credentials before accessing the path - if err := nt.establishConnection(); err != nil { - log.Fatalf("Failed to establish network connection to %s: %v", nt.config.Path, err) + if err := nt.EstablishConnection(); err != nil { + log.Fatalf("Failed to establish network connection to %s: %v", nt.Config.Path, err) } - err := nt.ensureDirectoryExists(nt.config.Path) + err := nt.EnsureDirectoryExists(nt.Config.Path) if err != nil { - log.Fatalf("Failed to create directory %s: %v", nt.config.Path, err) + log.Fatalf("Failed to create directory %s: %v", nt.Config.Path, err) } return nt } -func (nt *NASService) TransferFile(ctx context.Context, item *transfer.TransferItem) error { - destPath := filepath.Join(nt.config.Path, item.DestinationPath) - - destDir := filepath.Dir(destPath) - if err := nt.ensureDirectoryExists(destDir); err != nil { - return fmt.Errorf("Failed to create directory %s: %w", destDir, err) - } - - transferCtx, cancel := context.WithTimeout(ctx, nt.config.Timeout) - defer cancel() - - if err := nt.copyFile(transferCtx, item.SourcePath, destPath); err != nil { - return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err) - } - - if nt.config.VerifySize { - if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil { - os.Remove(destPath) - return fmt.Errorf("Failed to verify transfer: %w", err) - } - } - - log.Printf("File transfer completed: %s -> %s", item.SourcePath, destPath) - - return nil -} - -func (nt *NASService) copyFile(ctx context.Context, srcPath, destPath string) error { +func (nt *NASService) CopyFile(ctx context.Context, srcPath, destPath string) error { src, err := os.Open(srcPath) if err != nil { return fmt.Errorf("Failed to open source file: %w", err) @@ -110,27 +82,27 @@ func (nt *NASService) VerifyTransfer(srcPath, destPath string) error { return nil } -func (nt *NASService) ensureDirectoryExists(path string) error { +func (nt *NASService) EnsureDirectoryExists(path string) error { if err := os.MkdirAll(path, 0755); err != nil { return fmt.Errorf("Failed to create directory: %w", err) } return nil } -func (nt *NASService) establishConnection() error { +func (nt *NASService) EstablishConnection() error { // Extract the network path (\\server\share) from the full path - networkPath := nt.extractNetworkPath(nt.config.Path) + networkPath := nt.ExtractNetworkPath(nt.Config.Path) if networkPath == "" { // Local path, no authentication needed return nil } - log.Printf("Establishing network connection to %s with user %s", networkPath, nt.config.Username) + log.Printf("Establishing network connection to %s with user %s", networkPath, nt.Config.Username) // Use Windows net use command to establish authenticated connection var cmd *exec.Cmd - if nt.config.Username != "" && nt.config.Password != "" { - cmd = exec.Command("net", "use", networkPath, nt.config.Password, "/user:"+nt.config.Username, "/persistent:no") + if nt.Config.Username != "" && nt.Config.Password != "" { + cmd = exec.Command("net", "use", networkPath, nt.Config.Password, "/user:"+nt.Config.Username, "/persistent:no") } else { cmd = exec.Command("net", "use", networkPath, "/persistent:no") } @@ -144,7 +116,7 @@ func (nt *NASService) establishConnection() error { return nil } -func (nt *NASService) extractNetworkPath(fullPath string) string { +func (nt *NASService) ExtractNetworkPath(fullPath string) string { // Extract \\server\share from paths like \\server\share\folder\subfolder if !strings.HasPrefix(fullPath, "\\\\") { return "" // Not a UNC path @@ -159,7 +131,7 @@ func (nt *NASService) extractNetworkPath(fullPath string) string { } func (nt *NASService) TestConnection() error { - testFile := filepath.Join(nt.config.Path, ".connection_test") + testFile := filepath.Join(nt.Config.Path, ".connection_test") f, err := os.Create(testFile) if err != nil { @@ -170,7 +142,7 @@ func (nt *NASService) TestConnection() error { os.Remove(testFile) nt.connected = true - log.Printf("Connected to NAS at %s", nt.config.Path) + log.Printf("Connected to NAS at %s", nt.Config.Path) return nil } @@ -180,7 +152,7 @@ func (nt *NASService) IsConnected() bool { // Disconnect removes the network connection func (nt *NASService) Disconnect() error { - networkPath := nt.extractNetworkPath(nt.config.Path) + networkPath := nt.ExtractNetworkPath(nt.Config.Path) if networkPath == "" { return nil // Local path, nothing to disconnect } @@ -200,7 +172,7 @@ func (nt *NASService) Disconnect() error { // FileExists checks if a file already exists on the NAS and optionally verifies size func (nt *NASService) FileExists(destinationPath string, expectedSize int64) (bool, error) { - fullDestPath := filepath.Join(nt.config.Path, destinationPath) + fullDestPath := filepath.Join(nt.Config.Path, destinationPath) destInfo, err := os.Stat(fullDestPath) if err != nil { @@ -222,7 +194,7 @@ func (nt *NASService) FileExists(destinationPath string, expectedSize int64) (bo // GetFileSize returns the size of a file on the NAS func (nt *NASService) GetFileSize(destinationPath string) (int64, error) { - fullDestPath := filepath.Join(nt.config.Path, destinationPath) + fullDestPath := filepath.Join(nt.Config.Path, destinationPath) destInfo, err := os.Stat(fullDestPath) if err != nil { diff --git a/pkg/processing/service.go b/pkg/processing/service.go index e27d91f..a09d830 100644 --- a/pkg/processing/service.go +++ b/pkg/processing/service.go @@ -1,6 +1,7 @@ package processing import ( + "bufio" "context" "fmt" "log" @@ -10,6 +11,7 @@ import ( "os/exec" "regexp" "strconv" + "strings" "sync" ) @@ -24,6 +26,7 @@ func NewProcessingService(eventName string) (*ProcessingService, error) { SourcePath: constants.NASOutputPath + "/" + eventName, DestinationPath: constants.ProcessOutputPath, Enabled: constants.ProcessingEnabled, + EventName: eventName, } nasConfig := &nas.NASConfig{ @@ -51,12 +54,59 @@ func (ps *ProcessingService) GetSegmentInfo() (map[int]string, error) { return nil, nil } +func (ps *ProcessingService) GetEventDirs() ([]string, error) { + if ps.config.EventName == "" { + dirs, err := os.ReadDir(ps.config.SourcePath) + if err != nil { + return nil, fmt.Errorf("Failed to read directory: %w", err) + } + var eventDirs []string + for _, dir := range dirs { + if dir.IsDir() { + eventDirs = append(eventDirs, dir.Name()) + } + } + return eventDirs, nil + } else { + return []string{ps.config.EventName}, nil + } +} + func (ps *ProcessingService) Start(ctx context.Context) error { if !ps.config.Enabled { log.Println("Processing service disabled") return nil } + if ps.config.EventName == "" { + events, err := ps.GetEventDirs() + if err != nil { + return fmt.Errorf("Failed to get event directories: %w", err) + } + if len(events) == 0 { + return fmt.Errorf("No events found") + } + if len(events) > 1 { + fmt.Println("Multiple events found, please select one:") + for i, event := range events { + fmt.Printf("%d. %s\n", i+1, event) + } + reader := bufio.NewReader(os.Stdin) + input, _ := reader.ReadString('\n') + input = strings.TrimSpace(input) + index, err := strconv.Atoi(input) + if err != nil { + return fmt.Errorf("Failed to parse input: %w", err) + } + if index < 1 || index > len(events) { + return fmt.Errorf("Invalid input") + } + ps.config.EventName = events[index-1] + } else { + ps.config.EventName = events[0] + } + } + //Get all present resolutions dirs, err := ps.GetResolutions() if err != nil { diff --git a/pkg/processing/types.go b/pkg/processing/types.go index 5d11f22..99f0de1 100644 --- a/pkg/processing/types.go +++ b/pkg/processing/types.go @@ -9,4 +9,5 @@ type ProcessConfig struct { SourcePath string DestinationPath string Enabled bool + EventName string } diff --git a/pkg/transfer/nas.go b/pkg/transfer/nas.go index 3e884d3..5490387 100644 --- a/pkg/transfer/nas.go +++ b/pkg/transfer/nas.go @@ -1 +1,37 @@ package transfer + +import ( + "context" + "fmt" + "log" + "m3u8-downloader/pkg/nas" + "os" + "path/filepath" +) + +func TransferFile(nt *nas.NASService, ctx context.Context, item *TransferItem) error { + destPath := filepath.Join(nt.Config.Path, item.DestinationPath) + + destDir := filepath.Dir(destPath) + if err := nt.EnsureDirectoryExists(destDir); err != nil { + return fmt.Errorf("Failed to create directory %s: %w", destDir, err) + } + + transferCtx, cancel := context.WithTimeout(ctx, nt.Config.Timeout) + defer cancel() + + if err := nt.CopyFile(transferCtx, item.SourcePath, destPath); err != nil { + return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err) + } + + if nt.Config.VerifySize { + if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil { + os.Remove(destPath) + return fmt.Errorf("Failed to verify transfer: %w", err) + } + } + + log.Printf("File transfer completed: %s -> %s", item.SourcePath, destPath) + + return nil +} diff --git a/pkg/transfer/queue.go b/pkg/transfer/queue.go index 0ff7ef9..055e816 100644 --- a/pkg/transfer/queue.go +++ b/pkg/transfer/queue.go @@ -13,13 +13,13 @@ import ( ) type TransferQueue struct { - config QueueConfig - items *PriorityQueue - stats *QueueStats - nasTransfer *nas.NASService - cleanup *CleanupService - workers []chan TransferItem - mu sync.RWMutex + config QueueConfig + items *PriorityQueue + stats *QueueStats + nasService *nas.NASService + cleanup *CleanupService + workers []chan TransferItem + mu sync.RWMutex } type PriorityQueue []*TransferItem @@ -54,12 +54,12 @@ func NewTransferQueue(config QueueConfig, nasTransfer *nas.NASService, cleanup * heap.Init(pq) tq := &TransferQueue{ - config: config, - items: pq, - stats: &QueueStats{}, - nasTransfer: nasTransfer, - cleanup: cleanup, - workers: make([]chan TransferItem, config.WorkerCount), + config: config, + items: pq, + stats: &QueueStats{}, + nasService: nasTransfer, + cleanup: cleanup, + workers: make([]chan TransferItem, config.WorkerCount), } if err := tq.LoadState(); err != nil { @@ -147,7 +147,7 @@ func (tq *TransferQueue) dispatchWork() { func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) { // Check if file already exists on NAS before attempting transfer - if exists, err := tq.nasTransfer.FileExists(item.DestinationPath, item.FileSize); err != nil { + if exists, err := tq.nasService.FileExists(item.DestinationPath, item.FileSize); err != nil { log.Printf("Failed to check if file exists on NAS for %s: %v", item.SourcePath, err) // Continue with transfer attempt on error } else if exists { @@ -179,7 +179,7 @@ func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) { } } - err := tq.nasTransfer.TransferFile(ctx, &item) + err := TransferFile(tq.nasService, ctx, &item) if err == nil { item.Status = StatusCompleted tq.stats.IncrementCompleted(item.FileSize)