使用BLPOP处理Redis队列会导致单元测试中出现竞争状况吗?

库尔特·皮克(Kurt Peek):

我正在尝试实现Go中Redis电子书第6.4.1章中所述的先进先出任务队列为了进行测试,我将CommandExecutor接口传递给'worker'函数,如下所示:

package service

import (
    "context"

    "github.com/gomodule/redigo/redis"
    "github.com/pkg/errors"
    "github.com/sirupsen/logrus"
)

const commandsQueue = "queuedCommands:"

var pool = redis.Pool{
    MaxIdle:   50,
    MaxActive: 1000,
    Dial: func() (redis.Conn, error) {
        conn, err := redis.Dial("tcp", ":6379")
        if err != nil {
            logrus.WithError(err).Fatal("initialize Redis pool")
        }
        return conn, err
    },
}

// CommandExecutor executes a command
type CommandExecutor interface {
    Execute(string) error
}

func processQueue(ctx context.Context, done chan<- struct{}, executor CommandExecutor) error {
    rc := pool.Get()
    defer rc.Close()

    for {
        select {
        case <-ctx.Done():
            done <- struct{}{}
            return nil
        default:
            // If the commands queue does not exist, BLPOP blocks until another client
            // performs an LPUSH or RPUSH against it. The timeout argument of zero is
            // used to block indefinitely.
            reply, err := redis.Strings(rc.Do("BLPOP", commandsQueue, 0))
            if err != nil {
                logrus.WithError(err).Errorf("BLPOP %s %d", commandsQueue, 0)
                return errors.Wrapf(err, "BLPOP %s %d", commandsQueue, 0)
            }

            if len(reply) < 2 {
                logrus.Errorf("Expected a reply of length 2, got one of length %d", len(reply))
                return errors.Errorf("Expected a reply of length 2, got one of length %d", len(reply))
            }

            // BLPOP returns a two-element multi-bulk with the first element being the
            // name of the key where an element was popped and the second element
            // being the value of the popped element (cf. https://redis.io/commands/blpop#return-value)
            if err := executor.Execute(reply[1]); err != nil {
                return errors.Wrapf(err, "execute scheduled command: %s", reply[0])
            }
            done <- struct{}{}
        }
    }
}

用此代码制作了一个小的示例存储库https://github.com/kurtpeek/process-queue,并尝试了单元测试。对于单元测试,我有两个相同的测试(具有不同的名称):

package service

import (
    "context"
    "testing"

    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
)

func TestProcessQueue(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    executor := &CommandExecutorMock{
        ExecuteFunc: func(string) error {
            return nil
        },
    }

    done := make(chan struct{})
    go processQueue(ctx, done, executor)

    rc := pool.Get()
    defer rc.Close()

    _, err := rc.Do("RPUSH", commandsQueue, "foobar")
    require.NoError(t, err)

    <-done

    assert.Exactly(t, 1, len(executor.ExecuteCalls()))
    assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}

func TestProcessQueue2(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    executor := &CommandExecutorMock{
        ExecuteFunc: func(string) error {
            return nil
        },
    }

    done := make(chan struct{})
    go processQueue(ctx, done, executor)

    rc := pool.Get()
    defer rc.Close()

    _, err := rc.Do("RPUSH", commandsQueue, "foobar")
    require.NoError(t, err)

    <-done

    assert.Exactly(t, 1, len(executor.ExecuteCalls()))
    assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}

其中CommandExecutorMock使用产生moq如果我分别运行每个测试,则它们会通过:

~/g/s/g/k/process-queue> go test ./... -v -run TestProcessQueue2
=== RUN   TestProcessQueue2
--- PASS: TestProcessQueue2 (0.00s)
PASS
ok      github.com/kurtpeek/process-queue/service   0.243s

但是,如果我运行所有测试,则第二个超时:

~/g/s/g/k/process-queue> 
go test ./... -v -timeout 10s
=== RUN   TestProcessQueue
--- PASS: TestProcessQueue (0.00s)
=== RUN   TestProcessQueue2
panic: test timed out after 10s

看来,当第二个测试运行时,在第一个测试中启动的goroutine仍在运行,并且正在BLPOP从队列中读取命令,因此<-done第二个测试中的行将无限期地阻塞。尽管调用cancel()了第一个测试的父级上下文,但这仍然可行

如何“隔离”这些测试,以便它们一起运行时都能通过?(我试图将-p 1标志传递go test但无济于事)。

尤里·费多罗夫(Yury Fedorov):

尽管在第一个测试的父上下文上调用了cancel()。

在写入done和调用之间存在一段时间cancel(),这意味着第一个测试可能(并且确实)进入了第二for/select次迭代而不是退出<-ctx.Done()更具体地说,测试代码在取消之前包含2个断言:

    assert.Exactly(t, 1, len(executor.ExecuteCalls()))
    assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)

只有这样才能defer cancel()启动,这似乎太迟了,无法在首次执行例程中取消上下文。

如果您cancel()在从读取之前移动了电话done,则测试通过:

func TestProcessQueue(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())

    executor := &CommandExecutorMock{
        ExecuteFunc: func(string) error {
            return nil
        },
    }

    done := make(chan struct{})
    go processQueue(ctx, done, executor)

    rc := pool.Get()
    defer rc.Close()

    _, err := rc.Do("RPUSH", commandsQueue, "foobar")
    require.NoError(t, err)

    cancel() // note this change right here
    <-done

    assert.Exactly(t, 1, len(executor.ExecuteCalls()))
    assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

使用@Configuration时运行单元测试时出现NoSuchBeanDefinitionException

单元测试值得吗?

穿线导致竞争状况

如何避免在https春季安全单元测试中出现302响应?

此代码会导致套接字io中出现竞争情况吗?

运行tslint时在角度单元测试中出现“未使用的表达式,预期的赋值或函数调用”

如何使用laravel队列邮件编写单元测试?

为什么在多线程Scala中出现竞争状况?

使用组件的@ViewChild {阅读:ElementRef}导致单元测试失败

volatile是避免C#中出现竞争状况的好习惯吗?

包含主项目中的文件时,单元测试项目中出现错误

单元测试-带有超级测试的玩笑会导致超时

Javascript等待/异步-这会导致竞争状况吗?

使用Gradle任务运行单元测试时出现NoClassDefFoundError

多线程导致竞争状况

在角度分量法的单元测试中出现错误?

JavaScript事件处理程序与异步数据存储区API配合使用会导致竞争状况

在虚拟函数周围使用#ifdef预处理器会导致在与库链接的程序中出现运行时错误

运行Django单元测试会导致South迁移重复表

Python多重处理无法在单元测试中使用

使用BigDecimal会导致计算中出现轻微错误

如何在单元测试中使用Redis?

避免在Erlang中出现竞争状况

在AngularJS中使用回调而不是Promise会导致单元测试中的行为不同。为什么?

使用Fakes进行单元测试会产生IConvertible异常

如何使用 NUnit 通过异步单元测试来测试竞争条件?

使用 redis mock 进行 Redis 单元测试

使用 Amplify UI 组件时如何防止单元测试中出现“'amplify-authenticator' is not a known element”错误

使用对象更新状态导致单元测试无限运行