-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathexample_test.go
More file actions
118 lines (106 loc) · 3.21 KB
/
Copy pathexample_test.go
File metadata and controls
118 lines (106 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package workerpool_test
import (
"context"
"fmt"
"os"
"runtime"
"github.com/cilium/workerpool"
)
// IsPrime returns true if n is prime, false otherwise.
func IsPrime(ctx context.Context, n int64) bool {
if n < 2 {
return false
}
for p := int64(2); p*p <= n; p++ {
// Check for cancellation periodically (every 10000 iterations)
if p%10000 == 0 {
select {
case <-ctx.Done():
return false
default:
}
}
if n%p == 0 {
return false
}
}
return true
}
// Example demonstrates basic usage of a worker pool with Drain and Close.
func Example() {
wp := workerpool.New(runtime.NumCPU())
// Defer Close to ensure cleanup on early return (e.g., errors during Submit).
// Close sends cancellation to running tasks and waits for them to complete.
// It's safe to call Close multiple times; subsequent calls return [ErrClosed].
defer func() { _ = wp.Close() }()
for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
// Use Submit to submit tasks for processing. Submit blocks when no
// worker is available to pick up the task.
err := wp.Submit(id, func(ctx context.Context) error {
fmt.Println("isprime", n)
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
})
// Submit fails when the pool is closed ([ErrClosed]), being drained
// ([ErrDraining]), or the parent context is done ([context.Canceled]).
// Check for the error when appropriate.
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
}
// Drain prevents submitting new tasks and blocks until all submitted tasks
// complete.
tasks, err := wp.Drain()
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
// Iterating over the results is useful if non-nil errors can be expected.
for _, task := range tasks {
// Err returns the error that the task returned after execution.
if err := task.Err(); err != nil {
fmt.Println("task", task, "failed:", err)
}
}
// Close is called here explicitly to check for errors. The deferred Close
// will also run but returns [ErrClosed] (which we can ignore on defer).
if err := wp.Close(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}
// ExampleWithResultCallback demonstrates using a result callback to process
// task results immediately without accumulation.
func ExampleWithResultCallback() {
wp := workerpool.New(runtime.NumCPU(), workerpool.WithResultCallback(func(r workerpool.Result) {
if err := r.Err(); err != nil {
fmt.Fprintf(os.Stderr, "task %s failed after %s: %v\n", r, r.Duration(), err)
} else {
fmt.Printf("task %s completed in %s\n", r, r.Duration())
}
}))
defer func() { _ = wp.Close() }()
for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
err := wp.Submit(id, func(ctx context.Context) error {
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
})
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
}
// Close waits for all in-flight tasks to complete before returning,
// ensuring all callback invocations have finished.
if err := wp.Close(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}