如何使用Java客户端同时发布MQTT消息?

三元组13

我正在尝试使用5个Java客户端同时发布MQTT消息,以便每个Java客户端将特定主题上的1000条消息同时发布到MQTT代理(HIVEMQ)

我打开了多个线程,每个线程创建一个mqtt客户端并使用ssl连接到代理,并尝试同时发布1000条消息,正在发送消息,但所有连接都无法成功代理,并且我一直在获取异常

Client is not connected (32104)
at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:199)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1355)
    at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:583)
    at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:575)
    at com.test.MqttPublishSample.publishMessages(MqttPublishSample.java:122)
    at com.test.MqttPublishSample.lambda$start$0(MqttPublishSample.java:74)
    at java.base/java.lang.Thread.run(Thread.java:834)
public class MqttPublishSample {

    public static void main(String... args) throws InterruptedException {

        new MqttPublishSample().start();

    }

  public void start() throws InterruptedException {


        for(int i=0;i<5;i++){

            new Thread(()->{
                MqttClient client = null;
                try {
                    client = obtainConnection();//code to obtain connection using MqttClient
                    publishMessages(client);//code to publish message using simple for loop 

                } catch (MqttException e) {
                    e.printStackTrace();
                }

            }).start();
        }
    }
public MqttClient obtainConnection() throws MqttException {
        String clientId = "sslTestClient"+ThreadLocalRandom.current().nextInt(0,5);
        MqttClient client = null;
        try {
            client = new MqttClient("ssl://localhost:8883", clientId, new MemoryPersistence());
        } catch (MqttException e) {
            e.printStackTrace();
        }

        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName("user1");
        mqttConnectOptions.setPassword("pass1".toCharArray());
        try {
            mqttConnectOptions.setSocketFactory(getTruststoreFactory());
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("connecting...");
        client.connect(mqttConnectOptions);
        return client;
    }

我希望所有客户端都能成功连接到代理并发布消息,无一例外

萨米·塔赫里(Sami Tahri)

可能是您在线程上使用了相同的clientID,因此服务器将断开重复的连接。当您使用LocalThreadRandom时,有可能发生冲突(因为只有5个选择,所以冲突很大)。您可以使用generateClientId()提供的唯一标识符,也可以在线程之间共享一个跟踪它们的方法。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何在Mosquitto MQTT中向特定客户端发布消息

使用 pika(python 客户端)通过 RabbitMQ 发布/订阅 MQTT 消息

如何使用Java处理来自客户端的Websocket消息?

MqttNet - MQTT 客户端无法接收 MQTT 服务器发布的所有消息

如何在Python Azure IoT中心客户端SDK中为mqtt发布消息设置QoS级别0?

使用Java订阅服务器可以使用的CometD Java客户端发布消息

Python paho mqtt 客户端不会同时发布和订阅

Eclipse Paho MQTT 客户端在 Java 中使用 TLS

MQTT如何从客户端创建客户端?

如何开始使用MQTT Paho JavaScript客户端?

如何在Mosquitto MQTT中使用客户端ID?

如何在Django中使用Paho mqtt客户端?

如何使用TCP连接同时接受多个客户端?

如何使用npm发布客户端脚本?

使用Java的akka websocket,计算客户端数量,向客户端发送消息

在该通道中发布新消息时,如何向客户端发送Redis消息

MQTT发布在同一客户端中订阅

RabbitMQ Java客户端停止使用消息

使用Java中的JButton向客户端发送消息

如何使用消息队列处理特定的客户端/用户?

服务器如何使用Java将消息发送到客户端

如何使用Java RMI从服务器向客户端发送消息?

如何通过 mqtt 接收来自多个客户端的消息?

运行命令行mqtt客户端时如何在此mqtt消息中包含\ r?

NATS持久消息Java客户端

尝试使用Windows客户端应用程序将消息发布到Facebook

如何在客户端Java应用程序中使用客户端证书?

MQTT C ++客户端

Eclipse Paho Mqtt客户端是否保留所有已发布的已发布消息,直到代理在QoS-2下对其进行确认以进行传递为止?