diff options
Diffstat (limited to 'tasks')
| -rw-r--r-- | tasks/anifetch.task.go | 121 | ||||
| -rw-r--r-- | tasks/anisync.task.go | 112 | ||||
| -rw-r--r-- | tasks/aniupdate.task.go | 223 | ||||
| -rw-r--r-- | tasks/genresync.task.go | 64 | ||||
| -rw-r--r-- | tasks/helpers.go | 12 | ||||
| -rw-r--r-- | tasks/manager.go | 155 | ||||
| -rw-r--r-- | tasks/producersync.task.go | 282 | ||||
| -rw-r--r-- | tasks/tasks.go | 38 |
8 files changed, 232 insertions, 775 deletions
diff --git a/tasks/anifetch.task.go b/tasks/anifetch.task.go index 894dd97..8d38da8 100644 --- a/tasks/anifetch.task.go +++ b/tasks/anifetch.task.go @@ -4,53 +4,43 @@ import ( "encoding/json" "fmt" "io" - "metachan/database" "metachan/entities" + "metachan/enums" + "metachan/repositories" "metachan/types" "metachan/utils/logger" "metachan/utils/mappers" "net/http" - - "gorm.io/gorm" ) -const fribbURL = "https://raw.githubusercontent.com/Fribb/anime-lists/master/anime-list-full.json" +const ( + fribbURL = "https://raw.githubusercontent.com/Fribb/anime-lists/master/anime-list-full.json" + batchSize = 1000 +) func AniFetch() error { - logger.Log("Starting Anime Fetch", logger.LogOptions{ - Level: logger.Info, - Prefix: "AniFetch", - }) + logger.Infof("AniFetch", "Starting Anime Fetch") response, err := http.Get(fribbURL) if err != nil { - logger.Log(fmt.Sprintf("Anime Fetch failed: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "AniFetch", - }) + logger.Errorf("AniFetch", "Anime Fetch failed: %v", err) return err } + defer response.Body.Close() body, err := io.ReadAll(response.Body) if err != nil { - logger.Log(fmt.Sprintf("Failed to read response body: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "AniFetch", - }) + logger.Errorf("AniFetch", "Failed to read response body: %v", err) return err } - var mappings []types.AniSyncMapping + var mappings []types.MappingResponse if err := json.Unmarshal(body, &mappings); err != nil { - logger.Log(fmt.Sprintf("Failed to unmarshal JSON: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "AniFetch", - }) + logger.Errorf("AniFetch", "Failed to unmarshal JSON: %v", err) return err } - batchSize := 1000 total := len(mappings) for i := 0; i < total; i += batchSize { @@ -61,21 +51,15 @@ func AniFetch() error { batch := mappings[i:end] processBatch(batch) - logger.Log(fmt.Sprintf("Processed %d/%d mappings", end, total), logger.LogOptions{ - Level: logger.Info, - Prefix: "AniFetch", - }) + logger.Infof("AniFetch", "Processed %d/%d mappings", end, total) } - logger.Log("Anime Fetch completed", logger.LogOptions{ - Level: logger.Success, - Prefix: "AniFetch", - }) + logger.Successf("AniFetch", "Anime Fetch completed") return nil } -func processBatch(mappings []types.AniSyncMapping) { +func processBatch(mappings []types.MappingResponse) { for _, mapping := range mappings { var composite *string if mapping.MAL != 0 && mapping.Anilist != 0 { @@ -83,61 +67,26 @@ func processBatch(mappings []types.AniSyncMapping) { composite = &comp } - var entity entities.AnimeMapping - if err := database.DB.Where("mal_anilist_composite = ?", composite).First(&entity).Error; err != nil { - if err == gorm.ErrRecordNotFound { - newEntity := entities.AnimeMapping{ - AniDB: mapping.AniDB, - Anilist: mapping.Anilist, - AnimeCountdown: mapping.AnimeCountdown, - AnimePlanet: mappers.ForceString(mapping.AnimePlanet), - AniSearch: mapping.AniSearch, - IMDB: mapping.IMDB, - Kitsu: mapping.Kitsu, - LiveChart: mapping.LiveChart, - MAL: mapping.MAL, - NotifyMoe: mapping.NotifyMoe, - Simkl: mapping.Simkl, - TMDB: mappers.ForceInt(mapping.TMDB), - TVDB: mapping.TVDB, - Type: entities.MappingType(mapping.Type), - MALAnilistComposite: composite, - } - if err := database.DB.Create(&newEntity).Error; err != nil { - logger.Log(fmt.Sprintf("Unable to process mapping %v: %v", mapping, err), logger.LogOptions{ - Level: logger.Warn, - Prefix: "AniFetch", - }) - } - } else { - logger.Log(fmt.Sprintf("Error fetching entity: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "AniFetch", - }) - } - } else { - // Update existing entity - entity.AniDB = mapping.AniDB - entity.Anilist = mapping.Anilist - entity.AnimeCountdown = mapping.AnimeCountdown - entity.AnimePlanet = mappers.ForceString(mapping.AnimePlanet) - entity.AniSearch = mapping.AniSearch - entity.IMDB = mapping.IMDB - entity.Kitsu = mapping.Kitsu - entity.LiveChart = mapping.LiveChart - entity.MAL = mapping.MAL - entity.NotifyMoe = mapping.NotifyMoe - entity.Simkl = mapping.Simkl - entity.TMDB = mappers.ForceInt(mapping.TMDB) - entity.TVDB = mapping.TVDB - entity.Type = entities.MappingType(mapping.Type) - entity.MALAnilistComposite = composite - if err := database.DB.Save(&entity).Error; err != nil { - logger.Log(fmt.Sprintf("Unable to update mapping %v: %v", mapping, err), logger.LogOptions{ - Level: logger.Warn, - Prefix: "AniFetch", - }) - } + entity := entities.Mapping{ + AniDB: mapping.AniDB, + Anilist: mapping.Anilist, + AnimeCountdown: mapping.AnimeCountdown, + AnimePlanet: mappers.ForceString(mapping.AnimePlanet), + AniSearch: mapping.AniSearch, + IMDB: mapping.IMDB, + Kitsu: mapping.Kitsu, + LiveChart: mapping.LiveChart, + MAL: mapping.MAL, + NotifyMoe: mapping.NotifyMoe, + Simkl: mapping.Simkl, + TMDB: mappers.ForceInt(mapping.TMDB), + TVDB: mapping.TVDB, + Type: enums.MappingAnimeType(mapping.Type), + MALAnilistComposite: composite, + } + + if err := repositories.CreateOrUpdateMapping(&entity); err != nil { + logger.Warnf("AniFetch", "Unable to process mapping %v: %v", mapping, err) } } } diff --git a/tasks/anisync.task.go b/tasks/anisync.task.go index dc98d3e..0a346ac 100644 --- a/tasks/anisync.task.go +++ b/tasks/anisync.task.go @@ -1,145 +1,69 @@ package tasks import ( - "fmt" - "metachan/database" - "metachan/entities" - "metachan/services/anime" + "metachan/enums" + "metachan/repositories" + "metachan/services" "metachan/utils/logger" "time" ) -// AniSync fetches full anime details for all anime in the database func AniSync() error { - logger.Log("Starting Anime Sync - Fetching full anime details", logger.LogOptions{ - Level: logger.Info, - Prefix: "AniSync", - }) + logger.Infof("AniSync", "Starting Anime Sync - Fetching full anime details") - // Get all anime mappings - var mappings []entities.AnimeMapping - if err := database.DB.Find(&mappings).Error; err != nil { - logger.Log(fmt.Sprintf("Failed to fetch anime mappings: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "AniSync", - }) + mappings, err := repositories.GetAllMappings() + if err != nil { + logger.Errorf("AniSync", "Failed to fetch anime mappings: %v", err) return err } total := len(mappings) - logger.Log(fmt.Sprintf("Found %d anime mappings", total), logger.LogOptions{ - Level: logger.Info, - Prefix: "AniSync", - }) + logger.Infof("AniSync", "Found %d anime mappings", total) - // Pre-count items needing sync for accurate ETA itemsToSync := 0 for _, mapping := range mappings { if mapping.MAL == 0 { continue } - var existingAnime entities.Anime - err := database.DB.Where("mal_id = ?", mapping.MAL).First(&existingAnime).Error + _, err := repositories.GetAnime(enums.MAL, mapping.MAL) if err == nil { - var episodeCount int64 - database.DB.Model(&entities.AnimeSingleEpisode{}).Where("anime_id = ?", existingAnime.ID).Count(&episodeCount) - if episodeCount > 0 { - continue - } + continue } itemsToSync++ } - logger.Log(fmt.Sprintf("Found %d anime to sync (%d already synced)", itemsToSync, total-itemsToSync), logger.LogOptions{ - Level: logger.Info, - Prefix: "AniSync", - }) + logger.Infof("AniSync", "Found %d anime to sync (%d already synced)", itemsToSync, total-itemsToSync) - animeService := anime.NewService() synced := 0 skipped := 0 startTime := time.Now() processed := 0 for _, mapping := range mappings { - // Skip if MAL ID is 0 (invalid) if mapping.MAL == 0 { skipped++ continue } - // Check if anime already exists in DB - var existingAnime entities.Anime - err := database.DB.Where("mal_id = ?", mapping.MAL).First(&existingAnime).Error - + _, err := repositories.GetAnime(enums.MAL, mapping.MAL) if err == nil { - // Check if anime has full details (has episodes) - var episodeCount int64 - database.DB.Model(&entities.AnimeSingleEpisode{}).Where("anime_id = ?", existingAnime.ID).Count(&episodeCount) - - if episodeCount > 0 { - skipped++ - continue - } + skipped++ + continue } - // Calculate progress and ETA - progress := float64(processed+1) / float64(itemsToSync) * 100 - eta := "" - if processed >= 10 { - elapsed := time.Since(startTime) - avgTimePerItem := elapsed / time.Duration(processed) - remainingItems := itemsToSync - processed - remaining := time.Duration(remainingItems) * avgTimePerItem - eta = formatDuration(remaining) - } else { - eta = "calculating..." - } + progress, eta := calculateProgress(processed+1, itemsToSync, startTime) - // Fetch full anime details - logger.Log(fmt.Sprintf("[%d/%d] Synchronising MAL ID %d - %.1f%% | ETA: %s", processed+1, itemsToSync, mapping.MAL, progress, eta), logger.LogOptions{ - Level: logger.Info, - Prefix: "AniSync", - }) + logger.Infof("AniSync", "[%d/%d] Synchronising MAL ID %d - %.1f%% | ETA: %v", processed+1, itemsToSync, mapping.MAL, progress, eta) - _, err = animeService.GetAnimeDetailsWithSource(&mapping, "anisync") + _, err = services.GetAnime(&mapping) if err != nil { - logger.Log(fmt.Sprintf("Failed to sync anime MAL ID %d: %v", mapping.MAL, err), logger.LogOptions{ - Level: logger.Warn, - Prefix: "AniSync", - }) + logger.Warnf("AniSync", "Failed to sync anime MAL ID %d: %v", mapping.MAL, err) continue } synced++ processed++ - - // Sleep to respect rate limits (1 second between requests) - time.Sleep(1 * time.Second) } - logger.Log(fmt.Sprintf("Anime Sync completed: %d synced, %d skipped", synced, skipped), logger.LogOptions{ - Level: logger.Success, - Prefix: "AniSync", - }) - return nil } - -// formatDuration converts duration to human-readable format -func formatDuration(d time.Duration) string { - d = d.Round(time.Second) - h := d / time.Hour - d -= h * time.Hour - m := d / time.Minute - d -= m * time.Minute - s := d / time.Second - - if h > 0 { - return fmt.Sprintf("%dh %dm", h, m) - } - if m > 0 { - return fmt.Sprintf("%dm %ds", m, s) - } - return fmt.Sprintf("%ds", s) -} diff --git a/tasks/aniupdate.task.go b/tasks/aniupdate.task.go index 38cc77a..e6d63a0 100644 --- a/tasks/aniupdate.task.go +++ b/tasks/aniupdate.task.go @@ -3,157 +3,106 @@ package tasks import ( "fmt" "metachan/config" - "metachan/database" "metachan/entities" - "metachan/services/anime" - "metachan/types" + "metachan/enums" + "metachan/repositories" + "metachan/services" "metachan/utils/logger" "sync" "time" ) -// Constants for anime update task const ( - // UpdaterSource identifies the source of the update request UpdaterSource = "updater" - // MaxConcurrentUpdates limits the number of concurrent anime updates MaxConcurrentUpdates = 5 - // MaxConcurrentSQLiteUpdates limits concurrent updates for SQLite to prevent locks MaxConcurrentSQLiteUpdates = 1 - // UpdateInterval defines how often an anime should be updated even without specific triggers UpdateInterval = 6 * time.Hour ) -// animeUpdateJob represents a single anime update job type animeUpdateJob struct { series entities.Anime reason string } -// AnimeUpdate checks for airing anime that need to be updated func AnimeUpdate() error { - logger.Log("Starting Anime Update Task", logger.LogOptions{ - Level: logger.Info, - Prefix: "AnimeUpdate", - }) - - // Find all currently airing anime - var airingSeries []entities.Anime - result := database.DB. - Where("airing = ?", true). - Preload("NextAiringEpisode"). - Preload("AiringSchedule"). - Find(&airingSeries) - - if result.Error != nil { - logger.Log(fmt.Sprintf("Failed to fetch airing anime: %v", result.Error), logger.LogOptions{ - Level: logger.Error, - Prefix: "AnimeUpdate", - }) - return result.Error + logger.Infof("AnimeUpdate", "Starting Anime Update Task") + + airingSeries, err := repositories.GetAiringAnime() + if err != nil { + logger.Errorf("AnimeUpdate", "Failed to fetch airing anime: %v", err) + return err } - logger.Log(fmt.Sprintf("Found %d airing anime series", len(airingSeries)), logger.LogOptions{ - Level: logger.Info, - Prefix: "AnimeUpdate", - }) + logger.Infof("AnimeUpdate", "Found %d airing anime series", len(airingSeries)) - // Get current timestamp currentTime := time.Now().Unix() - // Log the current time for debugging - logger.Log(fmt.Sprintf("Current timestamp: %d (%s)", - currentTime, time.Unix(currentTime, 0).Format(time.RFC3339)), logger.LogOptions{ - Level: logger.Debug, - Prefix: "AnimeUpdate", - }) + logger.Debugf("AnimeUpdate", "Current timestamp: %d (%s)", currentTime, time.Unix(currentTime, 0).Format(time.RFC3339)) - // Create a channel for jobs jobs := make(chan animeUpdateJob, len(airingSeries)) - // Determine max concurrency based on database type maxWorkers := MaxConcurrentUpdates - if config.Config.DatabaseDriver == types.SQLite { + if config.Database.Driver == "sqlite" { maxWorkers = MaxConcurrentSQLiteUpdates - logger.Log(fmt.Sprintf("Using reduced concurrency (%d workers) for SQLite database", - maxWorkers), logger.LogOptions{ - Level: logger.Debug, - Prefix: "AnimeUpdate", - }) + logger.Debugf("AnimeUpdate", "Using reduced concurrency (%d workers) for SQLite database", maxWorkers) } - // Create a wait group to wait for all workers to finish var wg sync.WaitGroup - // Create workers for i := 0; i < maxWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() - animeService := anime.NewService() - logger.Log(fmt.Sprintf("Started worker #%d", workerID), logger.LogOptions{ - Level: logger.Debug, - Prefix: "AnimeUpdate", - }) + logger.Debugf("AnimeUpdate", "Started worker #%d", workerID) - // Process jobs from the channel for job := range jobs { - updateAnime(animeService, job.series, job.reason) + updateAnime(job.series, job.reason) } }(i) } - // Queue updates for anime that need it jobsQueued := 0 for _, series := range airingSeries { - // Check if we need to update this anime needsUpdate := false reason := "" - // Log details about this particular anime for debugging - logger.Log(fmt.Sprintf("Checking anime: %s (ID: %d)", - series.TitleRomaji, series.MALID), logger.LogOptions{ - Level: logger.Debug, - Prefix: "AnimeUpdate", - }) + title := "" + if series.Title != nil { + if series.Title.Romaji != "" { + title = series.Title.Romaji + } else if series.Title.English != "" { + title = series.Title.English + } + } + + logger.Debugf("AnimeUpdate", "Checking anime: %s (ID: %d)", title, series.MALID) - // If there's no next airing episode data, we should update - if series.NextAiringEpisode == nil || series.NextAiringEpisode.AiringAt == 0 { + if series.NextAiring == nil || series.NextAiring.AiringAt == 0 { needsUpdate = true reason = "missing next episode data" - } else if int64(series.NextAiringEpisode.AiringAt) <= currentTime { - // If the next episode should have aired already, update to get fresh data + } else if int64(series.NextAiring.AiringAt) <= currentTime { needsUpdate = true reason = "next episode already aired" } - // Check if the anime was last updated more than the update interval ago if !needsUpdate && !series.LastUpdated.IsZero() && time.Since(series.LastUpdated) > UpdateInterval { needsUpdate = true reason = fmt.Sprintf("regular update (last updated %s ago)", time.Since(series.LastUpdated).Round(time.Second)) } - // Log update decision if !needsUpdate { - logger.Log(fmt.Sprintf("Skipping update for %s (ID: %d) - no update needed. Next airing at: %d", - series.TitleRomaji, series.MALID, series.NextAiringEpisode.AiringAt), logger.LogOptions{ - Level: logger.Debug, - Prefix: "AnimeUpdate", - }) + logger.Debugf("AnimeUpdate", "Skipping update for %s (ID: %d) - no update needed. Next airing at: %d", + title, series.MALID, series.NextAiring.AiringAt) continue } - // Add the job to the queue - logger.Log(fmt.Sprintf("Queueing update for %s (ID: %d) - Reason: %s", - series.TitleRomaji, series.MALID, reason), logger.LogOptions{ - Level: logger.Debug, - Prefix: "AnimeUpdate", - }) + logger.Debugf("AnimeUpdate", "Queueing update for %s (ID: %d) - Reason: %s", + title, series.MALID, reason) jobs <- animeUpdateJob{ series: series, @@ -162,132 +111,96 @@ func AnimeUpdate() error { jobsQueued++ } - // Close the job channel to signal workers that no more jobs are coming close(jobs) - // Wait for all workers to finish wg.Wait() - logger.Log(fmt.Sprintf("Anime Update Task Completed - Processed %d anime", jobsQueued), logger.LogOptions{ - Level: logger.Success, - Prefix: "AnimeUpdate", - }) + logger.Successf("AnimeUpdate", "Anime Update Task Completed - Processed %d anime", jobsQueued) return nil } -// updateAnime updates a single anime series -func updateAnime(animeService *anime.Service, series entities.Anime, reason string) { - title := series.TitleRomaji - if series.TitleEnglish != "" { - title = series.TitleEnglish +func updateAnime(series entities.Anime, reason string) { + title := "" + if series.Title != nil { + if series.Title.English != "" { + title = series.Title.English + } else if series.Title.Romaji != "" { + title = series.Title.Romaji + } } - logger.Log(fmt.Sprintf("Updating anime: %s (MAL ID: %d) - %s", title, series.MALID, reason), logger.LogOptions{ - Level: logger.Info, - Prefix: "AnimeUpdate", - }) + logger.Infof("AnimeUpdate", "Updating anime: %s (MAL ID: %d) - %s", title, series.MALID, reason) - // Get anime mapping for the service call - mapping, err := database.GetAnimeMappingViaMALID(series.MALID) + mapping, err := repositories.GetAnimeMapping(enums.MAL, series.MALID) if err != nil { - logger.Log(fmt.Sprintf("Error getting anime mapping for %s (MAL ID: %d): %v", title, series.MALID, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "AnimeUpdate", - }) + logger.Errorf("AnimeUpdate", "Error getting anime mapping for %s (MAL ID: %d): %v", title, series.MALID, err) return } - // Get updated anime data from API - updatedAnime, err := animeService.GetAnimeDetailsWithSource(mapping, "mal") + updatedAnime, err := services.GetAnime(&mapping) if err != nil { - logger.Log(fmt.Sprintf("Error getting updated anime data for %s (MAL ID: %d): %v", title, series.MALID, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "AnimeUpdate", - }) + logger.Errorf("AnimeUpdate", "Error getting updated anime data for %s (MAL ID: %d): %v", title, series.MALID, err) return } - logger.Log(fmt.Sprintf("Successfully updated anime: %s (MAL ID: %d)", title, series.MALID), logger.LogOptions{ - Level: logger.Info, - Prefix: "AnimeUpdate", - }) + logger.Successf("AnimeUpdate", "Successfully updated anime: %s (MAL ID: %d)", title, series.MALID) - // Check if the updated anime data has significant changes that warrant saving if shouldSaveUpdate(&series, updatedAnime) { - // Check if anime is still airing if updatedAnime.Status != "RELEASING" && updatedAnime.Status != "AIRING" { - // Update the anime data to reflect that it's no longer airing updatedAnime.Airing = false } - if err := database.SaveAnimeToDatabase(updatedAnime); err != nil { - logger.Log(fmt.Sprintf("Error saving updated anime data for %s (MAL ID: %d): %v", title, series.MALID, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "AnimeUpdate", - }) + if err := repositories.CreateOrUpdateAnime(updatedAnime); err != nil { + logger.Errorf("AnimeUpdate", "Error saving updated anime data for %s (MAL ID: %d): %v", title, series.MALID, err) } else { - logger.Log(fmt.Sprintf("Successfully saved updated data for %s (MAL ID: %d)", title, series.MALID), logger.LogOptions{ - Level: logger.Info, - Prefix: "AnimeUpdate", - }) + logger.Infof("AnimeUpdate", "Successfully saved updated data for %s (MAL ID: %d)", title, series.MALID) if !updatedAnime.Airing { - logger.Log(fmt.Sprintf("Anime %s (MAL ID: %d) is no longer airing. Status: %s", title, series.MALID, updatedAnime.Status), logger.LogOptions{ - Level: logger.Info, - Prefix: "AnimeUpdate", - }) + logger.Infof("AnimeUpdate", "Anime %s (MAL ID: %d) is no longer airing. Status: %s", title, series.MALID, updatedAnime.Status) } } } else { - logger.Log(fmt.Sprintf("No significant changes detected for %s (MAL ID: %d), skipping database update", title, series.MALID), logger.LogOptions{ - Level: logger.Debug, - Prefix: "AnimeUpdate", - }) + logger.Debugf("AnimeUpdate", "No significant changes detected for %s (MAL ID: %d), skipping database update", title, series.MALID) } } -// shouldSaveUpdate determines if the updated anime data has significant changes -// that warrant saving it to the database -func shouldSaveUpdate(oldAnime *entities.Anime, newAnime *types.Anime) bool { +func shouldSaveUpdate(oldAnime *entities.Anime, newAnime *entities.Anime) bool { if oldAnime == nil { return true } - // Convert old anime to types.Anime for easier comparison - oldAnimeConverted := database.ConvertToTypesAnime(oldAnime) - - // Check for changes in next airing episode - oldNextEp := oldAnimeConverted.NextAiringEpisode - newNextEp := newAnime.NextAiringEpisode + oldHasNext := oldAnime.NextAiring != nil && oldAnime.NextAiring.AiringAt > 0 + newHasNext := newAnime.NextAiring != nil && newAnime.NextAiring.AiringAt > 0 - // If next episode timestamp or number changed - if oldNextEp.AiringAt != newNextEp.AiringAt || oldNextEp.Episode != newNextEp.Episode { + if oldHasNext != newHasNext { return true } - // Check if sub/dub count changed - if oldAnimeConverted.Episodes.Subbed != newAnime.Episodes.Subbed || - oldAnimeConverted.Episodes.Dubbed != newAnime.Episodes.Dubbed { + if oldHasNext && newHasNext { + if oldAnime.NextAiring.AiringAt != newAnime.NextAiring.AiringAt || + oldAnime.NextAiring.Episode != newAnime.NextAiring.Episode { + return true + } + } + + if oldAnime.SubbedCount != newAnime.SubbedCount || + oldAnime.DubbedCount != newAnime.DubbedCount { return true } - // Check if airing status changed - if oldAnimeConverted.Airing != newAnime.Airing || - oldAnimeConverted.Status != newAnime.Status { + if oldAnime.Airing != newAnime.Airing || + oldAnime.Status != newAnime.Status { return true } - // Check if the total episode count has changed - if oldAnimeConverted.Episodes.Total != newAnime.Episodes.Total { + if oldAnime.TotalEpisodes != newAnime.TotalEpisodes { return true } - // Check if number of episodes in the airing schedule changed - if len(oldAnimeConverted.AiringSchedule) != len(newAnime.AiringSchedule) { + if len(oldAnime.Schedule) != len(newAnime.Schedule) { return true } - // No significant changes detected return false } diff --git a/tasks/genresync.task.go b/tasks/genresync.task.go index e240833..22e2712 100644 --- a/tasks/genresync.task.go +++ b/tasks/genresync.task.go @@ -1,82 +1,36 @@ package tasks import ( - "fmt" - "metachan/database" "metachan/entities" + "metachan/repositories" "metachan/utils/api/jikan" "metachan/utils/logger" ) -// GenreSync synchronizes genre data from MAL via Jikan API func GenreSync() error { - logger.Log("Starting Genre Sync from MAL", logger.LogOptions{ - Level: logger.Info, - Prefix: "GenreSync", - }) + logger.Infof("GenreSync", "Starting Genre Sync from MAL") - // Create Jikan client - client := jikan.NewJikanClient() - - // Wait for rate limit - client.WaitForRateLimit() - - // Fetch genres from Jikan API - genresResponse, err := client.GetAnimeGenres() + genresResponse, err := jikan.GetAnimeGenres() if err != nil { - logger.Log(fmt.Sprintf("Failed to fetch genres from MAL: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "GenreSync", - }) + logger.Errorf("GenreSync", "Failed to fetch genres from MAL: %v", err) return err } - logger.Log(fmt.Sprintf("Fetched %d genres from MAL", len(genresResponse.Data)), logger.LogOptions{ - Level: logger.Info, - Prefix: "GenreSync", - }) + logger.Infof("GenreSync", "Fetched %d genres from MAL", len(genresResponse.Data)) - // Update or create genres in database for _, genre := range genresResponse.Data { - // Create a genre entry with AnimeID = 0 to indicate it's a master genre - genreEntity := entities.AnimeGenre{ - AnimeID: 0, // Master genre, not tied to specific anime + genreEntity := entities.Genre{ GenreID: genre.MALID, Name: genre.Name, URL: genre.URL, Count: genre.Count, } - // Update or create - var existing entities.AnimeGenre - result := database.DB.Where("genre_id = ? AND anime_id = 0", genre.MALID).First(&existing) - - if result.Error == nil { - // Update existing - existing.Name = genre.Name - existing.URL = genre.URL - existing.Count = genre.Count - if err := database.DB.Save(&existing).Error; err != nil { - logger.Log(fmt.Sprintf("Failed to update genre %s: %v", genre.Name, err), logger.LogOptions{ - Level: logger.Warn, - Prefix: "GenreSync", - }) - } - } else { - // Create new - if err := database.DB.Create(&genreEntity).Error; err != nil { - logger.Log(fmt.Sprintf("Failed to create genre %s: %v", genre.Name, err), logger.LogOptions{ - Level: logger.Warn, - Prefix: "GenreSync", - }) - } + if err := repositories.CreateOrUpdateGenre(&genreEntity); err != nil { + logger.Warnf("GenreSync", "Failed to sync genre %s: %v", genre.Name, err) } } - logger.Log("Genre Sync completed successfully", logger.LogOptions{ - Level: logger.Success, - Prefix: "GenreSync", - }) - + logger.Successf("GenreSync", "Genre Sync completed successfully. Synced %d genres", len(genresResponse.Data)) return nil } diff --git a/tasks/helpers.go b/tasks/helpers.go new file mode 100644 index 0000000..b99a977 --- /dev/null +++ b/tasks/helpers.go @@ -0,0 +1,12 @@ +package tasks + +import "time" + +func calculateProgress(current, total int, startTime time.Time) (progress float64, eta time.Duration) { + progress = float64(current) / float64(total) * 100 + elapsed := time.Since(startTime) + avgTimePerItem := elapsed / time.Duration(current) + remaining := total - current + eta = avgTimePerItem * time.Duration(remaining) + return progress, eta.Round(time.Second) +} diff --git a/tasks/manager.go b/tasks/manager.go index af86b17..9bcf4f5 100644 --- a/tasks/manager.go +++ b/tasks/manager.go @@ -2,8 +2,8 @@ package tasks import ( "fmt" - "metachan/database" "metachan/entities" + "metachan/repositories" "metachan/types" "metachan/utils/logger" "sync" @@ -13,11 +13,10 @@ import ( ) type TaskManager struct { - Tasks map[string]types.Task - Tickers map[string]*time.Ticker - Done map[string]chan bool - Mutex sync.Mutex - Database *gorm.DB + Tasks map[string]types.Task + Tickers map[string]*time.Ticker + Done map[string]chan bool + Mutex sync.Mutex } func (tm *TaskManager) RegisterTask(task types.Task) error { @@ -29,22 +28,17 @@ func (tm *TaskManager) RegisterTask(task types.Task) error { } tm.Tasks[task.Name] = task - logger.Log(fmt.Sprintf("Task %s registered", task.Name), logger.LogOptions{ - Level: logger.Info, - Prefix: "TaskManager", - }) + logger.Infof("TaskManager", "Task %s registered", task.Name) return nil } func (tm *TaskManager) shouldExecuteTask(taskName string, interval time.Duration) (bool, error) { - var lastLog entities.TaskLog - - if err := tm.Database.Where("task_name = ?", taskName).Order("executed_at desc").First(&lastLog).Error; err != nil { + lastLog, err := repositories.GetLatestTaskLog(taskName) + if err != nil { if err == gorm.ErrRecordNotFound { return true, nil } - return false, err } @@ -60,11 +54,8 @@ func (tm *TaskManager) logTaskExecution(taskName, status, message string) { ExecutedAt: time.Now(), } - if err := tm.Database.Create(&logEntry).Error; err != nil { - logger.Log(fmt.Sprintf("Failed to log task execution for %s: %v", taskName, err), logger.LogOptions{ - Level: logger.Warn, - Prefix: "TaskManager", - }) + if err := repositories.CreateTaskLog(&logEntry); err != nil { + logger.Warnf("TaskManager", "Failed to log task execution for %s: %v", taskName, err) } } @@ -73,10 +64,7 @@ func (tm *TaskManager) StartTask(taskName string) { task, exists := tm.Tasks[taskName] tm.Mutex.Unlock() if !exists { - logger.Log(fmt.Sprintf("Task %s not found", taskName), logger.LogOptions{ - Level: logger.Warn, - Prefix: "TaskManager", - }) + logger.Warnf("TaskManager", "Task %s not found", taskName) return } @@ -85,10 +73,7 @@ func (tm *TaskManager) StartTask(taskName string) { shouldExec, err := tm.shouldExecuteTask(taskName, task.Interval) if err != nil { - logger.Log(fmt.Sprintf("Error checking execution condition for task %s: %v", taskName, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "TaskManager", - }) + logger.Errorf("TaskManager", "Error checking execution condition for task %s: %v", taskName, err) return } @@ -102,81 +87,41 @@ func (tm *TaskManager) StartTask(taskName string) { if shouldExec { // Check dependencies before executing if !tm.checkDependencies(task) { - logger.Log(fmt.Sprintf("Task %s dependencies not met, skipping execution", taskName), logger.LogOptions{ - Level: logger.Warn, - Prefix: "TaskManager", - }) + logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName) } else if err := task.Execute(); err != nil { tm.logTaskExecution(taskName, "error", err.Error()) - logger.Log(fmt.Sprintf("Task %s execution failed: %v", taskName, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "TaskManager", - }) + logger.Errorf("TaskManager", "Task %s execution failed: %v", taskName, err) } else { task.LastRun = time.Now() tm.logTaskExecution(taskName, "success", "Task executed successfully") - - // Mark task as complete - if err := database.MarkTaskComplete(taskName); err != nil { - logger.Log(fmt.Sprintf("Failed to mark task %s as complete: %v", taskName, err), logger.LogOptions{ - Level: logger.Warn, - Prefix: "TaskManager", - }) - } - - logger.Log(fmt.Sprintf("Task %s executed successfully", taskName), logger.LogOptions{ - Level: logger.Success, - Prefix: "TaskManager", - }) + logger.Successf("TaskManager", "Task %s executed successfully", taskName) } } else { // Calculate time until next execution - var lastLog entities.TaskLog var initialDelay time.Duration = task.Interval - if err := tm.Database.Where("task_name = ?", taskName).Order("executed_at desc").First(&lastLog).Error; err == nil { + if lastLog, err := repositories.GetLatestTaskLog(taskName); err == nil { elapsed := time.Since(lastLog.ExecutedAt) if elapsed < task.Interval { initialDelay = task.Interval - elapsed } } - logger.Log(fmt.Sprintf("Task %s will run in %v", taskName, initialDelay), logger.LogOptions{ - Level: logger.Info, - Prefix: "TaskManager", - }) + logger.Infof("TaskManager", "Task %s will run in %v", taskName, initialDelay) // Wait for initial delay before first execution select { case <-time.After(initialDelay): // Check dependencies before executing if !tm.checkDependencies(task) { - logger.Log(fmt.Sprintf("Task %s dependencies not met, skipping execution", taskName), logger.LogOptions{ - Level: logger.Warn, - Prefix: "TaskManager", - }) + logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName) } else if err := task.Execute(); err != nil { tm.logTaskExecution(taskName, "error", err.Error()) - logger.Log(fmt.Sprintf("Task %s execution failed: %v", taskName, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "TaskManager", - }) + logger.Errorf("TaskManager", "Task %s execution failed: %v", taskName, err) } else { task.LastRun = time.Now() tm.logTaskExecution(taskName, "success", "Task executed successfully") - - // Mark task as complete - if err := database.MarkTaskComplete(taskName); err != nil { - logger.Log(fmt.Sprintf("Failed to mark task %s as complete: %v", taskName, err), logger.LogOptions{ - Level: logger.Warn, - Prefix: "TaskManager", - }) - } - - logger.Log(fmt.Sprintf("Task %s executed successfully", taskName), logger.LogOptions{ - Level: logger.Success, - Prefix: "TaskManager", - }) + logger.Successf("TaskManager", "Task %s executed successfully", taskName) } case <-doneChan: return @@ -185,10 +130,7 @@ func (tm *TaskManager) StartTask(taskName string) { // Skip ticker creation for manual-only tasks (interval = 0) if task.Interval == 0 { - logger.Log(fmt.Sprintf("Task %s is manual-only (no scheduled interval)", taskName), logger.LogOptions{ - Level: logger.Debug, - Prefix: "TaskManager", - }) + logger.Debugf("TaskManager", "Task %s is manual-only (no scheduled interval)", taskName) return } @@ -204,32 +146,14 @@ func (tm *TaskManager) StartTask(taskName string) { case <-ticker.C: // Check dependencies before executing if !tm.checkDependencies(task) { - logger.Log(fmt.Sprintf("Task %s dependencies not met, skipping execution", taskName), logger.LogOptions{ - Level: logger.Warn, - Prefix: "TaskManager", - }) + logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName) } else if err := task.Execute(); err != nil { tm.logTaskExecution(taskName, "error", err.Error()) - logger.Log(fmt.Sprintf("Task %s execution failed: %v", taskName, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "TaskManager", - }) + logger.Errorf("TaskManager", "Task %s execution failed: %v", taskName, err) } else { task.LastRun = time.Now() tm.logTaskExecution(taskName, "success", "Task executed successfully") - - // Mark task as complete - if err := database.MarkTaskComplete(taskName); err != nil { - logger.Log(fmt.Sprintf("Failed to mark task %s as complete: %v", taskName, err), logger.LogOptions{ - Level: logger.Warn, - Prefix: "TaskManager", - }) - } - - logger.Log(fmt.Sprintf("Task %s executed successfully", taskName), logger.LogOptions{ - Level: logger.Success, - Prefix: "TaskManager", - }) + logger.Successf("TaskManager", "Task %s executed successfully", taskName) } case <-doneChan: ticker.Stop() @@ -238,10 +162,7 @@ func (tm *TaskManager) StartTask(taskName string) { } }() - logger.Log(fmt.Sprintf("Task %s scheduled with interval %v", taskName, task.Interval), logger.LogOptions{ - Level: logger.Info, - Prefix: "TaskManager", - }) + logger.Infof("TaskManager", "Task %s scheduled with interval %v", taskName, task.Interval) } func (tm *TaskManager) StopTask(taskName string) { @@ -252,10 +173,7 @@ func (tm *TaskManager) StopTask(taskName string) { close(doneChan) delete(tm.Done, taskName) delete(tm.Tickers, taskName) - logger.Log(fmt.Sprintf("Task %s stopped", taskName), logger.LogOptions{ - Level: logger.Info, - Prefix: "TaskManager", - }) + logger.Infof("TaskManager", "Task %s stopped", taskName) } } @@ -283,10 +201,7 @@ func (tm *TaskManager) StopAllTasks() { ticker.Stop() delete(tm.Tickers, name) } - logger.Log(fmt.Sprintf("Task %s stopped", name), logger.LogOptions{ - Level: logger.Info, - Prefix: "TaskManager", - }) + logger.Infof("TaskManager", "Task %s stopped", name) } } @@ -297,11 +212,9 @@ func (tm *TaskManager) checkDependencies(task types.Task) bool { } for _, depName := range task.Dependencies { - if !database.IsTaskComplete(depName) { - logger.Log(fmt.Sprintf("Dependency %s not completed for task %s", depName, task.Name), logger.LogOptions{ - Level: logger.Debug, - Prefix: "TaskManager", - }) + taskStatus, err := repositories.GetTaskStatus(depName) + if err != nil || !taskStatus.IsCompleted { + logger.Debugf("TaskManager", "Dependency %s not completed for task %s", depName, task.Name) return false } } @@ -316,9 +229,8 @@ func (tm *TaskManager) GetTaskStatus(taskName string) *types.TaskStatus { tm.Mutex.Unlock() var lastRun, nextRun *time.Time - var logEntry entities.TaskLog - if err := tm.Database.Where("task_name = ?", taskName).Order("executed_at desc").First(&logEntry).Error; err == nil { + if logEntry, err := repositories.GetLatestTaskLog(taskName); err == nil { lastRun = &logEntry.ExecutedAt if logEntry.Status == "error" { lastRun = nil @@ -329,10 +241,7 @@ func (tm *TaskManager) GetTaskStatus(taskName string) *types.TaskStatus { nextRun = &next } } else if err != gorm.ErrRecordNotFound { - logger.Log(fmt.Sprintf("Error fetching task log for %s: %v", taskName, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "TaskManager", - }) + logger.Errorf("TaskManager", "Error fetching task log for %s: %v", taskName, err) } return &types.TaskStatus{ diff --git a/tasks/producersync.task.go b/tasks/producersync.task.go index e211eb3..59864ce 100644 --- a/tasks/producersync.task.go +++ b/tasks/producersync.task.go @@ -1,264 +1,78 @@ package tasks import ( - "fmt" - "metachan/database" "metachan/entities" + "metachan/repositories" "metachan/utils/api/jikan" "metachan/utils/logger" "time" ) func ProducerSync() error { - logger.Log("Starting producer sync (includes studios and licensors)...", logger.LogOptions{ - Level: logger.Info, - Prefix: "ProducerSync", - }) + logger.Infof("ProducerSync", "Starting producer sync (includes studios and licensors)") - client := jikan.NewJikanClient() - page := 1 - totalFetched := 0 - var totalPages int - var totalProducers int - startTime := time.Now() + response, err := jikan.GetAnimeProducers() + if err != nil { + logger.Errorf("ProducerSync", "Failed to fetch producers: %v", err) + return err + } - for { - logger.Log(fmt.Sprintf("Fetching producers page %d...", page), logger.LogOptions{ - Level: logger.Info, - Prefix: "ProducerSync", - }) + total := len(response.Data) + logger.Infof("ProducerSync", "Fetched %d producers from MAL", total) + + startTime := time.Now() - response, err := client.GetAnimeProducers(page) + for i, producerData := range response.Data { + producerDetail, err := jikan.GetProducerByID(producerData.MALID) if err != nil { - logger.Log(fmt.Sprintf("Failed to fetch producers page %d: %v", page, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "ProducerSync", - }) - // If we fetched at least one page, continue with what we have - if page > 1 { - break + logger.Warnf("ProducerSync", "Failed to fetch details for producer %d: %v", producerData.MALID, err) + continue + } + + var imageID *uint + if producerDetail.Data.Images.JPG.ImageURL != "" { + image := entities.SimpleImage{ + ImageURL: producerDetail.Data.Images.JPG.ImageURL, + } + id, err := repositories.CreateOrUpdateSimpleImage(&image) + if err == nil { + imageID = &id } - return err } - if len(response.Data) == 0 { - break + producer := entities.Producer{ + MALID: producerDetail.Data.MALID, + URL: producerDetail.Data.URL, + Favorites: producerDetail.Data.Favorites, + Count: producerDetail.Data.Count, + Established: producerDetail.Data.Established, + About: producerDetail.Data.About, + ImageID: imageID, } - // Set total pages from first response - if page == 1 { - totalPages = response.Pagination.LastVisiblePage - totalProducers = totalPages * len(response.Data) - logger.Log(fmt.Sprintf("Total pages: %d, Estimated producers: %d", totalPages, totalProducers), logger.LogOptions{ - Level: logger.Info, - Prefix: "ProducerSync", + for _, title := range producerDetail.Data.Titles { + producer.Titles = append(producer.Titles, entities.SimpleTitle{ + Type: title.Type, + Title: title.Title, }) } - // Process each producer - for _, producerData := range response.Data { - // Check if producer already exists - var existingProducer entities.Producer - result := database.DB.Where("mal_id = ?", producerData.MALID).First(&existingProducer) - - if result.Error != nil { - // Producer doesn't exist, create new - producer := entities.Producer{ - MALID: producerData.MALID, - URL: producerData.URL, - Favorites: producerData.Favorites, - Count: producerData.Count, - Established: producerData.Established, - About: producerData.About, - } - - // Create producer in database - if err := database.DB.Create(&producer).Error; err != nil { - logger.Log(fmt.Sprintf("Failed to create producer %d: %v", producerData.MALID, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "ProducerSync", - }) - continue - } - - // Add titles - for _, title := range producerData.Titles { - producerTitle := entities.ProducerTitle{ - ProducerID: producer.ID, - Type: title.Type, - Title: title.Title, - } - if err := database.DB.Create(&producerTitle).Error; err != nil { - logger.Log(fmt.Sprintf("Failed to create producer title for %d: %v", producerData.MALID, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "ProducerSync", - }) - } - } - - // Add image - if producerData.Images.JPG.ImageURL != "" { - producerImage := entities.ProducerImage{ - ProducerID: producer.ID, - ImageURL: producerData.Images.JPG.ImageURL, - } - if err := database.DB.Create(&producerImage).Error; err != nil { - logger.Log(fmt.Sprintf("Failed to create producer image for %d: %v", producerData.MALID, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "ProducerSync", - }) - } - } - - // Fetch and add external URLs - time.Sleep(350 * time.Millisecond) // Rate limiting - externalResp, err := client.GetProducerExternal(producerData.MALID) - if err != nil { - logger.Log(fmt.Sprintf("Failed to fetch external URLs for producer %d: %v", producerData.MALID, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "ProducerSync", - }) - } else { - for _, ext := range externalResp.Data { - producerExt := entities.ProducerExternalURL{ - ProducerID: producer.ID, - Name: ext.Name, - URL: ext.URL, - } - if err := database.DB.Create(&producerExt).Error; err != nil { - logger.Log(fmt.Sprintf("Failed to create external URL for producer %d: %v", producerData.MALID, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "ProducerSync", - }) - } - } - } - - // Get primary title (default or first available) - primaryTitle := "Unknown" - for _, title := range producerData.Titles { - if title.Type == "Default" { - primaryTitle = title.Title - break - } - } - if primaryTitle == "Unknown" && len(producerData.Titles) > 0 { - primaryTitle = producerData.Titles[0].Title - } - - logger.Log(fmt.Sprintf("Created producer: %s (ID: %d, Count: %d)", primaryTitle, producerData.MALID, producerData.Count), logger.LogOptions{ - Level: logger.Success, - Prefix: "ProducerSync", - }) - } else { - // Producer exists, update it - existingProducer.URL = producerData.URL - existingProducer.Favorites = producerData.Favorites - existingProducer.Count = producerData.Count - existingProducer.Established = producerData.Established - existingProducer.About = producerData.About - - if err := database.DB.Save(&existingProducer).Error; err != nil { - logger.Log(fmt.Sprintf("Failed to update producer %d: %v", producerData.MALID, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "ProducerSync", - }) - continue - } - - // Delete and recreate titles - database.DB.Where("producer_id = ?", existingProducer.ID).Delete(&entities.ProducerTitle{}) - for _, title := range producerData.Titles { - producerTitle := entities.ProducerTitle{ - ProducerID: existingProducer.ID, - Type: title.Type, - Title: title.Title, - } - database.DB.Create(&producerTitle) - } - - // Update image - var existingImage entities.ProducerImage - if database.DB.Where("producer_id = ?", existingProducer.ID).First(&existingImage).Error == nil { - existingImage.ImageURL = producerData.Images.JPG.ImageURL - database.DB.Save(&existingImage) - } else if producerData.Images.JPG.ImageURL != "" { - producerImage := entities.ProducerImage{ - ProducerID: existingProducer.ID, - ImageURL: producerData.Images.JPG.ImageURL, - } - database.DB.Create(&producerImage) - } - - // Update external URLs - time.Sleep(350 * time.Millisecond) // Rate limiting - externalResp, err := client.GetProducerExternal(producerData.MALID) - if err != nil { - logger.Log(fmt.Sprintf("Failed to fetch external URLs for producer %d: %v", producerData.MALID, err), logger.LogOptions{ - Level: logger.Error, - Prefix: "ProducerSync", - }) - } else { - database.DB.Where("producer_id = ?", existingProducer.ID).Delete(&entities.ProducerExternalURL{}) - for _, ext := range externalResp.Data { - producerExt := entities.ProducerExternalURL{ - ProducerID: existingProducer.ID, - Name: ext.Name, - URL: ext.URL, - } - database.DB.Create(&producerExt) - } - } - - primaryTitle := "Unknown" - for _, title := range producerData.Titles { - if title.Type == "Default" { - primaryTitle = title.Title - break - } - } - if primaryTitle == "Unknown" && len(producerData.Titles) > 0 { - primaryTitle = producerData.Titles[0].Title - } - - logger.Log(fmt.Sprintf("Updated producer: %s (ID: %d, Count: %d)", primaryTitle, producerData.MALID, producerData.Count), logger.LogOptions{ - Level: logger.Success, - Prefix: "ProducerSync", - }) - } - - totalFetched++ - - // Progress update every 10 producers - if totalFetched%10 == 0 && totalProducers > 0 { - progress := float64(totalFetched) / float64(totalProducers) * 100 - elapsed := time.Since(startTime) - avgTimePerProducer := elapsed / time.Duration(totalFetched) - remaining := totalProducers - totalFetched - eta := avgTimePerProducer * time.Duration(remaining) - - logger.Log(fmt.Sprintf("Progress: %d/%d - %.1f%% | ETA: %v", totalFetched, totalProducers, progress, eta.Round(time.Second)), logger.LogOptions{ - Level: logger.Info, - Prefix: "ProducerSync", - }) - } - - time.Sleep(350 * time.Millisecond) // Rate limiting between producers + for _, ext := range producerDetail.Data.External { + producer.ExternalURLs = append(producer.ExternalURLs, entities.ExternalURL{ + Name: ext.Name, + URL: ext.URL, + }) } - // Check if there's more data - if !response.Pagination.HasNextPage { - break + if err := repositories.CreateOrUpdateProducer(&producer); err != nil { + logger.Warnf("ProducerSync", "Failed to sync producer %d: %v", producerData.MALID, err) + continue } - page++ - time.Sleep(1 * time.Second) // Additional delay between pages + progress, eta := calculateProgress(i+1, total, startTime) + logger.Infof("ProducerSync", "Progress: %d/%d (%.1f%%) | ETA: %v", i+1, total, progress, eta) } - logger.Log(fmt.Sprintf("Producer sync completed successfully. Total: %d producers", totalFetched), logger.LogOptions{ - Level: logger.Success, - Prefix: "ProducerSync", - }) - + logger.Successf("ProducerSync", "Producer sync completed. Total: %d producers", total) return nil } diff --git a/tasks/tasks.go b/tasks/tasks.go index 31d017d..8b19a19 100644 --- a/tasks/tasks.go +++ b/tasks/tasks.go @@ -1,9 +1,7 @@ package tasks import ( - "fmt" "metachan/config" - "metachan/database" "metachan/types" "metachan/utils/logger" "sync" @@ -14,11 +12,10 @@ var GlobalTaskManager *TaskManager func init() { GlobalTaskManager = &TaskManager{ - Tasks: make(map[string]types.Task), - Tickers: make(map[string]*time.Ticker), - Done: make(map[string]chan bool), - Mutex: sync.Mutex{}, - Database: database.DB, + Tasks: make(map[string]types.Task), + Tickers: make(map[string]*time.Ticker), + Done: make(map[string]chan bool), + Mutex: sync.Mutex{}, } // Register ProducerSync task (every 7 days) - runs first to populate unified producer table @@ -29,10 +26,7 @@ func init() { }) if err != nil { - logger.Log(fmt.Sprintf("Failed to register ProducerSync task: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "TaskManager", - }) + logger.Errorf("TaskManager", "Failed to register ProducerSync task: %v", err) } // Register GenreSync task (every 7 days) @@ -43,10 +37,7 @@ func init() { }) if err != nil { - logger.Log(fmt.Sprintf("Failed to register GenreSync task: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "TaskManager", - }) + logger.Errorf("TaskManager", "Failed to register GenreSync task: %v", err) } // Register AniFetch task (weekly) - fetches anime mappings from Fribb list @@ -59,14 +50,11 @@ func init() { }) if err != nil { - logger.Log(fmt.Sprintf("Failed to register AnimeFetch task: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "TaskManager", - }) + logger.Errorf("TaskManager", "Failed to register AnimeFetch task: %v", err) } // Register AnimeSync task (runs after AnimeFetch completes) - only if enabled in config - if config.Config.AniSync { + if config.Sync.AniSync { err = GlobalTaskManager.RegisterTask(types.Task{ Name: "AnimeSync", Interval: 0, // Manual-only - waits for AnimeFetch dependency @@ -75,10 +63,7 @@ func init() { }) if err != nil { - logger.Log(fmt.Sprintf("Failed to register AnimeSync task: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "TaskManager", - }) + logger.Errorf("TaskManager", "Failed to register AnimeSync task: %v", err) } } @@ -90,9 +75,6 @@ func init() { }) if err != nil { - logger.Log(fmt.Sprintf("Failed to register AnimeUpdate task: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "TaskManager", - }) + logger.Errorf("TaskManager", "Failed to register AnimeUpdate task: %v", err) } } |
