aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBobby <[email protected]>2026-02-09 15:47:18 +0530
committerBobby <[email protected]>2026-02-09 15:47:18 +0530
commit1716b3b21b9e9daeacf970006b5c3a24885a65bc (patch)
tree92c51bdcf5daad35543ccfb77f53532d651c7f18
parentdd17b2c4ccf8f2cfc38a02a15d33ba8b40b665ac (diff)
downloadmetachan-1716b3b21b9e9daeacf970006b5c3a24885a65bc.tar.xz
metachan-1716b3b21b9e9daeacf970006b5c3a24885a65bc.zip
Refactor task management: enhance task status handling and trigger dependent tasks; improve producer sync logic to skip recently updated producers
-rw-r--r--entities/mapping.go2
-rw-r--r--repositories/tasks.go7
-rw-r--r--router/router.go4
-rw-r--r--tasks/manager.go85
-rw-r--r--tasks/producersync.task.go11
5 files changed, 96 insertions, 13 deletions
diff --git a/entities/mapping.go b/entities/mapping.go
index 51e588a..eabf3d4 100644
--- a/entities/mapping.go
+++ b/entities/mapping.go
@@ -16,7 +16,7 @@ type Mapping struct {
IMDB string `json:"imdb,omitempty"`
Kitsu int `json:"kitsu,omitempty"`
LiveChart int `json:"live_chart,omitempty"`
- MAL int `json:"mal,omitempty"`
+ MAL int `gorm:"uniqueIndex" json:"mal,omitempty"`
NotifyMoe string `json:"notify_moe,omitempty"`
Simkl int `json:"simkl,omitempty"`
TMDB int `json:"tmdb,omitempty"`
diff --git a/repositories/tasks.go b/repositories/tasks.go
index 097186f..4df5aad 100644
--- a/repositories/tasks.go
+++ b/repositories/tasks.go
@@ -4,6 +4,8 @@ import (
"errors"
"metachan/entities"
"metachan/utils/logger"
+
+ "gorm.io/gorm/clause"
)
func GetTaskStatus(taskName string) (entities.TaskStatus, error) {
@@ -19,7 +21,10 @@ func GetTaskStatus(taskName string) (entities.TaskStatus, error) {
}
func SetTaskStatus(task *entities.TaskStatus) error {
- result := DB.Save(task)
+ result := DB.Clauses(clause.OnConflict{
+ Columns: []clause.Column{{Name: "task_name"}},
+ DoUpdates: clause.AssignmentColumns([]string{"is_completed", "last_run_at", "updated_at"}),
+ }).Create(task)
if result.Error != nil {
logger.Errorf("Task", "Failed to set task status for %s: %v", task.TaskName, result.Error)
diff --git a/router/router.go b/router/router.go
index 426934f..870992b 100644
--- a/router/router.go
+++ b/router/router.go
@@ -11,6 +11,10 @@ func Initialize(router *fiber.App) {
router.Get("/health", controllers.HealthStatus)
// Anime routes
+ animeRouter := router.Group("/anime")
+ animeRouter.Get("/:id", controllers.GetAnime)
+
+ // Anime routes
// animeRouter := router.Group("/a")
// animeRouter.Get("/genres", controllers.GetGenres)
// animeRouter.Get("/genres/:id", controllers.GetAnimeByGenre)
diff --git a/tasks/manager.go b/tasks/manager.go
index 9bcf4f5..ac85630 100644
--- a/tasks/manager.go
+++ b/tasks/manager.go
@@ -77,15 +77,23 @@ func (tm *TaskManager) StartTask(taskName string) {
return
}
+ if !shouldExec {
+ if lastLog, err := repositories.GetLatestTaskLog(taskName); err == nil && lastLog.Status == "success" {
+ repositories.SetTaskStatus(&entities.TaskStatus{
+ TaskName: taskName,
+ IsCompleted: true,
+ LastRunAt: lastLog.ExecutedAt,
+ })
+ }
+ }
+
doneChan := make(chan bool)
tm.Mutex.Lock()
tm.Done[taskName] = doneChan
tm.Mutex.Unlock()
go func() {
- // Execute immediately if due
if shouldExec {
- // Check dependencies before executing
if !tm.checkDependencies(task) {
logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName)
} else if err := task.Execute(); err != nil {
@@ -94,10 +102,15 @@ func (tm *TaskManager) StartTask(taskName string) {
} else {
task.LastRun = time.Now()
tm.logTaskExecution(taskName, "success", "Task executed successfully")
+ repositories.SetTaskStatus(&entities.TaskStatus{
+ TaskName: taskName,
+ IsCompleted: true,
+ LastRunAt: time.Now(),
+ })
logger.Successf("TaskManager", "Task %s executed successfully", taskName)
+ tm.triggerDependentTasks(taskName)
}
} else {
- // Calculate time until next execution
var initialDelay time.Duration = task.Interval
if lastLog, err := repositories.GetLatestTaskLog(taskName); err == nil {
@@ -109,10 +122,8 @@ func (tm *TaskManager) StartTask(taskName string) {
logger.Infof("TaskManager", "Task %s will run in %v", taskName, initialDelay)
- // Wait for initial delay before first execution
select {
case <-time.After(initialDelay):
- // Check dependencies before executing
if !tm.checkDependencies(task) {
logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName)
} else if err := task.Execute(); err != nil {
@@ -121,30 +132,32 @@ func (tm *TaskManager) StartTask(taskName string) {
} else {
task.LastRun = time.Now()
tm.logTaskExecution(taskName, "success", "Task executed successfully")
+ repositories.SetTaskStatus(&entities.TaskStatus{
+ TaskName: taskName,
+ IsCompleted: true,
+ LastRunAt: time.Now(),
+ })
logger.Successf("TaskManager", "Task %s executed successfully", taskName)
+ tm.triggerDependentTasks(taskName)
}
case <-doneChan:
return
}
}
- // Skip ticker creation for manual-only tasks (interval = 0)
if task.Interval == 0 {
logger.Debugf("TaskManager", "Task %s is manual-only (no scheduled interval)", taskName)
return
}
- // Create ticker for subsequent executions
ticker := time.NewTicker(task.Interval)
tm.Mutex.Lock()
tm.Tickers[taskName] = ticker
tm.Mutex.Unlock()
- // Regular ticker loop
for {
select {
case <-ticker.C:
- // Check dependencies before executing
if !tm.checkDependencies(task) {
logger.Warnf("TaskManager", "Task %s dependencies not met, skipping execution", taskName)
} else if err := task.Execute(); err != nil {
@@ -153,7 +166,13 @@ func (tm *TaskManager) StartTask(taskName string) {
} else {
task.LastRun = time.Now()
tm.logTaskExecution(taskName, "success", "Task executed successfully")
+ repositories.SetTaskStatus(&entities.TaskStatus{
+ TaskName: taskName,
+ IsCompleted: true,
+ LastRunAt: time.Now(),
+ })
logger.Successf("TaskManager", "Task %s executed successfully", taskName)
+ tm.triggerDependentTasks(taskName)
}
case <-doneChan:
ticker.Stop()
@@ -205,7 +224,6 @@ func (tm *TaskManager) StopAllTasks() {
}
}
-// checkDependencies verifies all task dependencies are complete
func (tm *TaskManager) checkDependencies(task types.Task) bool {
if len(task.Dependencies) == 0 {
return true
@@ -222,6 +240,51 @@ func (tm *TaskManager) checkDependencies(task types.Task) bool {
return true
}
+func (tm *TaskManager) triggerDependentTasks(completedTaskName string) {
+ tm.Mutex.Lock()
+ defer tm.Mutex.Unlock()
+
+ for taskName, task := range tm.Tasks {
+ hasDependency := false
+ for _, dep := range task.Dependencies {
+ if dep == completedTaskName {
+ hasDependency = true
+ break
+ }
+ }
+
+ if !hasDependency {
+ continue
+ }
+
+ tm.Mutex.Unlock()
+ allDependenciesMet := tm.checkDependencies(task)
+ tm.Mutex.Lock()
+
+ if allDependenciesMet {
+ logger.Infof("TaskManager", "All dependencies met for %s, triggering execution", taskName)
+ go func(name string, t types.Task) {
+ if err := t.Execute(); err != nil {
+ tm.logTaskExecution(name, "error", err.Error())
+ logger.Errorf("TaskManager", "Task %s execution failed: %v", name, err)
+ } else {
+ tm.Mutex.Lock()
+ t.LastRun = time.Now()
+ tm.Mutex.Unlock()
+ tm.logTaskExecution(name, "success", "Task executed successfully")
+ repositories.SetTaskStatus(&entities.TaskStatus{
+ TaskName: name,
+ IsCompleted: true,
+ LastRunAt: time.Now(),
+ })
+ logger.Successf("TaskManager", "Task %s executed successfully", name)
+ tm.triggerDependentTasks(name)
+ }
+ }(taskName, task)
+ }
+ }
+}
+
func (tm *TaskManager) GetTaskStatus(taskName string) *types.TaskStatus {
tm.Mutex.Lock()
task, registered := tm.Tasks[taskName]
@@ -256,7 +319,7 @@ func (tm *TaskManager) GetAllTaskStatuses() map[string]*types.TaskStatus {
statuses := make(map[string]*types.TaskStatus)
tm.Mutex.Lock()
for name := range tm.Tasks {
- tm.Mutex.Unlock() // temporarily unlock to avoid deadlock / get task status
+ tm.Mutex.Unlock()
statuses[name] = tm.GetTaskStatus(name)
tm.Mutex.Lock()
}
diff --git a/tasks/producersync.task.go b/tasks/producersync.task.go
index 8c8bb79..8c6588b 100644
--- a/tasks/producersync.task.go
+++ b/tasks/producersync.task.go
@@ -38,6 +38,17 @@ func ProducerSync() error {
imageMap := make(map[string]struct{})
for i, producerData := range batchData {
+ // Check if producer was updated within last 3 days - if so, skip detail fetch
+ var existingProducer entities.Producer
+ if err := repositories.DB.Where("mal_id = ?", producerData.MALID).First(&existingProducer).Error; err == nil {
+ // Producer exists, check if updated within last 3 days
+ threeDaysAgo := time.Now().Add(-3 * 24 * time.Hour)
+ if existingProducer.UpdatedAt.After(threeDaysAgo) {
+ logger.Debugf("ProducerSync", "Skipping producer %d (MAL ID: %d) - updated %v ago", i+1, producerData.MALID, time.Since(existingProducer.UpdatedAt).Round(time.Hour))
+ continue
+ }
+ }
+
producerDetail, err := jikan.GetProducerByID(producerData.MALID)
if err != nil {
logger.Warnf("ProducerSync", "Failed to fetch details for producer %d: %v", producerData.MALID, err)