我正在尝试实现Go中Redis电子书第6.4.1章中所述的先进先出任务队列。为了进行测试,我将CommandExecutor
接口传递给'worker'函数,如下所示:
package service
import (
"context"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const commandsQueue = "queuedCommands:"
var pool = redis.Pool{
MaxIdle: 50,
MaxActive: 1000,
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial("tcp", ":6379")
if err != nil {
logrus.WithError(err).Fatal("initialize Redis pool")
}
return conn, err
},
}
// CommandExecutor executes a command
type CommandExecutor interface {
Execute(string) error
}
func processQueue(ctx context.Context, done chan<- struct{}, executor CommandExecutor) error {
rc := pool.Get()
defer rc.Close()
for {
select {
case <-ctx.Done():
done <- struct{}{}
return nil
default:
// If the commands queue does not exist, BLPOP blocks until another client
// performs an LPUSH or RPUSH against it. The timeout argument of zero is
// used to block indefinitely.
reply, err := redis.Strings(rc.Do("BLPOP", commandsQueue, 0))
if err != nil {
logrus.WithError(err).Errorf("BLPOP %s %d", commandsQueue, 0)
return errors.Wrapf(err, "BLPOP %s %d", commandsQueue, 0)
}
if len(reply) < 2 {
logrus.Errorf("Expected a reply of length 2, got one of length %d", len(reply))
return errors.Errorf("Expected a reply of length 2, got one of length %d", len(reply))
}
// BLPOP returns a two-element multi-bulk with the first element being the
// name of the key where an element was popped and the second element
// being the value of the popped element (cf. https://redis.io/commands/blpop#return-value)
if err := executor.Execute(reply[1]); err != nil {
return errors.Wrapf(err, "execute scheduled command: %s", reply[0])
}
done <- struct{}{}
}
}
}
我用此代码制作了一个小的示例存储库https://github.com/kurtpeek/process-queue,并尝试了单元测试。对于单元测试,我有两个相同的测试(具有不同的名称):
package service
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestProcessQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
executor := &CommandExecutorMock{
ExecuteFunc: func(string) error {
return nil
},
}
done := make(chan struct{})
go processQueue(ctx, done, executor)
rc := pool.Get()
defer rc.Close()
_, err := rc.Do("RPUSH", commandsQueue, "foobar")
require.NoError(t, err)
<-done
assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}
func TestProcessQueue2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
executor := &CommandExecutorMock{
ExecuteFunc: func(string) error {
return nil
},
}
done := make(chan struct{})
go processQueue(ctx, done, executor)
rc := pool.Get()
defer rc.Close()
_, err := rc.Do("RPUSH", commandsQueue, "foobar")
require.NoError(t, err)
<-done
assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}
其中CommandExecutorMock
使用产生moq
。如果我分别运行每个测试,则它们会通过:
~/g/s/g/k/process-queue> go test ./... -v -run TestProcessQueue2
=== RUN TestProcessQueue2
--- PASS: TestProcessQueue2 (0.00s)
PASS
ok github.com/kurtpeek/process-queue/service 0.243s
但是,如果我运行所有测试,则第二个超时:
~/g/s/g/k/process-queue>
go test ./... -v -timeout 10s
=== RUN TestProcessQueue
--- PASS: TestProcessQueue (0.00s)
=== RUN TestProcessQueue2
panic: test timed out after 10s
看来,当第二个测试运行时,在第一个测试中启动的goroutine仍在运行,并且正在BLPOP
从队列中读取命令,因此<-done
第二个测试中的行将无限期地阻塞。尽管调用cancel()
了第一个测试的父级上下文,但这仍然可行。
如何“隔离”这些测试,以便它们一起运行时都能通过?(我试图将-p 1
标志传递给go test
但无济于事)。
尽管在第一个测试的父上下文上调用了cancel()。
在写入done
和调用之间存在一段时间cancel()
,这意味着第一个测试可能(并且确实)进入了第二for/select
次迭代而不是退出<-ctx.Done()
。更具体地说,测试代码在取消之前包含2个断言:
assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
只有这样才能defer cancel()
启动,这似乎太迟了,无法在首次执行例程中取消上下文。
如果您cancel()
在从读取之前移动了电话done
,则测试通过:
func TestProcessQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
executor := &CommandExecutorMock{
ExecuteFunc: func(string) error {
return nil
},
}
done := make(chan struct{})
go processQueue(ctx, done, executor)
rc := pool.Get()
defer rc.Close()
_, err := rc.Do("RPUSH", commandsQueue, "foobar")
require.NoError(t, err)
cancel() // note this change right here
<-done
assert.Exactly(t, 1, len(executor.ExecuteCalls()))
assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句