我有以下代码:
static void Main()
{
var holderQueue = new ConcurrentQueue<int>(GetInitialElements());
Action<ConcurrentQueue<int>> addToQueueAction = AddToQueue;
var observableQueue = holderQueue.ToObservable();
IScheduler newThreadScheduler = new NewThreadScheduler();
IObservable<Timestamped<int>> myQueueTimestamped = observableQueue.Timestamp();
var bufferedTimestampedQueue = myQueueTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), newThreadScheduler);
var t = new TaskFactory();
t.StartNew(() => addToQueueAction(holderQueue));
using(bufferedTimestampedQueue.SubscribeOn(newThreadScheduler).Subscribe(currentQueue =>
{
Console.WriteLine("buffer time elapsed, current queue contents is: {0} items.", currentQueue.Count);
foreach(var item in currentQueue)
Console.WriteLine("item {0} at {1}", item.Value, item.Timestamp);
Console.WriteLine("holderqueue has: {0}", currentQueue.Count);
}))
{
Console.WriteLine("started observing queue");
Console.ReadLine();
}
}
private static void AddToQueue(ConcurrentQueue<int> concurrentQueue)
{
while(true)
{
var x = new Random().Next(1, 10);
concurrentQueue.Enqueue(x);
Console.WriteLine("added {0}", x);
Console.WriteLine("crtcount is: {0}", concurrentQueue.Count);
Thread.Sleep(1000);
}
}
private static IEnumerable<int> GetInitialElements()
{
var random = new Random();
var items = new List<int>();
for (int i = 0; i < 10; i++)
items.Add(random.Next(1, 10));
return items;
}
目的如下:
该holderQueue
对象最初填充了几个元素(GetInitialElements
),然后在其他线程上通过其他元素进行了更改(通过方法AddToQueue
),并且可观察对象应该检测到此更改,并在其时间经过时做出相应的反应(因此每3秒)通过执行其预订中的方法。
简而言之,我期望的是使代码在Subscribe
主体中每3秒执行一次,并向我展示队列中的更改(该更改在其他线程上进行了更改)。取而代之的是,Subscribe
身体只执行一次。为什么?
谢谢
该ToObservable
方法采用IEnumerable<T>
并将其转换为可观察的。结果,它将占用您的并发队列并立即枚举它,遍历所有可用项目。您稍后修改队列以添加其他项目的事实,不会影响IEnumerable<T>
从并发队列的GetEnumerator()
实现返回的已枚举的事实。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句