Compare commits

..

No commits in common. "a10688279f1db7053ba5180d0d221604365c210b" and "899ca31bb310f97aaadf484b0c4f768e05be9d22" have entirely different histories.

13 changed files with 254 additions and 421 deletions

Binary file not shown.

Binary file not shown.

View File

@ -27,7 +27,7 @@ func main() {
}
if *processOnly {
processor.Process(*eventName)
processor.Process()
return
}

View File

@ -1,18 +1,15 @@
package processor
import (
"context"
"log"
"m3u8-downloader/pkg/processing"
)
func Process(eventName string) {
log.Printf("Starting processing for event: %s", eventName)
ps, err := processing.NewProcessingService(eventName)
if err != nil {
log.Fatalf("Failed to create processing service: %v", err)
}
if err := ps.Start(context.Background()); err != nil {
log.Fatalf("Failed to run processing service: %v", err)
func Process() {
config := processing.ProcessConfig{
WorkerCount: 4,
DestinationPath: "/Users/andrey/Downloads",
Enabled: true,
}
processing.NewProcessingService(&config, nil).Start(nil)
}

6
go.sum
View File

@ -1,6 +0,0 @@
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/grafov/m3u8 v0.12.1 h1:DuP1uA1kvRRmGNAZ0m+ObLv1dvrfNO0TPx0c/enNk0s=
github.com/grafov/m3u8 v0.12.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@ -10,7 +10,7 @@ const (
REFERRER = "https://www.flomarching.com"
LocalOutputDirPath = "../data/"
EnableNASTransfer = false
EnableNASTransfer = true
NASOutputPath = "\\\\HomeLabNAS\\dci\\streams"
NASUsername = "NASAdmin"
NASPassword = "s3tkY6tzA&KN6M"
@ -23,11 +23,10 @@ const (
BatchSize = 1000
ManifestPath = "../data"
CleanupAfterTransfer = false
CleanupAfterTransfer = true
CleanupBatchSize = 1000
RetainLocalHours = 0
ProcessOutputPath = "../out"
AutoProcess = true
ProcessingEnabled = true
ProcessWorkerCount = 2

View File

@ -1,233 +0,0 @@
package nas
import (
"context"
"fmt"
"io"
"log"
"m3u8-downloader/pkg/transfer"
"os"
"os/exec"
"path/filepath"
"strings"
)
type NASService struct {
config NASConfig
connected bool
}
func NewNASService(config NASConfig) *NASService {
nt := &NASService{
config: config,
}
// Establish network connection with credentials before accessing the path
if err := nt.establishConnection(); err != nil {
log.Fatalf("Failed to establish network connection to %s: %v", nt.config.Path, err)
}
err := nt.ensureDirectoryExists(nt.config.Path)
if err != nil {
log.Fatalf("Failed to create directory %s: %v", nt.config.Path, err)
}
return nt
}
func (nt *NASService) TransferFile(ctx context.Context, item *transfer.TransferItem) error {
destPath := filepath.Join(nt.config.Path, item.DestinationPath)
destDir := filepath.Dir(destPath)
if err := nt.ensureDirectoryExists(destDir); err != nil {
return fmt.Errorf("Failed to create directory %s: %w", destDir, err)
}
transferCtx, cancel := context.WithTimeout(ctx, nt.config.Timeout)
defer cancel()
if err := nt.copyFile(transferCtx, item.SourcePath, destPath); err != nil {
return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err)
}
if nt.config.VerifySize {
if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil {
os.Remove(destPath)
return fmt.Errorf("Failed to verify transfer: %w", err)
}
}
log.Printf("File transfer completed: %s -> %s", item.SourcePath, destPath)
return nil
}
func (nt *NASService) copyFile(ctx context.Context, srcPath, destPath string) error {
src, err := os.Open(srcPath)
if err != nil {
return fmt.Errorf("Failed to open source file: %w", err)
}
defer src.Close()
dest, err := os.Create(destPath)
if err != nil {
return fmt.Errorf("Failed to create destination file: %w", err)
}
defer dest.Close()
done := make(chan error, 1)
go func() {
_, err := io.Copy(dest, src)
done <- err
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
if err != nil {
return err
}
return dest.Sync()
}
}
func (nt *NASService) VerifyTransfer(srcPath, destPath string) error {
srcInfo, err := os.Stat(srcPath)
if err != nil {
return fmt.Errorf("Failed to stat source file: %w", err)
}
destInfo, err := os.Stat(destPath)
if err != nil {
return fmt.Errorf("Failed to stat destination file: %w", err)
}
if srcInfo.Size() != destInfo.Size() {
return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size())
}
return nil
}
func (nt *NASService) ensureDirectoryExists(path string) error {
if err := os.MkdirAll(path, 0755); err != nil {
return fmt.Errorf("Failed to create directory: %w", err)
}
return nil
}
func (nt *NASService) establishConnection() error {
// Extract the network path (\\server\share) from the full path
networkPath := nt.extractNetworkPath(nt.config.Path)
if networkPath == "" {
// Local path, no authentication needed
return nil
}
log.Printf("Establishing network connection to %s with user %s", networkPath, nt.config.Username)
// Use Windows net use command to establish authenticated connection
var cmd *exec.Cmd
if nt.config.Username != "" && nt.config.Password != "" {
cmd = exec.Command("net", "use", networkPath, nt.config.Password, "/user:"+nt.config.Username, "/persistent:no")
} else {
cmd = exec.Command("net", "use", networkPath, "/persistent:no")
}
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to establish network connection: %w\nOutput: %s", err, string(output))
}
log.Printf("Network connection established successfully")
return nil
}
func (nt *NASService) extractNetworkPath(fullPath string) string {
// Extract \\server\share from paths like \\server\share\folder\subfolder
if !strings.HasPrefix(fullPath, "\\\\") {
return "" // Not a UNC path
}
parts := strings.Split(fullPath[2:], "\\") // Remove leading \\
if len(parts) < 2 {
return "" // Invalid UNC path
}
return "\\\\" + parts[0] + "\\" + parts[1]
}
func (nt *NASService) TestConnection() error {
testFile := filepath.Join(nt.config.Path, ".connection_test")
f, err := os.Create(testFile)
if err != nil {
return fmt.Errorf("Failed to create test file: %w", err)
}
f.Close()
os.Remove(testFile)
nt.connected = true
log.Printf("Connected to NAS at %s", nt.config.Path)
return nil
}
func (nt *NASService) IsConnected() bool {
return nt.connected
}
// Disconnect removes the network connection
func (nt *NASService) Disconnect() error {
networkPath := nt.extractNetworkPath(nt.config.Path)
if networkPath == "" {
return nil // Local path, nothing to disconnect
}
cmd := exec.Command("net", "use", networkPath, "/delete")
output, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Warning: failed to disconnect from %s: %v\nOutput: %s", networkPath, err, string(output))
// Don't return error since this is cleanup
} else {
log.Printf("Disconnected from network path: %s", networkPath)
}
nt.connected = false
return nil
}
// FileExists checks if a file already exists on the NAS and optionally verifies size
func (nt *NASService) FileExists(destinationPath string, expectedSize int64) (bool, error) {
fullDestPath := filepath.Join(nt.config.Path, destinationPath)
destInfo, err := os.Stat(fullDestPath)
if err != nil {
if os.IsNotExist(err) {
return false, nil // File doesn't exist, no error
}
return false, fmt.Errorf("failed to stat NAS file: %w", err)
}
// File exists, check size if expected size is provided
if expectedSize > 0 && destInfo.Size() != expectedSize {
log.Printf("NAS file size mismatch for %s: expected=%d, actual=%d",
fullDestPath, expectedSize, destInfo.Size())
return false, nil // File exists but wrong size, treat as not existing
}
return true, nil
}
// GetFileSize returns the size of a file on the NAS
func (nt *NASService) GetFileSize(destinationPath string) (int64, error) {
fullDestPath := filepath.Join(nt.config.Path, destinationPath)
destInfo, err := os.Stat(fullDestPath)
if err != nil {
return 0, fmt.Errorf("failed to stat NAS file: %w", err)
}
return destInfo.Size(), nil
}

