From 3c49a3fa9f41072192eab9b25a3a9cf5b4996365 Mon Sep 17 00:00:00 2001 From: townandgown Date: Sun, 10 Aug 2025 22:56:26 -0500 Subject: [PATCH] pre-cleanup --- cmd/main/main.go | 3 +- cmd/transfer/transfer.go | 57 +++++++++++++++++-- pkg/constants/constants.go | 5 +- pkg/nas/nas.go | 7 +-- pkg/processing/service.go | 110 +++++++++++++++++++++++++++++++------ pkg/transfer/service.go | 2 +- 6 files changed, 151 insertions(+), 33 deletions(-) diff --git a/cmd/main/main.go b/cmd/main/main.go index f2f3521..63037eb 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -9,12 +9,11 @@ import ( "m3u8-downloader/cmd/transfer" "os" "strings" - "time" ) func main() { url := flag.String("url", "", "M3U8 playlist URL") - eventName := flag.String("event", time.Now().Format("2006-01-02"), "Event name") + eventName := flag.String("event", "", "Event name") debug := flag.Bool("debug", false, "Enable debug mode") transferOnly := flag.Bool("transfer", false, "Transfer-only mode: transfer existing files without downloading") processOnly := flag.Bool("process", false, "Process-only mode: process existing files without downloading") diff --git a/cmd/transfer/transfer.go b/cmd/transfer/transfer.go index c26ccdd..957268d 100644 --- a/cmd/transfer/transfer.go +++ b/cmd/transfer/transfer.go @@ -1,17 +1,69 @@ package transfer import ( + "bufio" "context" + "fmt" "log" "m3u8-downloader/pkg/constants" "m3u8-downloader/pkg/transfer" "os" "os/signal" + "strconv" + "strings" "syscall" "time" ) +func getEventDirs() ([]string, error) { + dirs, err := os.ReadDir(constants.LocalOutputDirPath) + if err != nil { + return nil, fmt.Errorf("Failed to read directory: %w", err) + } + var eventDirs []string + for _, dir := range dirs { + if dir.IsDir() { + eventDirs = append(eventDirs, dir.Name()) + } + } + return eventDirs, nil +} + func RunTransferOnly(eventName string) { + // Check if NAS transfer is enabled + if !constants.EnableNASTransfer { + log.Fatal("NAS transfer is disabled in constants. Please enable it to use transfer-only mode.") + } + + if eventName == "" { + events, err := getEventDirs() + if err != nil { + log.Fatalf("Failed to get event directories: %v", err) + } + if len(events) == 0 { + log.Fatal("No events found") + } + if len(events) > 1 { + fmt.Println("Multiple events found, please select one:") + for i, event := range events { + fmt.Printf("%d. %s\n", i+1, event) + } + reader := bufio.NewReader(os.Stdin) + input, _ := reader.ReadString('\n') + input = strings.TrimSpace(input) + index, err := strconv.Atoi(input) + if err != nil { + log.Fatalf("Failed to parse input: %v", err) + } + if index < 1 || index > len(events) { + log.Fatal("Invalid input") + } + eventName = events[index-1] + } else { + eventName = events[0] + } + } + log.Printf("Starting transfer-only mode for event: %s", eventName) // Setup context and signal handling @@ -26,11 +78,6 @@ func RunTransferOnly(eventName string) { cancel() }() - // Check if NAS transfer is enabled - if !constants.EnableNASTransfer { - log.Fatal("NAS transfer is disabled in constants. Please enable it to use transfer-only mode.") - } - // Verify local event directory exists localEventPath := constants.LocalOutputDirPath + "/" + eventName if _, err := os.Stat(localEventPath); os.IsNotExist(err) { diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index aa6da22..5349777 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -10,7 +10,7 @@ const ( REFERRER = "https://www.flomarching.com" LocalOutputDirPath = "../data/" - EnableNASTransfer = false + EnableNASTransfer = true NASOutputPath = "\\\\HomeLabNAS\\dci\\streams" NASUsername = "NASAdmin" NASPassword = "s3tkY6tzA&KN6M" @@ -23,7 +23,7 @@ const ( BatchSize = 1000 ManifestPath = "../data" - CleanupAfterTransfer = false + CleanupAfterTransfer = true CleanupBatchSize = 1000 RetainLocalHours = 0 @@ -31,4 +31,5 @@ const ( AutoProcess = true ProcessingEnabled = true ProcessWorkerCount = 2 + FFmpegPath = "ffmpeg" ) diff --git a/pkg/nas/nas.go b/pkg/nas/nas.go index f748c77..9a59663 100644 --- a/pkg/nas/nas.go +++ b/pkg/nas/nas.go @@ -90,19 +90,16 @@ func (nt *NASService) EnsureDirectoryExists(path string) error { } func (nt *NASService) EstablishConnection() error { - // Extract the network path (\\server\share) from the full path networkPath := nt.ExtractNetworkPath(nt.Config.Path) if networkPath == "" { - // Local path, no authentication needed - return nil + return nil // local path, no network mount needed } log.Printf("Establishing network connection to %s with user %s", networkPath, nt.Config.Username) - // Use Windows net use command to establish authenticated connection var cmd *exec.Cmd if nt.Config.Username != "" && nt.Config.Password != "" { - cmd = exec.Command("net", "use", networkPath, nt.Config.Password, "/user:"+nt.Config.Username, "/persistent:no") + cmd = exec.Command("net", "use", networkPath, "/user:"+nt.Config.Username, nt.Config.Password, "/persistent:no") } else { cmd = exec.Command("net", "use", networkPath, "/persistent:no") } diff --git a/pkg/processing/service.go b/pkg/processing/service.go index a09d830..f0ad8ca 100644 --- a/pkg/processing/service.go +++ b/pkg/processing/service.go @@ -9,7 +9,10 @@ import ( "m3u8-downloader/pkg/nas" "os" "os/exec" + "path/filepath" "regexp" + "runtime" + "sort" "strconv" "strings" "sync" @@ -23,7 +26,7 @@ type ProcessingService struct { func NewProcessingService(eventName string) (*ProcessingService, error) { config := &ProcessConfig{ WorkerCount: constants.ProcessWorkerCount, - SourcePath: constants.NASOutputPath + "/" + eventName, + SourcePath: constants.NASOutputPath, DestinationPath: constants.ProcessOutputPath, Enabled: constants.ProcessingEnabled, EventName: eventName, @@ -131,10 +134,14 @@ func (ps *ProcessingService) Start(ctx context.Context) error { return fmt.Errorf("Failed to aggregate segment info: %w", err) } - ps.WriteConcatFile(segments) + aggFile, err := ps.WriteConcatFile(segments) + if err != nil { + return fmt.Errorf("Failed to write concat file: %w", err) + } //Feed info to ffmpeg to stitch files together - concatErr := ps.RunFFmpeg(ps.config.SourcePath, ps.config.DestinationPath) + outPath := filepath.Join(constants.NASOutputPath, ps.config.DestinationPath, ps.config.EventName) + concatErr := ps.RunFFmpeg(aggFile, outPath) if concatErr != nil { return concatErr } @@ -143,7 +150,7 @@ func (ps *ProcessingService) Start(ctx context.Context) error { } func (ps *ProcessingService) GetResolutions() ([]string, error) { - dirs, err := os.ReadDir(ps.config.SourcePath) + dirs, err := os.ReadDir(ps.config.SourcePath + "/" + ps.config.EventName) if err != nil { return nil, fmt.Errorf("Failed to read source directory: %w", err) } @@ -163,7 +170,7 @@ func (ps *ProcessingService) GetResolutions() ([]string, error) { func (ps *ProcessingService) ParseResolutionDirectory(resolution string, ch chan<- SegmentInfo, wg *sync.WaitGroup) { defer wg.Done() - files, err := os.ReadDir(ps.config.SourcePath + "/" + resolution) + files, err := os.ReadDir(ps.config.SourcePath + "/" + ps.config.EventName + "/" + resolution) if err != nil { log.Printf("Failed to read resolution directory: %v", err) return @@ -171,7 +178,10 @@ func (ps *ProcessingService) ParseResolutionDirectory(resolution string, ch chan for _, file := range files { if !file.IsDir() { - no, err := strconv.Atoi(file.Name()[7:11]) + if !strings.HasSuffix(strings.ToLower(file.Name()), ".ts") { + continue + } + no, err := strconv.Atoi(file.Name()[6:10]) if err != nil { log.Printf("Failed to parse segment number: %v", err) continue @@ -200,6 +210,7 @@ func (ps *ProcessingService) AggregateSegmentInfo(ch <-chan SegmentInfo) (map[in } for segment := range ch { + fmt.Printf("Received segment %s in resolution %s \n", segment.Name, segment.Resolution) current, exists := segmentMap[segment.SeqNo] if !exists || rank[segment.Resolution] > rank[current.Resolution] { segmentMap[segment.SeqNo] = segment @@ -209,30 +220,93 @@ func (ps *ProcessingService) AggregateSegmentInfo(ch <-chan SegmentInfo) (map[in return segmentMap, nil } -func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) error { - f, err := os.Create(ps.config.DestinationPath + "/concat.txt") +func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) (string, error) { + concatPath := filepath.Join(constants.NASOutputPath, constants.ProcessOutputPath, ps.config.EventName) + // Ensure the directory exists + if err := os.MkdirAll(concatPath, 0755); err != nil { + return "", fmt.Errorf("failed to create directories for concat path: %w", err) + } + + concatFilePath := filepath.Join(concatPath, ps.config.EventName+".txt") + f, err := os.Create(concatFilePath) if err != nil { - return fmt.Errorf("Failed to create concat file: %w", err) + return "", fmt.Errorf("failed to create concat file: %w", err) } defer f.Close() - for _, segment := range segmentMap { - _, err = f.WriteString("file '" + ps.config.SourcePath + "/" + segment.Resolution + "/" + segment.Name + "'\n") - if err != nil { - return fmt.Errorf("Failed to write to concat file: %w", err) + // Sort keys to preserve order + keys := make([]int, 0, len(segmentMap)) + for k := range segmentMap { + keys = append(keys, k) + } + sort.Ints(keys) + + for _, seq := range keys { + segment := segmentMap[seq] + filePath := filepath.Join(ps.config.SourcePath, ps.config.EventName, segment.Resolution, segment.Name) + line := fmt.Sprintf("file '%s'\n", filePath) + if _, err := f.WriteString(line); err != nil { + return "", fmt.Errorf("failed to write to concat file: %w", err) } } - return nil + return concatFilePath, nil +} + +func ffmpegPath() (string, error) { + var baseDir string + + exePath, err := os.Executable() + if err == nil { + baseDir = filepath.Dir(exePath) + } else { + // fallback to current working directory + baseDir, err = os.Getwd() + if err != nil { + return "", err + } + } + + // When running in GoLand, exePath might point to the IDE launcher, + // so optionally check if ffmpeg exists here, else fallback to CWD + ffmpeg := filepath.Join(baseDir, "bin", "ffmpeg") + if runtime.GOOS == "windows" { + ffmpeg += ".exe" + } + + if _, err := os.Stat(ffmpeg); os.IsNotExist(err) { + // try cwd instead + cwd, err := os.Getwd() + if err != nil { + return "", err + } + ffmpeg = filepath.Join(cwd, "bin", "ffmpeg") + if runtime.GOOS == "windows" { + ffmpeg += ".exe" + } + } + + return ffmpeg, nil } func (ps *ProcessingService) RunFFmpeg(inputPath, outputPath string) error { - cmd := exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", outputPath) + fmt.Println("Running ffmpeg...") + + fileOutPath := filepath.Join(outputPath, ps.config.EventName+".mp4") + fmt.Println("Input path:", inputPath) + fmt.Println("Attempting to write to: ", outputPath+"/"+ps.config.EventName+".mp4") + + path, err := ffmpegPath() + if err != nil { + return err + } + cmd := exec.Command(path, "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", filepath.Join(outputPath, ps.config.EventName+".mp4")) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - err := cmd.Run() - if err != nil { - return fmt.Errorf("Failed to run ffmpeg: %w", err) + ffmpegErr := cmd.Run() + if ffmpegErr != nil { + return fmt.Errorf("Failed to run ffmpeg: %w", ffmpegErr) } + fmt.Println("Output path:", fileOutPath) return nil } diff --git a/pkg/transfer/service.go b/pkg/transfer/service.go index 658a284..0301a55 100644 --- a/pkg/transfer/service.go +++ b/pkg/transfer/service.go @@ -40,7 +40,7 @@ func NewTrasferService(outputDir string, eventName string) (*TransferService, er Enabled: constants.CleanupAfterTransfer, RetentionPeriod: time.Duration(constants.RetainLocalHours) * time.Hour, BatchSize: constants.CleanupBatchSize, - CheckInterval: 1 * time.Minute, + CheckInterval: constants.FileSettlingDelay, } cleanup := NewCleanupService(cleanupConfig)