Compare commits
No commits in common. "manifest" and "1.0.1" have entirely different histories.
@ -1,37 +0,0 @@
|
|||||||
name: Build Go Binaries
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches:
|
|
||||||
- main
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
build:
|
|
||||||
runs-on: docker
|
|
||||||
steps:
|
|
||||||
- name: Checkout
|
|
||||||
uses: actions/checkout@v2
|
|
||||||
|
|
||||||
- name: Set up Go
|
|
||||||
uses: actions/setup-go@v5
|
|
||||||
with:
|
|
||||||
go-version: 1.23
|
|
||||||
|
|
||||||
- name: Cross-Compile
|
|
||||||
run: |
|
|
||||||
mkdir -p dist
|
|
||||||
for GOOS in linux windows darwin; do
|
|
||||||
for GOARCH in arm64 amd64; do
|
|
||||||
OUTPUT="dist/myapp_${GOOS}_${GOARCH}"
|
|
||||||
echo "Building ${OUTPUT}"
|
|
||||||
if [ "$GOOS" = "windows" ]; then
|
|
||||||
OUTPUT="${OUTPUT}.exe"
|
|
||||||
fi
|
|
||||||
GOOS=$GOOS GOARCH=$GOARCH go build -o $OUTPUT ./cmd/main
|
|
||||||
done
|
|
||||||
done
|
|
||||||
|
|
||||||
- name: Upload Artifact
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: flo_download-binaries
|
|
||||||
path: dist
|
|
||||||
BIN
bin/flo_download
BIN
bin/flo_download
Binary file not shown.
Binary file not shown.
@ -27,7 +27,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if *processOnly {
|
if *processOnly {
|
||||||
processor.Process(*eventName)
|
processor.Process()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,18 +1,15 @@
|
|||||||
package processor
|
package processor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"log"
|
|
||||||
"m3u8-downloader/pkg/processing"
|
"m3u8-downloader/pkg/processing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Process(eventName string) {
|
func Process() {
|
||||||
//log.Printf("Starting processing for event: %s", eventName)
|
|
||||||
ps, err := processing.NewProcessingService(eventName)
|
config := processing.ProcessConfig{
|
||||||
if err != nil {
|
WorkerCount: 4,
|
||||||
log.Fatalf("Failed to create processing service: %v", err)
|
DestinationPath: "/Users/andrey/Downloads",
|
||||||
}
|
Enabled: true,
|
||||||
if err := ps.Start(context.Background()); err != nil {
|
|
||||||
log.Fatalf("Failed to run processing service: %v", err)
|
|
||||||
}
|
}
|
||||||
|
processing.NewProcessingService(&config, nil).Start(nil)
|
||||||
}
|
}
|
||||||
|
|||||||
6
go.sum
6
go.sum
@ -1,6 +0,0 @@
|
|||||||
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
|
|
||||||
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
|
|
||||||
github.com/grafov/m3u8 v0.12.1 h1:DuP1uA1kvRRmGNAZ0m+ObLv1dvrfNO0TPx0c/enNk0s=
|
|
||||||
github.com/grafov/m3u8 v0.12.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080=
|
|
||||||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
|
||||||
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
|
||||||
@ -10,7 +10,7 @@ const (
|
|||||||
REFERRER = "https://www.flomarching.com"
|
REFERRER = "https://www.flomarching.com"
|
||||||
LocalOutputDirPath = "../data/"
|
LocalOutputDirPath = "../data/"
|
||||||
|
|
||||||
EnableNASTransfer = false
|
EnableNASTransfer = true
|
||||||
NASOutputPath = "\\\\HomeLabNAS\\dci\\streams"
|
NASOutputPath = "\\\\HomeLabNAS\\dci\\streams"
|
||||||
NASUsername = "NASAdmin"
|
NASUsername = "NASAdmin"
|
||||||
NASPassword = "s3tkY6tzA&KN6M"
|
NASPassword = "s3tkY6tzA&KN6M"
|
||||||
@ -23,11 +23,10 @@ const (
|
|||||||
BatchSize = 1000
|
BatchSize = 1000
|
||||||
ManifestPath = "../data"
|
ManifestPath = "../data"
|
||||||
|
|
||||||
CleanupAfterTransfer = false
|
CleanupAfterTransfer = true
|
||||||
CleanupBatchSize = 1000
|
CleanupBatchSize = 1000
|
||||||
RetainLocalHours = 0
|
RetainLocalHours = 0
|
||||||
|
|
||||||
ProcessOutputPath = "../out"
|
|
||||||
AutoProcess = true
|
AutoProcess = true
|
||||||
ProcessingEnabled = true
|
ProcessingEnabled = true
|
||||||
ProcessWorkerCount = 2
|
ProcessWorkerCount = 2
|
||||||
|
|||||||
205
pkg/nas/nas.go
205
pkg/nas/nas.go
@ -1,205 +0,0 @@
|
|||||||
package nas
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
|
||||||
)
|
|
||||||
|
|
||||||
type NASService struct {
|
|
||||||
Config NASConfig
|
|
||||||
connected bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewNASService(config NASConfig) *NASService {
|
|
||||||
nt := &NASService{
|
|
||||||
Config: config,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Establish network connection with credentials before accessing the path
|
|
||||||
if err := nt.EstablishConnection(); err != nil {
|
|
||||||
log.Fatalf("Failed to establish network connection to %s: %v", nt.Config.Path, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := nt.EnsureDirectoryExists(nt.Config.Path)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to create directory %s: %v", nt.Config.Path, err)
|
|
||||||
}
|
|
||||||
return nt
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASService) CopyFile(ctx context.Context, srcPath, destPath string) error {
|
|
||||||
src, err := os.Open(srcPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to open source file: %w", err)
|
|
||||||
}
|
|
||||||
defer src.Close()
|
|
||||||
|
|
||||||
dest, err := os.Create(destPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to create destination file: %w", err)
|
|
||||||
}
|
|
||||||
defer dest.Close()
|
|
||||||
|
|
||||||
done := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
_, err := io.Copy(dest, src)
|
|
||||||
done <- err
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case err := <-done:
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return dest.Sync()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASService) VerifyTransfer(srcPath, destPath string) error {
|
|
||||||
srcInfo, err := os.Stat(srcPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to stat source file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
destInfo, err := os.Stat(destPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to stat destination file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if srcInfo.Size() != destInfo.Size() {
|
|
||||||
return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASService) EnsureDirectoryExists(path string) error {
|
|
||||||
if err := os.MkdirAll(path, 0755); err != nil {
|
|
||||||
return fmt.Errorf("Failed to create directory: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASService) EstablishConnection() error {
|
|
||||||
// Extract the network path (\\server\share) from the full path
|
|
||||||
networkPath := nt.ExtractNetworkPath(nt.Config.Path)
|
|
||||||
if networkPath == "" {
|
|
||||||
// Local path, no authentication needed
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Establishing network connection to %s with user %s", networkPath, nt.Config.Username)
|
|
||||||
|
|
||||||
// Use Windows net use command to establish authenticated connection
|
|
||||||
var cmd *exec.Cmd
|
|
||||||
if nt.Config.Username != "" && nt.Config.Password != "" {
|
|
||||||
cmd = exec.Command("net", "use", networkPath, nt.Config.Password, "/user:"+nt.Config.Username, "/persistent:no")
|
|
||||||
} else {
|
|
||||||
cmd = exec.Command("net", "use", networkPath, "/persistent:no")
|
|
||||||
}
|
|
||||||
|
|
||||||
output, err := cmd.CombinedOutput()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to establish network connection: %w\nOutput: %s", err, string(output))
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Network connection established successfully")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASService) ExtractNetworkPath(fullPath string) string {
|
|
||||||
// Extract \\server\share from paths like \\server\share\folder\subfolder
|
|
||||||
if !strings.HasPrefix(fullPath, "\\\\") {
|
|
||||||
return "" // Not a UNC path
|
|
||||||
}
|
|
||||||
|
|
||||||
parts := strings.Split(fullPath[2:], "\\") // Remove leading \\
|
|
||||||
if len(parts) < 2 {
|
|
||||||
return "" // Invalid UNC path
|
|
||||||
}
|
|
||||||
|
|
||||||
return "\\\\" + parts[0] + "\\" + parts[1]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASService) TestConnection() error {
|
|
||||||
testFile := filepath.Join(nt.Config.Path, ".connection_test")
|
|
||||||
|
|
||||||
f, err := os.Create(testFile)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to create test file: %w", err)
|
|
||||||
}
|
|
||||||
f.Close()
|
|
||||||
|
|
||||||
os.Remove(testFile)
|
|
||||||
|
|
||||||
nt.connected = true
|
|
||||||
log.Printf("Connected to NAS at %s", nt.Config.Path)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASService) IsConnected() bool {
|
|
||||||
return nt.connected
|
|
||||||
}
|
|
||||||
|
|
||||||
// Disconnect removes the network connection
|
|
||||||
func (nt *NASService) Disconnect() error {
|
|
||||||
networkPath := nt.ExtractNetworkPath(nt.Config.Path)
|
|
||||||
if networkPath == "" {
|
|
||||||
return nil // Local path, nothing to disconnect
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd := exec.Command("net", "use", networkPath, "/delete")
|
|
||||||
output, err := cmd.CombinedOutput()
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Warning: failed to disconnect from %s: %v\nOutput: %s", networkPath, err, string(output))
|
|
||||||
// Don't return error since this is cleanup
|
|
||||||
} else {
|
|
||||||
log.Printf("Disconnected from network path: %s", networkPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
nt.connected = false
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FileExists checks if a file already exists on the NAS and optionally verifies size
|
|
||||||
func (nt *NASService) FileExists(destinationPath string, expectedSize int64) (bool, error) {
|
|
||||||
fullDestPath := filepath.Join(nt.Config.Path, destinationPath)
|
|
||||||
|
|
||||||
destInfo, err := os.Stat(fullDestPath)
|
|
||||||
if err != nil {
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return false, nil // File doesn't exist, no error
|
|
||||||
}
|
|
||||||
return false, fmt.Errorf("failed to stat NAS file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// File exists, check size if expected size is provided
|
|
||||||
if expectedSize > 0 && destInfo.Size() != expectedSize {
|
|
||||||
log.Printf("NAS file size mismatch for %s: expected=%d, actual=%d",
|
|
||||||
fullDestPath, expectedSize, destInfo.Size())
|
|
||||||
return false, nil // File exists but wrong size, treat as not existing
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetFileSize returns the size of a file on the NAS
|
|
||||||
func (nt *NASService) GetFileSize(destinationPath string) (int64, error) {
|
|
||||||
fullDestPath := filepath.Join(nt.Config.Path, destinationPath)
|
|
||||||
|
|
||||||
destInfo, err := os.Stat(fullDestPath)
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("failed to stat NAS file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return destInfo.Size(), nil
|
|
||||||
}
|
|
||||||
@ -1,7 +0,0 @@
|
|||||||
package processing
|
|
||||||
|
|
||||||
type SegmentInfo struct {
|
|
||||||
Name string
|
|
||||||
SeqNo int
|
|
||||||
Resolution string
|
|
||||||
}
|
|
||||||
@ -1,74 +1,21 @@
|
|||||||
package processing
|
package processing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"m3u8-downloader/pkg/constants"
|
|
||||||
"m3u8-downloader/pkg/nas"
|
"m3u8-downloader/pkg/nas"
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"regexp"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ProcessingService struct {
|
type ProcessingService struct {
|
||||||
config *ProcessConfig
|
config *ProcessConfig
|
||||||
nas *nas.NASService
|
nas *nas.NASConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProcessingService(eventName string) (*ProcessingService, error) {
|
func NewProcessingService(config *ProcessConfig, nas *nas.NASConfig) *ProcessingService {
|
||||||
config := &ProcessConfig{
|
|
||||||
WorkerCount: constants.ProcessWorkerCount,
|
|
||||||
SourcePath: constants.NASOutputPath + "/" + eventName,
|
|
||||||
DestinationPath: constants.ProcessOutputPath,
|
|
||||||
Enabled: constants.ProcessingEnabled,
|
|
||||||
EventName: eventName,
|
|
||||||
}
|
|
||||||
|
|
||||||
nasConfig := &nas.NASConfig{
|
|
||||||
Path: constants.NASOutputPath,
|
|
||||||
Username: constants.NASUsername,
|
|
||||||
Password: constants.NASPassword,
|
|
||||||
Timeout: constants.TransferTimeout,
|
|
||||||
RetryLimit: constants.TransferRetryLimit,
|
|
||||||
VerifySize: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
nasService := nas.NewNASService(*nasConfig)
|
|
||||||
|
|
||||||
if err := nasService.TestConnection(); err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &ProcessingService{
|
return &ProcessingService{
|
||||||
config: config,
|
config: config,
|
||||||
nas: nasService,
|
nas: nas,
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *ProcessingService) GetSegmentInfo() (map[int]string, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *ProcessingService) GetEventDirs() ([]string, error) {
|
|
||||||
if ps.config.EventName == "" {
|
|
||||||
dirs, err := os.ReadDir(ps.config.SourcePath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to read directory: %w", err)
|
|
||||||
}
|
|
||||||
var eventDirs []string
|
|
||||||
for _, dir := range dirs {
|
|
||||||
if dir.IsDir() {
|
|
||||||
eventDirs = append(eventDirs, dir.Name())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return eventDirs, nil
|
|
||||||
} else {
|
|
||||||
return []string{ps.config.EventName}, nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,161 +25,17 @@ func (ps *ProcessingService) Start(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if ps.config.EventName == "" {
|
|
||||||
events, err := ps.GetEventDirs()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to get event directories: %w", err)
|
|
||||||
}
|
|
||||||
if len(events) == 0 {
|
|
||||||
return fmt.Errorf("No events found")
|
|
||||||
}
|
|
||||||
if len(events) > 1 {
|
|
||||||
fmt.Println("Multiple events found, please select one:")
|
|
||||||
for i, event := range events {
|
|
||||||
fmt.Printf("%d. %s\n", i+1, event)
|
|
||||||
}
|
|
||||||
reader := bufio.NewReader(os.Stdin)
|
|
||||||
input, _ := reader.ReadString('\n')
|
|
||||||
input = strings.TrimSpace(input)
|
|
||||||
index, err := strconv.Atoi(input)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to parse input: %w", err)
|
|
||||||
}
|
|
||||||
if index < 1 || index > len(events) {
|
|
||||||
return fmt.Errorf("Invalid input")
|
|
||||||
}
|
|
||||||
ps.config.EventName = events[index-1]
|
|
||||||
} else {
|
|
||||||
ps.config.EventName = events[0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//Get all present resolutions
|
|
||||||
dirs, err := ps.GetResolutions()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to get resolutions: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
//Spawn a worker per resolution
|
|
||||||
ch := make(chan SegmentInfo, 100)
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for _, resolution := range dirs {
|
wg.Add(1)
|
||||||
wg.Add(1)
|
|
||||||
go ps.ParseResolutionDirectory(resolution, ch, &wg)
|
|
||||||
}
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Wait()
|
defer wg.Done()
|
||||||
close(ch)
|
|
||||||
}()
|
}()
|
||||||
|
wg.Wait()
|
||||||
segments, err := ps.AggregateSegmentInfo(ch)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to aggregate segment info: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ps.WriteConcatFile(segments)
|
|
||||||
|
|
||||||
//Feed info to ffmpeg to stitch files together
|
|
||||||
concatErr := ps.RunFFmpeg(ps.config.SourcePath, ps.config.DestinationPath)
|
|
||||||
if concatErr != nil {
|
|
||||||
return concatErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *ProcessingService) GetResolutions() ([]string, error) {
|
func (ps *ProcessingService) ProcessEvent(eventName string) error {
|
||||||
dirs, err := os.ReadDir(ps.config.SourcePath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to read source directory: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
re := regexp.MustCompile(`^\d+p$`)
|
|
||||||
|
|
||||||
var resolutions []string
|
|
||||||
for _, dir := range dirs {
|
|
||||||
if dir.IsDir() && re.MatchString(dir.Name()) {
|
|
||||||
resolutions = append(resolutions, dir.Name())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return resolutions, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *ProcessingService) ParseResolutionDirectory(resolution string, ch chan<- SegmentInfo, wg *sync.WaitGroup) {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
files, err := os.ReadDir(ps.config.SourcePath + "/" + resolution)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to read resolution directory: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, file := range files {
|
|
||||||
if !file.IsDir() {
|
|
||||||
no, err := strconv.Atoi(file.Name()[7:11])
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to parse segment number: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
ch <- SegmentInfo{
|
|
||||||
Name: file.Name(),
|
|
||||||
SeqNo: no,
|
|
||||||
Resolution: resolution,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *ProcessingService) AggregateSegmentInfo(ch <-chan SegmentInfo) (map[int]SegmentInfo, error) {
|
|
||||||
segmentMap := make(map[int]SegmentInfo)
|
|
||||||
|
|
||||||
rank := map[string]int{
|
|
||||||
"1080p": 1,
|
|
||||||
"720p": 2,
|
|
||||||
"540p": 3,
|
|
||||||
"480p": 4,
|
|
||||||
"450p": 5,
|
|
||||||
"360p": 6,
|
|
||||||
"270p": 7,
|
|
||||||
"240p": 8,
|
|
||||||
}
|
|
||||||
|
|
||||||
for segment := range ch {
|
|
||||||
current, exists := segmentMap[segment.SeqNo]
|
|
||||||
if !exists || rank[segment.Resolution] > rank[current.Resolution] {
|
|
||||||
segmentMap[segment.SeqNo] = segment
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return segmentMap, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) error {
|
|
||||||
f, err := os.Create(ps.config.DestinationPath + "/concat.txt")
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to create concat file: %w", err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
for _, segment := range segmentMap {
|
|
||||||
_, err = f.WriteString("file '" + ps.config.SourcePath + "/" + segment.Resolution + "/" + segment.Name + "'\n")
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to write to concat file: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *ProcessingService) RunFFmpeg(inputPath, outputPath string) error {
|
|
||||||
cmd := exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", outputPath)
|
|
||||||
cmd.Stdout = os.Stdout
|
|
||||||
cmd.Stderr = os.Stderr
|
|
||||||
err := cmd.Run()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to run ffmpeg: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,8 +6,6 @@ type ProcessJob struct {
|
|||||||
|
|
||||||
type ProcessConfig struct {
|
type ProcessConfig struct {
|
||||||
WorkerCount int
|
WorkerCount int
|
||||||
SourcePath string
|
|
||||||
DestinationPath string
|
DestinationPath string
|
||||||
Enabled bool
|
Enabled bool
|
||||||
EventName string
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -3,28 +3,53 @@ package transfer
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"m3u8-downloader/pkg/nas"
|
"m3u8-downloader/pkg/nas"
|
||||||
"os"
|
"os"
|
||||||
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TransferFile(nt *nas.NASService, ctx context.Context, item *TransferItem) error {
|
type NASTransfer struct {
|
||||||
destPath := filepath.Join(nt.Config.Path, item.DestinationPath)
|
config nas.NASConfig
|
||||||
|
connected bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNASTransfer(config nas.NASConfig) *NASTransfer {
|
||||||
|
nt := &NASTransfer{
|
||||||
|
config: config,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Establish network connection with credentials before accessing the path
|
||||||
|
if err := nt.establishConnection(); err != nil {
|
||||||
|
log.Fatalf("Failed to establish network connection to %s: %v", nt.config.Path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := nt.ensureDirectoryExists(nt.config.Path)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create directory %s: %v", nt.config.Path, err)
|
||||||
|
}
|
||||||
|
return nt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASTransfer) TransferFile(ctx context.Context, item *TransferItem) error {
|
||||||
|
destPath := filepath.Join(nt.config.Path, item.DestinationPath)
|
||||||
|
|
||||||
destDir := filepath.Dir(destPath)
|
destDir := filepath.Dir(destPath)
|
||||||
if err := nt.EnsureDirectoryExists(destDir); err != nil {
|
if err := nt.ensureDirectoryExists(destDir); err != nil {
|
||||||
return fmt.Errorf("Failed to create directory %s: %w", destDir, err)
|
return fmt.Errorf("Failed to create directory %s: %w", destDir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
transferCtx, cancel := context.WithTimeout(ctx, nt.Config.Timeout)
|
transferCtx, cancel := context.WithTimeout(ctx, nt.config.Timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
if err := nt.CopyFile(transferCtx, item.SourcePath, destPath); err != nil {
|
if err := nt.copyFile(transferCtx, item.SourcePath, destPath); err != nil {
|
||||||
return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err)
|
return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if nt.Config.VerifySize {
|
if nt.config.VerifySize {
|
||||||
if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil {
|
if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil {
|
||||||
os.Remove(destPath)
|
os.Remove(destPath)
|
||||||
return fmt.Errorf("Failed to verify transfer: %w", err)
|
return fmt.Errorf("Failed to verify transfer: %w", err)
|
||||||
@ -35,3 +60,174 @@ func TransferFile(nt *nas.NASService, ctx context.Context, item *TransferItem) e
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (nt *NASTransfer) copyFile(ctx context.Context, srcPath, destPath string) error {
|
||||||
|
src, err := os.Open(srcPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to open source file: %w", err)
|
||||||
|
}
|
||||||
|
defer src.Close()
|
||||||
|
|
||||||
|
dest, err := os.Create(destPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to create destination file: %w", err)
|
||||||
|
}
|
||||||
|
defer dest.Close()
|
||||||
|
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
_, err := io.Copy(dest, src)
|
||||||
|
done <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case err := <-done:
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return dest.Sync()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASTransfer) VerifyTransfer(srcPath, destPath string) error {
|
||||||
|
srcInfo, err := os.Stat(srcPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to stat source file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
destInfo, err := os.Stat(destPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to stat destination file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if srcInfo.Size() != destInfo.Size() {
|
||||||
|
return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASTransfer) ensureDirectoryExists(path string) error {
|
||||||
|
if err := os.MkdirAll(path, 0755); err != nil {
|
||||||
|
return fmt.Errorf("Failed to create directory: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASTransfer) establishConnection() error {
|
||||||
|
// Extract the network path (\\server\share) from the full path
|
||||||
|
networkPath := nt.extractNetworkPath(nt.config.Path)
|
||||||
|
if networkPath == "" {
|
||||||
|
// Local path, no authentication needed
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Establishing network connection to %s with user %s", networkPath, nt.config.Username)
|
||||||
|
|
||||||
|
// Use Windows net use command to establish authenticated connection
|
||||||
|
var cmd *exec.Cmd
|
||||||
|
if nt.config.Username != "" && nt.config.Password != "" {
|
||||||
|
cmd = exec.Command("net", "use", networkPath, nt.config.Password, "/user:"+nt.config.Username, "/persistent:no")
|
||||||
|
} else {
|
||||||
|
cmd = exec.Command("net", "use", networkPath, "/persistent:no")
|
||||||
|
}
|
||||||
|
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to establish network connection: %w\nOutput: %s", err, string(output))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Network connection established successfully")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASTransfer) extractNetworkPath(fullPath string) string {
|
||||||
|
// Extract \\server\share from paths like \\server\share\folder\subfolder
|
||||||
|
if !strings.HasPrefix(fullPath, "\\\\") {
|
||||||
|
return "" // Not a UNC path
|
||||||
|
}
|
||||||
|
|
||||||
|
parts := strings.Split(fullPath[2:], "\\") // Remove leading \\
|
||||||
|
if len(parts) < 2 {
|
||||||
|
return "" // Invalid UNC path
|
||||||
|
}
|
||||||
|
|
||||||
|
return "\\\\" + parts[0] + "\\" + parts[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASTransfer) TestConnection() error {
|
||||||
|
testFile := filepath.Join(nt.config.Path, ".connection_test")
|
||||||
|
|
||||||
|
f, err := os.Create(testFile)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to create test file: %w", err)
|
||||||
|
}
|
||||||
|
f.Close()
|
||||||
|
|
||||||
|
os.Remove(testFile)
|
||||||
|
|
||||||
|
nt.connected = true
|
||||||
|
log.Printf("Connected to NAS at %s", nt.config.Path)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASTransfer) IsConnected() bool {
|
||||||
|
return nt.connected
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disconnect removes the network connection
|
||||||
|
func (nt *NASTransfer) Disconnect() error {
|
||||||
|
networkPath := nt.extractNetworkPath(nt.config.Path)
|
||||||
|
if networkPath == "" {
|
||||||
|
return nil // Local path, nothing to disconnect
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := exec.Command("net", "use", networkPath, "/delete")
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Warning: failed to disconnect from %s: %v\nOutput: %s", networkPath, err, string(output))
|
||||||
|
// Don't return error since this is cleanup
|
||||||
|
} else {
|
||||||
|
log.Printf("Disconnected from network path: %s", networkPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
nt.connected = false
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FileExists checks if a file already exists on the NAS and optionally verifies size
|
||||||
|
func (nt *NASTransfer) FileExists(destinationPath string, expectedSize int64) (bool, error) {
|
||||||
|
fullDestPath := filepath.Join(nt.config.Path, destinationPath)
|
||||||
|
|
||||||
|
destInfo, err := os.Stat(fullDestPath)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return false, nil // File doesn't exist, no error
|
||||||
|
}
|
||||||
|
return false, fmt.Errorf("failed to stat NAS file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// File exists, check size if expected size is provided
|
||||||
|
if expectedSize > 0 && destInfo.Size() != expectedSize {
|
||||||
|
log.Printf("NAS file size mismatch for %s: expected=%d, actual=%d",
|
||||||
|
fullDestPath, expectedSize, destInfo.Size())
|
||||||
|
return false, nil // File exists but wrong size, treat as not existing
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetFileSize returns the size of a file on the NAS
|
||||||
|
func (nt *NASTransfer) GetFileSize(destinationPath string) (int64, error) {
|
||||||
|
fullDestPath := filepath.Join(nt.config.Path, destinationPath)
|
||||||
|
|
||||||
|
destInfo, err := os.Stat(fullDestPath)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to stat NAS file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return destInfo.Size(), nil
|
||||||
|
}
|
||||||
|
|||||||
@ -6,20 +6,19 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"m3u8-downloader/pkg/nas"
|
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TransferQueue struct {
|
type TransferQueue struct {
|
||||||
config QueueConfig
|
config QueueConfig
|
||||||
items *PriorityQueue
|
items *PriorityQueue
|
||||||
stats *QueueStats
|
stats *QueueStats
|
||||||
nasService *nas.NASService
|
nasTransfer *NASTransfer
|
||||||
cleanup *CleanupService
|
cleanup *CleanupService
|
||||||
workers []chan TransferItem
|
workers []chan TransferItem
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type PriorityQueue []*TransferItem
|
type PriorityQueue []*TransferItem
|
||||||
@ -49,17 +48,17 @@ func (pq *PriorityQueue) Pop() interface{} {
|
|||||||
return item
|
return item
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTransferQueue(config QueueConfig, nasTransfer *nas.NASService, cleanup *CleanupService) *TransferQueue {
|
func NewTransferQueue(config QueueConfig, nasTransfer *NASTransfer, cleanup *CleanupService) *TransferQueue {
|
||||||
pq := &PriorityQueue{}
|
pq := &PriorityQueue{}
|
||||||
heap.Init(pq)
|
heap.Init(pq)
|
||||||
|
|
||||||
tq := &TransferQueue{
|
tq := &TransferQueue{
|
||||||
config: config,
|
config: config,
|
||||||
items: pq,
|
items: pq,
|
||||||
stats: &QueueStats{},
|
stats: &QueueStats{},
|
||||||
nasService: nasTransfer,
|
nasTransfer: nasTransfer,
|
||||||
cleanup: cleanup,
|
cleanup: cleanup,
|
||||||
workers: make([]chan TransferItem, config.WorkerCount),
|
workers: make([]chan TransferItem, config.WorkerCount),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tq.LoadState(); err != nil {
|
if err := tq.LoadState(); err != nil {
|
||||||
@ -147,7 +146,7 @@ func (tq *TransferQueue) dispatchWork() {
|
|||||||
|
|
||||||
func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
|
func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
|
||||||
// Check if file already exists on NAS before attempting transfer
|
// Check if file already exists on NAS before attempting transfer
|
||||||
if exists, err := tq.nasService.FileExists(item.DestinationPath, item.FileSize); err != nil {
|
if exists, err := tq.nasTransfer.FileExists(item.DestinationPath, item.FileSize); err != nil {
|
||||||
log.Printf("Failed to check if file exists on NAS for %s: %v", item.SourcePath, err)
|
log.Printf("Failed to check if file exists on NAS for %s: %v", item.SourcePath, err)
|
||||||
// Continue with transfer attempt on error
|
// Continue with transfer attempt on error
|
||||||
} else if exists {
|
} else if exists {
|
||||||
@ -179,7 +178,7 @@ func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := TransferFile(tq.nasService, ctx, &item)
|
err := tq.nasTransfer.TransferFile(ctx, &item)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
item.Status = StatusCompleted
|
item.Status = StatusCompleted
|
||||||
tq.stats.IncrementCompleted(item.FileSize)
|
tq.stats.IncrementCompleted(item.FileSize)
|
||||||
|
|||||||
@ -16,7 +16,7 @@ import (
|
|||||||
type TransferService struct {
|
type TransferService struct {
|
||||||
watcher *FileWatcher
|
watcher *FileWatcher
|
||||||
queue *TransferQueue
|
queue *TransferQueue
|
||||||
nas *nas2.NASService
|
nas *NASTransfer
|
||||||
cleanup *CleanupService
|
cleanup *CleanupService
|
||||||
stats *QueueStats
|
stats *QueueStats
|
||||||
}
|
}
|
||||||
@ -30,7 +30,7 @@ func NewTrasferService(outputDir string, eventName string) (*TransferService, er
|
|||||||
RetryLimit: constants.TransferRetryLimit,
|
RetryLimit: constants.TransferRetryLimit,
|
||||||
VerifySize: true,
|
VerifySize: true,
|
||||||
}
|
}
|
||||||
nas := nas2.NewNASService(nasConfig)
|
nas := NewNASTransfer(nasConfig)
|
||||||
|
|
||||||
if err := nas.TestConnection(); err != nil {
|
if err := nas.TestConnection(); err != nil {
|
||||||
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
|
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user