Sending data from one goroutine to multiple other goroutines

sontags

In a project the program receives data via websocket. This data needs to be processed by n algorithms. The amount of algorithms can change dynamically.

My attempt is to create some pub/sub pattern where subscriptions can be started and canceled on the fly. Turns out that this is a bit more challenging than expected.

Here's what I came up with (which is based on https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/):

package pubsub

import (
    "context"
    "sync"
    "time"
)

type Pubsub struct {
    sync.RWMutex
    subs   []*Subsciption
    closed bool
}

func New() *Pubsub {
    ps := &Pubsub{}
    ps.subs = []*Subsciption{}
    return ps
}

func (ps *Pubsub) Publish(msg interface{}) {
    ps.RLock()
    defer ps.RUnlock()

    if ps.closed {
        return
    }

    for _, sub := range ps.subs {
        // ISSUE1: These goroutines apparently do not exit properly... 
        go func(ch chan interface{}) {
            ch <- msg
        }(sub.Data)
    }
}

func (ps *Pubsub) Subscribe() (context.Context, *Subsciption, error) {
    ps.Lock()
    defer ps.Unlock()

    // prep channel
    ctx, cancel := context.WithCancel(context.Background())
    sub := &Subsciption{
        Data:   make(chan interface{}, 1),
        cancel: cancel,
        ps:     ps,
    }

    // prep subsciption
    ps.subs = append(ps.subs, sub)
    return ctx, sub, nil
}

func (ps *Pubsub) unsubscribe(s *Subsciption) bool {
    ps.Lock()
    defer ps.Unlock()

    found := false
    index := 0
    for i, sub := range ps.subs {
        if sub == s {
            index = i
            found = true
        }
    }
    if found {
        s.cancel()
        ps.subs[index] = ps.subs[len(ps.subs)-1]
        ps.subs = ps.subs[:len(ps.subs)-1]

        // ISSUE2: close the channel async with a delay to ensure
        // nothing will be written to the channel anymore
        // via a pending goroutine from Publish()
        go func(ch chan interface{}) {
            time.Sleep(500 * time.Millisecond)
            close(ch)
        }(s.Data)
    }
    return found
}

func (ps *Pubsub) Close() {
    ps.Lock()
    defer ps.Unlock()

    if !ps.closed {
        ps.closed = true
        for _, sub := range ps.subs {
            sub.cancel()

            // ISSUE2: close the channel async with a delay to ensure
            // nothing will be written to the channel anymore
            // via a pending goroutine from Publish()
            go func(ch chan interface{}) {
                time.Sleep(500 * time.Millisecond)
                close(ch)
            }(sub.Data)
        }
    }
}

type Subsciption struct {
    Data   chan interface{}
    cancel func()
    ps     *Pubsub
}

func (s *Subsciption) Unsubscribe() {
    s.ps.unsubscribe(s)
}

As mentioned in the comments there are (at least) two issues with this:

ISSUE1:

After a while of running in implementation of this I get a few errors of this kind:

goroutine 120624 [runnable]:
bm/internal/pubsub.(*Pubsub).Publish.func1(0x8586c0, 0xc00b44e880, 0xc008617740)
    /home/X/Projects/bm/internal/pubsub/pubsub.go:30
created by bookmaker/internal/pubsub.(*Pubsub).Publish
    /home/X/Projects/bm/internal/pubsub/pubsub.go:30 +0xbb

Without really understanding this it appears to me that the goroutines created in Publish() do accumulate/leak. Is this correct and what am I doing wrong here?

ISSUE2:

When I end a subscription via Unsubscribe() it occurs that Publish() tried to write to a closed channel and panics. To mitigate this I have created a goroutine to close the channel with a delay. This feel really off-best-practice but I was not able to find a proper solution to this. What would be a deterministic way to do this?

Heres a little playground for you to test with: https://play.golang.org/p/K-L8vLjt7_9

icza

Before diving into your solution and its issues, let me recommend again another Broker approach presented in this answer: How to broadcast message using channel

Now on to your solution.


