aboutsummaryrefslogtreecommitdiff
path: root/tasks
diff options
context:
space:
mode:
Diffstat (limited to 'tasks')
-rw-r--r--tasks/anifetch.task.go121
-rw-r--r--tasks/anisync.task.go112
-rw-r--r--tasks/aniupdate.task.go223
-rw-r--r--tasks/genresync.task.go64
-rw-r--r--tasks/helpers.go12
-rw-r--r--tasks/manager.go155
-rw-r--r--tasks/producersync.task.go282
-rw-r--r--tasks/tasks.go38
8 files changed, 232 insertions, 775 deletions
diff --git a/tasks/anifetch.task.go b/tasks/anifetch.task.go
index 894dd97..8d38da8 100644
--- a/tasks/anifetch.task.go
+++ b/tasks/anifetch.task.go
@@ -4,53 +4,43 @@ import (
"encoding/json"
"fmt"
"io"
- "metachan/database"
"metachan/entities"
+ "metachan/enums"
+ "metachan/repositories"
"metachan/types"
"metachan/utils/logger"
"metachan/utils/mappers"
"net/http"
-
- "gorm.io/gorm"
)
-const fribbURL = "https://raw.githubusercontent.com/Fribb/anime-lists/master/anime-list-full.json"
+const (
+ fribbURL = "https://raw.githubusercontent.com/Fribb/anime-lists/master/anime-list-full.json"
+ batchSize = 1000
+)
func AniFetch() error {
- logger.Log("Starting Anime Fetch", logger.LogOptions{
- Level: logger.Info,
- Prefix: "AniFetch",
- })
+ logger.Infof("AniFetch", "Starting Anime Fetch")
response, err := http.Get(fribbURL)
if err != nil {
- logger.Log(fmt.Sprintf("Anime Fetch failed: %v", err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "AniFetch",
- })
+ logger.Errorf("AniFetch", "Anime Fetch failed: %v", err)
return err
}
+
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
- logger.Log(fmt.Sprintf("Failed to read response body: %v", err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "AniFetch",
- })
+ logger.Errorf("AniFetch", "Failed to read response body: %v", err)
return err
}
- var mappings []types.AniSyncMapping
+ var mappings []types.MappingResponse
if err := json.Unmarshal(body, &mappings); err != nil {
- logger.Log(fmt.Sprintf("Failed to unmarshal JSON: %v", err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "AniFetch",
- })
+ logger.Errorf("AniFetch", "Failed to unmarshal JSON: %v", err)
return err
}
- batchSize := 1000
total := len(mappings)
for i := 0; i < total; i += batchSize {
@@ -61,21 +51,15 @@ func AniFetch() error {
batch := mappings[i:end]
processBatch(batch)
- logger.Log(fmt.Sprintf("Processed %d/%d mappings", end, total), logger.LogOptions{
- Level: logger.Info,
- Prefix: "AniFetch",
- })
+ logger.Infof("AniFetch", "Processed %d/%d mappings", end, total)
}
- logger.Log("Anime Fetch completed", logger.LogOptions{
- Level: logger.Success,
- Prefix: "AniFetch",
- })
+ logger.Successf("AniFetch", "Anime Fetch completed")
return nil
}
-func processBatch(mappings []types.AniSyncMapping) {
+func processBatch(mappings []types.MappingResponse) {
for _, mapping := range mappings {
var composite *string
if mapping.MAL != 0 && mapping.Anilist != 0 {
@@ -83,61 +67,26 @@ func processBatch(mappings []types.AniSyncMapping) {
composite = &comp
}
- var entity entities.AnimeMapping
- if err := database.DB.Where("mal_anilist_composite = ?", composite).First(&entity).Error; err != nil {
- if err == gorm.ErrRecordNotFound {
- newEntity := entities.AnimeMapping{
- AniDB: mapping.AniDB,
- Anilist: mapping.Anilist,
- AnimeCountdown: mapping.AnimeCountdown,
- AnimePlanet: mappers.ForceString(mapping.AnimePlanet),
- AniSearch: mapping.AniSearch,
- IMDB: mapping.IMDB,
- Kitsu: mapping.Kitsu,
- LiveChart: mapping.LiveChart,
- MAL: mapping.MAL,
- NotifyMoe: mapping.NotifyMoe,
- Simkl: mapping.Simkl,
- TMDB: mappers.ForceInt(mapping.TMDB),
- TVDB: mapping.TVDB,
- Type: entities.MappingType(mapping.Type),
- MALAnilistComposite: composite,
- }
- if err := database.DB.Create(&newEntity).Error; err != nil {
- logger.Log(fmt.Sprintf("Unable to process mapping %v: %v", mapping, err), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "AniFetch",
- })
- }
- } else {
- logger.Log(fmt.Sprintf("Error fetching entity: %v", err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "AniFetch",
- })
- }
- } else {
- // Update existing entity
- entity.AniDB = mapping.AniDB
- entity.Anilist = mapping.Anilist
- entity.AnimeCountdown = mapping.AnimeCountdown
- entity.AnimePlanet = mappers.ForceString(mapping.AnimePlanet)
- entity.AniSearch = mapping.AniSearch
- entity.IMDB = mapping.IMDB
- entity.Kitsu = mapping.Kitsu
- entity.LiveChart = mapping.LiveChart
- entity.MAL = mapping.MAL
- entity.NotifyMoe = mapping.NotifyMoe
- entity.Simkl = mapping.Simkl
- entity.TMDB = mappers.ForceInt(mapping.TMDB)
- entity.TVDB = mapping.TVDB
- entity.Type = entities.MappingType(mapping.Type)
- entity.MALAnilistComposite = composite
- if err := database.DB.Save(&entity).Error; err != nil {
- logger.Log(fmt.Sprintf("Unable to update mapping %v: %v", mapping, err), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "AniFetch",
- })
- }
+ entity := entities.Mapping{
+ AniDB: mapping.AniDB,
+ Anilist: mapping.Anilist,
+ AnimeCountdown: mapping.AnimeCountdown,
+ AnimePlanet: mappers.ForceString(mapping.AnimePlanet),
+ AniSearch: mapping.AniSearch,
+ IMDB: mapping.IMDB,
+ Kitsu: mapping.Kitsu,
+ LiveChart: mapping.LiveChart,
+ MAL: mapping.MAL,
+ NotifyMoe: mapping.NotifyMoe,
+ Simkl: mapping.Simkl,
+ TMDB: mappers.ForceInt(mapping.TMDB),
+ TVDB: mapping.TVDB,
+ Type: enums.MappingAnimeType(mapping.Type),
+ MALAnilistComposite: composite,
+ }
+
+ if err := repositories.CreateOrUpdateMapping(&entity); err != nil {
+ logger.Warnf("AniFetch", "Unable to process mapping %v: %v", mapping, err)
}
}
}
diff --git a/tasks/anisync.task.go b/tasks/anisync.task.go
index dc98d3e..0a346ac 100644
--- a/tasks/anisync.task.go
+++ b/tasks/anisync.task.go
@@ -1,145 +1,69 @@
package tasks
import (
- "fmt"
- "metachan/database"
- "metachan/entities"
- "metachan/services/anime"
+ "metachan/enums"
+ "metachan/repositories"
+ "metachan/services"
"metachan/utils/logger"
"time"
)
-// AniSync fetches full anime details for all anime in the database
func AniSync() error {
- logger.Log("Starting Anime Sync - Fetching full anime details", logger.LogOptions{
- Level: logger.Info,
- Prefix: "AniSync",
- })
+ logger.Infof("AniSync", "Starting Anime Sync - Fetching full anime details")
- // Get all anime mappings
- var mappings []entities.AnimeMapping
- if err := database.DB.Find(&mappings).Error; err != nil {
- logger.Log(fmt.Sprintf("Failed to fetch anime mappings: %v", err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "AniSync",
- })
+ mappings, err := repositories.GetAllMappings()
+ if err != nil {
+ logger.Errorf("AniSync", "Failed to fetch anime mappings: %v", err)
return err
}
total := len(mappings)
- logger.Log(fmt.Sprintf("Found %d anime mappings", total), logger.LogOptions{
- Level: logger.Info,
- Prefix: "AniSync",
- })
+ logger.Infof("AniSync", "Found %d anime mappings", total)
- // Pre-count items needing sync for accurate ETA
itemsToSync := 0
for _, mapping := range mappings {
if mapping.MAL == 0 {
continue
}
- var existingAnime entities.Anime
- err := database.DB.Where("mal_id = ?", mapping.MAL).First(&existingAnime).Error
+ _, err := repositories.GetAnime(enums.MAL, mapping.MAL)
if err == nil {
- var episodeCount int64
- database.DB.Model(&entities.AnimeSingleEpisode{}).Where("anime_id = ?", existingAnime.ID).Count(&episodeCount)
- if episodeCount > 0 {
- continue
- }
+ continue
}
itemsToSync++
}
- logger.Log(fmt.Sprintf("Found %d anime to sync (%d already synced)", itemsToSync, total-itemsToSync), logger.LogOptions{
- Level: logger.Info,
- Prefix: "AniSync",
- })
+ logger.Infof("AniSync", "Found %d anime to sync (%d already synced)", itemsToSync, total-itemsToSync)
- animeService := anime.NewService()
synced := 0
skipped := 0
startTime := time.Now()
processed := 0
for _, mapping := range mappings {
- // Skip if MAL ID is 0 (invalid)
if mapping.MAL == 0 {
skipped++
continue
}
- // Check if anime already exists in DB
- var existingAnime entities.Anime
- err := database.DB.Where("mal_id = ?", mapping.MAL).First(&existingAnime).Error
-
+ _, err := repositories.GetAnime(enums.MAL, mapping.MAL)
if err == nil {
- // Check if anime has full details (has episodes)
- var episodeCount int64
- database.DB.Model(&entities.AnimeSingleEpisode{}).Where("anime_id = ?", existingAnime.ID).Count(&episodeCount)
-
- if episodeCount > 0 {
- skipped++
- continue
- }
+ skipped++
+ continue
}
- // Calculate progress and ETA
- progress := float64(processed+1) / float64(itemsToSync) * 100
- eta := ""
- if processed >= 10 {
- elapsed := time.Since(startTime)
- avgTimePerItem := elapsed / time.Duration(processed)
- remainingItems := itemsToSync - processed
- remaining := time.Duration(remainingItems) * avgTimePerItem
- eta = formatDuration(remaining)
- } else {
- eta = "calculating..."
- }
+ progress, eta := calculateProgress(processed+1, itemsToSync, startTime)
- // Fetch full anime details
- logger.Log(fmt.Sprintf("[%d/%d] Synchronising MAL ID %d - %.1f%% | ETA: %s", processed+1, itemsToSync, mapping.MAL, progress, eta), logger.LogOptions{
- Level: logger.Info,
- Prefix: "AniSync",
- })
+ logger.Infof("AniSync", "[%d/%d] Synchronising MAL ID %d - %.1f%% | ETA: %v", processed+1, itemsToSync, mapping.MAL, progress, eta)
- _, err = animeService.GetAnimeDetailsWithSource(&mapping, "anisync")
+ _, err = services.GetAnime(&mapping)
if err != nil {
- logger.Log(fmt.Sprintf("Failed to sync anime MAL ID %d: %v", mapping.MAL, err), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "AniSync",
- })
+ logger.Warnf("AniSync", "Failed to sync anime MAL ID %d: %v", mapping.MAL, err)
continue
}
synced++
processed++
-
- // Sleep to respect rate limits (1 second between requests)
- time.Sleep(1 * time.Second)
}
- logger.Log(fmt.Sprintf("Anime Sync completed: %d synced, %d skipped", synced, skipped), logger.LogOptions{
- Level: logger.Success,
- Prefix: "AniSync",
- })
-
return nil
}
-
-// formatDuration converts duration to human-readable format
-func formatDuration(d time.Duration) string {
- d = d.Round(time.Second)
- h := d / time.Hour
- d -= h * time.Hour
- m := d / time.Minute
- d -= m * time.Minute
- s := d / time.Second
-
- if h > 0 {
- return fmt.Sprintf("%dh %dm", h, m)
- }
- if m > 0 {
- return fmt.Sprintf("%dm %ds", m, s)
- }
- return fmt.Sprintf("%ds", s)
-}
diff --git a/tasks/aniupdate.task.go b/tasks/aniupdate.task.go
index 38cc77a..e6d63a0 100644
--- a/tasks/aniupdate.task.go
+++ b/tasks/aniupdate.task.go
@@ -3,157 +3,106 @@ package tasks
import (
"fmt"
"metachan/config"
- "metachan/database"
"metachan/entities"
- "metachan/services/anime"
- "metachan/types"
+ "metachan/enums"
+ "metachan/repositories"
+ "metachan/services"
"metachan/utils/logger"
"sync"
"time"
)
-// Constants for anime update task
const (
- // UpdaterSource identifies the source of the update request
UpdaterSource = "updater"
- // MaxConcurrentUpdates limits the number of concurrent anime updates
MaxConcurrentUpdates = 5
- // MaxConcurrentSQLiteUpdates limits concurrent updates for SQLite to prevent locks
MaxConcurrentSQLiteUpdates = 1
- // UpdateInterval defines how often an anime should be updated even without specific triggers
UpdateInterval = 6 * time.Hour
)
-// animeUpdateJob represents a single anime update job
type animeUpdateJob struct {
series entities.Anime
reason string
}
-// AnimeUpdate checks for airing anime that need to be updated
func AnimeUpdate() error {
- logger.Log("Starting Anime Update Task", logger.LogOptions{
- Level: logger.Info,
- Prefix: "AnimeUpdate",
- })
-
- // Find all currently airing anime
- var airingSeries []entities.Anime
- result := database.DB.
- Where("airing = ?", true).
- Preload("NextAiringEpisode").
- Preload("AiringSchedule").
- Find(&airingSeries)
-
- if result.Error != nil {
- logger.Log(fmt.Sprintf("Failed to fetch airing anime: %v", result.Error), logger.LogOptions{
- Level: logger.Error,
- Prefix: "AnimeUpdate",
- })
- return result.Error
+ logger.Infof("AnimeUpdate", "Starting Anime Update Task")
+
+ airingSeries, err := repositories.GetAiringAnime()
+ if err != nil {
+ logger.Errorf("AnimeUpdate", "Failed to fetch airing anime: %v", err)
+ return err
}
- logger.Log(fmt.Sprintf("Found %d airing anime series", len(airingSeries)), logger.LogOptions{
- Level: logger.Info,
- Prefix: "AnimeUpdate",
- })
+ logger.Infof("AnimeUpdate", "Found %d airing anime series", len(airingSeries))
- // Get current timestamp
currentTime := time.Now().Unix()
- // Log the current time for debugging
- logger.Log(fmt.Sprintf("Current timestamp: %d (%s)",
- currentTime, time.Unix(currentTime, 0).Format(time.RFC3339)), logger.LogOptions{
- Level: logger.Debug,
- Prefix: "AnimeUpdate",
- })
+ logger.Debugf("AnimeUpdate", "Current timestamp: %d (%s)", currentTime, time.Unix(currentTime, 0).Format(time.RFC3339))
- // Create a channel for jobs
jobs := make(chan animeUpdateJob, len(airingSeries))
- // Determine max concurrency based on database type
maxWorkers := MaxConcurrentUpdates
- if config.Config.DatabaseDriver == types.SQLite {
+ if config.Database.Driver == "sqlite" {
maxWorkers = MaxConcurrentSQLiteUpdates
- logger.Log(fmt.Sprintf("Using reduced concurrency (%d workers) for SQLite database",
- maxWorkers), logger.LogOptions{
- Level: logger.Debug,
- Prefix: "AnimeUpdate",
- })
+ logger.Debugf("AnimeUpdate", "Using reduced concurrency (%d workers) for SQLite database", maxWorkers)
}
- // Create a wait group to wait for all workers to finish
var wg sync.WaitGroup
- // Create workers
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
- animeService := anime.NewService()
- logger.Log(fmt.Sprintf("Started worker #%d", workerID), logger.LogOptions{
- Level: logger.Debug,
- Prefix: "AnimeUpdate",
- })
+ logger.Debugf("AnimeUpdate", "Started worker #%d", workerID)
- // Process jobs from the channel
for job := range jobs {
- updateAnime(animeService, job.series, job.reason)
+ updateAnime(job.series, job.reason)
}
}(i)
}
- // Queue updates for anime that need it
jobsQueued := 0
for _, series := range airingSeries {
- // Check if we need to update this anime
needsUpdate := false
reason := ""
- // Log details about this particular anime for debugging
- logger.Log(fmt.Sprintf("Checking anime: %s (ID: %d)",
- series.TitleRomaji, series.MALID), logger.LogOptions{
- Level: logger.Debug,
- Prefix: "AnimeUpdate",
- })
+ title := ""
+ if series.Title != nil {
+ if series.Title.Romaji != "" {
+ title = series.Title.Romaji
+ } else if series.Title.English != "" {
+ title = series.Title.English
+ }
+ }
+
+ logger.Debugf("AnimeUpdate", "Checking anime: %s (ID: %d)", title, series.MALID)
- // If there's no next airing episode data, we should update
- if series.NextAiringEpisode == nil || series.NextAiringEpisode.AiringAt == 0 {
+ if series.NextAiring == nil || series.NextAiring.AiringAt == 0 {
needsUpdate = true
reason = "missing next episode data"
- } else if int64(series.NextAiringEpisode.AiringAt) <= currentTime {
- // If the next episode should have aired already, update to get fresh data
+ } else if int64(series.NextAiring.AiringAt) <= currentTime {
needsUpdate = true
reason = "next episode already aired"
}
- // Check if the anime was last updated more than the update interval ago
if !needsUpdate && !series.LastUpdated.IsZero() && time.Since(series.LastUpdated) > UpdateInterval {
needsUpdate = true
reason = fmt.Sprintf("regular update (last updated %s ago)",
time.Since(series.LastUpdated).Round(time.Second))
}
- // Log update decision
if !needsUpdate {
- logger.Log(fmt.Sprintf("Skipping update for %s (ID: %d) - no update needed. Next airing at: %d",
- series.TitleRomaji, series.MALID, series.NextAiringEpisode.AiringAt), logger.LogOptions{
- Level: logger.Debug,
- Prefix: "AnimeUpdate",
- })
+ logger.Debugf("AnimeUpdate", "Skipping update for %s (ID: %d) - no update needed. Next airing at: %d",
+ title, series.MALID, series.NextAiring.AiringAt)
continue
}
- // Add the job to the queue
- logger.Log(fmt.Sprintf("Queueing update for %s (ID: %d) - Reason: %s",
- series.TitleRomaji, series.MALID, reason), logger.LogOptions{
- Level: logger.Debug,
- Prefix: "AnimeUpdate",
- })
+ logger.Debugf("AnimeUpdate", "Queueing update for %s (ID: %d) - Reason: %s",
+ title, series.MALID, reason)
jobs <- animeUpdateJob{
series: series,
@@ -162,132 +111,96 @@ func AnimeUpdate() error {
jobsQueued++
}
- // Close the job channel to signal workers that no more jobs are coming
close(jobs)
- // Wait for all workers to finish
wg.Wait()
- logger.Log(fmt.Sprintf("Anime Update Task Completed - Processed %d anime", jobsQueued), logger.LogOptions{
- Level: logger.Success,
- Prefix: "AnimeUpdate",
- })
+ logger.Successf("AnimeUpdate", "Anime Update Task Completed - Processed %d anime", jobsQueued)
return nil
}
-// updateAnime updates a single anime series
-func updateAnime(animeService *anime.Service, series entities.Anime, reason string) {
- title := series.TitleRomaji
- if series.TitleEnglish != "" {
- title = series.TitleEnglish
+func updateAnime(series entities.Anime, reason string) {
+ title := ""
+ if series.Title != nil {
+ if series.Title.English != "" {
+ title = series.Title.English
+ } else if series.Title.Romaji != "" {
+ title = series.Title.Romaji
+ }
}
- logger.Log(fmt.Sprintf("Updating anime: %s (MAL ID: %d) - %s", title, series.MALID, reason), logger.LogOptions{
- Level: logger.Info,
- Prefix: "AnimeUpdate",
- })
+ logger.Infof("AnimeUpdate", "Updating anime: %s (MAL ID: %d) - %s", title, series.MALID, reason)
- // Get anime mapping for the service call
- mapping, err := database.GetAnimeMappingViaMALID(series.MALID)
+ mapping, err := repositories.GetAnimeMapping(enums.MAL, series.MALID)
if err != nil {
- logger.Log(fmt.Sprintf("Error getting anime mapping for %s (MAL ID: %d): %v", title, series.MALID, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "AnimeUpdate",
- })
+ logger.Errorf("AnimeUpdate", "Error getting anime mapping for %s (MAL ID: %d): %v", title, series.MALID, err)
return
}
- // Get updated anime data from API
- updatedAnime, err := animeService.GetAnimeDetailsWithSource(mapping, "mal")
+ updatedAnime, err := services.GetAnime(&mapping)
if err != nil {
- logger.Log(fmt.Sprintf("Error getting updated anime data for %s (MAL ID: %d): %v", title, series.MALID, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "AnimeUpdate",
- })
+ logger.Errorf("AnimeUpdate", "Error getting updated anime data for %s (MAL ID: %d): %v", title, series.MALID, err)
return
}
- logger.Log(fmt.Sprintf("Successfully updated anime: %s (MAL ID: %d)", title, series.MALID), logger.LogOptions{
- Level: logger.Info,
- Prefix: "AnimeUpdate",
- })
+ logger.Successf("AnimeUpdate", "Successfully updated anime: %s (MAL ID: %d)", title, series.MALID)
- // Check if the updated anime data has significant changes that warrant saving
if shouldSaveUpdate(&series, updatedAnime) {
- // Check if anime is still airing
if updatedAnime.Status != "RELEASING" && updatedAnime.Status != "AIRING" {
- // Update the anime data to reflect that it's no longer airing
updatedAnime.Airing = false
}
- if err := database.SaveAnimeToDatabase(updatedAnime); err != nil {
- logger.Log(fmt.Sprintf("Error saving updated anime data for %s (MAL ID: %d): %v", title, series.MALID, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "AnimeUpdate",
- })
+ if err := repositories.CreateOrUpdateAnime(updatedAnime); err != nil {
+ logger.Errorf("AnimeUpdate", "Error saving updated anime data for %s (MAL ID: %d): %v", title, series.MALID, err)
} else {
- logger.Log(fmt.Sprintf("Successfully saved updated data for %s (MAL ID: %d)", title, series.MALID), logger.LogOptions{
- Level: logger.Info,
- Prefix: "AnimeUpdate",
- })
+ logger.Infof("AnimeUpdate", "Successfully saved updated data for %s (MAL ID: %d)", title, series.MALID)
if !updatedAnime.Airing {
- logger.Log(fmt.Sprintf("Anime %s (MAL ID: %d) is no longer airing. Status: %s", title, series.MALID, updatedAnime.Status), logger.LogOptions{
- Level: logger.Info,
- Prefix: "AnimeUpdate",
- })
+ logger.Infof("AnimeUpdate", "Anime %s (MAL ID: %d) is no longer airing. Status: %s", title, series.MALID, updatedAnime.Status)
}
}
} else {
- logger.Log(fmt.Sprintf("No significant changes detected for %s (MAL ID: %d), skipping database update", title, series.MALID), logger.LogOptions{
- Level: logger.Debug,
- Prefix: "AnimeUpdate",
- })
+ logger.Debugf("AnimeUpdate", "No significant changes detected for %s (MAL ID: %d), skipping database update", title, series.MALID)
}
}
-// shouldSaveUpdate determines if the updated anime data has significant changes
-// that warrant saving it to the database
-func shouldSaveUpdate(oldAnime *entities.Anime, newAnime *types.Anime) bool {
+func shouldSaveUpdate(oldAnime *entities.Anime, newAnime *entities.Anime) bool {
if oldAnime == nil {
return true
}
- // Convert old anime to types.Anime for easier comparison
- oldAnimeConverted := database.ConvertToTypesAnime(oldAnime)
-
- // Check for changes in next airing episode
- oldNextEp := oldAnimeConverted.NextAiringEpisode
- newNextEp := newAnime.NextAiringEpisode
+ oldHasNext := oldAnime.NextAiring != nil && oldAnime.NextAiring.AiringAt > 0
+ newHasNext := newAnime.NextAiring != nil && newAnime.NextAiring.AiringAt > 0
- // If next episode timestamp or number changed
- if oldNextEp.AiringAt != newNextEp.AiringAt || oldNextEp.Episode != newNextEp.Episode {
+ if oldHasNext != newHasNext {
return true
}
- // Check if sub/dub count changed
- if oldAnimeConverted.Episodes.Subbed != newAnime.Episodes.Subbed ||
- oldAnimeConverted.Episodes.Dubbed != newAnime.Episodes.Dubbed {
+ if oldHasNext && newHasNext {
+ if oldAnime.NextAiring.AiringAt != newAnime.NextAiring.AiringAt ||
+ oldAnime.NextAiring.Episode != newAnime.NextAiring.Episode {
+ return true
+ }
+ }
+
+ if oldAnime.SubbedCount != newAnime.SubbedCount ||
+ oldAnime.DubbedCount != newAnime.DubbedCount {
return true
}
- // Check if airing status changed
- if oldAnimeConverted.Airing != newAnime.Airing ||
- oldAnimeConverted.Status != newAnime.Status {
+ if oldAnime.Airing != newAnime.Airing ||
+ oldAnime.Status != newAnime.Status {
return true
}
- // Check if the total episode count has changed
- if oldAnimeConverted.Episodes.Total != newAnime.Episodes.Total {
+ if oldAnime.TotalEpisodes != newAnime.TotalEpisodes {
return true
}
- // Check if number of episodes in the airing schedule changed
- if len(oldAnimeConverted.AiringSchedule) != len(newAnime.AiringSchedule) {
+ if len(oldAnime.Schedule) != len(newAnime.Schedule) {
return true
}
- // No significant changes detected
return false
}
diff --git a/tasks/genresync.task.go b/tasks/genresync.task.go
index e240833..22e2712 100644
--- a/tasks/genresync.task.go
+++ b/tasks/genresync.task.go
@@ -1,82 +1,36 @@
package tasks
import (
- "fmt"
- "metachan/database"
"metachan/entities"
+ "metachan/repositories"
"metachan/utils/api/jikan"
"metachan/utils/logger"
)
-// GenreSync synchronizes genre data from MAL via Jikan API
func GenreSync() error {
- logger.Log("Starting Genre Sync from MAL", logger.LogOptions{
- Level: logger.Info,
- Prefix: "GenreSync",
- })
+ logger.Infof("GenreSync", "Starting Genre Sync from MAL")
- // Create Jikan client
- client := jikan.NewJikanClient()
-
- // Wait for rate limit
- client.WaitForRateLimit()
-
- // Fetch genres from Jikan API
- genresResponse, err := client.GetAnimeGenres()
+ genresResponse, err := jikan.GetAnimeGenres()
if err != nil {
- logger.Log(fmt.Sprintf("Failed to fetch genres from MAL: %v", err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "GenreSync",
- })
+ logger.Errorf("GenreSync", "Failed to fetch genres from MAL: %v", err)
return err
}
- logger.Log(fmt.Sprintf("Fetched %d genres from MAL", len(genresResponse.Data)), logger.LogOptions{
- Level: logger.Info,
- Prefix: "GenreSync",
- })
+ logger.Infof("GenreSync", "Fetched %d genres from MAL", len(genresResponse.Data))
- // Update or create genres in database
for _, genre := range genresResponse.Data {
- // Create a genre entry with AnimeID = 0 to indicate it's a master genre
- genreEntity := entities.AnimeGenre{
- AnimeID: 0, // Master genre, not tied to specific anime
+ genreEntity := entities.Genre{
GenreID: genre.MALID,
Name: genre.Name,
URL: genre.URL,
Count: genre.Count,
}
- // Update or create
- var existing entities.AnimeGenre
- result := database.DB.Where("genre_id = ? AND anime_id = 0", genre.MALID).First(&existing)
-
- if result.Error == nil {
- // Update existing
- existing.Name = genre.Name
- existing.URL = genre.URL
- existing.Count = genre.Count
- if err := database.DB.Save(&existing).Error; err != nil {
- logger.Log(fmt.Sprintf("Failed to update genre %s: %v", genre.Name, err), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "GenreSync",
- })
- }
- } else {
- // Create new
- if err := database.DB.Create(&genreEntity).Error; err != nil {
- logger.Log(fmt.Sprintf("Failed to create genre %s: %v", genre.Name, err), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "GenreSync",
- })
- }
+ if err := repositories.CreateOrUpdateGenre(&genreEntity); err != nil {
+ logger.Warnf("GenreSync", "Failed to sync genre %s: %v", genre.Name, err)
}
}
- logger.Log("Genre Sync completed successfully", logger.LogOptions{
- Level: logger.Success,
- Prefix: "GenreSync",
- })
-
+ logger.Successf("GenreSync", "Genre Sync completed successfully. Synced %d genres", len(genresResponse.Data))
return nil
}
diff --git a/tasks/helpers.go b/tasks/helpers.go
new file mode 100644
index 0000000..b99a977
--- /dev/null
+++ b/tasks/helpers.go
@@ -0,0 +1,12 @@
+package tasks
+
+import "time"
+
+func calculateProgress(current, total int, startTime time.Time) (progress float64, eta time.Duration) {
+ progress = float64(current) / float64(total) * 100
+ elapsed := time.Since(startTime)
+ avgTimePerItem := elapsed / time.Duration(current)
+ remaining := total - current
+ eta = avgTimePerItem * time.Duration(remaining)
+ return progress, eta.Round(time.Second)
+}
diff --git a/tasks/manager.go b/tasks/manager.go
index af86b17..9bcf4f5 100644
--- a/tasks/manager.go
+++ b/tasks/manager.go
@@ -2,8 +2,8 @@ package tasks
import (
"fmt"
- "metachan/database"
"metachan/entities"
+ "metachan/repositories"
"metachan/types"
"metachan/utils/logger"
"sync"
@@ -13,11 +13,10 @@ import (
)
type TaskManager struct {
- Tasks map[string]types.Task
- Tickers map[string]*time.Ticker
- Done map[string]chan bool
- Mutex sync.Mutex
- Database *gorm.DB
+ Tasks map[string]types.Task
+ Tickers map[string]*time.Ticker
+ Done map[string]chan bool
+ Mutex sync.Mutex
}
func (tm *TaskManager) RegisterTask(task types.Task) error {
@@ -29,22 +28,17 @@ func (tm *TaskManager) RegisterTask(task types.Task) error {
}
tm.Tasks[task.Name] = task
- logger.Log(fmt.Sprintf("Task %s registered", task.Name), logger.LogOptions{
- Level: logger.Info,
- Prefix: "TaskManager",
- })
+ logger.Infof("TaskManager", "Task %s registered", task.Name)
return nil
}
func (tm *TaskManager) shouldExecuteTask(taskName string, interval time.Duration) (bool, error) {
- var lastLog entities.TaskLog
-
- if err := tm.Database.Where("task_name = ?", taskName).Order("executed_at desc").First(&lastLog).Error; err != nil {
+ lastLog, err := repositories.GetLatestTaskLog(taskName)
+ if err != nil {
if err == gorm.ErrRecordNotFound {
return true, nil
}
-
return false, err
}
@@ -60,11 +54,8 @@ func (tm *TaskManager) logTaskExecution(taskName, status, message string) {
ExecutedAt: time.Now(),
}
- if err := tm.Database.Create(&logEntry).Error; err != nil {
- logger.Log(fmt.Sprintf("Failed to log task execution for %s: %v", taskName, err), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "TaskManager",
- })
+ if err := repositories.CreateTaskLog(&logEntry); err != nil {
+ logger.Warnf("TaskManager", "Failed to log task execution for %s: %v", taskName, err)
}
}
@@ -73,10 +64,7 @@ func (tm *TaskManager) StartTask(taskName string) {
task, exists := tm.Tasks[taskName]
tm.Mutex.Unlock()
if !exists {
- logger.Log(fmt.Sprintf("Task %s not found", taskName), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "TaskManager",
- })
+ logger.Warnf("TaskManager", "Task %s not found", taskName)
return
}
@@ -85,10 +73,7 @@ func (tm *TaskManager) StartTask(taskName string) {
shouldExec, err := tm.shouldExecuteTask(taskName, task.Interval)
if err != nil {
- logger.Log(fmt.Sprintf("Error checking execution condition for task %s: %v", taskName, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "TaskManager",
- })
+ logger.Errorf("TaskManager", "Error checking execution condition for task %s: %v", taskName, err)
return
}
@@ -102,81 +87,41 @@ func (tm *TaskManager) StartTask(taskName string) {
if shouldExec {
// Check dependencies before executing
if !tm.checkDependencies(task) {
- logger.Log(fmt.Sprintf("Task %s dependencies not met, skipping execution", taskName), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "TaskManager",
- })
+ logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName)
} else if err := task.Execute(); err != nil {
tm.logTaskExecution(taskName, "error", err.Error())
- logger.Log(fmt.Sprintf("Task %s execution failed: %v", taskName, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "TaskManager",
- })
+ logger.Errorf("TaskManager", "Task %s execution failed: %v", taskName, err)
} else {
task.LastRun = time.Now()
tm.logTaskExecution(taskName, "success", "Task executed successfully")
-
- // Mark task as complete
- if err := database.MarkTaskComplete(taskName); err != nil {
- logger.Log(fmt.Sprintf("Failed to mark task %s as complete: %v", taskName, err), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "TaskManager",
- })
- }
-
- logger.Log(fmt.Sprintf("Task %s executed successfully", taskName), logger.LogOptions{
- Level: logger.Success,
- Prefix: "TaskManager",
- })
+ logger.Successf("TaskManager", "Task %s executed successfully", taskName)
}
} else {
// Calculate time until next execution
- var lastLog entities.TaskLog
var initialDelay time.Duration = task.Interval
- if err := tm.Database.Where("task_name = ?", taskName).Order("executed_at desc").First(&lastLog).Error; err == nil {
+ if lastLog, err := repositories.GetLatestTaskLog(taskName); err == nil {
elapsed := time.Since(lastLog.ExecutedAt)
if elapsed < task.Interval {
initialDelay = task.Interval - elapsed
}
}
- logger.Log(fmt.Sprintf("Task %s will run in %v", taskName, initialDelay), logger.LogOptions{
- Level: logger.Info,
- Prefix: "TaskManager",
- })
+ logger.Infof("TaskManager", "Task %s will run in %v", taskName, initialDelay)
// Wait for initial delay before first execution
select {
case <-time.After(initialDelay):
// Check dependencies before executing
if !tm.checkDependencies(task) {
- logger.Log(fmt.Sprintf("Task %s dependencies not met, skipping execution", taskName), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "TaskManager",
- })
+ logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName)
} else if err := task.Execute(); err != nil {
tm.logTaskExecution(taskName, "error", err.Error())
- logger.Log(fmt.Sprintf("Task %s execution failed: %v", taskName, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "TaskManager",
- })
+ logger.Errorf("TaskManager", "Task %s execution failed: %v", taskName, err)
} else {
task.LastRun = time.Now()
tm.logTaskExecution(taskName, "success", "Task executed successfully")
-
- // Mark task as complete
- if err := database.MarkTaskComplete(taskName); err != nil {
- logger.Log(fmt.Sprintf("Failed to mark task %s as complete: %v", taskName, err), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "TaskManager",
- })
- }
-
- logger.Log(fmt.Sprintf("Task %s executed successfully", taskName), logger.LogOptions{
- Level: logger.Success,
- Prefix: "TaskManager",
- })
+ logger.Successf("TaskManager", "Task %s executed successfully", taskName)
}
case <-doneChan:
return
@@ -185,10 +130,7 @@ func (tm *TaskManager) StartTask(taskName string) {
// Skip ticker creation for manual-only tasks (interval = 0)
if task.Interval == 0 {
- logger.Log(fmt.Sprintf("Task %s is manual-only (no scheduled interval)", taskName), logger.LogOptions{
- Level: logger.Debug,
- Prefix: "TaskManager",
- })
+ logger.Debugf("TaskManager", "Task %s is manual-only (no scheduled interval)", taskName)
return
}
@@ -204,32 +146,14 @@ func (tm *TaskManager) StartTask(taskName string) {
case <-ticker.C:
// Check dependencies before executing
if !tm.checkDependencies(task) {
- logger.Log(fmt.Sprintf("Task %s dependencies not met, skipping execution", taskName), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "TaskManager",
- })
+ logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName)
} else if err := task.Execute(); err != nil {
tm.logTaskExecution(taskName, "error", err.Error())
- logger.Log(fmt.Sprintf("Task %s execution failed: %v", taskName, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "TaskManager",
- })
+ logger.Errorf("TaskManager", "Task %s execution failed: %v", taskName, err)
} else {
task.LastRun = time.Now()
tm.logTaskExecution(taskName, "success", "Task executed successfully")
-
- // Mark task as complete
- if err := database.MarkTaskComplete(taskName); err != nil {
- logger.Log(fmt.Sprintf("Failed to mark task %s as complete: %v", taskName, err), logger.LogOptions{
- Level: logger.Warn,
- Prefix: "TaskManager",
- })
- }
-
- logger.Log(fmt.Sprintf("Task %s executed successfully", taskName), logger.LogOptions{
- Level: logger.Success,
- Prefix: "TaskManager",
- })
+ logger.Successf("TaskManager", "Task %s executed successfully", taskName)
}
case <-doneChan:
ticker.Stop()
@@ -238,10 +162,7 @@ func (tm *TaskManager) StartTask(taskName string) {
}
}()
- logger.Log(fmt.Sprintf("Task %s scheduled with interval %v", taskName, task.Interval), logger.LogOptions{
- Level: logger.Info,
- Prefix: "TaskManager",
- })
+ logger.Infof("TaskManager", "Task %s scheduled with interval %v", taskName, task.Interval)
}
func (tm *TaskManager) StopTask(taskName string) {
@@ -252,10 +173,7 @@ func (tm *TaskManager) StopTask(taskName string) {
close(doneChan)
delete(tm.Done, taskName)
delete(tm.Tickers, taskName)
- logger.Log(fmt.Sprintf("Task %s stopped", taskName), logger.LogOptions{
- Level: logger.Info,
- Prefix: "TaskManager",
- })
+ logger.Infof("TaskManager", "Task %s stopped", taskName)
}
}
@@ -283,10 +201,7 @@ func (tm *TaskManager) StopAllTasks() {
ticker.Stop()
delete(tm.Tickers, name)
}
- logger.Log(fmt.Sprintf("Task %s stopped", name), logger.LogOptions{
- Level: logger.Info,
- Prefix: "TaskManager",
- })
+ logger.Infof("TaskManager", "Task %s stopped", name)
}
}
@@ -297,11 +212,9 @@ func (tm *TaskManager) checkDependencies(task types.Task) bool {
}
for _, depName := range task.Dependencies {
- if !database.IsTaskComplete(depName) {
- logger.Log(fmt.Sprintf("Dependency %s not completed for task %s", depName, task.Name), logger.LogOptions{
- Level: logger.Debug,
- Prefix: "TaskManager",
- })
+ taskStatus, err := repositories.GetTaskStatus(depName)
+ if err != nil || !taskStatus.IsCompleted {
+ logger.Debugf("TaskManager", "Dependency %s not completed for task %s", depName, task.Name)
return false
}
}
@@ -316,9 +229,8 @@ func (tm *TaskManager) GetTaskStatus(taskName string) *types.TaskStatus {
tm.Mutex.Unlock()
var lastRun, nextRun *time.Time
- var logEntry entities.TaskLog
- if err := tm.Database.Where("task_name = ?", taskName).Order("executed_at desc").First(&logEntry).Error; err == nil {
+ if logEntry, err := repositories.GetLatestTaskLog(taskName); err == nil {
lastRun = &logEntry.ExecutedAt
if logEntry.Status == "error" {
lastRun = nil
@@ -329,10 +241,7 @@ func (tm *TaskManager) GetTaskStatus(taskName string) *types.TaskStatus {
nextRun = &next
}
} else if err != gorm.ErrRecordNotFound {
- logger.Log(fmt.Sprintf("Error fetching task log for %s: %v", taskName, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "TaskManager",
- })
+ logger.Errorf("TaskManager", "Error fetching task log for %s: %v", taskName, err)
}
return &types.TaskStatus{
diff --git a/tasks/producersync.task.go b/tasks/producersync.task.go
index e211eb3..59864ce 100644
--- a/tasks/producersync.task.go
+++ b/tasks/producersync.task.go
@@ -1,264 +1,78 @@
package tasks
import (
- "fmt"
- "metachan/database"
"metachan/entities"
+ "metachan/repositories"
"metachan/utils/api/jikan"
"metachan/utils/logger"
"time"
)
func ProducerSync() error {
- logger.Log("Starting producer sync (includes studios and licensors)...", logger.LogOptions{
- Level: logger.Info,
- Prefix: "ProducerSync",
- })
+ logger.Infof("ProducerSync", "Starting producer sync (includes studios and licensors)")
- client := jikan.NewJikanClient()
- page := 1
- totalFetched := 0
- var totalPages int
- var totalProducers int
- startTime := time.Now()
+ response, err := jikan.GetAnimeProducers()
+ if err != nil {
+ logger.Errorf("ProducerSync", "Failed to fetch producers: %v", err)
+ return err
+ }
- for {
- logger.Log(fmt.Sprintf("Fetching producers page %d...", page), logger.LogOptions{
- Level: logger.Info,
- Prefix: "ProducerSync",
- })
+ total := len(response.Data)
+ logger.Infof("ProducerSync", "Fetched %d producers from MAL", total)
+
+ startTime := time.Now()
- response, err := client.GetAnimeProducers(page)
+ for i, producerData := range response.Data {
+ producerDetail, err := jikan.GetProducerByID(producerData.MALID)
if err != nil {
- logger.Log(fmt.Sprintf("Failed to fetch producers page %d: %v", page, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "ProducerSync",
- })
- // If we fetched at least one page, continue with what we have
- if page > 1 {
- break
+ logger.Warnf("ProducerSync", "Failed to fetch details for producer %d: %v", producerData.MALID, err)
+ continue
+ }
+
+ var imageID *uint
+ if producerDetail.Data.Images.JPG.ImageURL != "" {
+ image := entities.SimpleImage{
+ ImageURL: producerDetail.Data.Images.JPG.ImageURL,
+ }
+ id, err := repositories.CreateOrUpdateSimpleImage(&image)
+ if err == nil {
+ imageID = &id
}
- return err
}
- if len(response.Data) == 0 {
- break
+ producer := entities.Producer{
+ MALID: producerDetail.Data.MALID,
+ URL: producerDetail.Data.URL,
+ Favorites: producerDetail.Data.Favorites,
+ Count: producerDetail.Data.Count,
+ Established: producerDetail.Data.Established,
+ About: producerDetail.Data.About,
+ ImageID: imageID,
}
- // Set total pages from first response
- if page == 1 {
- totalPages = response.Pagination.LastVisiblePage
- totalProducers = totalPages * len(response.Data)
- logger.Log(fmt.Sprintf("Total pages: %d, Estimated producers: %d", totalPages, totalProducers), logger.LogOptions{
- Level: logger.Info,
- Prefix: "ProducerSync",
+ for _, title := range producerDetail.Data.Titles {
+ producer.Titles = append(producer.Titles, entities.SimpleTitle{
+ Type: title.Type,
+ Title: title.Title,
})
}
- // Process each producer
- for _, producerData := range response.Data {
- // Check if producer already exists
- var existingProducer entities.Producer
- result := database.DB.Where("mal_id = ?", producerData.MALID).First(&existingProducer)
-
- if result.Error != nil {
- // Producer doesn't exist, create new
- producer := entities.Producer{
- MALID: producerData.MALID,
- URL: producerData.URL,
- Favorites: producerData.Favorites,
- Count: producerData.Count,
- Established: producerData.Established,
- About: producerData.About,
- }
-
- // Create producer in database
- if err := database.DB.Create(&producer).Error; err != nil {
- logger.Log(fmt.Sprintf("Failed to create producer %d: %v", producerData.MALID, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "ProducerSync",
- })
- continue
- }
-
- // Add titles
- for _, title := range producerData.Titles {
- producerTitle := entities.ProducerTitle{
- ProducerID: producer.ID,
- Type: title.Type,
- Title: title.Title,
- }
- if err := database.DB.Create(&producerTitle).Error; err != nil {
- logger.Log(fmt.Sprintf("Failed to create producer title for %d: %v", producerData.MALID, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "ProducerSync",
- })
- }
- }
-
- // Add image
- if producerData.Images.JPG.ImageURL != "" {
- producerImage := entities.ProducerImage{
- ProducerID: producer.ID,
- ImageURL: producerData.Images.JPG.ImageURL,
- }
- if err := database.DB.Create(&producerImage).Error; err != nil {
- logger.Log(fmt.Sprintf("Failed to create producer image for %d: %v", producerData.MALID, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "ProducerSync",
- })
- }
- }
-
- // Fetch and add external URLs
- time.Sleep(350 * time.Millisecond) // Rate limiting
- externalResp, err := client.GetProducerExternal(producerData.MALID)
- if err != nil {
- logger.Log(fmt.Sprintf("Failed to fetch external URLs for producer %d: %v", producerData.MALID, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "ProducerSync",
- })
- } else {
- for _, ext := range externalResp.Data {
- producerExt := entities.ProducerExternalURL{
- ProducerID: producer.ID,
- Name: ext.Name,
- URL: ext.URL,
- }
- if err := database.DB.Create(&producerExt).Error; err != nil {
- logger.Log(fmt.Sprintf("Failed to create external URL for producer %d: %v", producerData.MALID, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "ProducerSync",
- })
- }
- }
- }
-
- // Get primary title (default or first available)
- primaryTitle := "Unknown"
- for _, title := range producerData.Titles {
- if title.Type == "Default" {
- primaryTitle = title.Title
- break
- }
- }
- if primaryTitle == "Unknown" && len(producerData.Titles) > 0 {
- primaryTitle = producerData.Titles[0].Title
- }
-
- logger.Log(fmt.Sprintf("Created producer: %s (ID: %d, Count: %d)", primaryTitle, producerData.MALID, producerData.Count), logger.LogOptions{
- Level: logger.Success,
- Prefix: "ProducerSync",
- })
- } else {
- // Producer exists, update it
- existingProducer.URL = producerData.URL
- existingProducer.Favorites = producerData.Favorites
- existingProducer.Count = producerData.Count
- existingProducer.Established = producerData.Established
- existingProducer.About = producerData.About
-
- if err := database.DB.Save(&existingProducer).Error; err != nil {
- logger.Log(fmt.Sprintf("Failed to update producer %d: %v", producerData.MALID, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "ProducerSync",
- })
- continue
- }
-
- // Delete and recreate titles
- database.DB.Where("producer_id = ?", existingProducer.ID).Delete(&entities.ProducerTitle{})
- for _, title := range producerData.Titles {
- producerTitle := entities.ProducerTitle{
- ProducerID: existingProducer.ID,
- Type: title.Type,
- Title: title.Title,
- }
- database.DB.Create(&producerTitle)
- }
-
- // Update image
- var existingImage entities.ProducerImage
- if database.DB.Where("producer_id = ?", existingProducer.ID).First(&existingImage).Error == nil {
- existingImage.ImageURL = producerData.Images.JPG.ImageURL
- database.DB.Save(&existingImage)
- } else if producerData.Images.JPG.ImageURL != "" {
- producerImage := entities.ProducerImage{
- ProducerID: existingProducer.ID,
- ImageURL: producerData.Images.JPG.ImageURL,
- }
- database.DB.Create(&producerImage)
- }
-
- // Update external URLs
- time.Sleep(350 * time.Millisecond) // Rate limiting
- externalResp, err := client.GetProducerExternal(producerData.MALID)
- if err != nil {
- logger.Log(fmt.Sprintf("Failed to fetch external URLs for producer %d: %v", producerData.MALID, err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "ProducerSync",
- })
- } else {
- database.DB.Where("producer_id = ?", existingProducer.ID).Delete(&entities.ProducerExternalURL{})
- for _, ext := range externalResp.Data {
- producerExt := entities.ProducerExternalURL{
- ProducerID: existingProducer.ID,
- Name: ext.Name,
- URL: ext.URL,
- }
- database.DB.Create(&producerExt)
- }
- }
-
- primaryTitle := "Unknown"
- for _, title := range producerData.Titles {
- if title.Type == "Default" {
- primaryTitle = title.Title
- break
- }
- }
- if primaryTitle == "Unknown" && len(producerData.Titles) > 0 {
- primaryTitle = producerData.Titles[0].Title
- }
-
- logger.Log(fmt.Sprintf("Updated producer: %s (ID: %d, Count: %d)", primaryTitle, producerData.MALID, producerData.Count), logger.LogOptions{
- Level: logger.Success,
- Prefix: "ProducerSync",
- })
- }
-
- totalFetched++
-
- // Progress update every 10 producers
- if totalFetched%10 == 0 && totalProducers > 0 {
- progress := float64(totalFetched) / float64(totalProducers) * 100
- elapsed := time.Since(startTime)
- avgTimePerProducer := elapsed / time.Duration(totalFetched)
- remaining := totalProducers - totalFetched
- eta := avgTimePerProducer * time.Duration(remaining)
-
- logger.Log(fmt.Sprintf("Progress: %d/%d - %.1f%% | ETA: %v", totalFetched, totalProducers, progress, eta.Round(time.Second)), logger.LogOptions{
- Level: logger.Info,
- Prefix: "ProducerSync",
- })
- }
-
- time.Sleep(350 * time.Millisecond) // Rate limiting between producers
+ for _, ext := range producerDetail.Data.External {
+ producer.ExternalURLs = append(producer.ExternalURLs, entities.ExternalURL{
+ Name: ext.Name,
+ URL: ext.URL,
+ })
}
- // Check if there's more data
- if !response.Pagination.HasNextPage {
- break
+ if err := repositories.CreateOrUpdateProducer(&producer); err != nil {
+ logger.Warnf("ProducerSync", "Failed to sync producer %d: %v", producerData.MALID, err)
+ continue
}
- page++
- time.Sleep(1 * time.Second) // Additional delay between pages
+ progress, eta := calculateProgress(i+1, total, startTime)
+ logger.Infof("ProducerSync", "Progress: %d/%d (%.1f%%) | ETA: %v", i+1, total, progress, eta)
}
- logger.Log(fmt.Sprintf("Producer sync completed successfully. Total: %d producers", totalFetched), logger.LogOptions{
- Level: logger.Success,
- Prefix: "ProducerSync",
- })
-
+ logger.Successf("ProducerSync", "Producer sync completed. Total: %d producers", total)
return nil
}
diff --git a/tasks/tasks.go b/tasks/tasks.go
index 31d017d..8b19a19 100644
--- a/tasks/tasks.go
+++ b/tasks/tasks.go
@@ -1,9 +1,7 @@
package tasks
import (
- "fmt"
"metachan/config"
- "metachan/database"
"metachan/types"
"metachan/utils/logger"
"sync"
@@ -14,11 +12,10 @@ var GlobalTaskManager *TaskManager
func init() {
GlobalTaskManager = &TaskManager{
- Tasks: make(map[string]types.Task),
- Tickers: make(map[string]*time.Ticker),
- Done: make(map[string]chan bool),
- Mutex: sync.Mutex{},
- Database: database.DB,
+ Tasks: make(map[string]types.Task),
+ Tickers: make(map[string]*time.Ticker),
+ Done: make(map[string]chan bool),
+ Mutex: sync.Mutex{},
}
// Register ProducerSync task (every 7 days) - runs first to populate unified producer table
@@ -29,10 +26,7 @@ func init() {
})
if err != nil {
- logger.Log(fmt.Sprintf("Failed to register ProducerSync task: %v", err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "TaskManager",
- })
+ logger.Errorf("TaskManager", "Failed to register ProducerSync task: %v", err)
}
// Register GenreSync task (every 7 days)
@@ -43,10 +37,7 @@ func init() {
})
if err != nil {
- logger.Log(fmt.Sprintf("Failed to register GenreSync task: %v", err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "TaskManager",
- })
+ logger.Errorf("TaskManager", "Failed to register GenreSync task: %v", err)
}
// Register AniFetch task (weekly) - fetches anime mappings from Fribb list
@@ -59,14 +50,11 @@ func init() {
})
if err != nil {
- logger.Log(fmt.Sprintf("Failed to register AnimeFetch task: %v", err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "TaskManager",
- })
+ logger.Errorf("TaskManager", "Failed to register AnimeFetch task: %v", err)
}
// Register AnimeSync task (runs after AnimeFetch completes) - only if enabled in config
- if config.Config.AniSync {
+ if config.Sync.AniSync {
err = GlobalTaskManager.RegisterTask(types.Task{
Name: "AnimeSync",
Interval: 0, // Manual-only - waits for AnimeFetch dependency
@@ -75,10 +63,7 @@ func init() {
})
if err != nil {
- logger.Log(fmt.Sprintf("Failed to register AnimeSync task: %v", err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "TaskManager",
- })
+ logger.Errorf("TaskManager", "Failed to register AnimeSync task: %v", err)
}
}
@@ -90,9 +75,6 @@ func init() {
})
if err != nil {
- logger.Log(fmt.Sprintf("Failed to register AnimeUpdate task: %v", err), logger.LogOptions{
- Level: logger.Error,
- Prefix: "TaskManager",
- })
+ logger.Errorf("TaskManager", "Failed to register AnimeUpdate task: %v", err)
}
}