Eliminating cyclical imports
This commit is contained in:
parent
a10688279f
commit
0a349cc406
@ -7,7 +7,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func Process(eventName string) {
|
func Process(eventName string) {
|
||||||
log.Printf("Starting processing for event: %s", eventName)
|
//log.Printf("Starting processing for event: %s", eventName)
|
||||||
ps, err := processing.NewProcessingService(eventName)
|
ps, err := processing.NewProcessingService(eventName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to create processing service: %v", err)
|
log.Fatalf("Failed to create processing service: %v", err)
|
||||||
|
|||||||
@ -5,7 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"m3u8-downloader/pkg/transfer"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -13,55 +12,28 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type NASService struct {
|
type NASService struct {
|
||||||
config NASConfig
|
Config NASConfig
|
||||||
connected bool
|
connected bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNASService(config NASConfig) *NASService {
|
func NewNASService(config NASConfig) *NASService {
|
||||||
nt := &NASService{
|
nt := &NASService{
|
||||||
config: config,
|
Config: config,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Establish network connection with credentials before accessing the path
|
// Establish network connection with credentials before accessing the path
|
||||||
if err := nt.establishConnection(); err != nil {
|
if err := nt.EstablishConnection(); err != nil {
|
||||||
log.Fatalf("Failed to establish network connection to %s: %v", nt.config.Path, err)
|
log.Fatalf("Failed to establish network connection to %s: %v", nt.Config.Path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := nt.ensureDirectoryExists(nt.config.Path)
|
err := nt.EnsureDirectoryExists(nt.Config.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to create directory %s: %v", nt.config.Path, err)
|
log.Fatalf("Failed to create directory %s: %v", nt.Config.Path, err)
|
||||||
}
|
}
|
||||||
return nt
|
return nt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nt *NASService) TransferFile(ctx context.Context, item *transfer.TransferItem) error {
|
func (nt *NASService) CopyFile(ctx context.Context, srcPath, destPath string) error {
|
||||||
destPath := filepath.Join(nt.config.Path, item.DestinationPath)
|
|
||||||
|
|
||||||
destDir := filepath.Dir(destPath)
|
|
||||||
if err := nt.ensureDirectoryExists(destDir); err != nil {
|
|
||||||
return fmt.Errorf("Failed to create directory %s: %w", destDir, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
transferCtx, cancel := context.WithTimeout(ctx, nt.config.Timeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
if err := nt.copyFile(transferCtx, item.SourcePath, destPath); err != nil {
|
|
||||||
return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if nt.config.VerifySize {
|
|
||||||
if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil {
|
|
||||||
os.Remove(destPath)
|
|
||||||
return fmt.Errorf("Failed to verify transfer: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("File transfer completed: %s -> %s", item.SourcePath, destPath)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASService) copyFile(ctx context.Context, srcPath, destPath string) error {
|
|
||||||
src, err := os.Open(srcPath)
|
src, err := os.Open(srcPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to open source file: %w", err)
|
return fmt.Errorf("Failed to open source file: %w", err)
|
||||||
@ -110,27 +82,27 @@ func (nt *NASService) VerifyTransfer(srcPath, destPath string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nt *NASService) ensureDirectoryExists(path string) error {
|
func (nt *NASService) EnsureDirectoryExists(path string) error {
|
||||||
if err := os.MkdirAll(path, 0755); err != nil {
|
if err := os.MkdirAll(path, 0755); err != nil {
|
||||||
return fmt.Errorf("Failed to create directory: %w", err)
|
return fmt.Errorf("Failed to create directory: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nt *NASService) establishConnection() error {
|
func (nt *NASService) EstablishConnection() error {
|
||||||
// Extract the network path (\\server\share) from the full path
|
// Extract the network path (\\server\share) from the full path
|
||||||
networkPath := nt.extractNetworkPath(nt.config.Path)
|
networkPath := nt.ExtractNetworkPath(nt.Config.Path)
|
||||||
if networkPath == "" {
|
if networkPath == "" {
|
||||||
// Local path, no authentication needed
|
// Local path, no authentication needed
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Establishing network connection to %s with user %s", networkPath, nt.config.Username)
|
log.Printf("Establishing network connection to %s with user %s", networkPath, nt.Config.Username)
|
||||||
|
|
||||||
// Use Windows net use command to establish authenticated connection
|
// Use Windows net use command to establish authenticated connection
|
||||||
var cmd *exec.Cmd
|
var cmd *exec.Cmd
|
||||||
if nt.config.Username != "" && nt.config.Password != "" {
|
if nt.Config.Username != "" && nt.Config.Password != "" {
|
||||||
cmd = exec.Command("net", "use", networkPath, nt.config.Password, "/user:"+nt.config.Username, "/persistent:no")
|
cmd = exec.Command("net", "use", networkPath, nt.Config.Password, "/user:"+nt.Config.Username, "/persistent:no")
|
||||||
} else {
|
} else {
|
||||||
cmd = exec.Command("net", "use", networkPath, "/persistent:no")
|
cmd = exec.Command("net", "use", networkPath, "/persistent:no")
|
||||||
}
|
}
|
||||||
@ -144,7 +116,7 @@ func (nt *NASService) establishConnection() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nt *NASService) extractNetworkPath(fullPath string) string {
|
func (nt *NASService) ExtractNetworkPath(fullPath string) string {
|
||||||
// Extract \\server\share from paths like \\server\share\folder\subfolder
|
// Extract \\server\share from paths like \\server\share\folder\subfolder
|
||||||
if !strings.HasPrefix(fullPath, "\\\\") {
|
if !strings.HasPrefix(fullPath, "\\\\") {
|
||||||
return "" // Not a UNC path
|
return "" // Not a UNC path
|
||||||
@ -159,7 +131,7 @@ func (nt *NASService) extractNetworkPath(fullPath string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (nt *NASService) TestConnection() error {
|
func (nt *NASService) TestConnection() error {
|
||||||
testFile := filepath.Join(nt.config.Path, ".connection_test")
|
testFile := filepath.Join(nt.Config.Path, ".connection_test")
|
||||||
|
|
||||||
f, err := os.Create(testFile)
|
f, err := os.Create(testFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -170,7 +142,7 @@ func (nt *NASService) TestConnection() error {
|
|||||||
os.Remove(testFile)
|
os.Remove(testFile)
|
||||||
|
|
||||||
nt.connected = true
|
nt.connected = true
|
||||||
log.Printf("Connected to NAS at %s", nt.config.Path)
|
log.Printf("Connected to NAS at %s", nt.Config.Path)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -180,7 +152,7 @@ func (nt *NASService) IsConnected() bool {
|
|||||||
|
|
||||||
// Disconnect removes the network connection
|
// Disconnect removes the network connection
|
||||||
func (nt *NASService) Disconnect() error {
|
func (nt *NASService) Disconnect() error {
|
||||||
networkPath := nt.extractNetworkPath(nt.config.Path)
|
networkPath := nt.ExtractNetworkPath(nt.Config.Path)
|
||||||
if networkPath == "" {
|
if networkPath == "" {
|
||||||
return nil // Local path, nothing to disconnect
|
return nil // Local path, nothing to disconnect
|
||||||
}
|
}
|
||||||
@ -200,7 +172,7 @@ func (nt *NASService) Disconnect() error {
|
|||||||
|
|
||||||
// FileExists checks if a file already exists on the NAS and optionally verifies size
|
// FileExists checks if a file already exists on the NAS and optionally verifies size
|
||||||
func (nt *NASService) FileExists(destinationPath string, expectedSize int64) (bool, error) {
|
func (nt *NASService) FileExists(destinationPath string, expectedSize int64) (bool, error) {
|
||||||
fullDestPath := filepath.Join(nt.config.Path, destinationPath)
|
fullDestPath := filepath.Join(nt.Config.Path, destinationPath)
|
||||||
|
|
||||||
destInfo, err := os.Stat(fullDestPath)
|
destInfo, err := os.Stat(fullDestPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -222,7 +194,7 @@ func (nt *NASService) FileExists(destinationPath string, expectedSize int64) (bo
|
|||||||
|
|
||||||
// GetFileSize returns the size of a file on the NAS
|
// GetFileSize returns the size of a file on the NAS
|
||||||
func (nt *NASService) GetFileSize(destinationPath string) (int64, error) {
|
func (nt *NASService) GetFileSize(destinationPath string) (int64, error) {
|
||||||
fullDestPath := filepath.Join(nt.config.Path, destinationPath)
|
fullDestPath := filepath.Join(nt.Config.Path, destinationPath)
|
||||||
|
|
||||||
destInfo, err := os.Stat(fullDestPath)
|
destInfo, err := os.Stat(fullDestPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package processing
|
package processing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
@ -10,6 +11,7 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -24,6 +26,7 @@ func NewProcessingService(eventName string) (*ProcessingService, error) {
|
|||||||
SourcePath: constants.NASOutputPath + "/" + eventName,
|
SourcePath: constants.NASOutputPath + "/" + eventName,
|
||||||
DestinationPath: constants.ProcessOutputPath,
|
DestinationPath: constants.ProcessOutputPath,
|
||||||
Enabled: constants.ProcessingEnabled,
|
Enabled: constants.ProcessingEnabled,
|
||||||
|
EventName: eventName,
|
||||||
}
|
}
|
||||||
|
|
||||||
nasConfig := &nas.NASConfig{
|
nasConfig := &nas.NASConfig{
|
||||||
@ -51,12 +54,59 @@ func (ps *ProcessingService) GetSegmentInfo() (map[int]string, error) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ps *ProcessingService) GetEventDirs() ([]string, error) {
|
||||||
|
if ps.config.EventName == "" {
|
||||||
|
dirs, err := os.ReadDir(ps.config.SourcePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to read directory: %w", err)
|
||||||
|
}
|
||||||
|
var eventDirs []string
|
||||||
|
for _, dir := range dirs {
|
||||||
|
if dir.IsDir() {
|
||||||
|
eventDirs = append(eventDirs, dir.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return eventDirs, nil
|
||||||
|
} else {
|
||||||
|
return []string{ps.config.EventName}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (ps *ProcessingService) Start(ctx context.Context) error {
|
func (ps *ProcessingService) Start(ctx context.Context) error {
|
||||||
if !ps.config.Enabled {
|
if !ps.config.Enabled {
|
||||||
log.Println("Processing service disabled")
|
log.Println("Processing service disabled")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ps.config.EventName == "" {
|
||||||
|
events, err := ps.GetEventDirs()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to get event directories: %w", err)
|
||||||
|
}
|
||||||
|
if len(events) == 0 {
|
||||||
|
return fmt.Errorf("No events found")
|
||||||
|
}
|
||||||
|
if len(events) > 1 {
|
||||||
|
fmt.Println("Multiple events found, please select one:")
|
||||||
|
for i, event := range events {
|
||||||
|
fmt.Printf("%d. %s\n", i+1, event)
|
||||||
|
}
|
||||||
|
reader := bufio.NewReader(os.Stdin)
|
||||||
|
input, _ := reader.ReadString('\n')
|
||||||
|
input = strings.TrimSpace(input)
|
||||||
|
index, err := strconv.Atoi(input)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to parse input: %w", err)
|
||||||
|
}
|
||||||
|
if index < 1 || index > len(events) {
|
||||||
|
return fmt.Errorf("Invalid input")
|
||||||
|
}
|
||||||
|
ps.config.EventName = events[index-1]
|
||||||
|
} else {
|
||||||
|
ps.config.EventName = events[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//Get all present resolutions
|
//Get all present resolutions
|
||||||
dirs, err := ps.GetResolutions()
|
dirs, err := ps.GetResolutions()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -9,4 +9,5 @@ type ProcessConfig struct {
|
|||||||
SourcePath string
|
SourcePath string
|
||||||
DestinationPath string
|
DestinationPath string
|
||||||
Enabled bool
|
Enabled bool
|
||||||
|
EventName string
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1 +1,37 @@
|
|||||||
package transfer
|
package transfer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"m3u8-downloader/pkg/nas"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TransferFile(nt *nas.NASService, ctx context.Context, item *TransferItem) error {
|
||||||
|
destPath := filepath.Join(nt.Config.Path, item.DestinationPath)
|
||||||
|
|
||||||
|
destDir := filepath.Dir(destPath)
|
||||||
|
if err := nt.EnsureDirectoryExists(destDir); err != nil {
|
||||||
|
return fmt.Errorf("Failed to create directory %s: %w", destDir, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
transferCtx, cancel := context.WithTimeout(ctx, nt.Config.Timeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := nt.CopyFile(transferCtx, item.SourcePath, destPath); err != nil {
|
||||||
|
return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if nt.Config.VerifySize {
|
||||||
|
if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil {
|
||||||
|
os.Remove(destPath)
|
||||||
|
return fmt.Errorf("Failed to verify transfer: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("File transfer completed: %s -> %s", item.SourcePath, destPath)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@ -13,13 +13,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type TransferQueue struct {
|
type TransferQueue struct {
|
||||||
config QueueConfig
|
config QueueConfig
|
||||||
items *PriorityQueue
|
items *PriorityQueue
|
||||||
stats *QueueStats
|
stats *QueueStats
|
||||||
nasTransfer *nas.NASService
|
nasService *nas.NASService
|
||||||
cleanup *CleanupService
|
cleanup *CleanupService
|
||||||
workers []chan TransferItem
|
workers []chan TransferItem
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type PriorityQueue []*TransferItem
|
type PriorityQueue []*TransferItem
|
||||||
@ -54,12 +54,12 @@ func NewTransferQueue(config QueueConfig, nasTransfer *nas.NASService, cleanup *
|
|||||||
heap.Init(pq)
|
heap.Init(pq)
|
||||||
|
|
||||||
tq := &TransferQueue{
|
tq := &TransferQueue{
|
||||||
config: config,
|
config: config,
|
||||||
items: pq,
|
items: pq,
|
||||||
stats: &QueueStats{},
|
stats: &QueueStats{},
|
||||||
nasTransfer: nasTransfer,
|
nasService: nasTransfer,
|
||||||
cleanup: cleanup,
|
cleanup: cleanup,
|
||||||
workers: make([]chan TransferItem, config.WorkerCount),
|
workers: make([]chan TransferItem, config.WorkerCount),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tq.LoadState(); err != nil {
|
if err := tq.LoadState(); err != nil {
|
||||||
@ -147,7 +147,7 @@ func (tq *TransferQueue) dispatchWork() {
|
|||||||
|
|
||||||
func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
|
func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
|
||||||
// Check if file already exists on NAS before attempting transfer
|
// Check if file already exists on NAS before attempting transfer
|
||||||
if exists, err := tq.nasTransfer.FileExists(item.DestinationPath, item.FileSize); err != nil {
|
if exists, err := tq.nasService.FileExists(item.DestinationPath, item.FileSize); err != nil {
|
||||||
log.Printf("Failed to check if file exists on NAS for %s: %v", item.SourcePath, err)
|
log.Printf("Failed to check if file exists on NAS for %s: %v", item.SourcePath, err)
|
||||||
// Continue with transfer attempt on error
|
// Continue with transfer attempt on error
|
||||||
} else if exists {
|
} else if exists {
|
||||||
@ -179,7 +179,7 @@ func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := tq.nasTransfer.TransferFile(ctx, &item)
|
err := TransferFile(tq.nasService, ctx, &item)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
item.Status = StatusCompleted
|
item.Status = StatusCompleted
|
||||||
tq.stats.IncrementCompleted(item.FileSize)
|
tq.stats.IncrementCompleted(item.FileSize)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user