265 lines
5.8 KiB
Go
265 lines
5.8 KiB
Go
package transfer
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type TransferQueue struct {
|
|
config QueueConfig
|
|
items *PriorityQueue
|
|
stats *QueueStats
|
|
nasTransfer *NASTransfer
|
|
cleanup *CleanupService
|
|
workers []chan TransferItem
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
type PriorityQueue []*TransferItem
|
|
|
|
func (pq PriorityQueue) Len() int {
|
|
return len(pq)
|
|
}
|
|
|
|
func (pq PriorityQueue) Less(i, j int) bool {
|
|
return pq[i].Timestamp.After(pq[j].Timestamp)
|
|
}
|
|
|
|
func (pq PriorityQueue) Swap(i, j int) {
|
|
pq[i], pq[j] = pq[j], pq[i]
|
|
}
|
|
|
|
func (pq *PriorityQueue) Push(x interface{}) {
|
|
item := x.(*TransferItem)
|
|
*pq = append(*pq, item)
|
|
}
|
|
|
|
func (pq *PriorityQueue) Pop() interface{} {
|
|
old := *pq
|
|
n := len(old)
|
|
item := old[n-1]
|
|
*pq = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
func NewTransferQueue(config QueueConfig, nasTransfer *NASTransfer, cleanup *CleanupService) *TransferQueue {
|
|
pq := &PriorityQueue{}
|
|
heap.Init(pq)
|
|
|
|
tq := &TransferQueue{
|
|
config: config,
|
|
items: pq,
|
|
stats: &QueueStats{},
|
|
nasTransfer: nasTransfer,
|
|
cleanup: cleanup,
|
|
workers: make([]chan TransferItem, config.WorkerCount),
|
|
}
|
|
|
|
if err := tq.LoadState(); err != nil {
|
|
log.Printf("Failed to load queue state: %v", err)
|
|
}
|
|
|
|
return tq
|
|
}
|
|
|
|
func (tq *TransferQueue) Add(item TransferItem) error {
|
|
tq.mu.Lock()
|
|
defer tq.mu.Unlock()
|
|
|
|
if tq.items.Len() >= tq.config.MaxQueueSize {
|
|
return fmt.Errorf("Queue is full (max size: %d)", tq.config.MaxQueueSize)
|
|
}
|
|
|
|
heap.Push(tq.items, &item)
|
|
tq.stats.IncrementAdded()
|
|
|
|
log.Printf("Added file to queue: %s", item.SourcePath)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (tq *TransferQueue) ProcessQueue(ctx context.Context) error {
|
|
for i := 0; i < tq.config.WorkerCount; i++ {
|
|
workerChan := make(chan TransferItem, 1)
|
|
tq.workers[i] = workerChan
|
|
|
|
go func(workerID int, workChan chan TransferItem) {
|
|
log.Printf("Worker %d started", workerID)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Printf("Transfer worker %d shutting down...", workerID)
|
|
return
|
|
case item := <-workChan:
|
|
tq.processItem(ctx, item)
|
|
}
|
|
}
|
|
}(i, workerChan)
|
|
}
|
|
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Println("Transfer queue shutting down...")
|
|
return ctx.Err()
|
|
case <-ticker.C:
|
|
tq.dispatchWork()
|
|
|
|
if time.Now().Unix()%30 == 0 {
|
|
if err := tq.SaveState(); err != nil {
|
|
log.Printf("Failed to save queue state: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tq *TransferQueue) dispatchWork() {
|
|
tq.mu.Lock()
|
|
defer tq.mu.Unlock()
|
|
|
|
for i, workerChan := range tq.workers {
|
|
if len(workerChan) == 0 && tq.items.Len() > 0 {
|
|
item := heap.Pop(tq.items).(*TransferItem)
|
|
item.Status = StatusInProgress
|
|
|
|
select {
|
|
case workerChan <- *item:
|
|
log.Printf("Dispatched file to worker %d: %s", i, item.SourcePath)
|
|
default:
|
|
heap.Push(tq.items, item)
|
|
item.Status = StatusPending
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
|
|
maxRetries := 3
|
|
|
|
for attempt := 1; attempt <= maxRetries; attempt++ {
|
|
if attempt > 0 {
|
|
item.Status = StatusRetrying
|
|
backoff := time.Duration(attempt*attempt) * time.Second
|
|
log.Printf("Backing off for %d seconds before retrying (attempt %d/%d)", backoff, attempt, maxRetries)
|
|
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
err := tq.nasTransfer.TransferFile(ctx, &item)
|
|
if err == nil {
|
|
item.Status = StatusCompleted
|
|
tq.stats.IncrementCompleted(item.FileSize)
|
|
|
|
if tq.cleanup != nil {
|
|
if err := tq.cleanup.ScheduleCleanup(item.SourcePath); err != nil {
|
|
log.Printf("Failed to add file to cleanup list: %v", err)
|
|
}
|
|
}
|
|
log.Printf("File transfer completed: %s", item.SourcePath)
|
|
return
|
|
}
|
|
|
|
item.LastError = err.Error()
|
|
item.RetryCount++
|
|
|
|
log.Printf("File transfer failed: %s (attempt %d/%d): %v", item.SourcePath, item.RetryCount, maxRetries, err)
|
|
|
|
if attempt == maxRetries {
|
|
item.Status = StatusFailed
|
|
tq.stats.IncrementFailed()
|
|
log.Printf("Transfer permanently failed for file: %s", item.SourcePath)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tq *TransferQueue) SaveState() error {
|
|
tq.mu.Lock()
|
|
defer tq.mu.Unlock()
|
|
|
|
items := make([]*TransferItem, tq.items.Len())
|
|
tempPQ := make(PriorityQueue, tq.items.Len())
|
|
copy(tempPQ, *tq.items)
|
|
|
|
for i := 0; i < len(items); i++ {
|
|
items[i] = heap.Pop(&tempPQ).(*TransferItem)
|
|
}
|
|
|
|
data, err := json.MarshalIndent(map[string]interface{}{
|
|
"items": items,
|
|
"stats": tq.stats,
|
|
"timestamp": time.Now(),
|
|
}, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to marshal queue state: %w", err)
|
|
}
|
|
|
|
if err := os.WriteFile(tq.config.PersistencePath, data, 0644); err != nil {
|
|
return fmt.Errorf("Failed to save queue state: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (tq *TransferQueue) LoadState() error {
|
|
data, err := os.ReadFile(tq.config.PersistencePath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("Failed to load queue state: %w", err)
|
|
}
|
|
|
|
var state struct {
|
|
Items []*TransferItem `json:"items"`
|
|
Stats *QueueStats `json:"stats"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
if err := json.Unmarshal(data, &state); err != nil {
|
|
return fmt.Errorf("Failed to load queue state: %w", err)
|
|
}
|
|
|
|
tq.mu.Lock()
|
|
defer tq.mu.Unlock()
|
|
|
|
for _, item := range state.Items {
|
|
if item.Status == StatusPending || item.Status == StatusFailed {
|
|
heap.Push(tq.items, item)
|
|
}
|
|
}
|
|
|
|
if state.Stats != nil {
|
|
tq.stats = state.Stats
|
|
}
|
|
|
|
log.Printf("Loaded queue state: %d items restored from %v",
|
|
tq.items.Len(), state.Timestamp.Format(time.RFC3339))
|
|
return nil
|
|
}
|
|
|
|
func (tq *TransferQueue) GetStats() (int, int, int, int, int64) {
|
|
return tq.stats.GetStats()
|
|
}
|
|
|
|
func (tq *TransferQueue) GetQueueSize() int {
|
|
tq.mu.RLock()
|
|
defer tq.mu.RUnlock()
|
|
return tq.items.Len()
|
|
}
|