我有一种情况,我想将图形结果写入CSV。这包括文件的创建,文件编写器的初始化(我正在使用此库),最后,在流完成之后,我想再次处置/关闭编写器。
理想情况下,我想将此逻辑封装在接收器中,但我想知道添加初始化和处理逻辑的最佳实践/挂钩。
给定任何类型的资源,而不仅仅是文件,它消耗数据元素并且还需要关闭:
type Data = ???
trait DataConsumer extends Function1[Data, Unit] with AutoCloseable
可以Sink
使用以下watchTermination
方法创建一个在完成后关闭使用者的Flow
:
def createDataConsumerSink(dataConsumer: DataConsumer) : Sink[Data,_] =
Flow[Data].watchTermination()( (_, f) => f foreach (_ => dataConsumer.close()))
.to(Sink.foreach[Data](dataConsumer.apply))
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句