From 173214524f30d1c319acfa10dbe8a9e111034f5a Mon Sep 17 00:00:00 2001 From: Daniel Liu <139250065@qq.com> Date: Wed, 24 Sep 2025 07:48:23 +0800 Subject: [PATCH] event: fix Resubscribe deadlock when unsubscribing after inner sub ends #28359 (#1551) A goroutine is used to manage the lifetime of subscriptions managed by resubscriptions. When the subscription ends with no error, the resub goroutine ends as well. However, the resub goroutine needs to live long enough to read from the unsub channel. Otheriwse, an Unsubscribe call deadlocks when writing to the unsub channel. This is fixed by adding a buffer to the unsub channel. Co-authored-by: Inphi --- event/subscription.go | 2 +- event/subscription_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/event/subscription.go b/event/subscription.go index 1025254dc6..911f2dca69 100644 --- a/event/subscription.go +++ b/event/subscription.go @@ -120,7 +120,7 @@ func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscriptio backoffMax: backoffMax, fn: fn, err: make(chan error), - unsub: make(chan struct{}), + unsub: make(chan struct{}, 1), } go s.loop() return s diff --git a/event/subscription_test.go b/event/subscription_test.go index ba081705c4..743d0bf67d 100644 --- a/event/subscription_test.go +++ b/event/subscription_test.go @@ -154,3 +154,27 @@ func TestResubscribeWithErrorHandler(t *testing.T) { t.Fatalf("unexpected subscription errors %v, want %v", subErrs, expectedSubErrs) } } + +func TestResubscribeWithCompletedSubscription(t *testing.T) { + t.Parallel() + + quitProducerAck := make(chan struct{}) + quitProducer := make(chan struct{}) + + sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) { + return NewSubscription(func(unsubscribed <-chan struct{}) error { + select { + case <-quitProducer: + quitProducerAck <- struct{}{} + return nil + case <-unsubscribed: + return nil + } + }), nil + }) + + // Ensure producer has started and exited before Unsubscribe + close(quitProducer) + <-quitProducerAck + sub.Unsubscribe() +}