MqttConfig.java 8.35 KB
Newer Older
1
package share.system.mqtt;
YG8999 committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58


import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttConnect;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;


import javax.annotation.Resource;
import java.util.Date;

/**
 * mqtt连接信息
 *
 * @author : mwx
 * @date : 2022/12/28 13:39
 */
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfig {

    @Value("${spring.mqtt.url}")
    private String url;

    @Value("${spring.mqtt.clientId}")
    private String clientId;

    @Value("${spring.mqtt.username}")
    private String userName;

    @Value("${spring.mqtt.password}")
    private String passWord;

    @Value("${spring.mqtt.completion-timeout}")
    private int completionTimeout;

    @Value("${spring.mqtt.defaultTopic}")
    private String defaultTopic;

YG8999 committed
59 60 61
    @Value("${spring.mqtt.is-subscribe-topic}")
    private boolean isSubscribeTopic;

YG8999 committed
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
    private SnowFlakeUtil snowFlakeUtil = new SnowFlakeUtil(0, 0);

    @Resource
    private MqttMessageHandler mqttMessageHandler;

    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        // MQTT的连接设置
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        // 设置连接的用户名
        mqttConnectOptions.setUserName(userName);
        // 设置连接的密码
        mqttConnectOptions.setPassword(passWord.toCharArray());
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
        // 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,
        // 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
        mqttConnectOptions.setCleanSession(false);
        // 设置发布端地址,多个用逗号分隔, 如:tcp://111:1883,tcp://222:1883
        // 当第一个111连接上后,222不会在连,如果111挂掉后,重试连111几次失败后,会自动去连接222
        String[] urls = new String[]{url};
        mqttConnectOptions.setServerURIs(urls);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        mqttConnectOptions.setKeepAliveInterval(20);
        mqttConnectOptions.setAutomaticReconnect(true);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        //mqttConnectOptions.setWill(willTopic, willContent.getBytes(), 2, false);
        mqttConnectOptions.setMaxInflight(1000000);
        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        //==========方案一(基本普通方案)==========
        //clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线
        //踩坑记录:1.相同clientId相互挤掉线导致mqtt重复断线重连和消息无法正常消费
        //String clientIdStr = clientId + snowFlakeUtil.nextId();
        //MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdStr, mqttClientFactory());
//        //async如果为true,则调用方不会阻塞。而是在发送消息时等待传递确认。默认值为false(发送将阻塞,直到确认发送)
        //messageHandler.setAsync(true);
        //messageHandler.setAsyncEvents(true);
        //messageHandler.setDefaultTopic(defaultTopic);
        //messageHandler.setDefaultQos(0);
        //return messageHandler;
        //==========方案二(并发生产消息方案)==========
        return new MultiMqttMessageHandler();
    }

    public MessageHandler create() {
        String clientIdStr = clientId + snowFlakeUtil.nextId();
        MyMqttPahoMessageHandler myMqttPahoMessageHandler = new MyMqttPahoMessageHandler(url, clientIdStr, mqttClientFactory());
        myMqttPahoMessageHandler.setAsync(true);
        myMqttPahoMessageHandler.setAsyncEvents(true);
        myMqttPahoMessageHandler.setDefaultTopic(defaultTopic);
        myMqttPahoMessageHandler.setDefaultQos(0);
        myMqttPahoMessageHandler.onInit();
        return myMqttPahoMessageHandler;
    }

    /**
     * 发消息通道
     */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * 接收消息通道
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        //clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线
        String serverIdStr = clientId + snowFlakeUtil.nextId();
        //动态设置adapter的的topic值
        String[] allTopics = defaultTopic.split(",");
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(serverIdStr, mqttClientFactory(), allTopics);
        adapter.setCompletionTimeout(completionTimeout);
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        //接收字节数组类型有效荷载
        defaultPahoMessageConverter.setPayloadAsBytes(true);
        adapter.setConverter(defaultPahoMessageConverter);
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    /**
     * 通过通道获取数据 订阅的数据
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
YG8999 committed
169 170 171 172 173
            // mqtt订阅消息由share_admin服务端处理
            if (isSubscribeTopic) {
                //接收到的消息由这个处理器进行处理
                mqttMessageHandler.handleMessage(message);
            }
YG8999 committed
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
        };
    }

    @EventListener(MqttConnectionFailedEvent.class)
    public void mqttConnectionFailedEvent(MqttConnectionFailedEvent event) {
        log.error("mqttConnectionFailedEvent连接mqtt失败: " +
                        "date={}, hostUrl={}, username={}, error={}",
                new Date(), url, userName, event.getCause().getMessage());
    }

    @EventListener(MqttConnect.class)
    public void reSubscribed() {

    }

    @EventListener(MqttMessageSentEvent.class)
    public void mqttMessageSentEvent(MqttMessageSentEvent event) {
        log.info("mqttMessageSentEvent发送信息: date={}, info={}", new Date(), event.toString());
    }

//    @EventListener(MqttMessageDeliveredEvent.class)
//    public void mqttMessageDeliveredEvent(MqttMessageDeliveredEvent event) {
//        //log.info("mqttMessageDeliveredEvent发送成功信息: date={}, info={}", new Date(), event.toString());
//    }

    @EventListener(MqttSubscribedEvent.class)
    public void mqttSubscribedEvent(MqttSubscribedEvent event) {
        log.info("mqttSubscribedEvent订阅成功信息: date={}, info={}", new Date(), event.toString());
    }


}