我有一个Action<Action> operation
做一些长期的工作,并通过提供的Action
参数发送心跳。在调用它之前,我想设置一个IObservable<Unit>
每次操作发送心跳时都会生成Next元素的元素。我怎么做?
创建IObservable
是没有那么复杂,但我猜的概念最简单的方法将被从事件创建它,但可惜你不能创建一个方法体事件(除非你知道一种方法)。我可以使用Observable.FromEvent(Action<Action> addHandler, Action<Action> removeHandler)
,但是这需要难看的封闭操作来将方法调用转发到(请参见下面的示例)。
有没有更优雅的方式?
Action forward = null;
Action sendHeartbeat = () => { if (forward != null) forward(); };
//ugly, since it does not scale to multiple observers:
IObservable<Unit> heartbeatObs =
Observable.FromEvent(handler => {
forward = handler;
}, _ => { forward = null; });
operation(sendHeartbeat);
即使我没有使用零维前向动作,而是使用它们的集合,这也很难看,因为我将再次从头实现+=
and-=
运算符EventHandlers
。
不确定我是否完全理解您的问题,但是如果有,您可以执行类似的操作,该操作使用aSubject
来传递心跳。您可以在名称空间中找到主题System.Reactive.Subjects
。
// set up a subject
Subject<Unit> heartbeatSubject=new Subject<Unit>();
// set up the heartbeataction to push units down the subject
Action sendHeartbeat = () => { heartbeatSubject.OnNext(Unit.Default); };
// use the heartbeat subject somehow.
heartbeatSubject.Subscribe(_=>Console.WriteLine("Operation produced a heartbeat"));
// start the operation
operation(sendHeartbeat);
编辑
如您的评论中所述,根据官方的RX系列,最好避免使用主题,但演示/示例代码除外。我刚刚使用Observable.Create进行了尝试,哪个更好?
Action<Action> longRunningAction=(
hb => { for(int i=0;i<100;i++) { hb(); Thread.Sleep(1000);}});
var ob=Observable.Create<Unit>(
(IObserver<Unit> observer) =>
{
longRunningAction(()=>observer.OnNext(Unit.Default));
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});
ob.Subscribe(_=>Console.WriteLine("Received a heartbeat"));
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句