This commit is contained in:
forkfury 2026-02-25 21:56:05 -08:00 committed by GitHub
commit 776f7b279e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 34 additions and 0 deletions

View file

@ -19,6 +19,13 @@ package event
// JoinSubscriptions joins multiple subscriptions to be able to track them as
// one entity and collectively cancel them or consume any errors from them.
func JoinSubscriptions(subs ...Subscription) Subscription {
// Handle empty subscription list to avoid blocking forever
if len(subs) == 0 {
return NewSubscription(func(unsubbed <-chan struct{}) error {
<-unsubbed
return nil
})
}
return NewSubscription(func(unsubbed <-chan struct{}) error {
// Unsubscribe all subscriptions before returning
defer func() {

View file

@ -173,3 +173,30 @@ func TestMultisubFullUnsubscribe(t *testing.T) {
default:
}
}
func TestMultisubEmpty(t *testing.T) {
// Test that joining zero subscriptions doesn't block forever
sub := JoinSubscriptions()
if sub == nil {
t.Fatal("JoinSubscriptions() returned nil")
}
// Should be able to unsubscribe immediately without blocking
done := make(chan struct{})
go func() {
sub.Unsubscribe()
close(done)
}()
select {
case <-done:
// Success - unsubscribe completed
case <-time.After(100 * time.Millisecond):
t.Error("Unsubscribe blocked on empty subscription list")
}
// Error channel should be closed
select {
case <-sub.Err():
// Expected - channel is closed
default:
t.Error("error channel not closed after unsubscribe")
}
}