mosquitto发送接收离线消息(fusesource客户端java版)

MQTT协议就不多说了,百度一下很多,官网 mqtt.org上也有很多说明. 今天记录的是在物联网设备上连接mosquitto, 发布消息。 JAVA写的应用程序订阅设备发送过来的消息。就本身应用来说是很简单的,与通常用的MQ没多大差别。我关注的重点是 mosquitto 对离线消息的处理。通常网上的例子是没有这些的. 看了下官网文档。还有fusesource客户端mqtt-client API,发现是可以很简单实现的。记录下过程。

1. 修改配置文件 mosquitto.conf
程序代码 程序代码

persistence true
persistence_file mosquitto.db
persistence_location d:/tmp/mosquitto
max_queued_messages 100000


2. 客户端publish消息时需要设置clientID以及clean session设置为false
程序代码 程序代码

public static void main(String[] args) throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost(MQTTConstants.HOST, MQTTConstants.PORT);
        
        mqtt.setCleanSession(false);
        mqtt.setClientId("AH-JAVA-CLIENT-PUB-001");
        BlockingConnection connection = mqtt.blockingConnection();
        connection.connect();
        
        CountDownLatch count = new CountDownLatch(88);

        for(int i=0; i<88; i++) {
            String message = "Message_Test_" + i;            
            connection.publish(MQTTConstants.TOPIC_001, message.getBytes(), QoS.AT_LEAST_ONCE, false);
            System.out.println("publish: " + message);
            count.countDown();
            //Thread.sleep(10000);
        }  
        count.await();

        connection.disconnect();
    }


3. 客户端订阅消息时需要设置clientID以及clean session设置为false
程序代码 程序代码

public static void main(String[] args) throws Exception {
          MQTT mqtt = new MQTT();
        
          mqtt.setClientId("AH-JAVA-CLIENT-100");
          mqtt.setCleanSession(false);
        
          mqtt.setHost(MQTTConstants.HOST, MQTTConstants.PORT);
          BlockingConnection connection = mqtt.blockingConnection();
          connection.connect();

          Topic[] topics = {new Topic(MQTTConstants.TOPIC_001, QoS.AT_LEAST_ONCE), new Topic("$SYS/broker/clients/total", QoS.AT_LEAST_ONCE)};
          connection.subscribe(topics);
          int i = 0;
          while(true){
              Message message = connection.receive();
              if(message!=null){
                  System.out.println("Received messages. topic: " + message.getTopic() + "---" + new String(message.getPayload()));
                  message.ack();
                  i++;
                  if (i>= 50) {
                      break;
                  }
              }
              
          }
    }


用的JAVA写的测试,但真实场景设备那边是C语言的API, 应该类似处理就可以。java, pom包如下:
程序代码 程序代码

<dependency>
            <groupId>org.fusesource.mqtt-client</groupId>
            <artifactId>mqtt-client</artifactId>
            <version>1.12</version>
        </dependency>



除非申明,文章均为一号门原创,转载请注明本文地址,谢谢!
[本日志由 yihaomen 于 2019-05-10 04:58 PM 编辑]
文章来自: 本站原创
引用通告: 查看所有引用 | 我要引用此文章
Tags: MQTT mosquitto fusesource
相关日志:
评论: 0 | 引用: 0 | 查看次数: -
发表评论
昵 称:
密 码: 游客发言不需要密码.
内 容:
验证码: 验证码
选 项:
虽然发表评论不用注册,但是为了保护您的发言权,建议您注册帐号.