Compare commits

...

4 Commits

Author SHA1 Message Date
townandgown
718d69c12a Adding CI/CD 2025-08-10 22:48:53 -05:00
townandgown
0a349cc406 Eliminating cyclical imports 2025-08-10 15:16:04 -05:00
a10688279f Non-tested processing service 2025-08-10 02:35:05 -05:00
4d73ce25c2 Linux compilation 2025-08-09 18:22:35 -05:00
14 changed files with 500 additions and 237 deletions

View File

@ -0,0 +1,37 @@
name: Build Go Binaries
on:
push:
branches:
- main
jobs:
build:
runs-on: docker
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: 1.23
- name: Cross-Compile
run: |
mkdir -p dist
for GOOS in linux windows darwin; do
for GOARCH in arm64 amd64; do
OUTPUT="dist/myapp_${GOOS}_${GOARCH}"
echo "Building ${OUTPUT}"
if [ "$GOOS" = "windows" ]; then
OUTPUT="${OUTPUT}.exe"
fi
GOOS=$GOOS GOARCH=$GOARCH go build -o $OUTPUT ./cmd/main
done
done
- name: Upload Artifact
uses: actions/upload-artifact@v4
with:
name: flo_download-binaries
path: dist

BIN
bin/flo_download Normal file

Binary file not shown.

BIN
bin/flo_download.exe Normal file

Binary file not shown.

View File

@ -27,7 +27,7 @@ func main() {
}
if *processOnly {
processor.Process()
processor.Process(*eventName)
return
}

View File

