diff --git a/cmd/main/main.go b/cmd/main/main.go index d0e641b..f2f3521 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -27,7 +27,7 @@ func main() { } if *processOnly { - processor.Process() + processor.Process(*eventName) return } diff --git a/cmd/processor/process.go b/cmd/processor/process.go index a1c6049..6a748aa 100644 --- a/cmd/processor/process.go +++ b/cmd/processor/process.go @@ -1,15 +1,18 @@ package processor import ( + "context" + "log" "m3u8-downloader/pkg/processing" ) -func Process() { - - config := processing.ProcessConfig{ - WorkerCount: 4, - DestinationPath: "/Users/andrey/Downloads", - Enabled: true, +func Process(eventName string) { + log.Printf("Starting processing for event: %s", eventName) + ps, err := processing.NewProcessingService(eventName) + if err != nil { + log.Fatalf("Failed to create processing service: %v", err) + } + if err := ps.Start(context.Background()); err != nil { + log.Fatalf("Failed to run processing service: %v", err) } - processing.NewProcessingService(&config, nil).Start(nil) } diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 5abfed7..aa6da22 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -10,7 +10,7 @@ const ( REFERRER = "https://www.flomarching.com" LocalOutputDirPath = "../data/" - EnableNASTransfer = true + EnableNASTransfer = false NASOutputPath = "\\\\HomeLabNAS\\dci\\streams" NASUsername = "NASAdmin" NASPassword = "s3tkY6tzA&KN6M" @@ -23,10 +23,11 @@ const ( BatchSize = 1000 ManifestPath = "../data" - CleanupAfterTransfer = true + CleanupAfterTransfer = false CleanupBatchSize = 1000 RetainLocalHours = 0 + ProcessOutputPath = "../out" AutoProcess = true ProcessingEnabled = true ProcessWorkerCount = 2 diff --git a/pkg/nas/nas.go b/pkg/nas/nas.go new file mode 100644 index 0000000..33b6753 --- /dev/null +++ b/pkg/nas/nas.go @@ -0,0 +1,233 @@ +package nas + +import ( + "context" + "fmt" + "io" + "log" + "m3u8-downloader/pkg/transfer" + "os" + "os/exec" + "path/filepath" + "strings" +) + +type NASService struct { + config NASConfig + connected bool +} + +func NewNASService(config NASConfig) *NASService { + nt := &NASService{ + config: config, + } + + // Establish network connection with credentials before accessing the path + if err := nt.establishConnection(); err != nil { + log.Fatalf("Failed to establish network connection to %s: %v", nt.config.Path, err) + } + + err := nt.ensureDirectoryExists(nt.config.Path) + if err != nil { + log.Fatalf("Failed to create directory %s: %v", nt.config.Path, err) + } + return nt +} + +func (nt *NASService) TransferFile(ctx context.Context, item *transfer.TransferItem) error { + destPath := filepath.Join(nt.config.Path, item.DestinationPath) + + destDir := filepath.Dir(destPath) + if err := nt.ensureDirectoryExists(destDir); err != nil { + return fmt.Errorf("Failed to create directory %s: %w", destDir, err) + } + + transferCtx, cancel := context.WithTimeout(ctx, nt.config.Timeout) + defer cancel() + + if err := nt.copyFile(transferCtx, item.SourcePath, destPath); err != nil { + return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err) + } + + if nt.config.VerifySize { + if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil { + os.Remove(destPath) + return fmt.Errorf("Failed to verify transfer: %w", err) + } + } + + log.Printf("File transfer completed: %s -> %s", item.SourcePath, destPath) + + return nil +} + +func (nt *NASService) copyFile(ctx context.Context, srcPath, destPath string) error { + src, err := os.Open(srcPath) + if err != nil { + return fmt.Errorf("Failed to open source file: %w", err) + } + defer src.Close() + + dest, err := os.Create(destPath) + if err != nil { + return fmt.Errorf("Failed to create destination file: %w", err) + } + defer dest.Close() + + done := make(chan error, 1) + go func() { + _, err := io.Copy(dest, src) + done <- err + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-done: + if err != nil { + return err + } + + return dest.Sync() + } +} + +func (nt *NASService) VerifyTransfer(srcPath, destPath string) error { + srcInfo, err := os.Stat(srcPath) + if err != nil { + return fmt.Errorf("Failed to stat source file: %w", err) + } + + destInfo, err := os.Stat(destPath) + if err != nil { + return fmt.Errorf("Failed to stat destination file: %w", err) + } + + if srcInfo.Size() != destInfo.Size() { + return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size()) + } + + return nil +} + +func (nt *NASService) ensureDirectoryExists(path string) error { + if err := os.MkdirAll(path, 0755); err != nil { + return fmt.Errorf("Failed to create directory: %w", err) + } + return nil +} + +func (nt *NASService) establishConnection() error { + // Extract the network path (\\server\share) from the full path + networkPath := nt.extractNetworkPath(nt.config.Path) + if networkPath == "" { + // Local path, no authentication needed + return nil + } + + log.Printf("Establishing network connection to %s with user %s", networkPath, nt.config.Username) + + // Use Windows net use command to establish authenticated connection + var cmd *exec.Cmd + if nt.config.Username != "" && nt.config.Password != "" { + cmd = exec.Command("net", "use", networkPath, nt.config.Password, "/user:"+nt.config.Username, "/persistent:no") + } else { + cmd = exec.Command("net", "use", networkPath, "/persistent:no") + } + + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to establish network connection: %w\nOutput: %s", err, string(output)) + } + + log.Printf("Network connection established successfully") + return nil +} + +func (nt *NASService) extractNetworkPath(fullPath string) string { + // Extract \\server\share from paths like \\server\share\folder\subfolder + if !strings.HasPrefix(fullPath, "\\\\") { + return "" // Not a UNC path + } + + parts := strings.Split(fullPath[2:], "\\") // Remove leading \\ + if len(parts) < 2 { + return "" // Invalid UNC path + } + + return "\\\\" + parts[0] + "\\" + parts[1] +} + +func (nt *NASService) TestConnection() error { + testFile := filepath.Join(nt.config.Path, ".connection_test") + + f, err := os.Create(testFile) + if err != nil { + return fmt.Errorf("Failed to create test file: %w", err) + } + f.Close() + + os.Remove(testFile) + + nt.connected = true + log.Printf("Connected to NAS at %s", nt.config.Path) + return nil +} + +func (nt *NASService) IsConnected() bool { + return nt.connected +} + +// Disconnect removes the network connection +func (nt *NASService) Disconnect() error { + networkPath := nt.extractNetworkPath(nt.config.Path) + if networkPath == "" { + return nil // Local path, nothing to disconnect + } + + cmd := exec.Command("net", "use", networkPath, "/delete") + output, err := cmd.CombinedOutput() + if err != nil { + log.Printf("Warning: failed to disconnect from %s: %v\nOutput: %s", networkPath, err, string(output)) + // Don't return error since this is cleanup + } else { + log.Printf("Disconnected from network path: %s", networkPath) + } + + nt.connected = false + return nil +} + +// FileExists checks if a file already exists on the NAS and optionally verifies size +func (nt *NASService) FileExists(destinationPath string, expectedSize int64) (bool, error) { + fullDestPath := filepath.Join(nt.config.Path, destinationPath) + + destInfo, err := os.Stat(fullDestPath) + if err != nil { + if os.IsNotExist(err) { + return false, nil // File doesn't exist, no error + } + return false, fmt.Errorf("failed to stat NAS file: %w", err) + } + + // File exists, check size if expected size is provided + if expectedSize > 0 && destInfo.Size() != expectedSize { + log.Printf("NAS file size mismatch for %s: expected=%d, actual=%d", + fullDestPath, expectedSize, destInfo.Size()) + return false, nil // File exists but wrong size, treat as not existing + } + + return true, nil +} + +// GetFileSize returns the size of a file on the NAS +func (nt *NASService) GetFileSize(destinationPath string) (int64, error) { + fullDestPath := filepath.Join(nt.config.Path, destinationPath) + + destInfo, err := os.Stat(fullDestPath) + if err != nil { + return 0, fmt.Errorf("failed to stat NAS file: %w", err) + } + + return destInfo.Size(), nil +} diff --git a/pkg/processing/segment.go b/pkg/processing/segment.go new file mode 100644 index 0000000..89eb23c --- /dev/null +++ b/pkg/processing/segment.go @@ -0,0 +1,7 @@ +package processing + +type SegmentInfo struct { + Name string + SeqNo int + Resolution string +} diff --git a/pkg/processing/service.go b/pkg/processing/service.go index e643ce0..e27d91f 100644 --- a/pkg/processing/service.go +++ b/pkg/processing/service.go @@ -2,21 +2,53 @@ package processing import ( "context" + "fmt" "log" + "m3u8-downloader/pkg/constants" "m3u8-downloader/pkg/nas" + "os" + "os/exec" + "regexp" + "strconv" "sync" ) type ProcessingService struct { config *ProcessConfig - nas *nas.NASConfig + nas *nas.NASService } -func NewProcessingService(config *ProcessConfig, nas *nas.NASConfig) *ProcessingService { +func NewProcessingService(eventName string) (*ProcessingService, error) { + config := &ProcessConfig{ + WorkerCount: constants.ProcessWorkerCount, + SourcePath: constants.NASOutputPath + "/" + eventName, + DestinationPath: constants.ProcessOutputPath, + Enabled: constants.ProcessingEnabled, + } + + nasConfig := &nas.NASConfig{ + Path: constants.NASOutputPath, + Username: constants.NASUsername, + Password: constants.NASPassword, + Timeout: constants.TransferTimeout, + RetryLimit: constants.TransferRetryLimit, + VerifySize: true, + } + + nasService := nas.NewNASService(*nasConfig) + + if err := nasService.TestConnection(); err != nil { + return nil, fmt.Errorf("Failed to connect to NAS: %w", err) + } + return &ProcessingService{ config: config, - nas: nas, - } + nas: nasService, + }, nil +} + +func (ps *ProcessingService) GetSegmentInfo() (map[int]string, error) { + return nil, nil } func (ps *ProcessingService) Start(ctx context.Context) error { @@ -25,17 +57,132 @@ func (ps *ProcessingService) Start(ctx context.Context) error { return nil } + //Get all present resolutions + dirs, err := ps.GetResolutions() + if err != nil { + return fmt.Errorf("Failed to get resolutions: %w", err) + } + + //Spawn a worker per resolution + ch := make(chan SegmentInfo, 100) var wg sync.WaitGroup - wg.Add(1) + for _, resolution := range dirs { + wg.Add(1) + go ps.ParseResolutionDirectory(resolution, ch, &wg) + } go func() { - defer wg.Done() + wg.Wait() + close(ch) }() - wg.Wait() + + segments, err := ps.AggregateSegmentInfo(ch) + if err != nil { + return fmt.Errorf("Failed to aggregate segment info: %w", err) + } + + ps.WriteConcatFile(segments) + + //Feed info to ffmpeg to stitch files together + concatErr := ps.RunFFmpeg(ps.config.SourcePath, ps.config.DestinationPath) + if concatErr != nil { + return concatErr + } return nil } -func (ps *ProcessingService) ProcessEvent(eventName string) error { +func (ps *ProcessingService) GetResolutions() ([]string, error) { + dirs, err := os.ReadDir(ps.config.SourcePath) + if err != nil { + return nil, fmt.Errorf("Failed to read source directory: %w", err) + } + + re := regexp.MustCompile(`^\d+p$`) + + var resolutions []string + for _, dir := range dirs { + if dir.IsDir() && re.MatchString(dir.Name()) { + resolutions = append(resolutions, dir.Name()) + } + } + + return resolutions, nil +} + +func (ps *ProcessingService) ParseResolutionDirectory(resolution string, ch chan<- SegmentInfo, wg *sync.WaitGroup) { + defer wg.Done() + + files, err := os.ReadDir(ps.config.SourcePath + "/" + resolution) + if err != nil { + log.Printf("Failed to read resolution directory: %v", err) + return + } + + for _, file := range files { + if !file.IsDir() { + no, err := strconv.Atoi(file.Name()[7:11]) + if err != nil { + log.Printf("Failed to parse segment number: %v", err) + continue + } + ch <- SegmentInfo{ + Name: file.Name(), + SeqNo: no, + Resolution: resolution, + } + } + } +} + +func (ps *ProcessingService) AggregateSegmentInfo(ch <-chan SegmentInfo) (map[int]SegmentInfo, error) { + segmentMap := make(map[int]SegmentInfo) + + rank := map[string]int{ + "1080p": 1, + "720p": 2, + "540p": 3, + "480p": 4, + "450p": 5, + "360p": 6, + "270p": 7, + "240p": 8, + } + + for segment := range ch { + current, exists := segmentMap[segment.SeqNo] + if !exists || rank[segment.Resolution] > rank[current.Resolution] { + segmentMap[segment.SeqNo] = segment + } + } + + return segmentMap, nil +} + +func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) error { + f, err := os.Create(ps.config.DestinationPath + "/concat.txt") + if err != nil { + return fmt.Errorf("Failed to create concat file: %w", err) + } + defer f.Close() + + for _, segment := range segmentMap { + _, err = f.WriteString("file '" + ps.config.SourcePath + "/" + segment.Resolution + "/" + segment.Name + "'\n") + if err != nil { + return fmt.Errorf("Failed to write to concat file: %w", err) + } + } + + return nil +} + +func (ps *ProcessingService) RunFFmpeg(inputPath, outputPath string) error { + cmd := exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", outputPath) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + return fmt.Errorf("Failed to run ffmpeg: %w", err) + } return nil } diff --git a/pkg/processing/types.go b/pkg/processing/types.go index d29a70d..5d11f22 100644 --- a/pkg/processing/types.go +++ b/pkg/processing/types.go @@ -6,6 +6,7 @@ type ProcessJob struct { type ProcessConfig struct { WorkerCount int + SourcePath string DestinationPath string Enabled bool } diff --git a/pkg/transfer/nas.go b/pkg/transfer/nas.go index ecbc146..3e884d3 100644 --- a/pkg/transfer/nas.go +++ b/pkg/transfer/nas.go @@ -1,233 +1 @@ package transfer - -import ( - "context" - "fmt" - "io" - "log" - "m3u8-downloader/pkg/nas" - "os" - "os/exec" - "path/filepath" - "strings" -) - -type NASTransfer struct { - config nas.NASConfig - connected bool -} - -func NewNASTransfer(config nas.NASConfig) *NASTransfer { - nt := &NASTransfer{ - config: config, - } - - // Establish network connection with credentials before accessing the path - if err := nt.establishConnection(); err != nil { - log.Fatalf("Failed to establish network connection to %s: %v", nt.config.Path, err) - } - - err := nt.ensureDirectoryExists(nt.config.Path) - if err != nil { - log.Fatalf("Failed to create directory %s: %v", nt.config.Path, err) - } - return nt -} - -func (nt *NASTransfer) TransferFile(ctx context.Context, item *TransferItem) error { - destPath := filepath.Join(nt.config.Path, item.DestinationPath) - - destDir := filepath.Dir(destPath) - if err := nt.ensureDirectoryExists(destDir); err != nil { - return fmt.Errorf("Failed to create directory %s: %w", destDir, err) - } - - transferCtx, cancel := context.WithTimeout(ctx, nt.config.Timeout) - defer cancel() - - if err := nt.copyFile(transferCtx, item.SourcePath, destPath); err != nil { - return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err) - } - - if nt.config.VerifySize { - if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil { - os.Remove(destPath) - return fmt.Errorf("Failed to verify transfer: %w", err) - } - } - - log.Printf("File transfer completed: %s -> %s", item.SourcePath, destPath) - - return nil -} - -func (nt *NASTransfer) copyFile(ctx context.Context, srcPath, destPath string) error { - src, err := os.Open(srcPath) - if err != nil { - return fmt.Errorf("Failed to open source file: %w", err) - } - defer src.Close() - - dest, err := os.Create(destPath) - if err != nil { - return fmt.Errorf("Failed to create destination file: %w", err) - } - defer dest.Close() - - done := make(chan error, 1) - go func() { - _, err := io.Copy(dest, src) - done <- err - }() - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-done: - if err != nil { - return err - } - - return dest.Sync() - } -} - -func (nt *NASTransfer) VerifyTransfer(srcPath, destPath string) error { - srcInfo, err := os.Stat(srcPath) - if err != nil { - return fmt.Errorf("Failed to stat source file: %w", err) - } - - destInfo, err := os.Stat(destPath) - if err != nil { - return fmt.Errorf("Failed to stat destination file: %w", err) - } - - if srcInfo.Size() != destInfo.Size() { - return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size()) - } - - return nil -} - -func (nt *NASTransfer) ensureDirectoryExists(path string) error { - if err := os.MkdirAll(path, 0755); err != nil { - return fmt.Errorf("Failed to create directory: %w", err) - } - return nil -} - -func (nt *NASTransfer) establishConnection() error { - // Extract the network path (\\server\share) from the full path - networkPath := nt.extractNetworkPath(nt.config.Path) - if networkPath == "" { - // Local path, no authentication needed - return nil - } - - log.Printf("Establishing network connection to %s with user %s", networkPath, nt.config.Username) - - // Use Windows net use command to establish authenticated connection - var cmd *exec.Cmd - if nt.config.Username != "" && nt.config.Password != "" { - cmd = exec.Command("net", "use", networkPath, nt.config.Password, "/user:"+nt.config.Username, "/persistent:no") - } else { - cmd = exec.Command("net", "use", networkPath, "/persistent:no") - } - - output, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("failed to establish network connection: %w\nOutput: %s", err, string(output)) - } - - log.Printf("Network connection established successfully") - return nil -} - -func (nt *NASTransfer) extractNetworkPath(fullPath string) string { - // Extract \\server\share from paths like \\server\share\folder\subfolder - if !strings.HasPrefix(fullPath, "\\\\") { - return "" // Not a UNC path - } - - parts := strings.Split(fullPath[2:], "\\") // Remove leading \\ - if len(parts) < 2 { - return "" // Invalid UNC path - } - - return "\\\\" + parts[0] + "\\" + parts[1] -} - -func (nt *NASTransfer) TestConnection() error { - testFile := filepath.Join(nt.config.Path, ".connection_test") - - f, err := os.Create(testFile) - if err != nil { - return fmt.Errorf("Failed to create test file: %w", err) - } - f.Close() - - os.Remove(testFile) - - nt.connected = true - log.Printf("Connected to NAS at %s", nt.config.Path) - return nil -} - -func (nt *NASTransfer) IsConnected() bool { - return nt.connected -} - -// Disconnect removes the network connection -func (nt *NASTransfer) Disconnect() error { - networkPath := nt.extractNetworkPath(nt.config.Path) - if networkPath == "" { - return nil // Local path, nothing to disconnect - } - - cmd := exec.Command("net", "use", networkPath, "/delete") - output, err := cmd.CombinedOutput() - if err != nil { - log.Printf("Warning: failed to disconnect from %s: %v\nOutput: %s", networkPath, err, string(output)) - // Don't return error since this is cleanup - } else { - log.Printf("Disconnected from network path: %s", networkPath) - } - - nt.connected = false - return nil -} - -// FileExists checks if a file already exists on the NAS and optionally verifies size -func (nt *NASTransfer) FileExists(destinationPath string, expectedSize int64) (bool, error) { - fullDestPath := filepath.Join(nt.config.Path, destinationPath) - - destInfo, err := os.Stat(fullDestPath) - if err != nil { - if os.IsNotExist(err) { - return false, nil // File doesn't exist, no error - } - return false, fmt.Errorf("failed to stat NAS file: %w", err) - } - - // File exists, check size if expected size is provided - if expectedSize > 0 && destInfo.Size() != expectedSize { - log.Printf("NAS file size mismatch for %s: expected=%d, actual=%d", - fullDestPath, expectedSize, destInfo.Size()) - return false, nil // File exists but wrong size, treat as not existing - } - - return true, nil -} - -// GetFileSize returns the size of a file on the NAS -func (nt *NASTransfer) GetFileSize(destinationPath string) (int64, error) { - fullDestPath := filepath.Join(nt.config.Path, destinationPath) - - destInfo, err := os.Stat(fullDestPath) - if err != nil { - return 0, fmt.Errorf("failed to stat NAS file: %w", err) - } - - return destInfo.Size(), nil -} diff --git a/pkg/transfer/queue.go b/pkg/transfer/queue.go index 66c7747..0ff7ef9 100644 --- a/pkg/transfer/queue.go +++ b/pkg/transfer/queue.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "log" + "m3u8-downloader/pkg/nas" "os" "sync" "time" @@ -15,7 +16,7 @@ type TransferQueue struct { config QueueConfig items *PriorityQueue stats *QueueStats - nasTransfer *NASTransfer + nasTransfer *nas.NASService cleanup *CleanupService workers []chan TransferItem mu sync.RWMutex @@ -48,7 +49,7 @@ func (pq *PriorityQueue) Pop() interface{} { return item } -func NewTransferQueue(config QueueConfig, nasTransfer *NASTransfer, cleanup *CleanupService) *TransferQueue { +func NewTransferQueue(config QueueConfig, nasTransfer *nas.NASService, cleanup *CleanupService) *TransferQueue { pq := &PriorityQueue{} heap.Init(pq) diff --git a/pkg/transfer/service.go b/pkg/transfer/service.go index d65a646..658a284 100644 --- a/pkg/transfer/service.go +++ b/pkg/transfer/service.go @@ -16,7 +16,7 @@ import ( type TransferService struct { watcher *FileWatcher queue *TransferQueue - nas *NASTransfer + nas *nas2.NASService cleanup *CleanupService stats *QueueStats } @@ -30,7 +30,7 @@ func NewTrasferService(outputDir string, eventName string) (*TransferService, er RetryLimit: constants.TransferRetryLimit, VerifySize: true, } - nas := NewNASTransfer(nasConfig) + nas := nas2.NewNASService(nasConfig) if err := nas.TestConnection(); err != nil { return nil, fmt.Errorf("Failed to connect to NAS: %w", err)