2025-08-10 15:16:04 -05:00

239 lines
5.6 KiB
Go

package processing
import (
"bufio"
"context"
"fmt"
"log"
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/nas"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
)
type ProcessingService struct {
config *ProcessConfig
nas *nas.NASService
}
func NewProcessingService(eventName string) (*ProcessingService, error) {
config := &ProcessConfig{
WorkerCount: constants.ProcessWorkerCount,
SourcePath: constants.NASOutputPath + "/" + eventName,
DestinationPath: constants.ProcessOutputPath,
Enabled: constants.ProcessingEnabled,
EventName: eventName,
}
nasConfig := &nas.NASConfig{
Path: constants.NASOutputPath,
Username: constants.NASUsername,
Password: constants.NASPassword,
Timeout: constants.TransferTimeout,
RetryLimit: constants.TransferRetryLimit,
VerifySize: true,
}
nasService := nas.NewNASService(*nasConfig)
if err := nasService.TestConnection(); err != nil {
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
}
return &ProcessingService{
config: config,
nas: nasService,
}, nil
}
func (ps *ProcessingService) GetSegmentInfo() (map[int]string, error) {
return nil, nil
}
func (ps *ProcessingService) GetEventDirs() ([]string, error) {
if ps.config.EventName == "" {
dirs, err := os.ReadDir(ps.config.SourcePath)
if err != nil {
return nil, fmt.Errorf("Failed to read directory: %w", err)
}
var eventDirs []string
for _, dir := range dirs {
if dir.IsDir() {
eventDirs = append(eventDirs, dir.Name())
}
}
return eventDirs, nil
} else {
return []string{ps.config.EventName}, nil
}
}
func (ps *ProcessingService) Start(ctx context.Context) error {
if !ps.config.Enabled {
log.Println("Processing service disabled")
return nil
}
if ps.config.EventName == "" {
events, err := ps.GetEventDirs()
if err != nil {
return fmt.Errorf("Failed to get event directories: %w", err)
}
if len(events) == 0 {
return fmt.Errorf("No events found")
}
if len(events) > 1 {
fmt.Println("Multiple events found, please select one:")
for i, event := range events {
fmt.Printf("%d. %s\n", i+1, event)
}
reader := bufio.NewReader(os.Stdin)
input, _ := reader.ReadString('\n')
input = strings.TrimSpace(input)
index, err := strconv.Atoi(input)
if err != nil {
return fmt.Errorf("Failed to parse input: %w", err)
}
if index < 1 || index > len(events) {
return fmt.Errorf("Invalid input")
}
ps.config.EventName = events[index-1]
} else {
ps.config.EventName = events[0]
}
}
//Get all present resolutions
dirs, err := ps.GetResolutions()
if err != nil {
return fmt.Errorf("Failed to get resolutions: %w", err)
}
//Spawn a worker per resolution
ch := make(chan SegmentInfo, 100)
var wg sync.WaitGroup
for _, resolution := range dirs {
wg.Add(1)
go ps.ParseResolutionDirectory(resolution, ch, &wg)
}
go func() {
wg.Wait()
close(ch)
}()
segments, err := ps.AggregateSegmentInfo(ch)
if err != nil {
return fmt.Errorf("Failed to aggregate segment info: %w", err)
}
ps.WriteConcatFile(segments)
//Feed info to ffmpeg to stitch files together
concatErr := ps.RunFFmpeg(ps.config.SourcePath, ps.config.DestinationPath)
if concatErr != nil {
return concatErr
}
return nil
}
func (ps *ProcessingService) GetResolutions() ([]string, error) {
dirs, err := os.ReadDir(ps.config.SourcePath)
if err != nil {
return nil, fmt.Errorf("Failed to read source directory: %w", err)
}
re := regexp.MustCompile(`^\d+p$`)
var resolutions []string
for _, dir := range dirs {
if dir.IsDir() && re.MatchString(dir.Name()) {
resolutions = append(resolutions, dir.Name())
}
}
return resolutions, nil
}
func (ps *ProcessingService) ParseResolutionDirectory(resolution string, ch chan<- SegmentInfo, wg *sync.WaitGroup) {
defer wg.Done()
files, err := os.ReadDir(ps.config.SourcePath + "/" + resolution)
if err != nil {
log.Printf("Failed to read resolution directory: %v", err)
return
}
for _, file := range files {
if !file.IsDir() {
no, err := strconv.Atoi(file.Name()[7:11])
if err != nil {
log.Printf("Failed to parse segment number: %v", err)
continue
}
ch <- SegmentInfo{
Name: file.Name(),
SeqNo: no,
Resolution: resolution,
}
}
}
}
func (ps *ProcessingService) AggregateSegmentInfo(ch <-chan SegmentInfo) (map[int]SegmentInfo, error) {
segmentMap := make(map[int]SegmentInfo)
rank := map[string]int{
"1080p": 1,
"720p": 2,
"540p": 3,
"480p": 4,
"450p": 5,
"360p": 6,
"270p": 7,
"240p": 8,
}
for segment := range ch {
current, exists := segmentMap[segment.SeqNo]
if !exists || rank[segment.Resolution] > rank[current.Resolution] {
segmentMap[segment.SeqNo] = segment
}
}
return segmentMap, nil
}
func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) error {
f, err := os.Create(ps.config.DestinationPath + "/concat.txt")
if err != nil {
return fmt.Errorf("Failed to create concat file: %w", err)
}
defer f.Close()
for _, segment := range segmentMap {
_, err = f.WriteString("file '" + ps.config.SourcePath + "/" + segment.Resolution + "/" + segment.Name + "'\n")
if err != nil {
return fmt.Errorf("Failed to write to concat file: %w", err)
}
}
return nil
}
func (ps *ProcessingService) RunFFmpeg(inputPath, outputPath string) error {
cmd := exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", outputPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
return fmt.Errorf("Failed to run ffmpeg: %w", err)
}
return nil
}