我是使用 Storm 的新手,我刚刚开始了数据架构师培训课程,正是在这种情况下,我面临着今天给您带来的问题。
我通过名为 CurrentPriceSpout 的 KafkaSpout 从 kakfa 接收消息。到目前为止,一切正常。然后,在我的 CurrentPriceBolt 中,我重新发布了一个元组,以便使用 EsCurrentPriceBolt 在 ElasticSearch 中写入我的数据。问题就在这里。我不能直接将我的数据写入 ElasticSearch,它只有在我删除我的拓扑时才会写入。
是否有可以通过检索确认来强制写入元组的 Storm 参数?
我尝试添加选项“.addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5)”,元组在 ElasticSearch 中写得很好,但没有得到承认。所以 Storm 无限期地重写它们。
感谢您的帮助蒂埃里
我设法找到了我的问题的答案。主要问题是 ES 的设计目的不是像研究项目中生成的那样摄取尽可能少的数据。ES默认批量写入1000条数据。在这个项目中,我每 30 秒生成一个数据,或者每 500 分钟(或 8h20)生成一批 1000。
所以我详细审查了我的拓扑结构并使用了以下选项:
现在它是这样的:
...
...
public class App
{
...
...
public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException
{
...
...
StormTopology topology = topologyBuilder.createTopology(); // je crée ma topologie Storm
String topologyName = properties.getProperty("storm.topology.name"); // je nomme ma topologie
StormSubmitter.submitTopology(topologyName, getTopologyConfig(properties), topology); // je démarre ma topologie sur mon cluster storm
System.out.println( "Topology on remote cluster : Started!" );
}
private static Config getTopologyConfig(Properties properties)
{
Config stormConfig = new Config();
stormConfig.put("topology.workers", Integer.parseInt(properties.getProperty("topology.workers")));
stormConfig.put("topology.enable.message.timeouts", Boolean.parseBoolean(properties.getProperty("topology.enable.message.timeouts")));
stormConfig.put("topology.message.timeout.secs", Integer.parseInt(properties.getProperty("topology.message.timeout.secs")));
stormConfig.put("topology.transfer.batch.size", Integer.parseInt(properties.getProperty("topology.transfer.batch.size")));
stormConfig.put("topology.producer.batch.size", Integer.parseInt(properties.getProperty("topology.producer.batch.size")));
return stormConfig;
}
...
...
...
}
现在它起作用了!!!
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句