aboutsummaryrefslogtreecommitdiff
path: root/utils/concurrency/fetch.go
blob: c94069d735439d5e7ec1421bfec075e417453323 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package concurrency

import (
	"sync"
)

// Parallel executes multiple functions concurrently and returns their results
// This is a powerful utility that allows us to fetch data from multiple APIs in parallel
func Parallel[T any](funcs ...func() (T, error)) []ParallelResult[T] {
	results := make([]ParallelResult[T], len(funcs))
	var wg sync.WaitGroup

	for i, f := range funcs {
		wg.Add(1)
		go func(index int, function func() (T, error)) {
			defer wg.Done()
			value, err := function()
			results[index] = ParallelResult[T]{
				Value: value,
				Error: err,
			}
		}(i, f)
	}

	wg.Wait()
	return results
}

// ParallelMap applies a function to each item in a slice concurrently
// This is useful for operations like fetching skip times for multiple episodes at once
func ParallelMap[T any, R any](items []T, f func(T) (R, error)) []ParallelMapResult[R] {
	results := make([]ParallelMapResult[R], len(items))
	var wg sync.WaitGroup

	for i, item := range items {
		wg.Add(1)
		go func(index int, element T) {
			defer wg.Done()
			value, err := f(element)
			results[index] = ParallelMapResult[R]{
				Value: value,
				Error: err,
			}
		}(i, item)
	}

	wg.Wait()
	return results
}

// ParallelMapWithLimit applies a function to each item in a slice concurrently
// with a maximum number of concurrent operations
// This is crucial for rate-limited APIs like AniSkip
func ParallelMapWithLimit[T any, R any](items []T, limit int, f func(T) (R, error)) []ParallelMapResult[R] {
	results := make([]ParallelMapResult[R], len(items))
	var wg sync.WaitGroup

	// Create a semaphore channel with the specified limit
	semaphore := make(chan struct{}, limit)

	for i, item := range items {
		wg.Add(1)
		go func(index int, element T) {
			// Acquire semaphore
			semaphore <- struct{}{}
			defer func() {
				// Release semaphore when done
				<-semaphore
				wg.Done()
			}()

			value, err := f(element)
			results[index] = ParallelMapResult[R]{
				Value: value,
				Error: err,
			}
		}(i, item)
	}

	wg.Wait()
	return results
}