diff --git a/CHANGELOG.md b/CHANGELOG.md index f52c6b08..6d734eb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259) - When using `(*Migrator[TTx]).Migrate` with a `TargetVersion` that's already applied, River now no-ops idempotently instead of returning an error as a user convenience. [PR #1260](https://github.com/riverqueue/river/pull/1260) +- Log dropped job and queue subscription events at warn level when a subscriber buffer is full, including `event_kind` for easier diagnosis. [PR #1271](https://github.com/riverqueue/river/pull/1271) ### Fixed diff --git a/subscription_manager.go b/subscription_manager.go index 3afda983..c74ef4d4 100644 --- a/subscription_manager.go +++ b/subscription_manager.go @@ -3,6 +3,7 @@ package river import ( "context" "fmt" + "log/slog" "sync" "time" @@ -79,13 +80,13 @@ func (sm *subscriptionManager) Start(ctx context.Context) error { // one has to be careful in tests. sm.Logger.DebugContext(ctx, sm.Name+": Stopping; distributing subscriptions until channel is closed") for updates := range sm.subscribeCh { - sm.distributeJobUpdates(updates) + sm.distributeJobUpdates(ctx, updates) } return case updates := <-sm.subscribeCh: - sm.distributeJobUpdates(updates) + sm.distributeJobUpdates(ctx, updates) } } }() @@ -118,7 +119,7 @@ func (sm *subscriptionManager) safeDurationAverage(d time.Duration, n int) time. // Receives updates from the completer and prompts the client to update // statistics and distribute jobs into any listening subscriber channels. // (Subscriber channels are non-blocking so this should be quite fast.) -func (sm *subscriptionManager) distributeJobUpdates(updates []jobcompleter.CompleterJobUpdated) { +func (sm *subscriptionManager) distributeJobUpdates(ctx context.Context, updates []jobcompleter.CompleterJobUpdated) { func() { sm.statsMu.Lock() defer sm.statsMu.Unlock() @@ -141,7 +142,7 @@ func (sm *subscriptionManager) distributeJobUpdates(updates []jobcompleter.Compl } for _, update := range updates { - sm.distributeJobEvent(update.Job, jobStatisticsFromInternal(update.JobStats), update.Snoozed) + sm.distributeJobEvent(ctx, update.Job, jobStatisticsFromInternal(update.JobStats), update.Snoozed) } } @@ -151,7 +152,7 @@ func (sm *subscriptionManager) distributeJobUpdates(updates []jobcompleter.Compl // the queue. // // MUST be called with sm.mu already held. -func (sm *subscriptionManager) distributeJobEvent(job *rivertype.JobRow, stats *JobStatistics, snoozed bool) { +func (sm *subscriptionManager) distributeJobEvent(ctx context.Context, job *rivertype.JobRow, stats *JobStatistics, snoozed bool) { var event *Event if snoozed { event = &Event{Kind: EventKindJobSnoozed, Job: job, JobStats: stats} @@ -186,12 +187,19 @@ func (sm *subscriptionManager) distributeJobEvent(job *rivertype.JobRow, stats * select { case sub.Chan <- event: default: + sm.Logger.WarnContext(ctx, sm.Name+": Subscription event dropped due to full buffer", + slog.String("event_kind", string(event.Kind)), + ) } } } } func (sm *subscriptionManager) distributeQueueEvent(event *Event) { + sm.distributeQueueEventWithContext(context.Background(), event) +} + +func (sm *subscriptionManager) distributeQueueEventWithContext(ctx context.Context, event *Event) { sm.mu.Lock() defer sm.mu.Unlock() @@ -202,6 +210,9 @@ func (sm *subscriptionManager) distributeQueueEvent(event *Event) { select { case sub.Chan <- event: default: + sm.Logger.WarnContext(ctx, sm.Name+": Subscription event dropped due to full buffer", + slog.String("event_kind", string(event.Kind)), + ) } } } diff --git a/subscription_manager_test.go b/subscription_manager_test.go index 1d667ad5..c8a04509 100644 --- a/subscription_manager_test.go +++ b/subscription_manager_test.go @@ -1,7 +1,10 @@ package river import ( + "bytes" "context" + "log/slog" + "strings" "testing" "time" @@ -13,6 +16,7 @@ import ( "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstoptest" "github.com/riverqueue/river/rivershared/testfactory" @@ -138,4 +142,31 @@ func Test_SubscriptionManager(t *testing.T) { startstoptest.Stress(ctx, t, svc) }) + + t.Run("LogsDroppedQueueEvents", func(t *testing.T) { + t.Parallel() + + var logBuf bytes.Buffer + + manager := newSubscriptionManager(&baseservice.Archetype{ + Logger: slog.New(slog.NewTextHandler(&logBuf, &slog.HandlerOptions{Level: slog.LevelWarn})), + Time: riversharedtest.BaseServiceArchetype(t).Time, + }, nil) + + sub, cancelSub := manager.SubscribeConfig(&SubscribeConfig{ChanSize: 1, Kinds: []EventKind{EventKindQueuePaused}}) + t.Cleanup(cancelSub) + + manager.distributeQueueEventWithContext(ctx, &Event{ + Kind: EventKindQueuePaused, + Queue: &rivertype.Queue{Name: "default"}, + }) + manager.distributeQueueEventWithContext(ctx, &Event{ + Kind: EventKindQueuePaused, + Queue: &rivertype.Queue{Name: "default"}, + }) + + require.Len(t, sub, 1) + require.Contains(t, strings.TrimSpace(logBuf.String()), "Subscription event dropped due to full buffer") + require.Contains(t, logBuf.String(), "event_kind=queue_paused") + }) }