Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
718d69c12a | ||
|
|
0a349cc406 | ||
| a10688279f | |||
| 4d73ce25c2 | |||
| 899ca31bb3 | |||
|
|
c9d6900d48 | ||
|
|
8223a3ac99 | ||
|
|
99594597db |
37
.gitea/workflows/build.yml
Normal file
37
.gitea/workflows/build.yml
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
name: Build Go Binaries
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- main
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build:
|
||||||
|
runs-on: docker
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
|
- name: Set up Go
|
||||||
|
uses: actions/setup-go@v5
|
||||||
|
with:
|
||||||
|
go-version: 1.23
|
||||||
|
|
||||||
|
- name: Cross-Compile
|
||||||
|
run: |
|
||||||
|
mkdir -p dist
|
||||||
|
for GOOS in linux windows darwin; do
|
||||||
|
for GOARCH in arm64 amd64; do
|
||||||
|
OUTPUT="dist/myapp_${GOOS}_${GOARCH}"
|
||||||
|
echo "Building ${OUTPUT}"
|
||||||
|
if [ "$GOOS" = "windows" ]; then
|
||||||
|
OUTPUT="${OUTPUT}.exe"
|
||||||
|
fi
|
||||||
|
GOOS=$GOOS GOARCH=$GOARCH go build -o $OUTPUT ./cmd/main
|
||||||
|
done
|
||||||
|
done
|
||||||
|
|
||||||
|
- name: Upload Artifact
|
||||||
|
uses: actions/upload-artifact@v4
|
||||||
|
with:
|
||||||
|
name: flo_download-binaries
|
||||||
|
path: dist
|
||||||
164
CLAUDE.md
164
CLAUDE.md
@ -4,31 +4,98 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
|
|||||||
|
|
||||||
## Project Overview
|
## Project Overview
|
||||||
|
|
||||||
This is a Go-based M3U8 downloader that parses HLS (HTTP Live Streaming) playlists to extract video and audio stream metadata. The end goal of this project is to have a listening REST API take in m3u8 urls, parse them, and eventually send to a conversion service.
|
This is a Go-based HLS (HTTP Live Streaming) recorder that monitors M3U8 playlists and downloads video segments in real-time with automatic NAS transfer capabilities. The program takes a master M3U8 playlist URL, parses all available stream variants (different qualities/bitrates), continuously monitors each variant's chunklist for new segments, downloads them locally, and optionally transfers them to network storage for long-term archival.
|
||||||
|
|
||||||
## Architecture
|
## Architecture
|
||||||
|
|
||||||
The project follows a clean separation of concerns:
|
The project follows a modular architecture with clear separation of concerns:
|
||||||
|
|
||||||
- **main.go**: Entry point that demonstrates usage of the media package
|
- **cmd/**: Entry points for different execution modes
|
||||||
- **media/**: Core package containing M3U8 parsing logic
|
- **main/main.go**: Primary CLI entry point with URL input and event naming
|
||||||
- **types.go**: Contains the main parsing logic and data structures (`StreamSet`, `VideoURL`, `AudioURL`)
|
- **downloader/download.go**: Core download orchestration logic with transfer service integration
|
||||||
- **utils.go**: Utility functions for parsing attributes and resolution calculations
|
- **processor/process.go**: Alternative processing entry point
|
||||||
|
- **pkg/**: Core packages containing the application logic
|
||||||
|
- **media/**: HLS streaming and download logic
|
||||||
|
- **stream.go**: Stream variant parsing and downloading orchestration (`GetAllVariants`, `VariantDownloader`)
|
||||||
|
- **playlist.go**: M3U8 playlist loading and parsing (`LoadMediaPlaylist`)
|
||||||
|
- **segment.go**: Individual segment downloading logic (`DownloadSegment`, `SegmentJob`)
|
||||||
|
- **manifest.go**: Manifest generation and segment tracking (`ManifestWriter`, `ManifestItem`)
|
||||||
|
- **transfer/**: NAS transfer system (complete implementation available)
|
||||||
|
- **service.go**: Transfer service orchestration
|
||||||
|
- **watcher.go**: File system monitoring for new downloads
|
||||||
|
- **queue.go**: Priority queue with worker pool management
|
||||||
|
- **nas.go**: NAS file transfer with retry logic
|
||||||
|
- **cleanup.go**: Local file cleanup after successful transfer
|
||||||
|
- **types.go**: Transfer system data structures
|
||||||
|
- **constants/constants.go**: Configuration constants (paths, timeouts, NAS settings)
|
||||||
|
- **httpClient/error.go**: HTTP error handling utilities
|
||||||
|
|
||||||
The `GetStreamMetadata()` function is the main entry point that:
|
## Core Functionality
|
||||||
1. Fetches the M3U8 master playlist via HTTP
|
|
||||||
2. Parses the content line by line
|
### Download Workflow
|
||||||
3. Extracts video streams (`#EXT-X-STREAM-INF`) and audio streams (`#EXT-X-MEDIA`)
|
1. **Parse Master Playlist**: `GetAllVariants()` fetches and parses the master M3U8 to extract all stream variants with different qualities/bitrates
|
||||||
4. Returns a `StreamSet` containing all parsed metadata
|
2. **Concurrent Monitoring**: Each variant gets its own goroutine running `VariantDownloader()` that continuously polls for playlist updates
|
||||||
|
3. **Segment Detection**: When new segments appear in a variant's playlist, they are queued for download
|
||||||
|
4. **Parallel Downloads**: Segments are downloaded concurrently with configurable worker pools and retry logic
|
||||||
|
5. **Quality Organization**: Downloaded segments are organized by resolution (1080p, 720p, etc.) in separate directories
|
||||||
|
6. **Manifest Generation**: `ManifestWriter` tracks all downloaded segments with sequence numbers and resolutions
|
||||||
|
|
||||||
|
### NAS Transfer Workflow (Optional)
|
||||||
|
1. **File Watching**: `FileWatcher` monitors download directories for new `.ts` files
|
||||||
|
2. **Transfer Queuing**: New files are added to a priority queue after a settling delay
|
||||||
|
3. **Background Transfer**: Worker pool transfers files to NAS with retry logic and verification
|
||||||
|
4. **Local Cleanup**: Successfully transferred files are automatically cleaned up locally
|
||||||
|
5. **State Persistence**: Queue state is persisted to survive crashes and restarts
|
||||||
|
|
||||||
|
## Key Data Structures
|
||||||
|
|
||||||
|
- `StreamVariant`: Represents a stream quality variant with URL, bandwidth, resolution, output directory, and manifest writer
|
||||||
|
- `SegmentJob`: Represents a segment download task with URI, sequence number, and variant info
|
||||||
|
- `ManifestWriter`: Tracks downloaded segments and generates JSON manifests
|
||||||
|
- `ManifestItem`: Individual segment record with sequence number and resolution
|
||||||
|
- `TransferItem`: Transfer queue item with source, destination, retry count, and status
|
||||||
|
- `TransferService`: Orchestrates file watching, queuing, transfer, and cleanup
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
Key configuration is managed in `pkg/constants/constants.go`:
|
||||||
|
|
||||||
|
### Core Settings
|
||||||
|
- `WorkerCount`: Number of concurrent segment downloaders per variant (4)
|
||||||
|
- `RefreshDelay`: How often to check for playlist updates (3 seconds)
|
||||||
|
- `LocalOutputDirPath`: Base directory for local downloads (`./data/`)
|
||||||
|
- `ManifestPath`: Directory for manifest JSON files (`./data`)
|
||||||
|
|
||||||
|
### HTTP Settings
|
||||||
|
- `HTTPUserAgent`: User agent string for HTTP requests
|
||||||
|
- `REFERRER`: Referer header for HTTP requests
|
||||||
|
|
||||||
|
### NAS Transfer Settings
|
||||||
|
- `EnableNASTransfer`: Enable/disable automatic NAS transfer (true)
|
||||||
|
- `NASOutputPath`: UNC path to NAS storage (`\\HomeLabNAS\dci\streams`)
|
||||||
|
- `NASUsername`/`NASPassword`: NAS credentials (empty for current user)
|
||||||
|
- `TransferWorkerCount`: Concurrent transfer workers (2)
|
||||||
|
- `TransferRetryLimit`: Max retry attempts per file (3)
|
||||||
|
- `TransferTimeout`: Timeout per file transfer (30 seconds)
|
||||||
|
- `FileSettlingDelay`: Wait before queuing new files (5 seconds)
|
||||||
|
- `PersistencePath`: Transfer queue state file (`./data/transfer_queue.json`)
|
||||||
|
|
||||||
|
### Cleanup Settings
|
||||||
|
- `CleanupAfterTransfer`: Delete local files after NAS transfer (true)
|
||||||
|
- `CleanupBatchSize`: Files processed per cleanup batch (10)
|
||||||
|
- `RetainLocalHours`: Hours to keep local files (0 = immediate cleanup)
|
||||||
|
|
||||||
## Common Development Commands
|
## Common Development Commands
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Build the project
|
# Build the main application
|
||||||
go build -o m3u8-downloader
|
go build -o stream-recorder ./cmd/main
|
||||||
|
|
||||||
# Run the project
|
# Run with URL prompt
|
||||||
go run main.go
|
go run ./cmd/main/main.go
|
||||||
|
|
||||||
|
# Run with command line arguments
|
||||||
|
go run ./cmd/main/main.go -url="https://example.com/playlist.m3u8" -event="my-event" -debug=true
|
||||||
|
|
||||||
# Run with module support
|
# Run with module support
|
||||||
go mod tidy
|
go mod tidy
|
||||||
@ -40,12 +107,69 @@ go test ./...
|
|||||||
go fmt ./...
|
go fmt ./...
|
||||||
```
|
```
|
||||||
|
|
||||||
## Key Data Structures
|
## Command Line Options
|
||||||
|
|
||||||
- `StreamSet`: Root structure containing playlist URL and all streams
|
- `-url`: M3U8 playlist URL (if not provided, prompts for input)
|
||||||
- `VideoURL`: Represents video stream with bandwidth, codecs, resolution, frame rate
|
- `-event`: Event name for organizing downloads (defaults to current date)
|
||||||
- `AudioURL`: Represents audio stream with media type, group ID, name, and selection flags
|
- `-debug`: Debug mode (only downloads 1080p variant for easier testing)
|
||||||
|
|
||||||
|
## Monitoring and Downloads
|
||||||
|
|
||||||
|
The application implements comprehensive real-time stream monitoring:
|
||||||
|
|
||||||
|
### Download Features
|
||||||
|
- **Continuous Polling**: Each variant playlist is checked every 3 seconds for new segments
|
||||||
|
- **Deduplication**: Uses segment URIs and sequence numbers to avoid re-downloading
|
||||||
|
- **Graceful Shutdown**: Responds to SIGINT/SIGTERM signals for clean exit
|
||||||
|
- **Error Resilience**: Retries failed downloads and handles HTTP 403 errors specially
|
||||||
|
- **Quality Detection**: Automatically determines resolution from bandwidth or explicit resolution data
|
||||||
|
- **Context Cancellation**: Proper timeout and cancellation handling for clean shutdowns
|
||||||
|
|
||||||
|
### Transfer Features (when enabled)
|
||||||
|
- **Real-time Transfer**: Files are transferred to NAS as soon as they're downloaded
|
||||||
|
- **Queue Persistence**: Transfer queue survives application restarts
|
||||||
|
- **Retry Logic**: Failed transfers are retried with exponential backoff
|
||||||
|
- **Verification**: File sizes are verified after transfer
|
||||||
|
- **Automatic Cleanup**: Local files are removed after successful NAS transfer
|
||||||
|
- **Statistics Reporting**: Transfer progress and statistics are logged regularly
|
||||||
|
|
||||||
|
### Manifest Generation
|
||||||
|
- **Segment Tracking**: All downloaded segments are tracked with sequence numbers
|
||||||
|
- **Resolution Mapping**: Segments are associated with their quality variants
|
||||||
|
- **JSON Output**: Manifest files are generated as sorted JSON arrays for easy processing
|
||||||
|
|
||||||
## Error Handling
|
## Error Handling
|
||||||
|
|
||||||
The current implementation uses `panic()` for error handling. When extending functionality, consider implementing proper error handling with returned error values following Go conventions.
|
The implementation uses proper Go error handling patterns:
|
||||||
|
- **Custom HTTP Errors**: Structured error types for HTTP failures
|
||||||
|
- **Context-Aware Cancellation**: Proper handling of shutdown scenarios
|
||||||
|
- **Retry Logic**: Exponential backoff for transient failures
|
||||||
|
- **Logging**: Clear status indicators (✓ for success, ✗ for failure)
|
||||||
|
- **Graceful Degradation**: Transfer service failures don't stop downloads
|
||||||
|
|
||||||
|
## Dependencies
|
||||||
|
|
||||||
|
- `github.com/grafov/m3u8`: M3U8 playlist parsing
|
||||||
|
- `github.com/fsnotify/fsnotify`: File system event monitoring for NAS transfers
|
||||||
|
|
||||||
|
## Data Organization
|
||||||
|
|
||||||
|
Downloaded files are organized as:
|
||||||
|
```
|
||||||
|
./data/
|
||||||
|
├── {event-name}.json # Manifest file
|
||||||
|
├── {event-name}/ # Event-specific directory
|
||||||
|
│ ├── 1080p/ # High quality segments
|
||||||
|
│ ├── 720p/ # Medium quality segments
|
||||||
|
│ └── 480p/ # Lower quality segments
|
||||||
|
└── transfer_queue.json # Transfer queue state
|
||||||
|
```
|
||||||
|
|
||||||
|
NAS files mirror the local structure:
|
||||||
|
```
|
||||||
|
\\HomeLabNAS\dci\streams\
|
||||||
|
└── {event-name}/
|
||||||
|
├── 1080p/
|
||||||
|
├── 720p/
|
||||||
|
└── 480p/
|
||||||
|
```
|
||||||
BIN
bin/flo_download
Normal file
BIN
bin/flo_download
Normal file
Binary file not shown.
BIN
bin/flo_download.exe
Normal file
BIN
bin/flo_download.exe
Normal file
Binary file not shown.
@ -1,4 +1,4 @@
|
|||||||
package main
|
package downloader
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -13,7 +13,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func Download(masterURL string, eventName string, debug bool) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@ -29,7 +29,7 @@ func main() {
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var transferService *transfer.TransferService
|
var transferService *transfer.TransferService
|
||||||
if constants.EnableNASTransfer {
|
if constants.EnableNASTransfer {
|
||||||
ts, err := transfer.NewTrasferService(constants.NASPath)
|
ts, err := transfer.NewTrasferService(constants.NASOutputPath, eventName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Failed to create transfer service: %v", err)
|
log.Printf("Failed to create transfer service: %v", err)
|
||||||
log.Println("Continuing without transfer service...")
|
log.Println("Continuing without transfer service...")
|
||||||
@ -46,7 +46,9 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
variants, err := media.GetAllVariants(constants.MasterURL)
|
manifestWriter := media.NewManifestWriter(eventName)
|
||||||
|
|
||||||
|
variants, err := media.GetAllVariants(masterURL, constants.LocalOutputDirPath+"/"+eventName, manifestWriter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to get variants: %v", err)
|
log.Fatalf("Failed to get variants: %v", err)
|
||||||
}
|
}
|
||||||
@ -54,11 +56,19 @@ func main() {
|
|||||||
|
|
||||||
sem := make(chan struct{}, constants.WorkerCount*len(variants))
|
sem := make(chan struct{}, constants.WorkerCount*len(variants))
|
||||||
|
|
||||||
|
manifest := media.NewManifestWriter(eventName)
|
||||||
|
|
||||||
for _, variant := range variants {
|
for _, variant := range variants {
|
||||||
|
// Debug mode only tracks one variant for easier debugging
|
||||||
|
if debug {
|
||||||
|
if variant.Resolution != "1080p" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(v *media.StreamVariant) {
|
go func(v *media.StreamVariant) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
media.VariantDownloader(ctx, v, sem)
|
media.VariantDownloader(ctx, v, sem, manifest)
|
||||||
}(variant)
|
}(variant)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,4 +82,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Println("All Services shut down.")
|
log.Println("All Services shut down.")
|
||||||
|
|
||||||
|
manifestWriter.WriteManifest()
|
||||||
|
log.Println("Manifest written.")
|
||||||
}
|
}
|
||||||
44
cmd/main/main.go
Normal file
44
cmd/main/main.go
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"m3u8-downloader/cmd/downloader"
|
||||||
|
"m3u8-downloader/cmd/processor"
|
||||||
|
"m3u8-downloader/cmd/transfer"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
url := flag.String("url", "", "M3U8 playlist URL")
|
||||||
|
eventName := flag.String("event", time.Now().Format("2006-01-02"), "Event name")
|
||||||
|
debug := flag.Bool("debug", false, "Enable debug mode")
|
||||||
|
transferOnly := flag.Bool("transfer", false, "Transfer-only mode: transfer existing files without downloading")
|
||||||
|
processOnly := flag.Bool("process", false, "Process-only mode: process existing files without downloading")
|
||||||
|
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
if *transferOnly {
|
||||||
|
transfer.RunTransferOnly(*eventName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if *processOnly {
|
||||||
|
processor.Process(*eventName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if *url == "" {
|
||||||
|
reader := bufio.NewReader(os.Stdin)
|
||||||
|
fmt.Print("Enter M3U8 playlist URL: ")
|
||||||
|
inputUrl, _ := reader.ReadString('\n')
|
||||||
|
inputUrl = strings.TrimSpace(inputUrl)
|
||||||
|
downloader.Download(inputUrl, *eventName, *debug)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
downloader.Download(*url, *eventName, *debug)
|
||||||
|
}
|
||||||
@ -1 +0,0 @@
|
|||||||
package proc
|
|
||||||
18
cmd/processor/process.go
Normal file
18
cmd/processor/process.go
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
package processor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"m3u8-downloader/pkg/processing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Process(eventName string) {
|
||||||
|
//log.Printf("Starting processing for event: %s", eventName)
|
||||||
|
ps, err := processing.NewProcessingService(eventName)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create processing service: %v", err)
|
||||||
|
}
|
||||||
|
if err := ps.Start(context.Background()); err != nil {
|
||||||
|
log.Fatalf("Failed to run processing service: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
63
cmd/transfer/transfer.go
Normal file
63
cmd/transfer/transfer.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
package transfer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"m3u8-downloader/pkg/constants"
|
||||||
|
"m3u8-downloader/pkg/transfer"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RunTransferOnly(eventName string) {
|
||||||
|
log.Printf("Starting transfer-only mode for event: %s", eventName)
|
||||||
|
|
||||||
|
// Setup context and signal handling
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
sigChan := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
go func() {
|
||||||
|
<-sigChan
|
||||||
|
log.Println("Shutting down transfer service...")
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Check if NAS transfer is enabled
|
||||||
|
if !constants.EnableNASTransfer {
|
||||||
|
log.Fatal("NAS transfer is disabled in constants. Please enable it to use transfer-only mode.")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify local event directory exists
|
||||||
|
localEventPath := constants.LocalOutputDirPath + "/" + eventName
|
||||||
|
if _, err := os.Stat(localEventPath); os.IsNotExist(err) {
|
||||||
|
log.Fatalf("Local event directory does not exist: %s", localEventPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create transfer service
|
||||||
|
transferService, err := transfer.NewTrasferService(constants.NASOutputPath, eventName)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create transfer service: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find and queue existing files
|
||||||
|
if err := transferService.QueueExistingFiles(localEventPath); err != nil {
|
||||||
|
log.Fatalf("Failed to queue existing files: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start transfer service
|
||||||
|
log.Println("Starting transfer service...")
|
||||||
|
if err := transferService.Start(ctx); err != nil && err != context.Canceled {
|
||||||
|
log.Printf("Transfer service error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Graceful shutdown
|
||||||
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
|
defer shutdownCancel()
|
||||||
|
transferService.Shutdown(shutdownCtx)
|
||||||
|
|
||||||
|
log.Println("Transfer-only mode completed.")
|
||||||
|
}
|
||||||
6
go.sum
Normal file
6
go.sum
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
|
||||||
|
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
|
||||||
|
github.com/grafov/m3u8 v0.12.1 h1:DuP1uA1kvRRmGNAZ0m+ObLv1dvrfNO0TPx0c/enNk0s=
|
||||||
|
github.com/grafov/m3u8 v0.12.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080=
|
||||||
|
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||||
|
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
@ -3,27 +3,32 @@ package constants
|
|||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
MasterURL = "https://live-fastly.flosports.tv/streams/mr159021-260419/playlist.m3u8?token=st%3D1753571418%7Eexp%3D1753571448%7Eacl%3D%2Fstreams%2Fmr159021-260419%2Fplaylist.m3u8%7Edata%3Dssai%3A0%3BuserId%3A14025903%3BstreamId%3A260419%3BmediaPackageRegion%3Afalse%3BdvrMinutes%3A360%3BtokenId%3Abadd289a-ade5-48fe-852f-7dbd1d57aca8%3Bpv%3A86400%7Ehmac2%3D8de65c26b185084a6be77e788cb0ba41be5fcac3ab86159b06f7572ca925d77ba7bd182124af2a432953d4223548f198742d1a238e937d875976cd42fe549838&mid_origin=media_store&keyName=FLOSPORTS_TOKEN_KEY_2023-08-02&streamCode=mr159021-260419"
|
|
||||||
WorkerCount = 4
|
WorkerCount = 4
|
||||||
RefreshDelay = 3 * time.Second
|
RefreshDelay = 3 * time.Second
|
||||||
|
|
||||||
HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
|
HTTPUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
|
||||||
REFERRER = "https://www.flomarching.com"
|
REFERRER = "https://www.flomarching.com"
|
||||||
OutputDirPath = "./data/flo_radio"
|
LocalOutputDirPath = "../data/"
|
||||||
|
|
||||||
EnableNASTransfer = true
|
EnableNASTransfer = false
|
||||||
NASPath = "\\\\HomeLabNAS\\dci\\streams\\2025_Atlanta"
|
NASOutputPath = "\\\\HomeLabNAS\\dci\\streams"
|
||||||
NASUsername = ""
|
NASUsername = "NASAdmin"
|
||||||
NASPassword = ""
|
NASPassword = "s3tkY6tzA&KN6M"
|
||||||
TransferWorkerCount = 2
|
TransferWorkerCount = 2
|
||||||
TransferRetryLimit = 3
|
TransferRetryLimit = 3
|
||||||
TransferTimeout = 30 * time.Second
|
TransferTimeout = 30 * time.Second
|
||||||
FileSettlingDelay = 5 * time.Second
|
FileSettlingDelay = 5 * time.Second
|
||||||
PersistencePath = "./data/transfer_queue.json"
|
PersistencePath = "../data/transfer_queue.json"
|
||||||
TransferQueueSize = 1000
|
TransferQueueSize = 100000
|
||||||
BatchSize = 10
|
BatchSize = 1000
|
||||||
|
ManifestPath = "../data"
|
||||||
|
|
||||||
CleanupAfterTransfer = true
|
CleanupAfterTransfer = false
|
||||||
CleanupBatchSize = 10
|
CleanupBatchSize = 1000
|
||||||
RetainLocalHours = 0
|
RetainLocalHours = 0
|
||||||
|
|
||||||
|
ProcessOutputPath = "../out"
|
||||||
|
AutoProcess = true
|
||||||
|
ProcessingEnabled = true
|
||||||
|
ProcessWorkerCount = 2
|
||||||
)
|
)
|
||||||
|
|||||||
78
pkg/media/manifest.go
Normal file
78
pkg/media/manifest.go
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
package media
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"log"
|
||||||
|
"m3u8-downloader/pkg/constants"
|
||||||
|
"os"
|
||||||
|
"sort"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ManifestWriter struct {
|
||||||
|
ManifestPath string
|
||||||
|
Segments []ManifestItem
|
||||||
|
Index map[string]*ManifestItem
|
||||||
|
}
|
||||||
|
|
||||||
|
type ManifestItem struct {
|
||||||
|
SeqNo string `json:"seqNo"`
|
||||||
|
Resolution string `json:"resolution"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewManifestWriter(eventName string) *ManifestWriter {
|
||||||
|
return &ManifestWriter{
|
||||||
|
ManifestPath: constants.ManifestPath + "/" + eventName + ".json",
|
||||||
|
Segments: make([]ManifestItem, 0),
|
||||||
|
Index: make(map[string]*ManifestItem),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *ManifestWriter) AddOrUpdateSegment(seqNo string, resolution string) {
|
||||||
|
if m.Index == nil {
|
||||||
|
m.Index = make(map[string]*ManifestItem)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Segments == nil {
|
||||||
|
m.Segments = make([]ManifestItem, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
if existing, ok := m.Index[seqNo]; ok {
|
||||||
|
if resolution > existing.Resolution {
|
||||||
|
existing.Resolution = resolution
|
||||||
|
}
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
item := ManifestItem{
|
||||||
|
SeqNo: seqNo,
|
||||||
|
Resolution: resolution,
|
||||||
|
}
|
||||||
|
m.Segments = append(m.Segments, item)
|
||||||
|
m.Index[seqNo] = &item
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *ManifestWriter) WriteManifest() {
|
||||||
|
sort.Slice(m.Segments, func(i, j int) bool {
|
||||||
|
return m.Segments[i].SeqNo < m.Segments[j].SeqNo
|
||||||
|
})
|
||||||
|
|
||||||
|
data, err := json.MarshalIndent(m.Segments, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to marshal manifest: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
file, err := os.Create(m.ManifestPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to create manifest file: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
_, err = file.Write(data)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to write manifest file: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -2,6 +2,7 @@ package media
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/grafov/m3u8"
|
"github.com/grafov/m3u8"
|
||||||
"log"
|
"log"
|
||||||
@ -21,6 +22,7 @@ type StreamVariant struct {
|
|||||||
ID int
|
ID int
|
||||||
Resolution string
|
Resolution string
|
||||||
OutputDir string
|
OutputDir string
|
||||||
|
Writer *ManifestWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractResolution(variant *m3u8.Variant) string {
|
func extractResolution(variant *m3u8.Variant) string {
|
||||||
@ -44,7 +46,7 @@ func extractResolution(variant *m3u8.Variant) string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetAllVariants(masterURL string) ([]*StreamVariant, error) {
|
func GetAllVariants(masterURL string, outputDir string, writer *ManifestWriter) ([]*StreamVariant, error) {
|
||||||
client := &http.Client{}
|
client := &http.Client{}
|
||||||
req, _ := http.NewRequest("GET", masterURL, nil)
|
req, _ := http.NewRequest("GET", masterURL, nil)
|
||||||
req.Header.Set("User-Agent", constants.HTTPUserAgent)
|
req.Header.Set("User-Agent", constants.HTTPUserAgent)
|
||||||
@ -69,7 +71,8 @@ func GetAllVariants(masterURL string) ([]*StreamVariant, error) {
|
|||||||
BaseURL: base,
|
BaseURL: base,
|
||||||
ID: 0,
|
ID: 0,
|
||||||
Resolution: "unknown",
|
Resolution: "unknown",
|
||||||
OutputDir: path.Join(constants.NASPath, "unknown"),
|
OutputDir: path.Join(outputDir, "unknown"),
|
||||||
|
Writer: writer,
|
||||||
}}, nil
|
}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,7 +86,7 @@ func GetAllVariants(masterURL string) ([]*StreamVariant, error) {
|
|||||||
vURL, _ := url.Parse(v.URI)
|
vURL, _ := url.Parse(v.URI)
|
||||||
fullURL := base.ResolveReference(vURL).String()
|
fullURL := base.ResolveReference(vURL).String()
|
||||||
resolution := extractResolution(v)
|
resolution := extractResolution(v)
|
||||||
outputDir := path.Join(constants.NASPath, resolution)
|
outputDir := path.Join(outputDir, resolution)
|
||||||
variants = append(variants, &StreamVariant{
|
variants = append(variants, &StreamVariant{
|
||||||
URL: fullURL,
|
URL: fullURL,
|
||||||
Bandwidth: v.Bandwidth,
|
Bandwidth: v.Bandwidth,
|
||||||
@ -96,7 +99,7 @@ func GetAllVariants(masterURL string) ([]*StreamVariant, error) {
|
|||||||
return variants, nil
|
return variants, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func VariantDownloader(ctx context.Context, variant *StreamVariant, sem chan struct{}) {
|
func VariantDownloader(ctx context.Context, variant *StreamVariant, sem chan struct{}, manifest *ManifestWriter) {
|
||||||
log.Printf("Starting %s variant downloader (bandwidth: %d)", variant.Resolution, variant.Bandwidth)
|
log.Printf("Starting %s variant downloader (bandwidth: %d)", variant.Resolution, variant.Bandwidth)
|
||||||
ticker := time.NewTicker(constants.RefreshDelay)
|
ticker := time.NewTicker(constants.RefreshDelay)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
@ -142,9 +145,18 @@ func VariantDownloader(ctx context.Context, variant *StreamVariant, sem chan str
|
|||||||
|
|
||||||
err := DownloadSegment(ctx, client, j.AbsoluteURL(), j.Variant.OutputDir)
|
err := DownloadSegment(ctx, client, j.AbsoluteURL(), j.Variant.OutputDir)
|
||||||
name := strings.TrimSuffix(path.Base(j.Key()), path.Ext(path.Base(j.Key())))
|
name := strings.TrimSuffix(path.Base(j.Key()), path.Ext(path.Base(j.Key())))
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
log.Printf("✓ %s downloaded segment %s", j.Variant.Resolution, name)
|
log.Printf("✓ %s downloaded segment %s", j.Variant.Resolution, name)
|
||||||
} else if httpClient.IsHTTPStatus(err, 403) {
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
// Suppress log: shutdown in progress
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if httpClient.IsHTTPStatus(err, 403) {
|
||||||
log.Printf("✗ %s failed to download segment %s (403)", j.Variant.Resolution, name)
|
log.Printf("✗ %s failed to download segment %s (403)", j.Variant.Resolution, name)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("✗ %s failed to download segment %s: %v", j.Variant.Resolution, name, err)
|
log.Printf("✗ %s failed to download segment %s: %v", j.Variant.Resolution, name, err)
|
||||||
|
|||||||
12
pkg/nas/config.go
Normal file
12
pkg/nas/config.go
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
package nas
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type NASConfig struct {
|
||||||
|
Path string
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
Timeout time.Duration
|
||||||
|
RetryLimit int
|
||||||
|
VerifySize bool
|
||||||
|
}
|
||||||
205
pkg/nas/nas.go
Normal file
205
pkg/nas/nas.go
Normal file
@ -0,0 +1,205 @@
|
|||||||
|
package nas
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NASService struct {
|
||||||
|
Config NASConfig
|
||||||
|
connected bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNASService(config NASConfig) *NASService {
|
||||||
|
nt := &NASService{
|
||||||
|
Config: config,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Establish network connection with credentials before accessing the path
|
||||||
|
if err := nt.EstablishConnection(); err != nil {
|
||||||
|
log.Fatalf("Failed to establish network connection to %s: %v", nt.Config.Path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := nt.EnsureDirectoryExists(nt.Config.Path)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create directory %s: %v", nt.Config.Path, err)
|
||||||
|
}
|
||||||
|
return nt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASService) CopyFile(ctx context.Context, srcPath, destPath string) error {
|
||||||
|
src, err := os.Open(srcPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to open source file: %w", err)
|
||||||
|
}
|
||||||
|
defer src.Close()
|
||||||
|
|
||||||
|
dest, err := os.Create(destPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to create destination file: %w", err)
|
||||||
|
}
|
||||||
|
defer dest.Close()
|
||||||
|
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
_, err := io.Copy(dest, src)
|
||||||
|
done <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case err := <-done:
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return dest.Sync()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASService) VerifyTransfer(srcPath, destPath string) error {
|
||||||
|
srcInfo, err := os.Stat(srcPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to stat source file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
destInfo, err := os.Stat(destPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to stat destination file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if srcInfo.Size() != destInfo.Size() {
|
||||||
|
return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASService) EnsureDirectoryExists(path string) error {
|
||||||
|
if err := os.MkdirAll(path, 0755); err != nil {
|
||||||
|
return fmt.Errorf("Failed to create directory: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASService) EstablishConnection() error {
|
||||||
|
// Extract the network path (\\server\share) from the full path
|
||||||
|
networkPath := nt.ExtractNetworkPath(nt.Config.Path)
|
||||||
|
if networkPath == "" {
|
||||||
|
// Local path, no authentication needed
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Establishing network connection to %s with user %s", networkPath, nt.Config.Username)
|
||||||
|
|
||||||
|
// Use Windows net use command to establish authenticated connection
|
||||||
|
var cmd *exec.Cmd
|
||||||
|
if nt.Config.Username != "" && nt.Config.Password != "" {
|
||||||
|
cmd = exec.Command("net", "use", networkPath, nt.Config.Password, "/user:"+nt.Config.Username, "/persistent:no")
|
||||||
|
} else {
|
||||||
|
cmd = exec.Command("net", "use", networkPath, "/persistent:no")
|
||||||
|
}
|
||||||
|
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to establish network connection: %w\nOutput: %s", err, string(output))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Network connection established successfully")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASService) ExtractNetworkPath(fullPath string) string {
|
||||||
|
// Extract \\server\share from paths like \\server\share\folder\subfolder
|
||||||
|
if !strings.HasPrefix(fullPath, "\\\\") {
|
||||||
|
return "" // Not a UNC path
|
||||||
|
}
|
||||||
|
|
||||||
|
parts := strings.Split(fullPath[2:], "\\") // Remove leading \\
|
||||||
|
if len(parts) < 2 {
|
||||||
|
return "" // Invalid UNC path
|
||||||
|
}
|
||||||
|
|
||||||
|
return "\\\\" + parts[0] + "\\" + parts[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASService) TestConnection() error {
|
||||||
|
testFile := filepath.Join(nt.Config.Path, ".connection_test")
|
||||||
|
|
||||||
|
f, err := os.Create(testFile)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to create test file: %w", err)
|
||||||
|
}
|
||||||
|
f.Close()
|
||||||
|
|
||||||
|
os.Remove(testFile)
|
||||||
|
|
||||||
|
nt.connected = true
|
||||||
|
log.Printf("Connected to NAS at %s", nt.Config.Path)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nt *NASService) IsConnected() bool {
|
||||||
|
return nt.connected
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disconnect removes the network connection
|
||||||
|
func (nt *NASService) Disconnect() error {
|
||||||
|
networkPath := nt.ExtractNetworkPath(nt.Config.Path)
|
||||||
|
if networkPath == "" {
|
||||||
|
return nil // Local path, nothing to disconnect
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := exec.Command("net", "use", networkPath, "/delete")
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Warning: failed to disconnect from %s: %v\nOutput: %s", networkPath, err, string(output))
|
||||||
|
// Don't return error since this is cleanup
|
||||||
|
} else {
|
||||||
|
log.Printf("Disconnected from network path: %s", networkPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
nt.connected = false
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FileExists checks if a file already exists on the NAS and optionally verifies size
|
||||||
|
func (nt *NASService) FileExists(destinationPath string, expectedSize int64) (bool, error) {
|
||||||
|
fullDestPath := filepath.Join(nt.Config.Path, destinationPath)
|
||||||
|
|
||||||
|
destInfo, err := os.Stat(fullDestPath)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return false, nil // File doesn't exist, no error
|
||||||
|
}
|
||||||
|
return false, fmt.Errorf("failed to stat NAS file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// File exists, check size if expected size is provided
|
||||||
|
if expectedSize > 0 && destInfo.Size() != expectedSize {
|
||||||
|
log.Printf("NAS file size mismatch for %s: expected=%d, actual=%d",
|
||||||
|
fullDestPath, expectedSize, destInfo.Size())
|
||||||
|
return false, nil // File exists but wrong size, treat as not existing
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetFileSize returns the size of a file on the NAS
|
||||||
|
func (nt *NASService) GetFileSize(destinationPath string) (int64, error) {
|
||||||
|
fullDestPath := filepath.Join(nt.Config.Path, destinationPath)
|
||||||
|
|
||||||
|
destInfo, err := os.Stat(fullDestPath)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to stat NAS file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return destInfo.Size(), nil
|
||||||
|
}
|
||||||
7
pkg/processing/segment.go
Normal file
7
pkg/processing/segment.go
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
package processing
|
||||||
|
|
||||||
|
type SegmentInfo struct {
|
||||||
|
Name string
|
||||||
|
SeqNo int
|
||||||
|
Resolution string
|
||||||
|
}
|
||||||
238
pkg/processing/service.go
Normal file
238
pkg/processing/service.go
Normal file
@ -0,0 +1,238 @@
|
|||||||
|
package processing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"m3u8-downloader/pkg/constants"
|
||||||
|
"m3u8-downloader/pkg/nas"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ProcessingService struct {
|
||||||
|
config *ProcessConfig
|
||||||
|
nas *nas.NASService
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProcessingService(eventName string) (*ProcessingService, error) {
|
||||||
|
config := &ProcessConfig{
|
||||||
|
WorkerCount: constants.ProcessWorkerCount,
|
||||||
|
SourcePath: constants.NASOutputPath + "/" + eventName,
|
||||||
|
DestinationPath: constants.ProcessOutputPath,
|
||||||
|
Enabled: constants.ProcessingEnabled,
|
||||||
|
EventName: eventName,
|
||||||
|
}
|
||||||
|
|
||||||
|
nasConfig := &nas.NASConfig{
|
||||||
|
Path: constants.NASOutputPath,
|
||||||
|
Username: constants.NASUsername,
|
||||||
|
Password: constants.NASPassword,
|
||||||
|
Timeout: constants.TransferTimeout,
|
||||||
|
RetryLimit: constants.TransferRetryLimit,
|
||||||
|
VerifySize: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
nasService := nas.NewNASService(*nasConfig)
|
||||||
|
|
||||||
|
if err := nasService.TestConnection(); err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &ProcessingService{
|
||||||
|
config: config,
|
||||||
|
nas: nasService,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *ProcessingService) GetSegmentInfo() (map[int]string, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *ProcessingService) GetEventDirs() ([]string, error) {
|
||||||
|
if ps.config.EventName == "" {
|
||||||
|
dirs, err := os.ReadDir(ps.config.SourcePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to read directory: %w", err)
|
||||||
|
}
|
||||||
|
var eventDirs []string
|
||||||
|
for _, dir := range dirs {
|
||||||
|
if dir.IsDir() {
|
||||||
|
eventDirs = append(eventDirs, dir.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return eventDirs, nil
|
||||||
|
} else {
|
||||||
|
return []string{ps.config.EventName}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *ProcessingService) Start(ctx context.Context) error {
|
||||||
|
if !ps.config.Enabled {
|
||||||
|
log.Println("Processing service disabled")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if ps.config.EventName == "" {
|
||||||
|
events, err := ps.GetEventDirs()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to get event directories: %w", err)
|
||||||
|
}
|
||||||
|
if len(events) == 0 {
|
||||||
|
return fmt.Errorf("No events found")
|
||||||
|
}
|
||||||
|
if len(events) > 1 {
|
||||||
|
fmt.Println("Multiple events found, please select one:")
|
||||||
|
for i, event := range events {
|
||||||
|
fmt.Printf("%d. %s\n", i+1, event)
|
||||||
|
}
|
||||||
|
reader := bufio.NewReader(os.Stdin)
|
||||||
|
input, _ := reader.ReadString('\n')
|
||||||
|
input = strings.TrimSpace(input)
|
||||||
|
index, err := strconv.Atoi(input)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to parse input: %w", err)
|
||||||
|
}
|
||||||
|
if index < 1 || index > len(events) {
|
||||||
|
return fmt.Errorf("Invalid input")
|
||||||
|
}
|
||||||
|
ps.config.EventName = events[index-1]
|
||||||
|
} else {
|
||||||
|
ps.config.EventName = events[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//Get all present resolutions
|
||||||
|
dirs, err := ps.GetResolutions()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to get resolutions: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
//Spawn a worker per resolution
|
||||||
|
ch := make(chan SegmentInfo, 100)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for _, resolution := range dirs {
|
||||||
|
wg.Add(1)
|
||||||
|
go ps.ParseResolutionDirectory(resolution, ch, &wg)
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
segments, err := ps.AggregateSegmentInfo(ch)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to aggregate segment info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ps.WriteConcatFile(segments)
|
||||||
|
|
||||||
|
//Feed info to ffmpeg to stitch files together
|
||||||
|
concatErr := ps.RunFFmpeg(ps.config.SourcePath, ps.config.DestinationPath)
|
||||||
|
if concatErr != nil {
|
||||||
|
return concatErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *ProcessingService) GetResolutions() ([]string, error) {
|
||||||
|
dirs, err := os.ReadDir(ps.config.SourcePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to read source directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
re := regexp.MustCompile(`^\d+p$`)
|
||||||
|
|
||||||
|
var resolutions []string
|
||||||
|
for _, dir := range dirs {
|
||||||
|
if dir.IsDir() && re.MatchString(dir.Name()) {
|
||||||
|
resolutions = append(resolutions, dir.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return resolutions, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *ProcessingService) ParseResolutionDirectory(resolution string, ch chan<- SegmentInfo, wg *sync.WaitGroup) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
files, err := os.ReadDir(ps.config.SourcePath + "/" + resolution)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to read resolution directory: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, file := range files {
|
||||||
|
if !file.IsDir() {
|
||||||
|
no, err := strconv.Atoi(file.Name()[7:11])
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to parse segment number: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ch <- SegmentInfo{
|
||||||
|
Name: file.Name(),
|
||||||
|
SeqNo: no,
|
||||||
|
Resolution: resolution,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *ProcessingService) AggregateSegmentInfo(ch <-chan SegmentInfo) (map[int]SegmentInfo, error) {
|
||||||
|
segmentMap := make(map[int]SegmentInfo)
|
||||||
|
|
||||||
|
rank := map[string]int{
|
||||||
|
"1080p": 1,
|
||||||
|
"720p": 2,
|
||||||
|
"540p": 3,
|
||||||
|
"480p": 4,
|
||||||
|
"450p": 5,
|
||||||
|
"360p": 6,
|
||||||
|
"270p": 7,
|
||||||
|
"240p": 8,
|
||||||
|
}
|
||||||
|
|
||||||
|
for segment := range ch {
|
||||||
|
current, exists := segmentMap[segment.SeqNo]
|
||||||
|
if !exists || rank[segment.Resolution] > rank[current.Resolution] {
|
||||||
|
segmentMap[segment.SeqNo] = segment
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return segmentMap, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *ProcessingService) WriteConcatFile(segmentMap map[int]SegmentInfo) error {
|
||||||
|
f, err := os.Create(ps.config.DestinationPath + "/concat.txt")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to create concat file: %w", err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
for _, segment := range segmentMap {
|
||||||
|
_, err = f.WriteString("file '" + ps.config.SourcePath + "/" + segment.Resolution + "/" + segment.Name + "'\n")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to write to concat file: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *ProcessingService) RunFFmpeg(inputPath, outputPath string) error {
|
||||||
|
cmd := exec.Command("ffmpeg", "-f", "concat", "-safe", "0", "-i", inputPath, "-c", "copy", outputPath)
|
||||||
|
cmd.Stdout = os.Stdout
|
||||||
|
cmd.Stderr = os.Stderr
|
||||||
|
err := cmd.Run()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to run ffmpeg: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
13
pkg/processing/types.go
Normal file
13
pkg/processing/types.go
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
package processing
|
||||||
|
|
||||||
|
type ProcessJob struct {
|
||||||
|
EventName string
|
||||||
|
}
|
||||||
|
|
||||||
|
type ProcessConfig struct {
|
||||||
|
WorkerCount int
|
||||||
|
SourcePath string
|
||||||
|
DestinationPath string
|
||||||
|
Enabled bool
|
||||||
|
EventName string
|
||||||
|
}
|
||||||
@ -69,7 +69,6 @@ func (cs *CleanupService) ExecuteCleanup(ctx context.Context) error {
|
|||||||
if batchSize > len(cs.pendingFiles) {
|
if batchSize > len(cs.pendingFiles) {
|
||||||
batchSize = len(cs.pendingFiles)
|
batchSize = len(cs.pendingFiles)
|
||||||
}
|
}
|
||||||
cs.mu.Unlock()
|
|
||||||
|
|
||||||
log.Printf("Executing cleanup batch (size: %d)", batchSize)
|
log.Printf("Executing cleanup batch (size: %d)", batchSize)
|
||||||
|
|
||||||
|
|||||||
@ -3,39 +3,28 @@ package transfer
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"log"
|
"log"
|
||||||
|
"m3u8-downloader/pkg/nas"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
)
|
)
|
||||||
|
|
||||||
type NASTransfer struct {
|
func TransferFile(nt *nas.NASService, ctx context.Context, item *TransferItem) error {
|
||||||
config NASConfig
|
destPath := filepath.Join(nt.Config.Path, item.DestinationPath)
|
||||||
connected bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewNASTransfer(config NASConfig) *NASTransfer {
|
|
||||||
return &NASTransfer{
|
|
||||||
config: config,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASTransfer) TransferFile(ctx context.Context, item *TransferItem) error {
|
|
||||||
destPath := filepath.Join(nt.config.Path, item.DestinationPath)
|
|
||||||
|
|
||||||
destDir := filepath.Dir(destPath)
|
destDir := filepath.Dir(destPath)
|
||||||
if err := nt.ensureDirectoryExists(destDir); err != nil {
|
if err := nt.EnsureDirectoryExists(destDir); err != nil {
|
||||||
return fmt.Errorf("Failed to create directory %s: %w", destDir, err)
|
return fmt.Errorf("Failed to create directory %s: %w", destDir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
transferCtx, cancel := context.WithTimeout(ctx, nt.config.Timeout)
|
transferCtx, cancel := context.WithTimeout(ctx, nt.Config.Timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
if err := nt.copyFile(transferCtx, item.SourcePath, destPath); err != nil {
|
if err := nt.CopyFile(transferCtx, item.SourcePath, destPath); err != nil {
|
||||||
return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err)
|
return fmt.Errorf("Failed to copy file %s to %s: %w", item.SourcePath, destPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if nt.config.VerifySize {
|
if nt.Config.VerifySize {
|
||||||
if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil {
|
if err := nt.VerifyTransfer(item.SourcePath, destPath); err != nil {
|
||||||
os.Remove(destPath)
|
os.Remove(destPath)
|
||||||
return fmt.Errorf("Failed to verify transfer: %w", err)
|
return fmt.Errorf("Failed to verify transfer: %w", err)
|
||||||
@ -46,79 +35,3 @@ func (nt *NASTransfer) TransferFile(ctx context.Context, item *TransferItem) err
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nt *NASTransfer) copyFile(ctx context.Context, srcPath, destPath string) error {
|
|
||||||
src, err := os.Open(srcPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to open source file: %w", err)
|
|
||||||
}
|
|
||||||
defer src.Close()
|
|
||||||
|
|
||||||
dest, err := os.Create(destPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to create destination file: %w", err)
|
|
||||||
}
|
|
||||||
defer dest.Close()
|
|
||||||
|
|
||||||
done := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
_, err := io.Copy(dest, src)
|
|
||||||
done <- err
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case err := <-done:
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return dest.Sync()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASTransfer) VerifyTransfer(srcPath, destPath string) error {
|
|
||||||
srcInfo, err := os.Stat(srcPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to stat source file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
destInfo, err := os.Stat(destPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to stat destination file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if srcInfo.Size() != destInfo.Size() {
|
|
||||||
return fmt.Errorf("size mismatch: source=%d, dest=%d", srcInfo.Size(), destInfo.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASTransfer) ensureDirectoryExists(path string) error {
|
|
||||||
if err := os.MkdirAll(path, 0755); err != nil {
|
|
||||||
return fmt.Errorf("Failed to create directory: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASTransfer) TestConnection() error {
|
|
||||||
testFile := filepath.Join(nt.config.Path, ".connection_test")
|
|
||||||
|
|
||||||
f, err := os.Create(testFile)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to create test file: %w", err)
|
|
||||||
}
|
|
||||||
f.Close()
|
|
||||||
|
|
||||||
os.Remove(testFile)
|
|
||||||
|
|
||||||
nt.connected = true
|
|
||||||
log.Printf("Connected to NAS at %s", nt.config.Path)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (nt *NASTransfer) IsConnected() bool {
|
|
||||||
return nt.connected
|
|
||||||
}
|
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"m3u8-downloader/pkg/nas"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -15,7 +16,7 @@ type TransferQueue struct {
|
|||||||
config QueueConfig
|
config QueueConfig
|
||||||
items *PriorityQueue
|
items *PriorityQueue
|
||||||
stats *QueueStats
|
stats *QueueStats
|
||||||
nasTransfer *NASTransfer
|
nasService *nas.NASService
|
||||||
cleanup *CleanupService
|
cleanup *CleanupService
|
||||||
workers []chan TransferItem
|
workers []chan TransferItem
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
@ -48,7 +49,7 @@ func (pq *PriorityQueue) Pop() interface{} {
|
|||||||
return item
|
return item
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTransferQueue(config QueueConfig, nasTransfer *NASTransfer, cleanup *CleanupService) *TransferQueue {
|
func NewTransferQueue(config QueueConfig, nasTransfer *nas.NASService, cleanup *CleanupService) *TransferQueue {
|
||||||
pq := &PriorityQueue{}
|
pq := &PriorityQueue{}
|
||||||
heap.Init(pq)
|
heap.Init(pq)
|
||||||
|
|
||||||
@ -56,7 +57,7 @@ func NewTransferQueue(config QueueConfig, nasTransfer *NASTransfer, cleanup *Cle
|
|||||||
config: config,
|
config: config,
|
||||||
items: pq,
|
items: pq,
|
||||||
stats: &QueueStats{},
|
stats: &QueueStats{},
|
||||||
nasTransfer: nasTransfer,
|
nasService: nasTransfer,
|
||||||
cleanup: cleanup,
|
cleanup: cleanup,
|
||||||
workers: make([]chan TransferItem, config.WorkerCount),
|
workers: make([]chan TransferItem, config.WorkerCount),
|
||||||
}
|
}
|
||||||
@ -145,6 +146,24 @@ func (tq *TransferQueue) dispatchWork() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
|
func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
|
||||||
|
// Check if file already exists on NAS before attempting transfer
|
||||||
|
if exists, err := tq.nasService.FileExists(item.DestinationPath, item.FileSize); err != nil {
|
||||||
|
log.Printf("Failed to check if file exists on NAS for %s: %v", item.SourcePath, err)
|
||||||
|
// Continue with transfer attempt on error
|
||||||
|
} else if exists {
|
||||||
|
log.Printf("File already exists on NAS, skipping transfer: %s", item.SourcePath)
|
||||||
|
item.Status = StatusCompleted
|
||||||
|
tq.stats.IncrementCompleted(item.FileSize)
|
||||||
|
|
||||||
|
// Schedule for cleanup
|
||||||
|
if tq.cleanup != nil {
|
||||||
|
if err := tq.cleanup.ScheduleCleanup(item.SourcePath); err != nil {
|
||||||
|
log.Printf("Failed to schedule cleanup for existing file %s: %v", item.SourcePath, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
maxRetries := 3
|
maxRetries := 3
|
||||||
|
|
||||||
for attempt := 1; attempt <= maxRetries; attempt++ {
|
for attempt := 1; attempt <= maxRetries; attempt++ {
|
||||||
@ -160,7 +179,7 @@ func (tq *TransferQueue) processItem(ctx context.Context, item TransferItem) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := tq.nasTransfer.TransferFile(ctx, &item)
|
err := TransferFile(tq.nasService, ctx, &item)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
item.Status = StatusCompleted
|
item.Status = StatusCompleted
|
||||||
tq.stats.IncrementCompleted(item.FileSize)
|
tq.stats.IncrementCompleted(item.FileSize)
|
||||||
|
|||||||
@ -5,6 +5,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"m3u8-downloader/pkg/constants"
|
"m3u8-downloader/pkg/constants"
|
||||||
|
nas2 "m3u8-downloader/pkg/nas"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -12,21 +16,21 @@ import (
|
|||||||
type TransferService struct {
|
type TransferService struct {
|
||||||
watcher *FileWatcher
|
watcher *FileWatcher
|
||||||
queue *TransferQueue
|
queue *TransferQueue
|
||||||
nas *NASTransfer
|
nas *nas2.NASService
|
||||||
cleanup *CleanupService
|
cleanup *CleanupService
|
||||||
stats *QueueStats
|
stats *QueueStats
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTrasferService(outputDir string) (*TransferService, error) {
|
func NewTrasferService(outputDir string, eventName string) (*TransferService, error) {
|
||||||
nasConfig := NASConfig{
|
nasConfig := nas2.NASConfig{
|
||||||
Path: constants.NASPath,
|
Path: outputDir,
|
||||||
Username: constants.NASUsername,
|
Username: constants.NASUsername,
|
||||||
Password: constants.NASPassword,
|
Password: constants.NASPassword,
|
||||||
Timeout: constants.TransferTimeout,
|
Timeout: constants.TransferTimeout,
|
||||||
RetryLimit: constants.TransferRetryLimit,
|
RetryLimit: constants.TransferRetryLimit,
|
||||||
VerifySize: true,
|
VerifySize: true,
|
||||||
}
|
}
|
||||||
nas := NewNASTransfer(nasConfig)
|
nas := nas2.NewNASService(nasConfig)
|
||||||
|
|
||||||
if err := nas.TestConnection(); err != nil {
|
if err := nas.TestConnection(); err != nil {
|
||||||
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
|
return nil, fmt.Errorf("Failed to connect to NAS: %w", err)
|
||||||
@ -34,9 +38,9 @@ func NewTrasferService(outputDir string) (*TransferService, error) {
|
|||||||
|
|
||||||
cleanupConfig := CleanupConfig{
|
cleanupConfig := CleanupConfig{
|
||||||
Enabled: constants.CleanupAfterTransfer,
|
Enabled: constants.CleanupAfterTransfer,
|
||||||
RetentionPeriod: constants.RetainLocalHours,
|
RetentionPeriod: time.Duration(constants.RetainLocalHours) * time.Hour,
|
||||||
BatchSize: constants.CleanupBatchSize,
|
BatchSize: constants.CleanupBatchSize,
|
||||||
CheckInterval: constants.FileSettlingDelay,
|
CheckInterval: 1 * time.Minute,
|
||||||
}
|
}
|
||||||
cleanup := NewCleanupService(cleanupConfig)
|
cleanup := NewCleanupService(cleanupConfig)
|
||||||
|
|
||||||
@ -48,7 +52,13 @@ func NewTrasferService(outputDir string) (*TransferService, error) {
|
|||||||
}
|
}
|
||||||
queue := NewTransferQueue(queueConfig, nas, cleanup)
|
queue := NewTransferQueue(queueConfig, nas, cleanup)
|
||||||
|
|
||||||
watcher, err := NewFileWatcher(outputDir, queue)
|
// Create local output directory if it doesn't exist
|
||||||
|
localOutputPath := constants.LocalOutputDirPath + "/" + eventName
|
||||||
|
if err := os.MkdirAll(localOutputPath, 0755); err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to create local output directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
watcher, err := NewFileWatcher(localOutputPath, queue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to create file watcher: %w", err)
|
return nil, fmt.Errorf("Failed to create file watcher: %w", err)
|
||||||
}
|
}
|
||||||
@ -130,7 +140,111 @@ func (ts *TransferService) Shutdown(ctx context.Context) error {
|
|||||||
return fmt.Errorf("Failed to force cleanup: %w", err)
|
return fmt.Errorf("Failed to force cleanup: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Disconnect from NAS
|
||||||
|
if err := ts.nas.Disconnect(); err != nil {
|
||||||
|
log.Printf("Warning: failed to disconnect from NAS: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
log.Println("Transfer service shut down")
|
log.Println("Transfer service shut down")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueueExistingFiles scans a directory for .ts files and queues them for transfer
|
||||||
|
func (ts *TransferService) QueueExistingFiles(localEventPath string) error {
|
||||||
|
log.Printf("Scanning for existing files in: %s", localEventPath)
|
||||||
|
|
||||||
|
var fileCount, alreadyTransferred, scheduledForCleanup int
|
||||||
|
|
||||||
|
// Extract event name from path for NAS destination
|
||||||
|
eventName := filepath.Base(localEventPath)
|
||||||
|
|
||||||
|
err := filepath.Walk(localEventPath, func(path string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error accessing path %s: %v", path, err)
|
||||||
|
return nil // Continue walking
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only process .ts files
|
||||||
|
if !info.IsDir() && strings.HasSuffix(strings.ToLower(info.Name()), ".ts") {
|
||||||
|
// Extract resolution from directory path
|
||||||
|
resolution := ts.extractResolutionFromPath(path)
|
||||||
|
|
||||||
|
// Get relative path from event directory
|
||||||
|
relPath, err := filepath.Rel(localEventPath, path)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to get relative path for %s: %v", path, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build NAS destination path (eventName/relPath)
|
||||||
|
nasDestPath := filepath.Join(eventName, relPath)
|
||||||
|
|
||||||
|
// Check if file already exists on NAS with matching size
|
||||||
|
exists, err := ts.nas.FileExists(nasDestPath, info.Size())
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to check NAS file existence for %s: %v", path, err)
|
||||||
|
// Continue with transfer attempt on error
|
||||||
|
} else if exists {
|
||||||
|
log.Printf("File already exists on NAS: %s (%s, %d bytes)", path, resolution, info.Size())
|
||||||
|
alreadyTransferred++
|
||||||
|
|
||||||
|
// Schedule for cleanup if cleanup is enabled
|
||||||
|
if constants.CleanupAfterTransfer {
|
||||||
|
if err := ts.cleanup.ScheduleCleanup(path); err != nil {
|
||||||
|
log.Printf("Failed to schedule cleanup for already-transferred file %s: %v", path, err)
|
||||||
|
} else {
|
||||||
|
scheduledForCleanup++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil // Skip queuing this file
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create transfer item
|
||||||
|
item := TransferItem{
|
||||||
|
ID: ts.generateTransferID(),
|
||||||
|
SourcePath: path,
|
||||||
|
DestinationPath: nasDestPath,
|
||||||
|
Resolution: resolution,
|
||||||
|
Timestamp: info.ModTime(),
|
||||||
|
Status: StatusPending,
|
||||||
|
FileSize: info.Size(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to queue
|
||||||
|
if err := ts.queue.Add(item); err != nil {
|
||||||
|
log.Printf("Failed to queue file %s: %v", path, err)
|
||||||
|
} else {
|
||||||
|
log.Printf("Queued file: %s (%s, %d bytes)", path, resolution, info.Size())
|
||||||
|
fileCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to walk directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("File scan completed - Queued: %d, Already transferred: %d, Scheduled for cleanup: %d",
|
||||||
|
fileCount, alreadyTransferred, scheduledForCleanup)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *TransferService) extractResolutionFromPath(filePath string) string {
|
||||||
|
dir := filepath.Dir(filePath)
|
||||||
|
parts := strings.Split(dir, string(filepath.Separator))
|
||||||
|
|
||||||
|
for _, part := range parts {
|
||||||
|
if strings.HasSuffix(part, "p") {
|
||||||
|
return part
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *TransferService) generateTransferID() string {
|
||||||
|
return fmt.Sprintf("transfer_existing_%d", time.Now().UnixNano())
|
||||||
|
}
|
||||||
|
|||||||
@ -44,15 +44,6 @@ func (s TransferStatus) String() string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type NASConfig struct {
|
|
||||||
Path string
|
|
||||||
Username string
|
|
||||||
Password string
|
|
||||||
Timeout time.Duration
|
|
||||||
RetryLimit int
|
|
||||||
VerifySize bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type QueueConfig struct {
|
type QueueConfig struct {
|
||||||
WorkerCount int
|
WorkerCount int
|
||||||
PersistencePath string
|
PersistencePath string
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"m3u8-downloader/pkg/constants"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -32,7 +33,7 @@ func NewFileWatcher(outputDir string, queue *TransferQueue) (*FileWatcher, error
|
|||||||
outputDir: outputDir,
|
outputDir: outputDir,
|
||||||
queue: queue,
|
queue: queue,
|
||||||
watcher: watcher,
|
watcher: watcher,
|
||||||
settingDelay: time.Second,
|
settingDelay: constants.FileSettlingDelay,
|
||||||
pendingFiles: make(map[string]*time.Timer),
|
pendingFiles: make(map[string]*time.Timer),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user