Skip to content

Commit 1ca3420

Browse files
authored
Log dropped subscription events (#1271)
1 parent dfc32de commit 1ca3420

3 files changed

Lines changed: 48 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919

2020
- Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259)
2121
- When using `(*Migrator[TTx]).Migrate` with a `TargetVersion` that's already applied, River now no-ops idempotently instead of returning an error as a user convenience. [PR #1260](https://github.com/riverqueue/river/pull/1260)
22+
- Log dropped job and queue subscription events at warn level when a subscriber buffer is full, including `event_kind` for easier diagnosis. [PR #1271](https://github.com/riverqueue/river/pull/1271)
2223

2324
### Fixed
2425

subscription_manager.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package river
33
import (
44
"context"
55
"fmt"
6+
"log/slog"
67
"sync"
78
"time"
89

@@ -79,13 +80,13 @@ func (sm *subscriptionManager) Start(ctx context.Context) error {
7980
// one has to be careful in tests.
8081
sm.Logger.DebugContext(ctx, sm.Name+": Stopping; distributing subscriptions until channel is closed")
8182
for updates := range sm.subscribeCh {
82-
sm.distributeJobUpdates(updates)
83+
sm.distributeJobUpdates(ctx, updates)
8384
}
8485

8586
return
8687

8788
case updates := <-sm.subscribeCh:
88-
sm.distributeJobUpdates(updates)
89+
sm.distributeJobUpdates(ctx, updates)
8990
}
9091
}
9192
}()
@@ -118,7 +119,7 @@ func (sm *subscriptionManager) safeDurationAverage(d time.Duration, n int) time.
118119
// Receives updates from the completer and prompts the client to update
119120
// statistics and distribute jobs into any listening subscriber channels.
120121
// (Subscriber channels are non-blocking so this should be quite fast.)
121-
func (sm *subscriptionManager) distributeJobUpdates(updates []jobcompleter.CompleterJobUpdated) {
122+
func (sm *subscriptionManager) distributeJobUpdates(ctx context.Context, updates []jobcompleter.CompleterJobUpdated) {
122123
func() {
123124
sm.statsMu.Lock()
124125
defer sm.statsMu.Unlock()
@@ -141,7 +142,7 @@ func (sm *subscriptionManager) distributeJobUpdates(updates []jobcompleter.Compl
141142
}
142143

143144
for _, update := range updates {
144-
sm.distributeJobEvent(update.Job, jobStatisticsFromInternal(update.JobStats), update.Snoozed)
145+
sm.distributeJobEvent(ctx, update.Job, jobStatisticsFromInternal(update.JobStats), update.Snoozed)
145146
}
146147
}
147148

@@ -151,7 +152,7 @@ func (sm *subscriptionManager) distributeJobUpdates(updates []jobcompleter.Compl
151152
// the queue.
152153
//
153154
// MUST be called with sm.mu already held.
154-
func (sm *subscriptionManager) distributeJobEvent(job *rivertype.JobRow, stats *JobStatistics, snoozed bool) {
155+
func (sm *subscriptionManager) distributeJobEvent(ctx context.Context, job *rivertype.JobRow, stats *JobStatistics, snoozed bool) {
155156
var event *Event
156157
if snoozed {
157158
event = &Event{Kind: EventKindJobSnoozed, Job: job, JobStats: stats}
@@ -186,12 +187,19 @@ func (sm *subscriptionManager) distributeJobEvent(job *rivertype.JobRow, stats *
186187
select {
187188
case sub.Chan <- event:
188189
default:
190+
sm.Logger.WarnContext(ctx, sm.Name+": Subscription event dropped due to full buffer",
191+
slog.String("event_kind", string(event.Kind)),
192+
)
189193
}
190194
}
191195
}
192196
}
193197

194198
func (sm *subscriptionManager) distributeQueueEvent(event *Event) {
199+
sm.distributeQueueEventWithContext(context.Background(), event)
200+
}
201+
202+
func (sm *subscriptionManager) distributeQueueEventWithContext(ctx context.Context, event *Event) {
195203
sm.mu.Lock()
196204
defer sm.mu.Unlock()
197205

@@ -202,6 +210,9 @@ func (sm *subscriptionManager) distributeQueueEvent(event *Event) {
202210
select {
203211
case sub.Chan <- event:
204212
default:
213+
sm.Logger.WarnContext(ctx, sm.Name+": Subscription event dropped due to full buffer",
214+
slog.String("event_kind", string(event.Kind)),
215+
)
205216
}
206217
}
207218
}

subscription_manager_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package river
22

33
import (
4+
"bytes"
45
"context"
6+
"log/slog"
7+
"strings"
58
"testing"
69
"time"
710

@@ -13,6 +16,7 @@ import (
1316
"github.com/riverqueue/river/riverdbtest"
1417
"github.com/riverqueue/river/riverdriver"
1518
"github.com/riverqueue/river/riverdriver/riverpgxv5"
19+
"github.com/riverqueue/river/rivershared/baseservice"
1620
"github.com/riverqueue/river/rivershared/riversharedtest"
1721
"github.com/riverqueue/river/rivershared/startstoptest"
1822
"github.com/riverqueue/river/rivershared/testfactory"
@@ -138,4 +142,31 @@ func Test_SubscriptionManager(t *testing.T) {
138142

139143
startstoptest.Stress(ctx, t, svc)
140144
})
145+
146+
t.Run("LogsDroppedQueueEvents", func(t *testing.T) {
147+
t.Parallel()
148+
149+
var logBuf bytes.Buffer
150+
151+
manager := newSubscriptionManager(&baseservice.Archetype{
152+
Logger: slog.New(slog.NewTextHandler(&logBuf, &slog.HandlerOptions{Level: slog.LevelWarn})),
153+
Time: riversharedtest.BaseServiceArchetype(t).Time,
154+
}, nil)
155+
156+
sub, cancelSub := manager.SubscribeConfig(&SubscribeConfig{ChanSize: 1, Kinds: []EventKind{EventKindQueuePaused}})
157+
t.Cleanup(cancelSub)
158+
159+
manager.distributeQueueEventWithContext(ctx, &Event{
160+
Kind: EventKindQueuePaused,
161+
Queue: &rivertype.Queue{Name: "default"},
162+
})
163+
manager.distributeQueueEventWithContext(ctx, &Event{
164+
Kind: EventKindQueuePaused,
165+
Queue: &rivertype.Queue{Name: "default"},
166+
})
167+
168+
require.Len(t, sub, 1)
169+
require.Contains(t, strings.TrimSpace(logBuf.String()), "Subscription event dropped due to full buffer")
170+
require.Contains(t, logBuf.String(), "event_kind=queue_paused")
171+
})
141172
}

0 commit comments

Comments
 (0)