View File

@ -1,7 +0,0 @@
package processing
type SegmentInfo struct {
Name string
SeqNo int
Resolution string
}

View File

@ -2,53 +2,21 @@ package processing
import (
"context"
"fmt"
"log"
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/nas"
"os"
"os/exec"
"regexp"
"strconv"
"sync"
)
type ProcessingService struct {
config *ProcessConfig
nas *nas.NASService
nas *nas.NASConfig
}
func NewProcessingService(eventName string) (*ProcessingService, error) {
config := &ProcessConfig{
WorkerCount: constants.ProcessWorkerCount,
SourcePath: constants.NASOutputPath + "/" + eventName,
DestinationPath: constants.ProcessOutputPath,
Enabled: constants.ProcessingEnabled,
}
nasConfig := &nas.NASConfig{
Path: constants.NASOutputPath,
Username: constants.NASUsername,
Password: constants.NASPassword,
Timeout: constants.TransferTimeout,
RetryLimit: constants.TransferRetryLimit,
VerifySize: true,
}
nasService := nas.NewNASService(*nasConfig)
if err := nasService.TestConnection(); err != nil {
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
}
func NewProcessingService(config *ProcessConfig, nas *nas.NASConfig) *ProcessingService {
return &ProcessingService{
config: config,
nas: nasService,
}, nil
}
func (ps *ProcessingService) GetSegmentInfo() (map[int]string, error) {
return nil, nil
nas: nas,
}
}
func (ps *ProcessingService) Start(ctx context.Context) error {
@ -57,132 +25,17 @@ func (ps *ProcessingService) Start(ctx context.Context) error {
return nil
}
//Get all present resolutions
dirs, err := ps.GetResolutions()
if err != nil {
return fmt.Errorf("Failed to get resolutions: %w", err)
}
//Spawn a worker per resolution
ch := make(chan SegmentInfo, 100)
var wg sync.WaitGroup
for _, resolution := range dirs {
wg.Add(1)
go ps.ParseResolutionDirectory(resolution, ch, &wg)
}
wg.Add(1)
go func() {
wg.Wait()
close(ch)
defer wg.Done()
}()
segments, err := ps.AggregateSegmentInfo(ch)
if err != nil {
return fmt.Errorf("Failed to aggregate segment info: %w", err)
}
ps.WriteConcatFile(segments)
//Feed info to ffmpeg to stitch files together
concatErr := ps.RunFFmpeg(ps.config.SourcePath, ps.config.DestinationPath)
if concatErr != nil {
return concatErr
}
wg.Wait()
return nil
}
func (ps *ProcessingService) GetResolutions() ([]string, error) {
dirs, err := os.ReadDir(ps.config.SourcePath)
if err != nil {
return nil, fmt.Errorf("Failed to read source directory: %w", err)
}
re := regexp.MustCompile(`^\d+p$`)
var resolutions []string
for _, dir := range dirs {
if dir.IsDir() && re.MatchString(dir.Name()) {
resolutions = append(resolutions, dir.Name())
}
}
return resolutions, nil
}
func (ps *ProcessingService) ParseResolutionDirectory(resolution string, ch chan<- SegmentInfo, wg *sync.WaitGroup) {
defer wg.Done()
files, err := os.ReadDir(ps.config.SourcePath + "/" + resolution)
if err != nil {
log.Printf("Failed to read resolution directory: %v", err)
return
}
for _, file := range files {
if !file.IsDir() {
no, err := strconv.Atoi(file.Name()[7:11])
if err != nil {
log.Printf("Failed to parse segment number: %v", err)
continue
}
ch <- SegmentInfo{
Name: file.Name(),
SeqNo: no,
Resolution: resolution,
}
}
}
}
func (ps *ProcessingService) AggregateSegmentInfo(ch <-chan SegmentInfo) (map[int]SegmentInfo, error) {
segmentMap := make(map[int]SegmentInfo)
rank := map[string]int{
"1080p": 1,
"720p": 2,
"540p": 3,
"480p": 4,
"450p": 5,
"360p": 6,
"270p": 7,
"240p": 8,
}
for segment := range ch {
current, exists := segmentMap[segment.SeqNo]
if !exists || rank[segment.Resolution] > rank[current.Resolution] {
segmentMap[segment.SeqNo] = segment
}
}
return segmentMap, nil
}
func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) error {
f, err := os.Create(ps.config.DestinationPath + "/concat.txt")
if err != nil {
return fmt.Errorf("Failed to create concat file: %w", err)
}
defer f.Close()
for _, segment := range segmentMap {
_, err = f.WriteString("file '" + ps.config.SourcePath + "/" + segment.Resolution + "/" + segment.Name + "'\n")
if err != nil {
return fmt.Errorf("Failed to write to concat file: %w", err)
}
}
return nil
}
func (ps *ProcessingService) RunFFmpeg(inputPath, outputPath string) error {
cmd := exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", outputPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
return fmt.Errorf("Failed to run ffmpeg: %w", err)
}
func (ps *ProcessingService) ProcessEvent(eventName string) error {
return nil
}