@ -1,15 +1,18 @@
package processor
import (
"context"
"log"
"m3u8-downloader/pkg/processing"
)
func Process() {
config := processing.ProcessConfig{
WorkerCount: 4,
DestinationPath: "/Users/andrey/Downloads",
Enabled: true,
func Process(eventName string) {
//log.Printf("Starting processing for event: %s", eventName)
ps, err := processing.NewProcessingService(eventName)
if err != nil {
log.Fatalf("Failed to create processing service: %v", err)
}
if err := ps.Start(context.Background()); err != nil {
log.Fatalf("Failed to run processing service: %v", err)
}
processing.NewProcessingService(&config, nil).Start(nil)
}

6
go.sum Normal file
View File

@ -0,0 +1,6 @@
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/grafov/m3u8 v0.12.1 h1:DuP1uA1kvRRmGNAZ0m+ObLv1dvrfNO0TPx0c/enNk0s=
github.com/grafov/m3u8 v0.12.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@ -10,7 +10,7 @@ const (
REFERRER = "https://www.flomarching.com"
LocalOutputDirPath = "../data/"
EnableNASTransfer = true
EnableNASTransfer = false
NASOutputPath = "\\\\HomeLabNAS\\dci\\streams"
NASUsername = "NASAdmin"
NASPassword = "s3tkY6tzA&KN6M"
@ -23,10 +23,11 @@ const (
BatchSize = 1000
ManifestPath = "../data"
CleanupAfterTransfer = true
CleanupAfterTransfer = false
CleanupBatchSize = 1000
RetainLocalHours = 0
ProcessOutputPath = "../out"
AutoProcess = true
ProcessingEnabled = true
ProcessWorkerCount = 2

205
pkg/nas/nas.go Normal file
View File

@ -0,0 +1,205 @@
package nas
import (
"context"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"strings"
)
type NASService struct {
Config NASConfig
connected bool
}
func NewNASService(config NASConfig) *NASService {
nt := &NASService{
Config: config,
}
// Establish network connection with credentials before accessing the path
if err := nt.EstablishConnection(); err != nil {
log.Fatalf("Failed to establish network connection to %s: %v", nt.Config.Path, err)
}
err := nt.EnsureDirectoryExists(nt.Config.Path)
if err != nil {
log.Fatalf("Failed to create directory %s: %v", nt.Config.Path, err)
}
return nt
}
func (nt *NASService) CopyFile(ctx context.Context, srcPath, destPath string) error {
src, err := os.Open(srcPath)
if err != nil {
return fmt.Errorf("Failed to open source file: %w", err)
}
defer src.Close()
dest, err := os.Create(destPath)
if err != nil {
return fmt.Errorf("Failed to create destination file: %w", err)
}
defer dest.Close()
done := make(chan error, 1)
go func() {
_, err := io.Copy(dest, src)
done <- err
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
if err != nil {
return err
}
return dest.Sync()
}
}
func (nt *NASService) VerifyTransfer(srcPath, destPath string) error {
srcInfo, err := os.Stat(srcPath)
if err != nil {
return fmt.Errorf("Failed to stat source file: %w", err)
}
destInfo, err := os.Stat(destPath)
if err != nil {
return fmt.Errorf("Failed to stat destination file: %w", err)
}
if srcInfo.Size() != destInfo.Size() {
return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size())
}
return nil
}
func (nt *NASService) EnsureDirectoryExists(path string) error {
if err := os.MkdirAll(path, 0755); err != nil {
return fmt.Errorf("Failed to create directory: %w", err)
}
return nil
}
func (nt *NASService) EstablishConnection() error {
// Extract the network path (\\server\share) from the full path
networkPath := nt.ExtractNetworkPath(nt.Config.Path)
if networkPath == "" {
// Local path, no authentication needed
return nil
}
log.Printf("Establishing network connection to %s with user %s", networkPath, nt.Config.Username)
// Use Windows net use command to establish authenticated connection
var cmd *exec.Cmd
if nt.Config.Username != "" && nt.Config.Password != "" {
cmd = exec.Command("net", "use", networkPath, nt.Config.Password, "/user:"+nt.Config.Username, "/persistent:no")
} else {
cmd = exec.Command("net", "use", networkPath, "/persistent:no")
}
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to establish network connection: %w\nOutput: %s", err, string(output))
}
log.Printf("Network connection established successfully")
return nil
}
func (nt *NASService) ExtractNetworkPath(fullPath string) string {
// Extract \\server\share from paths like \\server\share\folder\subfolder
if !strings.HasPrefix(fullPath, "\\\\") {
return "" // Not a UNC path
}
parts := strings.Split(fullPath[2:], "\\") // Remove leading \\
if len(parts) < 2 {
return "" // Invalid UNC path
}
return "\\\\" + parts[0] + "\\" + parts[1]
}
func (nt *NASService) TestConnection() error {
testFile := filepath.Join(nt.Config.Path, ".connection_test")
f, err := os.Create(testFile)
if err != nil {
return fmt.Errorf("Failed to create test file: %w", err)
}
f.Close()
os.Remove(testFile)
nt.connected = true
log.Printf("Connected to NAS at %s", nt.Config.Path)
return nil
}
func (nt *NASService) IsConnected() bool {
return nt.connected
}
// Disconnect removes the network connection
func (nt *NASService) Disconnect() error {
networkPath := nt.ExtractNetworkPath(nt.Config.Path)
if networkPath == "" {
return nil // Local path, nothing to disconnect
}
cmd := exec.Command("net", "use", networkPath, "/delete")
output, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Warning: failed to disconnect from %s: %v\nOutput: %s", networkPath, err, string(output))
// Don't return error since this is cleanup
} else {
log.Printf("Disconnected from network path: %s", networkPath)
}
nt.connected = false
return nil
}
// FileExists checks if a file already exists on the NAS and optionally verifies size
func (nt *NASService) FileExists(destinationPath string, expectedSize int64) (bool, error) {
fullDestPath := filepath.Join(nt.Config.Path, destinationPath)
destInfo, err := os.Stat(fullDestPath)
if err != nil {
if os.IsNotExist(err) {
return false, nil // File doesn't exist, no error
}
return false, fmt.Errorf("failed to stat NAS file: %w", err)
}
// File exists, check size if expected size is provided
if expectedSize > 0 && destInfo.Size() != expectedSize {
log.Printf("NAS file size mismatch for %s: expected=%d, actual=%d",
fullDestPath, expectedSize, destInfo.Size())
return false, nil // File exists but wrong size, treat as not existing
}
return true, nil
}
// GetFileSize returns the size of a file on the NAS
func (nt *NASService) GetFileSize(destinationPath string) (int64, error) {
fullDestPath := filepath.Join(nt.Config.Path, destinationPath)
destInfo, err := os.Stat(fullDestPath)
if err != nil {
return 0, fmt.Errorf("failed to stat NAS file: %w", err)
}
return destInfo.Size(), nil
}

View File

@ -0,0 +1,7 @@
package processing
type SegmentInfo struct {
Name string
SeqNo int
Resolution string
}

View File

@ -1,21 +1,74 @@
package processing
import (
"bufio"
"context"
"fmt"
"log"
"m3u8-downloader/pkg/constants"
"m3u8-downloader/pkg/nas"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
)
type ProcessingService struct {
config *ProcessConfig
nas *nas.NASConfig
nas *nas.NASService
}
func NewProcessingService(config *ProcessConfig, nas *nas.NASConfig) *ProcessingService {
func NewProcessingService(eventName string) (*ProcessingService, error) {
config := &ProcessConfig{
WorkerCount: constants.ProcessWorkerCount,
SourcePath: constants.NASOutputPath + "/" + eventName,
DestinationPath: constants.ProcessOutputPath,
Enabled: constants.ProcessingEnabled,
EventName: eventName,
}
nasConfig := &nas.NASConfig{
Path: constants.NASOutputPath,
Username: constants.NASUsername,
Password: constants.NASPassword,
Timeout: constants.TransferTimeout,
RetryLimit: constants.TransferRetryLimit,
VerifySize: true,
}
nasService := nas.NewNASService(*nasConfig)
if err := nasService.TestConnection(); err != nil {
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
}
return &ProcessingService{
config: config,
nas: nas,
nas: nasService,
}, nil
}
func (ps *ProcessingService) GetSegmentInfo() (map[int]string, error) {
return nil, nil
}
func (ps *ProcessingService) GetEventDirs() ([]string, error) {
if ps.config.EventName == "" {
dirs, err := os.ReadDir(ps.config.SourcePath)
if err != nil {
return nil, fmt.Errorf("Failed to read directory: %w", err)
}
var eventDirs []string
for _, dir := range dirs {
if dir.IsDir() {
eventDirs = append(eventDirs, dir.Name())
}
}
return eventDirs, nil
} else {
return []string{ps.config.EventName}, nil
}
}
@ -25,17 +78,161 @@ func (ps *ProcessingService) Start(ctx context.Context) error {
return nil
}
if ps.config.EventName == "" {
events, err := ps.GetEventDirs()
if err != nil {
return fmt.Errorf("Failed to get event directories: %w", err)
}
if len(events) == 0 {
return fmt.Errorf("No events found")
}
if len(events) > 1 {
fmt.Println("Multiple events found, please select one:")
for i, event := range events {
fmt.Printf("%d. %s\n", i+1, event)
}
reader := bufio.NewReader(os.Stdin)
input, _ := reader.ReadString('\n')
input = strings.TrimSpace(input)
index, err := strconv.Atoi(input)
if err != nil {
return fmt.Errorf("Failed to parse input: %w", err)
}
if index < 1 || index > len(events) {
return fmt.Errorf("Invalid input")
}
ps.config.EventName = events[index-1]
} else {
ps.config.EventName = events[0]
}
}
//Get all present resolutions
dirs, err := ps.GetResolutions()
if err != nil {
return fmt.Errorf("Failed to get resolutions: %w", err)
}
//Spawn a worker per resolution
ch := make(chan SegmentInfo, 100)
var wg sync.WaitGroup
wg.Add(1)
for _, resolution := range dirs {
wg.Add(1)
go ps.ParseResolutionDirectory(resolution, ch, &wg)
}
go func() {
defer wg.Done()
wg.Wait()
close(ch)
}()
wg.Wait()
segments, err := ps.AggregateSegmentInfo(ch)
if err != nil {
return fmt.Errorf("Failed to aggregate segment info: %w", err)
}
ps.WriteConcatFile(segments)
//Feed info to ffmpeg to stitch files together
concatErr := ps.RunFFmpeg(ps.config.SourcePath, ps.config.DestinationPath)
if concatErr != nil {
return concatErr
}
return nil
}
func (ps *ProcessingService) ProcessEvent(eventName string) error {
func (ps *ProcessingService) GetResolutions() ([]string, error) {
dirs, err := os.ReadDir(ps.config.SourcePath)
if err != nil {
return nil, fmt.Errorf("Failed to read source directory: %w", err)
}
re := regexp.MustCompile(`^\d+p$`)
var resolutions []string
for _, dir := range dirs {
if dir.IsDir() && re.MatchString(dir.Name()) {
resolutions = append(resolutions, dir.Name())
}
}
return resolutions, nil
}
func (ps *ProcessingService) ParseResolutionDirectory(resolution string, ch chan<- SegmentInfo, wg *sync.WaitGroup) {
defer wg.Done()
files, err := os.ReadDir(ps.config.SourcePath + "/" + resolution)
if err != nil {
log.Printf("Failed to read resolution directory: %v", err)
return
}
for _, file := range files {
if !file.IsDir() {
no, err := strconv.Atoi(file.Name()[7:11])
if err != nil {
log.Printf("Failed to parse segment number: %v", err)
continue
}
ch <- SegmentInfo{
Name: file.Name(),
SeqNo: no,
Resolution: resolution,
}
}
}
}
func (ps *ProcessingService) AggregateSegmentInfo(ch <-chan SegmentInfo) (map[int]SegmentInfo, error) {
segmentMap := make(map[int]SegmentInfo)
rank := map[string]int{
"1080p": 1,
"720p": 2,
"540p": 3,
"480p": 4,
"450p": 5,
"360p": 6,
"270p": 7,
"240p": 8,
}
for segment := range ch {
current, exists := segmentMap[segment.SeqNo]
if !exists || rank[segment.Resolution] > rank[current.Resolution] {
segmentMap[segment.SeqNo] = segment
}
}
return segmentMap, nil
}
func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) error {
f, err := os.Create(ps.config.DestinationPath + "/concat.txt")
if err != nil {
return fmt.Errorf("Failed to create concat file: %w", err)
}
defer f.Close()
for _, segment := range segmentMap {
_, err = f.WriteString("file '" + ps.config.SourcePath + "/" + segment.Resolution + "/" + segment.Name + "'\n")
if err != nil {
return fmt.Errorf("Failed to write to concat file: %w", err)
}
}
return nil
}
func (ps *ProcessingService) RunFFmpeg(inputPath, outputPath string) error {
cmd := exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", outputPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
return fmt.Errorf("Failed to run ffmpeg: %w", err)
}
return nil
}