Whenever you launch a goroutine, always think of how it will end and make sure it does if the goroutine is not ought to run for the lifetime of your app.

// ISSUE1: These goroutines apparently do not exit properly... 
go func(ch chan interface{}) {
    ch <- msg
}(sub.Data)

This goroutine tries to send a value on ch. This may be a blocking operation: it will block if ch's buffer is full and there is no ready receiver on ch. This is out of the control of the launched goroutine, and also out of the control of the pubsub package. This may be fine in some cases, but this already places a burden on the users of the package. Try to avoid these. Try to create APIs that are easy to use and hard to misuse.

Also, launching a goroutine just to send a value on a channel is a waste of resources (goroutines are cheap and light, but you shouldn't spam them whenever you can).

You do it because you don't want to get blocked. To avoid blocking, you may use a buffered channel with a "reasonable" high buffer. Yes, this doesn't solve the blocking issue, in only helps with "slow" clients receiving from the channel.

To "truly" avoid blocking without launching a goroutine, you may use non-blocking send:

select {
case ch <- msg:
default:
    // ch's buffer is full, we cannot deliver now
}

If send on ch can proceed, it will happen. If not, the default branch is chosen immediately. You have to decide what to do then. Is it acceptable to "lose" a message? Is it acceptable to wait for some time until "giving up"? Or is it acceptable to launch a goroutine to do this (but then you'll be back at what we're trying to fix here)? Or is it acceptable to get blocked until the client can receive from the channel...

Choosing a reasonable high buffer, if you encounter a situation when it still gets full, it may be acceptable to block until the client can advance and receive from the message. If it can't, then your whole app might be in an unacceptable state, and it might be acceptable to "hang" or "crash".

// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
    time.Sleep(500 * time.Millisecond)
    close(ch)
}(s.Data)

Closing a channel is a signal to the receiver(s) that no more values will be sent on the channel. So always it should be the sender's job (and responsibility) to close the channel. Launching a goroutine to close the channel, you "hand" that job and responsibility to another "entity" (a goroutine) that will not be synchronized to the sender. This may easily end up in a panic (sending on a closed channel is a runtime panic, for other axioms see How does a non initialized channel behave?). Don't do that.

Yes, this was necessary because you launched goroutines to send. If you don't do that, then you may close "in-place", without launching a goroutine, because then the sender and closer will be the same entity: the Pubsub itself, whose sending and closing operations are protected by a mutex. So solving the first issue solves the second naturally.

In general if there are multiple senders for a channel, then closing the channel must be coordinated. There must be a single entity (often not any of the senders) that waits for all senders to finish, practically using a sync.WaitGroup, and then that single entity can close the channel, safely. See Closing channel of unknown length.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

How can i Await a Single Goroutine Response on Multiple other Goroutines?

Sending Data From One Page To The Other Page

sending dynamic data from one route to other

Go: using one channels to receive results from multiple goroutines

Sending multiple data in one page Jquery

How to tell if one goroutine succeeded or all goroutines are done?

Multiple goroutines listening on one channel

Goroutines - send critical data to the single goroutine and wait for result

Multiple simultaneous POST request, without sending any if one or the other fails

Sending sqlite data from one activity to another

Sending data from one phone to another

Sending data from one activity to another startactivityforresult

Sending data from child activity to the main one

Sending data from Multiple form Ajax

Sending a file's data to multiple other files in Shell

sending multiple data rows to firebase on clicking one button

Sending unnecessary data in one query response or making multiple queries?

Angular Reactive forms sending data from fb group as one data

How do I close a channel multiple goroutines are sending on?

Ajax sending data from other php file contact form

spring boot: sending data to 1 endpoint but hiding from other

Create a view that joins data from other table multiple cells into one oracle sql

Copy data from one sheet that lies between two time frames, to multiple other sheets

Close multiple goroutine if an error occurs in one in go

Is it safe for one goroutine to read from a struct while another goroutine is modifying it?

sending multiple data ajax

Sending multiple data with AJAX

Insert values into one table from form and other multiple other tables

Copy data from one workbook to other workbooks