View File

@ -6,7 +6,6 @@ type ProcessJob struct {
type ProcessConfig struct {
WorkerCount int
SourcePath string
DestinationPath string
Enabled bool
}

View File

@ -1 +1,233 @@
package transfer
import (
"context"
"fmt"
"io"
"log"
"m3u8-downloader/pkg/nas"
"os"
"os/exec"
"path/filepath"
"strings"
)
type NASTransfer struct {
config nas.NASConfig
connected bool
}
func NewNASTransfer(config nas.NASConfig) *NASTransfer {
nt := &NASTransfer{
config: config,
}
// Establish network connection with credentials before accessing the path
if err := nt.establishConnection(); err != nil {
log.Fatalf("Failed to establish network connection to %s: %v", nt.config.Path, err)
}
err := nt.ensureDirectoryExists(nt.config.Path)
if err != nil {
log.Fatalf("Failed to create directory %s: %v", nt.config.Path, err)
}
return nt
}
func (nt *NASTransfer) TransferFile(ctx context.Context, item *TransferItem) error {
destPath := filepath.Join(nt.config.Path, item.DestinationPath)
destDir := filepath.Dir(destPath)
if err := nt.ensureDirectoryExists(destDir); err != nil {
return fmt.Errorf("Failed to create directory %s: %w", destDir, err)
}
transferCtx, cancel := context.WithTimeout(ctx, nt.config.Timeout)
defer cancel()
if err := nt.copyFile(transferCtx, item.SourcePath, destPath); err != nil {
return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err)
}
if nt.config.VerifySize {
if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil {
os.Remove(destPath)
return fmt.Errorf("Failed to verify transfer: %w", err)
}
}
log.Printf("File transfer completed: %s -> %s", item.SourcePath, destPath)
return nil
}
func (nt *NASTransfer) copyFile(ctx context.Context, srcPath, destPath string) error {
src, err := os.Open(srcPath)
if err != nil {
return fmt.Errorf("Failed to open source file: %w", err)
}
defer src.Close()
dest, err := os.Create(destPath)
if err != nil {
return fmt.Errorf("Failed to create destination file: %w", err)
}
defer dest.Close()
done := make(chan error, 1)
go func() {
_, err := io.Copy(dest, src)
done <- err
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
if err != nil {
return err
}
return dest.Sync()
}
}
func (nt *NASTransfer) VerifyTransfer(srcPath, destPath string) error {
srcInfo, err := os.Stat(srcPath)
if err != nil {
return fmt.Errorf("Failed to stat source file: %w", err)
}
destInfo, err := os.Stat(destPath)
if err != nil {
return fmt.Errorf("Failed to stat destination file: %w", err)
}
if srcInfo.Size() != destInfo.Size() {
return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size())
}
return nil
}
func (nt *NASTransfer) ensureDirectoryExists(path string) error {
if err := os.MkdirAll(path, 0755); err != nil {
return fmt.Errorf("Failed to create directory: %w", err)
}
return nil
}
func (nt *NASTransfer) establishConnection() error {
// Extract the network path (\\server\share) from the full path
networkPath := nt.extractNetworkPath(nt.config.Path)
if networkPath == "" {
// Local path, no authentication needed
return nil
}
log.Printf("Establishing network connection to %s with user %s", networkPath, nt.config.Username)
// Use Windows net use command to establish authenticated connection
var cmd *exec.Cmd
if nt.config.Username != "" && nt.config.Password != "" {
cmd = exec.Command("net", "use", networkPath, nt.config.Password, "/user:"+nt.config.Username, "/persistent:no")
} else {
cmd = exec.Command("net", "use", networkPath, "/persistent:no")
}
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to establish network connection: %w\nOutput: %s", err, string(output))
}
log.Printf("Network connection established successfully")
return nil
}
func (nt *NASTransfer) extractNetworkPath(fullPath string) string {
// Extract \\server\share from paths like \\server\share\folder\subfolder
if !strings.HasPrefix(fullPath, "\\\\") {
return "" // Not a UNC path
}
parts := strings.Split(fullPath[2:], "\\") // Remove leading \\
if len(parts) < 2 {
return "" // Invalid UNC path
}
return "\\\\" + parts[0] + "\\" + parts[1]
}
func (nt *NASTransfer) TestConnection() error {
testFile := filepath.Join(nt.config.Path, ".connection_test")
f, err := os.Create(testFile)
if err != nil {
return fmt.Errorf("Failed to create test file: %w", err)
}
f.Close()
os.Remove(testFile)
nt.connected = true
log.Printf("Connected to NAS at %s", nt.config.Path)
return nil
}
func (nt *NASTransfer) IsConnected() bool {
return nt.connected
}
// Disconnect removes the network connection
func (nt *NASTransfer) Disconnect() error {
networkPath := nt.extractNetworkPath(nt.config.Path)
if networkPath == "" {
return nil // Local path, nothing to disconnect
}
cmd := exec.Command("net", "use", networkPath, "/delete")
output, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Warning: failed to disconnect from %s: %v\nOutput: %s", networkPath, err, string(output))
// Don't return error since this is cleanup
} else {
log.Printf("Disconnected from network path: %s", networkPath)
}
nt.connected = false
return nil
}
// FileExists checks if a file already exists on the NAS and optionally verifies size
func (nt *NASTransfer) FileExists(destinationPath string, expectedSize int64) (bool, error) {
fullDestPath := filepath.Join(nt.config.Path, destinationPath)
destInfo, err := os.Stat(fullDestPath)
if err != nil {
if os.IsNotExist(err) {
return false, nil // File doesn't exist, no error
}
return false, fmt.Errorf("failed to stat NAS file: %w", err)
}
// File exists, check size if expected size is provided
if expectedSize > 0 && destInfo.Size() != expectedSize {
log.Printf("NAS file size mismatch for %s: expected=%d, actual=%d",
fullDestPath, expectedSize, destInfo.Size())
return false, nil // File exists but wrong size, treat as not existing
}
return true, nil
}
// GetFileSize returns the size of a file on the NAS
func (nt *NASTransfer) GetFileSize(destinationPath string) (int64, error) {
fullDestPath := filepath.Join(nt.config.Path, destinationPath)
destInfo, err := os.Stat(fullDestPath)
if err != nil {
return 0, fmt.Errorf("failed to stat NAS file: %w", err)
}
return destInfo.Size(), nil
}

