半异步代码逻辑

Fomo Dong:

我正在努力寻找一个将同步流与异步行为混合在一起的可行设计。

我有4个组成部分:

  1. 播种机
  2. 工人
  3. 发行人
  4. 更新器

我唯一的限制是,一旦Seeder播种数据,就必须将其阻塞,直到Updater无法完全完成所有任务的处理为止。前3个组件可以很容易地实现同步,但是Updater必须并行工作,否则将花费很多时间才能完成任务。

因此,流程为:

Seeder -> Worker -> Publisher -> Updater --> Seeder -> Worker -> Publisher -> Updater ...

并且该流必须永远旋转。

播种和更新是针对数据库的。不幸的是,这个特定的数据库不允许进行其他设计。

我最好的方法是sync.WaitGroup用于同步Updater goroutine,并使其他所有事物保持同步状态。通过通道将数据提供给更新程序。

这是一个简化的代码(没有错误,没有太多逻辑)。

func main() {
    var wg sync.WaitGroup
    c := make(chan Result, 100)

    for {
        data := Seeder()
        msgs := Worker(data)
        results := Publisher(msgs)

        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func(){
                defer wg.Done()
                data := <- c

                // this is the updater
            }(&wg)
        }

        for _, result := range results {
            c <- result
        }
        wg.Wait()
    }
}

结果是代码一直工作到直到某个周期停止并且永远不会前进。我玩了很多变量,加载了100行而不是10k,结果没有太大不同。

我还尝试传递包含通道的结构并异步运行所有内容,但是我什至更难以确定Updater何时完成,以便可以解除对种子的阻止。

任何指针表示赞赏。

佐德:

很难说,因为您的代码无法编译和运行,并且不清楚如何使用c。至少可以确定一件事:wg应该通过引用而不是通过值传递(sync.WaitGroup具有nocopy注释)。然后,假设您使用c将值发送到更新程序。但是您不提供他们的代码,所以我只能猜测。例如,假设进行调度,以使前9个goroutine占用了通道中的所有内容;然后,最后一个例程将永远被阻塞,并且永远不会释放WaitGroup。在这种情况下,一种简单的解决方案是在最外层for循环的每次迭代中创建一个新通道(将第3行向下移动两行),并在调用之前立即关闭cwg.Wait()您的更新程序必须能够处理来自关闭频道读取

[编辑]我想您正在寻找的是这样的:

package main

import (
    "fmt"
    "sync"
)

// Result is a type
type Result struct {
    I int
}

// Seeder is a function
func Seeder() []int {
    fmt.Println("Seeding")
    return []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}
}

// Worker is a function
func Worker(data []int) []int {
    return data
}

// Publisher is a function
func Publisher(data []int) []Result {
    var r []Result
    for i := 0; i < len(data); i++ {
        r = append(r, Result{I: data[i]})
    }
    return r
}

func updater(c chan Result, wg *sync.WaitGroup) {
    for _ = range c {
        // update here
        wg.Done()
    }
}

func main() {
    var wg sync.WaitGroup

    c := make(chan Result, 100)
    for i := 0; i < 10; i++ {
        go updater(c, &wg)
    }

    for {
        data := Seeder()
        msgs := Worker(data)
        results := Publisher(msgs)

        wg.Add(len(results))
        for _, result := range results {
            c <- result
        }
        wg.Wait()
    }
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章