diff options
| author | Bobby <[email protected]> | 2026-02-03 15:05:56 +0530 |
|---|---|---|
| committer | Bobby <[email protected]> | 2026-02-03 15:05:56 +0530 |
| commit | fb233706694e73774751f2481bf7b5df57b98fd9 (patch) | |
| tree | aa005e9fb59e9dcdb612fc02b170d9b169c973d5 /tasks | |
| parent | 8b31922b72cd2a3bb77b5b09aeebcad37e39d37b (diff) | |
| download | metachan-fb233706694e73774751f2481bf7b5df57b98fd9.tar.xz metachan-fb233706694e73774751f2481bf7b5df57b98fd9.zip | |
Implement AniFetch task for fetching anime mappings and trigger AniSync after completion
Diffstat (limited to 'tasks')
| -rw-r--r-- | tasks/anifetch.task.go | 143 | ||||
| -rw-r--r-- | tasks/anisync.task.go | 203 | ||||
| -rw-r--r-- | tasks/tasks.go | 35 |
3 files changed, 276 insertions, 105 deletions
diff --git a/tasks/anifetch.task.go b/tasks/anifetch.task.go new file mode 100644 index 0000000..894dd97 --- /dev/null +++ b/tasks/anifetch.task.go @@ -0,0 +1,143 @@ +package tasks + +import ( + "encoding/json" + "fmt" + "io" + "metachan/database" + "metachan/entities" + "metachan/types" + "metachan/utils/logger" + "metachan/utils/mappers" + "net/http" + + "gorm.io/gorm" +) + +const fribbURL = "https://raw.githubusercontent.com/Fribb/anime-lists/master/anime-list-full.json" + +func AniFetch() error { + logger.Log("Starting Anime Fetch", logger.LogOptions{ + Level: logger.Info, + Prefix: "AniFetch", + }) + + response, err := http.Get(fribbURL) + if err != nil { + logger.Log(fmt.Sprintf("Anime Fetch failed: %v", err), logger.LogOptions{ + Level: logger.Error, + Prefix: "AniFetch", + }) + return err + } + defer response.Body.Close() + + body, err := io.ReadAll(response.Body) + if err != nil { + logger.Log(fmt.Sprintf("Failed to read response body: %v", err), logger.LogOptions{ + Level: logger.Error, + Prefix: "AniFetch", + }) + return err + } + + var mappings []types.AniSyncMapping + if err := json.Unmarshal(body, &mappings); err != nil { + logger.Log(fmt.Sprintf("Failed to unmarshal JSON: %v", err), logger.LogOptions{ + Level: logger.Error, + Prefix: "AniFetch", + }) + return err + } + + batchSize := 1000 + total := len(mappings) + + for i := 0; i < total; i += batchSize { + end := i + batchSize + if end > total { + end = total + } + + batch := mappings[i:end] + processBatch(batch) + logger.Log(fmt.Sprintf("Processed %d/%d mappings", end, total), logger.LogOptions{ + Level: logger.Info, + Prefix: "AniFetch", + }) + } + + logger.Log("Anime Fetch completed", logger.LogOptions{ + Level: logger.Success, + Prefix: "AniFetch", + }) + + return nil +} + +func processBatch(mappings []types.AniSyncMapping) { + for _, mapping := range mappings { + var composite *string + if mapping.MAL != 0 && mapping.Anilist != 0 { + comp := fmt.Sprintf("%d-%d", mapping.MAL, mapping.Anilist) + composite = &comp + } + + var entity entities.AnimeMapping + if err := database.DB.Where("mal_anilist_composite = ?", composite).First(&entity).Error; err != nil { + if err == gorm.ErrRecordNotFound { + newEntity := entities.AnimeMapping{ + AniDB: mapping.AniDB, + Anilist: mapping.Anilist, + AnimeCountdown: mapping.AnimeCountdown, + AnimePlanet: mappers.ForceString(mapping.AnimePlanet), + AniSearch: mapping.AniSearch, + IMDB: mapping.IMDB, + Kitsu: mapping.Kitsu, + LiveChart: mapping.LiveChart, + MAL: mapping.MAL, + NotifyMoe: mapping.NotifyMoe, + Simkl: mapping.Simkl, + TMDB: mappers.ForceInt(mapping.TMDB), + TVDB: mapping.TVDB, + Type: entities.MappingType(mapping.Type), + MALAnilistComposite: composite, + } + if err := database.DB.Create(&newEntity).Error; err != nil { + logger.Log(fmt.Sprintf("Unable to process mapping %v: %v", mapping, err), logger.LogOptions{ + Level: logger.Warn, + Prefix: "AniFetch", + }) + } + } else { + logger.Log(fmt.Sprintf("Error fetching entity: %v", err), logger.LogOptions{ + Level: logger.Error, + Prefix: "AniFetch", + }) + } + } else { + // Update existing entity + entity.AniDB = mapping.AniDB + entity.Anilist = mapping.Anilist + entity.AnimeCountdown = mapping.AnimeCountdown + entity.AnimePlanet = mappers.ForceString(mapping.AnimePlanet) + entity.AniSearch = mapping.AniSearch + entity.IMDB = mapping.IMDB + entity.Kitsu = mapping.Kitsu + entity.LiveChart = mapping.LiveChart + entity.MAL = mapping.MAL + entity.NotifyMoe = mapping.NotifyMoe + entity.Simkl = mapping.Simkl + entity.TMDB = mappers.ForceInt(mapping.TMDB) + entity.TVDB = mapping.TVDB + entity.Type = entities.MappingType(mapping.Type) + entity.MALAnilistComposite = composite + if err := database.DB.Save(&entity).Error; err != nil { + logger.Log(fmt.Sprintf("Unable to update mapping %v: %v", mapping, err), logger.LogOptions{ + Level: logger.Warn, + Prefix: "AniFetch", + }) + } + } + } +} diff --git a/tasks/anisync.task.go b/tasks/anisync.task.go index b189336..aa94c6e 100644 --- a/tasks/anisync.task.go +++ b/tasks/anisync.task.go @@ -1,73 +1,121 @@ package tasks import ( - "encoding/json" "fmt" - "io" "metachan/database" "metachan/entities" - "metachan/types" + "metachan/services/anime" "metachan/utils/logger" - "metachan/utils/mappers" - "net/http" - - "gorm.io/gorm" + "time" ) -const fribbURL = "https://raw.githubusercontent.com/Fribb/anime-lists/master/anime-list-full.json" - +// AniSync fetches full anime details for all anime in the database func AniSync() error { - logger.Log("Starting Anime Sync", logger.LogOptions{ + logger.Log("Starting Anime Sync - Fetching full anime details", logger.LogOptions{ Level: logger.Info, Prefix: "AniSync", }) - response, err := http.Get(fribbURL) - if err != nil { - logger.Log(fmt.Sprintf("Anime Sync failed: %v", err), logger.LogOptions{ + // Get all anime mappings + var mappings []entities.AnimeMapping + if err := database.DB.Find(&mappings).Error; err != nil { + logger.Log(fmt.Sprintf("Failed to fetch anime mappings: %v", err), logger.LogOptions{ Level: logger.Error, Prefix: "AniSync", }) return err } - defer response.Body.Close() - body, err := io.ReadAll(response.Body) - if err != nil { - logger.Log(fmt.Sprintf("Failed to read response body: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "AniSync", - }) - return err - } + total := len(mappings) + logger.Log(fmt.Sprintf("Found %d anime mappings", total), logger.LogOptions{ + Level: logger.Info, + Prefix: "AniSync", + }) - var mappings []types.AniSyncMapping - if err := json.Unmarshal(body, &mappings); err != nil { - logger.Log(fmt.Sprintf("Failed to unmarshal JSON: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "AniSync", - }) - return err + // Pre-count items needing sync for accurate ETA + itemsToSync := 0 + for _, mapping := range mappings { + if mapping.MAL == 0 { + continue + } + var existingAnime entities.Anime + err := database.DB.Where("mal_id = ?", mapping.MAL).First(&existingAnime).Error + if err == nil { + var episodeCount int64 + database.DB.Model(&entities.AnimeSingleEpisode{}).Where("anime_id = ?", existingAnime.ID).Count(&episodeCount) + if episodeCount > 0 { + continue + } + } + itemsToSync++ } - batchSize := 1000 - total := len(mappings) + logger.Log(fmt.Sprintf("Found %d anime to sync (%d already synced)", itemsToSync, total-itemsToSync), logger.LogOptions{ + Level: logger.Info, + Prefix: "AniSync", + }) + + animeService := anime.NewService() + synced := 0 + skipped := 0 + startTime := time.Now() + processed := 0 + + for _, mapping := range mappings { + // Skip if MAL ID is 0 (invalid) + if mapping.MAL == 0 { + skipped++ + continue + } + + // Check if anime already exists in DB + var existingAnime entities.Anime + err := database.DB.Where("mal_id = ?", mapping.MAL).First(&existingAnime).Error + + if err == nil { + // Check if anime has full details (has episodes) + var episodeCount int64 + database.DB.Model(&entities.AnimeSingleEpisode{}).Where("anime_id = ?", existingAnime.ID).Count(&episodeCount) + + if episodeCount > 0 { + skipped++ + continue + } + } - for i := 0; i < total; i += batchSize { - end := i + batchSize - if end > total { - end = total + // Calculate time remaining (after processing at least 10 items for accuracy) + timeRemaining := "" + if processed >= 10 { + elapsed := time.Since(startTime) + avgTimePerItem := elapsed / time.Duration(processed) + remainingItems := itemsToSync - processed + remaining := time.Duration(remainingItems) * avgTimePerItem + timeRemaining = fmt.Sprintf(" - ETA: %s", formatDuration(remaining)) } - batch := mappings[i:end] - processBatch(batch) - logger.Log(fmt.Sprintf("Processed %d/%d mappings", end, total), logger.LogOptions{ + // Fetch full anime details + logger.Log(fmt.Sprintf("[%d/%d] Synchronising MAL ID %d...%s", processed+1, itemsToSync, mapping.MAL, timeRemaining), logger.LogOptions{ Level: logger.Info, Prefix: "AniSync", }) + + _, err = animeService.GetAnimeDetailsWithSource(&mapping, "anisync") + if err != nil { + logger.Log(fmt.Sprintf("Failed to sync anime MAL ID %d: %v", mapping.MAL, err), logger.LogOptions{ + Level: logger.Warn, + Prefix: "AniSync", + }) + continue + } + + synced++ + processed++ + + // Sleep to respect rate limits (1 second between requests) + time.Sleep(1 * time.Second) } - logger.Log("Anime Sync completed", logger.LogOptions{ + logger.Log(fmt.Sprintf("Anime Sync completed: %d synced, %d skipped", synced, skipped), logger.LogOptions{ Level: logger.Success, Prefix: "AniSync", }) @@ -75,69 +123,20 @@ func AniSync() error { return nil } -func processBatch(mappings []types.AniSyncMapping) { - for _, mapping := range mappings { - var composite *string - if mapping.MAL != 0 && mapping.Anilist != 0 { - comp := fmt.Sprintf("%d-%d", mapping.MAL, mapping.Anilist) - composite = &comp - } +// formatDuration converts duration to human-readable format +func formatDuration(d time.Duration) string { + d = d.Round(time.Second) + h := d / time.Hour + d -= h * time.Hour + m := d / time.Minute + d -= m * time.Minute + s := d / time.Second - var entity entities.AnimeMapping - if err := database.DB.Where("mal_anilist_composite = ?", composite).First(&entity).Error; err != nil { - if err == gorm.ErrRecordNotFound { - newEntity := entities.AnimeMapping{ - AniDB: mapping.AniDB, - Anilist: mapping.Anilist, - AnimeCountdown: mapping.AnimeCountdown, - AnimePlanet: mappers.ForceString(mapping.AnimePlanet), - AniSearch: mapping.AniSearch, - IMDB: mapping.IMDB, - Kitsu: mapping.Kitsu, - LiveChart: mapping.LiveChart, - MAL: mapping.MAL, - NotifyMoe: mapping.NotifyMoe, - Simkl: mapping.Simkl, - TMDB: mappers.ForceInt(mapping.TMDB), - TVDB: mapping.TVDB, - Type: entities.MappingType(mapping.Type), - MALAnilistComposite: composite, - } - if err := database.DB.Create(&newEntity).Error; err != nil { - logger.Log(fmt.Sprintf("Unable to process mapping %v: %v", mapping, err), logger.LogOptions{ - Level: logger.Warn, - Prefix: "AniSync", - }) - } - } else { - logger.Log(fmt.Sprintf("Error fetching entity: %v", err), logger.LogOptions{ - Level: logger.Error, - Prefix: "AniSync", - }) - } - } else { - // Update existing entity - entity.AniDB = mapping.AniDB - entity.Anilist = mapping.Anilist - entity.AnimeCountdown = mapping.AnimeCountdown - entity.AnimePlanet = mappers.ForceString(mapping.AnimePlanet) - entity.AniSearch = mapping.AniSearch - entity.IMDB = mapping.IMDB - entity.Kitsu = mapping.Kitsu - entity.LiveChart = mapping.LiveChart - entity.MAL = mapping.MAL - entity.NotifyMoe = mapping.NotifyMoe - entity.Simkl = mapping.Simkl - entity.TMDB = mappers.ForceInt(mapping.TMDB) - entity.TVDB = mapping.TVDB - entity.Type = entities.MappingType(mapping.Type) - entity.MALAnilistComposite = composite - if err := database.DB.Save(&entity).Error; err != nil { - logger.Log(fmt.Sprintf("Unable to update mapping %v: %v", mapping, err), logger.LogOptions{ - Level: logger.Warn, - Prefix: "AniSync", - }) - } - } + if h > 0 { + return fmt.Sprintf("%dh %dm", h, m) + } + if m > 0 { + return fmt.Sprintf("%dm %ds", m, s) } + return fmt.Sprintf("%ds", s) } diff --git a/tasks/tasks.go b/tasks/tasks.go index b6b913a..ecf8d80 100644 --- a/tasks/tasks.go +++ b/tasks/tasks.go @@ -20,15 +20,44 @@ func init() { Database: database.DB, } - // Register AniSync task (weekly) + // Register AniFetch task (weekly) - fetches anime mappings from Fribb list err := GlobalTaskManager.RegisterTask(types.Task{ - Name: "AnimeSync", + Name: "AnimeFetch", Interval: 7 * 24 * time.Hour, + Execute: func() error { + // Run AniFetch first + if err := AniFetch(); err != nil { + return err + } + // After AniFetch completes, trigger AniSync in background + go func() { + if err := AniSync(); err != nil { + logger.Log(fmt.Sprintf("AniSync failed: %v", err), logger.LogOptions{ + Level: logger.Error, + Prefix: "TaskManager", + }) + } + }() + return nil + }, + }) + + if err != nil { + logger.Log(fmt.Sprintf("Failed to register AnimeFetch task: %v", err), logger.LogOptions{ + Level: logger.Error, + Prefix: "TaskManager", + }) + } + + // Register AnimeSync task (triggered automatically after AnimeFetch completes) + err = GlobalTaskManager.RegisterTask(types.Task{ + Name: "AnimeSync", + Interval: 0, // No scheduled interval - runs after AnimeFetch Execute: AniSync, }) if err != nil { - logger.Log(fmt.Sprintf("Failed to register AniSync task: %v", err), logger.LogOptions{ + logger.Log(fmt.Sprintf("Failed to register AnimeSync task: %v", err), logger.LogOptions{ Level: logger.Error, Prefix: "TaskManager", }) |