View File

@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"log"
"m3u8-downloader/pkg/nas"
"os"
"sync"
"time"
@ -16,7 +15,7 @@ type TransferQueue struct {
config QueueConfig
items *PriorityQueue
stats *QueueStats
nasTransfer *nas.NASService
nasTransfer *NASTransfer
cleanup *CleanupService
workers []chan TransferItem
mu sync.RWMutex
@ -49,7 +48,7 @@ func (pq *PriorityQueue) Pop() interface{} {
return item
}
func NewTransferQueue(config QueueConfig, nasTransfer *nas.NASService, cleanup *CleanupService) *TransferQueue {
func NewTransferQueue(config QueueConfig, nasTransfer *NASTransfer, cleanup *CleanupService) *TransferQueue {
pq := &PriorityQueue{}
heap.Init(pq)

View File

@ -16,7 +16,7 @@ import (
type TransferService struct {
watcher *FileWatcher
queue *TransferQueue
nas *nas2.NASService
nas *NASTransfer
cleanup *CleanupService
stats *QueueStats
}
@ -30,7 +30,7 @@ func NewTrasferService(outputDir string, eventName string) (*TransferService, er
RetryLimit: constants.TransferRetryLimit,
VerifySize: true,
}
nas := nas2.NewNASService(nasConfig)
nas := NewNASTransfer(nasConfig)
if err := nas.TestConnection(); err != nil {
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)