当我将元组发送到 ElasticSearch 时,为什么我的风暴拓扑没有确认

他们的人

我是使用 Storm 的新手,我刚刚开始了数据架构师培训课程,正是在这种情况下,我面临着今天给您带来的问题。

我通过名为 CurrentPriceSpout 的 KafkaSpout 从 kakfa 接收消息。到目前为止,一切正常。然后,在我的 CurrentPriceBolt 中,我重新发布了一个元组,以便使用 EsCurrentPriceBolt 在 ElasticSearch 中写入我的数据。问题就在这里。我不能直接将我的数据写入 ElasticSearch,它只有在我删除我的拓扑时才会写入。

是否有可以通过检索确认来强制写入元组的 Storm 参数?

我尝试添加选项“.addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5)”,元组在 ElasticSearch 中写得很好,但没有得到承认。所以 Storm 无限期地重写它们。

感谢您的帮助蒂埃里

他们的人

我设法找到了我的问题的答案。主要问题是 ES 的设计目的不是像研究项目中生成的那样摄取尽可能少的数据。ES默认批量写入1000条数据。在这个项目中,我每 30 秒生成一个数据,或者每 500 分钟(或 8h20)生成一批 1000。

所以我详细审查了我的拓扑结构并使用了以下选项:

  • es.batch.size.entries: 1
  • es.storm.bolt.flush.entries.size: 1
  • 拓扑.生产者.批次.大小:1
  • 拓扑.传输.批次.大小:1

现在它是这样的:

...
...

public class App 
{
    ...    
    ...    

    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException
    {
        ...
        ...

        StormTopology topology  = topologyBuilder.createTopology();                 // je crée ma topologie Storm
        String topologyName     = properties.getProperty("storm.topology.name");    // je nomme ma topologie
        StormSubmitter.submitTopology(topologyName, getTopologyConfig(properties), topology);               // je démarre ma topologie sur mon cluster storm
        System.out.println( "Topology on remote cluster : Started!" );              
    }


    private static Config getTopologyConfig(Properties properties)
    {
        Config stormConfig = new Config();
        stormConfig.put("topology.workers",                 Integer.parseInt(properties.getProperty("topology.workers")));
        stormConfig.put("topology.enable.message.timeouts", Boolean.parseBoolean(properties.getProperty("topology.enable.message.timeouts")));
        stormConfig.put("topology.message.timeout.secs",    Integer.parseInt(properties.getProperty("topology.message.timeout.secs")));
        stormConfig.put("topology.transfer.batch.size",     Integer.parseInt(properties.getProperty("topology.transfer.batch.size")));
        stormConfig.put("topology.producer.batch.size",     Integer.parseInt(properties.getProperty("topology.producer.batch.size")));      
        return stormConfig;
    }

    ...    
    ...    
    ...    
}

现在它起作用了!!!

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

为什么在使用 spring-data-elasticsearch 时将 HEAD 请求发送到我的索引

为什么当我使用AJAX中的POST将数据发送到PHP时,它说变量为null?

React中的formData,当我将formdata发送到后端Express时为空

当我将JSON发送到MVC控制器时,模型未绑定

当我尝试搜索时,为什么Chrome会将我发送到Google主页?

当我旋转屏幕时,我从Activity发送到Fragment的捆绑包仍然有效。为什么?

为什么我的formdata.append没有将键值对发送到服务器?

为什么我的PrintWriter没有将数据发送到服务器?

为什么我的 ElasticSearch 查询没有获取任何记录?

将 SolrInputDocument 发送到 ElasticSearch

当我将json发送到spring控制器时,415不支持的媒体类型

当我使用 Django Jinjna 模板将“schemaname.tablename”发送到 id 字段时,Bootstrap 模式弹出中断

当我们将停止发送到 pid 1 时,docker 如何处理子进程

当我单击一个按钮时,它没有通过 ajax jquery 将数据发送到 changePassword.php

当我尝试发送到服务器时,为什么我的吸气剂返回null?

当我们对片段使用依赖注入时,将数据从片段发送到另一个

当我的esc键不起作用时,如何将ESC信号发送到vim?

当我发送到帖子时,jQuery ajax没有填充选择

使用Chrome插件Sense在ElasticSearch中进行搜索时,为什么我没有得到预期的结果?

当我的表单中有 onsubmit 事件时,为什么我的 HTML 表单无法将用户输入正确发送到 Google 表格?

如果logstash将数据发送到elasticsearch的速度快于其索引速度,那会发生什么?

将数据发送到我的母版页

我如何将类型发送到功能

我可以将参数从MvxSplashScreenActivity发送到MvxViewModel吗?

我无法将帖子发送到Django Rest Framework

为什么我的Dependency属性将null发送到我的视图模型?

当我想从http重定向到https时,将其发送到其他网站

当我单击打印时发送到打印机的数据

当我尝试在我的服务器上使用它时,为什么该命令不会发送到我的 DM