我正在使用Spark流计算唯一用户。我使用updateStateByKey
,所以我需要配置一个检查点目录。在启动应用程序时,我还从检查点加载了数据,如doc中的示例所示:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
这里的问题是,如果更改了我的代码,那么我将重新部署该代码,无论更改了多少代码,都会加载检查点吗?或者,我需要使用自己的逻辑来持久化数据并在下一次运行中加载它们。
如果我使用自己的逻辑来保存和加载DStream,那么如果应用程序在失败时重新启动,那么从检查点目录和我自己的数据库中加载的数据都不会吗?
该检查点本身包括您的元数据,rdd,dag甚至您的逻辑。如果您更改逻辑并尝试从最后一个检查点运行它,则很有可能会遇到异常。如果要使用自己的逻辑将数据保存为检查点的某个位置,则可能需要执行spark操作以将检查点数据推送到任何数据库,在下一次运行中,将检查点数据作为初始RDD加载(以防正在使用updateStateByKey API)并继续您的逻辑。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句