diff options
| author | Bobby <[email protected]> | 2026-02-09 15:47:18 +0530 |
|---|---|---|
| committer | Bobby <[email protected]> | 2026-02-09 15:47:18 +0530 |
| commit | 1716b3b21b9e9daeacf970006b5c3a24885a65bc (patch) | |
| tree | 92c51bdcf5daad35543ccfb77f53532d651c7f18 | |
| parent | dd17b2c4ccf8f2cfc38a02a15d33ba8b40b665ac (diff) | |
| download | metachan-1716b3b21b9e9daeacf970006b5c3a24885a65bc.tar.xz metachan-1716b3b21b9e9daeacf970006b5c3a24885a65bc.zip | |
Refactor task management: enhance task status handling and trigger dependent tasks; improve producer sync logic to skip recently updated producers
| -rw-r--r-- | entities/mapping.go | 2 | ||||
| -rw-r--r-- | repositories/tasks.go | 7 | ||||
| -rw-r--r-- | router/router.go | 4 | ||||
| -rw-r--r-- | tasks/manager.go | 85 | ||||
| -rw-r--r-- | tasks/producersync.task.go | 11 |
5 files changed, 96 insertions, 13 deletions
diff --git a/entities/mapping.go b/entities/mapping.go index 51e588a..eabf3d4 100644 --- a/entities/mapping.go +++ b/entities/mapping.go @@ -16,7 +16,7 @@ type Mapping struct { IMDB string `json:"imdb,omitempty"` Kitsu int `json:"kitsu,omitempty"` LiveChart int `json:"live_chart,omitempty"` - MAL int `json:"mal,omitempty"` + MAL int `gorm:"uniqueIndex" json:"mal,omitempty"` NotifyMoe string `json:"notify_moe,omitempty"` Simkl int `json:"simkl,omitempty"` TMDB int `json:"tmdb,omitempty"` diff --git a/repositories/tasks.go b/repositories/tasks.go index 097186f..4df5aad 100644 --- a/repositories/tasks.go +++ b/repositories/tasks.go @@ -4,6 +4,8 @@ import ( "errors" "metachan/entities" "metachan/utils/logger" + + "gorm.io/gorm/clause" ) func GetTaskStatus(taskName string) (entities.TaskStatus, error) { @@ -19,7 +21,10 @@ func GetTaskStatus(taskName string) (entities.TaskStatus, error) { } func SetTaskStatus(task *entities.TaskStatus) error { - result := DB.Save(task) + result := DB.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "task_name"}}, + DoUpdates: clause.AssignmentColumns([]string{"is_completed", "last_run_at", "updated_at"}), + }).Create(task) if result.Error != nil { logger.Errorf("Task", "Failed to set task status for %s: %v", task.TaskName, result.Error) diff --git a/router/router.go b/router/router.go index 426934f..870992b 100644 --- a/router/router.go +++ b/router/router.go @@ -11,6 +11,10 @@ func Initialize(router *fiber.App) { router.Get("/health", controllers.HealthStatus) // Anime routes + animeRouter := router.Group("/anime") + animeRouter.Get("/:id", controllers.GetAnime) + + // Anime routes // animeRouter := router.Group("/a") // animeRouter.Get("/genres", controllers.GetGenres) // animeRouter.Get("/genres/:id", controllers.GetAnimeByGenre) diff --git a/tasks/manager.go b/tasks/manager.go index 9bcf4f5..ac85630 100644 --- a/tasks/manager.go +++ b/tasks/manager.go @@ -77,15 +77,23 @@ func (tm *TaskManager) StartTask(taskName string) { return } + if !shouldExec { + if lastLog, err := repositories.GetLatestTaskLog(taskName); err == nil && lastLog.Status == "success" { + repositories.SetTaskStatus(&entities.TaskStatus{ + TaskName: taskName, + IsCompleted: true, + LastRunAt: lastLog.ExecutedAt, + }) + } + } + doneChan := make(chan bool) tm.Mutex.Lock() tm.Done[taskName] = doneChan tm.Mutex.Unlock() go func() { - // Execute immediately if due if shouldExec { - // Check dependencies before executing if !tm.checkDependencies(task) { logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName) } else if err := task.Execute(); err != nil { @@ -94,10 +102,15 @@ func (tm *TaskManager) StartTask(taskName string) { } else { task.LastRun = time.Now() tm.logTaskExecution(taskName, "success", "Task executed successfully") + repositories.SetTaskStatus(&entities.TaskStatus{ + TaskName: taskName, + IsCompleted: true, + LastRunAt: time.Now(), + }) logger.Successf("TaskManager", "Task %s executed successfully", taskName) + tm.triggerDependentTasks(taskName) } } else { - // Calculate time until next execution var initialDelay time.Duration = task.Interval if lastLog, err := repositories.GetLatestTaskLog(taskName); err == nil { @@ -109,10 +122,8 @@ func (tm *TaskManager) StartTask(taskName string) { logger.Infof("TaskManager", "Task %s will run in %v", taskName, initialDelay) - // Wait for initial delay before first execution select { case <-time.After(initialDelay): - // Check dependencies before executing if !tm.checkDependencies(task) { logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName) } else if err := task.Execute(); err != nil { @@ -121,30 +132,32 @@ func (tm *TaskManager) StartTask(taskName string) { } else { task.LastRun = time.Now() tm.logTaskExecution(taskName, "success", "Task executed successfully") + repositories.SetTaskStatus(&entities.TaskStatus{ + TaskName: taskName, + IsCompleted: true, + LastRunAt: time.Now(), + }) logger.Successf("TaskManager", "Task %s executed successfully", taskName) + tm.triggerDependentTasks(taskName) } case <-doneChan: return } } - // Skip ticker creation for manual-only tasks (interval = 0) if task.Interval == 0 { logger.Debugf("TaskManager", "Task %s is manual-only (no scheduled interval)", taskName) return } - // Create ticker for subsequent executions ticker := time.NewTicker(task.Interval) tm.Mutex.Lock() tm.Tickers[taskName] = ticker tm.Mutex.Unlock() - // Regular ticker loop for { select { case <-ticker.C: - // Check dependencies before executing if !tm.checkDependencies(task) { logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName) } else if err := task.Execute(); err != nil { @@ -153,7 +166,13 @@ func (tm *TaskManager) StartTask(taskName string) { } else { task.LastRun = time.Now() tm.logTaskExecution(taskName, "success", "Task executed successfully") + repositories.SetTaskStatus(&entities.TaskStatus{ + TaskName: taskName, + IsCompleted: true, + LastRunAt: time.Now(), + }) logger.Successf("TaskManager", "Task %s executed successfully", taskName) + tm.triggerDependentTasks(taskName) } case <-doneChan: ticker.Stop() @@ -205,7 +224,6 @@ func (tm *TaskManager) StopAllTasks() { } } -// checkDependencies verifies all task dependencies are complete func (tm *TaskManager) checkDependencies(task types.Task) bool { if len(task.Dependencies) == 0 { return true @@ -222,6 +240,51 @@ func (tm *TaskManager) checkDependencies(task types.Task) bool { return true } +func (tm *TaskManager) triggerDependentTasks(completedTaskName string) { + tm.Mutex.Lock() + defer tm.Mutex.Unlock() + + for taskName, task := range tm.Tasks { + hasDependency := false + for _, dep := range task.Dependencies { + if dep == completedTaskName { + hasDependency = true + break + } + } + + if !hasDependency { + continue + } + + tm.Mutex.Unlock() + allDependenciesMet := tm.checkDependencies(task) + tm.Mutex.Lock() + + if allDependenciesMet { + logger.Infof("TaskManager", "All dependencies met for %s, triggering execution", taskName) + go func(name string, t types.Task) { + if err := t.Execute(); err != nil { + tm.logTaskExecution(name, "error", err.Error()) + logger.Errorf("TaskManager", "Task %s execution failed: %v", name, err) + } else { + tm.Mutex.Lock() + t.LastRun = time.Now() + tm.Mutex.Unlock() + tm.logTaskExecution(name, "success", "Task executed successfully") + repositories.SetTaskStatus(&entities.TaskStatus{ + TaskName: name, + IsCompleted: true, + LastRunAt: time.Now(), + }) + logger.Successf("TaskManager", "Task %s executed successfully", name) + tm.triggerDependentTasks(name) + } + }(taskName, task) + } + } +} + func (tm *TaskManager) GetTaskStatus(taskName string) *types.TaskStatus { tm.Mutex.Lock() task, registered := tm.Tasks[taskName] @@ -256,7 +319,7 @@ func (tm *TaskManager) GetAllTaskStatuses() map[string]*types.TaskStatus { statuses := make(map[string]*types.TaskStatus) tm.Mutex.Lock() for name := range tm.Tasks { - tm.Mutex.Unlock() // temporarily unlock to avoid deadlock / get task status + tm.Mutex.Unlock() statuses[name] = tm.GetTaskStatus(name) tm.Mutex.Lock() } diff --git a/tasks/producersync.task.go b/tasks/producersync.task.go index 8c8bb79..8c6588b 100644 --- a/tasks/producersync.task.go +++ b/tasks/producersync.task.go @@ -38,6 +38,17 @@ func ProducerSync() error { imageMap := make(map[string]struct{}) for i, producerData := range batchData { + // Check if producer was updated within last 3 days - if so, skip detail fetch + var existingProducer entities.Producer + if err := repositories.DB.Where("mal_id = ?", producerData.MALID).First(&existingProducer).Error; err == nil { + // Producer exists, check if updated within last 3 days + threeDaysAgo := time.Now().Add(-3 * 24 * time.Hour) + if existingProducer.UpdatedAt.After(threeDaysAgo) { + logger.Debugf("ProducerSync", "Skipping producer %d (MAL ID: %d) - updated %v ago", i+1, producerData.MALID, time.Since(existingProducer.UpdatedAt).Round(time.Hour)) + continue + } + } + producerDetail, err := jikan.GetProducerByID(producerData.MALID) if err != nil { logger.Warnf("ProducerSync", "Failed to fetch details for producer %d: %v", producerData.MALID, err) |