View File

@ -6,6 +6,8 @@ type ProcessJob struct {
type ProcessConfig struct {
WorkerCount int
SourcePath string
DestinationPath string
Enabled bool
EventName string
}

View File

@ -3,53 +3,28 @@ package transfer
import (
"context"
"fmt"
"io"
"log"
"m3u8-downloader/pkg/nas"
"os"
"os/exec"
"path/filepath"
"strings"
)
type NASTransfer struct {
config nas.NASConfig
connected bool
}
func NewNASTransfer(config nas.NASConfig) *NASTransfer {
nt := &NASTransfer{
config: config,
}
// Establish network connection with credentials before accessing the path
if err := nt.establishConnection(); err != nil {
log.Fatalf("Failed to establish network connection to %s: %v", nt.config.Path, err)
}
err := nt.ensureDirectoryExists(nt.config.Path)
if err != nil {
log.Fatalf("Failed to create directory %s: %v", nt.config.Path, err)
}
return nt
}
func (nt *NASTransfer) TransferFile(ctx context.Context, item *TransferItem) error {
destPath := filepath.Join(nt.config.Path, item.DestinationPath)
func TransferFile(nt *nas.NASService, ctx context.Context, item *TransferItem) error {
destPath := filepath.Join(nt.Config.Path, item.DestinationPath)
destDir := filepath.Dir(destPath)
if err := nt.ensureDirectoryExists(destDir); err != nil {
if err := nt.EnsureDirectoryExists(destDir); err != nil {
return fmt.Errorf("Failed to create directory %s: %w", destDir, err)
}
transferCtx, cancel := context.WithTimeout(ctx, nt.config.Timeout)
transferCtx, cancel := context.WithTimeout(ctx, nt.Config.Timeout)
defer cancel()
if err := nt.copyFile(transferCtx, item.SourcePath, destPath); err != nil {
if err := nt.CopyFile(transferCtx, item.SourcePath, destPath); err != nil {
return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err)
}
if nt.config.VerifySize {
if nt.Config.VerifySize {
if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil {
os.Remove(destPath)
return fmt.Errorf("Failed to verify transfer: %w", err)
@ -60,174 +35,3 @@ func (nt *NASTransfer) TransferFile(ctx context.Context, item *TransferItem) err
return nil
}
func (nt *NASTransfer) copyFile(ctx context.Context, srcPath, destPath string) error {
src, err := os.Open(srcPath)
if err != nil {
return fmt.Errorf("Failed to open source file: %w", err)
}
defer src.Close()
dest, err := os.Create(destPath)
if err != nil {
return fmt.Errorf("Failed to create destination file: %w", err)
}
defer dest.Close()
done := make(chan error, 1)
go func() {
_, err := io.Copy(dest, src)
done <- err
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
if err != nil {
return err
}
return dest.Sync()
}
}
func (nt *NASTransfer) VerifyTransfer(srcPath, destPath string) error {
srcInfo, err := os.Stat(srcPath)
if err != nil {
return fmt.Errorf("Failed to stat source file: %w", err)
}
destInfo, err := os.Stat(destPath)
if err != nil {
return fmt.Errorf("Failed to stat destination file: %w", err)
}
if srcInfo.Size() != destInfo.Size() {
return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size())
}
return nil
}
func (nt *NASTransfer) ensureDirectoryExists(path string) error {
if err := os.MkdirAll(path, 0755); err != nil {
return fmt.Errorf("Failed to create directory: %w", err)
}
return nil
}
func (nt *NASTransfer) establishConnection() error {
// Extract the network path (\\server\share) from the full path
networkPath := nt.extractNetworkPath(nt.config.Path)
if networkPath == "" {
// Local path, no authentication needed
return nil
}
log.Printf("Establishing network connection to %s with user %s", networkPath, nt.config.Username)
// Use Windows net use command to establish authenticated connection
var cmd *exec.Cmd
if nt.config.Username != "" && nt.config.Password != "" {
cmd = exec.Command("net", "use", networkPath, nt.config.Password, "/user:"+nt.config.Username, "/persistent:no")
} else {
cmd = exec.Command("net", "use", networkPath, "/persistent:no")
}
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to establish network connection: %w\nOutput: %s", err, string(output))
}
log.Printf("Network connection established successfully")
return nil
}
func (nt *NASTransfer) extractNetworkPath(fullPath string) string {
// Extract \\server\share from paths like \\server\share\folder\subfolder
if !strings.HasPrefix(fullPath, "\\\\") {
return "" // Not a UNC path
}
parts := strings.Split(fullPath[2:], "\\") // Remove leading \\
if len(parts) < 2 {
return "" // Invalid UNC path
}
return "\\\\" + parts[0] + "\\" + parts[1]
}
func (nt *NASTransfer) TestConnection() error {
testFile := filepath.Join(nt.config.Path, ".connection_test")
f, err := os.Create(testFile)
if err != nil {
return fmt.Errorf("Failed to create test file: %w", err)
}
f.Close()
os.Remove(testFile)
nt.connected = true
log.Printf("Connected to NAS at %s", nt.config.Path)
return nil
}
func (nt *NASTransfer) IsConnected() bool {
return nt.connected
}
// Disconnect removes the network connection
func (nt *NASTransfer) Disconnect() error {
networkPath := nt.extractNetworkPath(nt.config.Path)
if networkPath == "" {
return nil // Local path, nothing to disconnect
}
cmd := exec.Command("net", "use", networkPath, "/delete")
output, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Warning: failed to disconnect from %s: %v\nOutput: %s", networkPath, err, string(output))
// Don't return error since this is cleanup
} else {
log.Printf("Disconnected from network path: %s", networkPath)
}
nt.connected = false
return nil
}
// FileExists checks if a file already exists on the NAS and optionally verifies size
func (nt *NASTransfer) FileExists(destinationPath string, expectedSize int64) (bool, error) {
fullDestPath := filepath.Join(nt.config.Path, destinationPath)
destInfo, err := os.Stat(fullDestPath)
if err != nil {
if os.IsNotExist(err) {
return false, nil // File doesn't exist, no error
}
return false, fmt.Errorf("failed to stat NAS file: %w", err)
}
// File exists, check size if expected size is provided
if expectedSize > 0 && destInfo.Size() != expectedSize {
log.Printf("NAS file size mismatch for %s: expected=%d, actual=%d",
fullDestPath, expectedSize, destInfo.Size())
return false, nil // File exists but wrong size, treat as not existing
}
return true, nil
}
// GetFileSize returns the size of a file on the NAS
func (nt *NASTransfer) GetFileSize(destinationPath string) (int64, error) {
fullDestPath := filepath.Join(nt.config.Path, destinationPath)
destInfo, err := os.Stat(fullDestPath)
if err != nil {
return 0, fmt.Errorf("failed to stat NAS file: %w", err)
}
return destInfo.Size(), nil
}

View File

@ -6,19 +6,20 @@ import (
"encoding/json"
"fmt"
"log"
"m3u8-downloader/pkg/nas"
"os"
"sync"
"time"
)
type TransferQueue struct {
config QueueConfig
items *PriorityQueue
stats *QueueStats
nasTransfer *NASTransfer
cleanup *CleanupService
workers []chan TransferItem
mu sync.RWMutex
config QueueConfig
items *PriorityQueue
stats *QueueStats
nasService *nas.NASService
cleanup *CleanupService
workers []chan TransferItem
mu sync.RWMutex
}
type PriorityQueue []*TransferItem
@ -48,17 +49,17 @@ func (pq *PriorityQueue) Pop() interface{} {
return item
}
func NewTransferQueue(config QueueConfig, nasTransfer *NASTransfer, cleanup *CleanupService) *TransferQueue {
func NewTransferQueue(config QueueConfig, nasTransfer *nas.NASService, cleanup *CleanupService) *TransferQueue {
pq := &PriorityQueue{}
heap.Init(pq)
tq := &TransferQueue{
config: config,
items: pq,
stats: &QueueStats{},
nasTransfer: nasTransfer,
cleanup: cleanup,
workers: make([]chan TransferItem, config.WorkerCount),
config: config,
items: pq,
stats: &QueueStats{},
nasService: nasTransfer,
cleanup: cleanup,
workers: make([]chan TransferItem, config.WorkerCount),
}
if err := tq.LoadState(); err != nil {
@ -146,7 +147,7 @@ func (tq *TransferQueue) dispatchWork() {
func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
// Check if file already exists on NAS before attempting transfer
if exists, err := tq.nasTransfer.FileExists(item.DestinationPath, item.FileSize); err != nil {
if exists, err := tq.nasService.FileExists(item.DestinationPath, item.FileSize); err != nil {
log.Printf("Failed to check if file exists on NAS for %s: %v", item.SourcePath, err)
// Continue with transfer attempt on error
} else if exists {
@ -178,7 +179,7 @@ func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
}
}
err := tq.nasTransfer.TransferFile(ctx, &item)
err := TransferFile(tq.nasService, ctx, &item)
if err == nil {
item.Status = StatusCompleted
tq.stats.IncrementCompleted(item.FileSize)

View File

@ -16,7 +16,7 @@ import (
type TransferService struct {
watcher *FileWatcher
queue *TransferQueue
nas *NASTransfer
nas *nas2.NASService
cleanup *CleanupService
stats *QueueStats
}
@ -30,7 +30,7 @@ func NewTrasferService(outputDir string, eventName string) (*TransferService, er
RetryLimit: constants.TransferRetryLimit,
VerifySize: true,
}
nas := NewNASTransfer(nasConfig)
nas := nas2.NewNASService(nasConfig)
if err := nas.TestConnection(); err != nil {
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)