可观察到对不同线程上更改的队列没有反应

Amc_rtty

我有以下代码:

static void Main()
    {
        var holderQueue = new ConcurrentQueue<int>(GetInitialElements());

        Action<ConcurrentQueue<int>> addToQueueAction = AddToQueue;
        var observableQueue = holderQueue.ToObservable();
        IScheduler newThreadScheduler = new NewThreadScheduler();

        IObservable<Timestamped<int>> myQueueTimestamped = observableQueue.Timestamp();

        var bufferedTimestampedQueue = myQueueTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), newThreadScheduler);

        var t = new TaskFactory();
        t.StartNew(() => addToQueueAction(holderQueue));

        using(bufferedTimestampedQueue.SubscribeOn(newThreadScheduler).Subscribe(currentQueue =>
        {
            Console.WriteLine("buffer time elapsed, current queue contents is: {0} items.", currentQueue.Count);
            foreach(var item in currentQueue)
                Console.WriteLine("item {0} at {1}", item.Value, item.Timestamp);

            Console.WriteLine("holderqueue has: {0}", currentQueue.Count);
        }))
        {
            Console.WriteLine("started observing queue");

            Console.ReadLine();
        }
    }

    private static void AddToQueue(ConcurrentQueue<int> concurrentQueue)
    {
        while(true)
        {
            var x = new Random().Next(1, 10);
            concurrentQueue.Enqueue(x);
            Console.WriteLine("added {0}", x);
            Console.WriteLine("crtcount is: {0}", concurrentQueue.Count);
            Thread.Sleep(1000);
        }
    }

    private static IEnumerable<int> GetInitialElements()
    {
        var random = new Random();
        var items = new List<int>();
        for (int i = 0; i < 10; i++)
            items.Add(random.Next(1, 10));

        return items;
    }

目的如下:

holderQueue对象最初填充了几个元素(GetInitialElements),然后在其他线程上通过其他元素进行了更改(通过方法AddToQueue),并且可观察对象应该检测到此更改,并在其时间经过时做出相应的反应(因此每3秒)通过执行其预订中的方法。

简而言之,我期望的是使代码在Subscribe主体中每3秒执行一次,并向我展示队列中的更改(该更改在其他线程上进行了更改)。取而代之的是,Subscribe身体只执行一次。为什么?

谢谢

大卫·费弗(David Pfeffer)

ToObservable方法采用IEnumerable<T>并将其转换为可观察的。结果,它将占用您的并发队列并立即枚举它,遍历所有可用项目。您稍后修改队列以添加其他项目的事实,不会影响IEnumerable<T>从并发队列的GetEnumerator()实现返回的已枚举的事实

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章