Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
G
gxpt_ht
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
pseer
gxpt_ht
Commits
adea1d3d
Commit
adea1d3d
authored
Nov 07, 2023
by
YG8999
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
设备网关、mqtt
parent
971f0d8c
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
756 additions
and
34 deletions
+756
-34
MqttConstants.java
...on/src/main/java/share/common/constant/MqttConstants.java
+21
-25
MqttOpType.java
...e-common/src/main/java/share/common/enums/MqttOpType.java
+42
-0
MqttOpenType.java
...common/src/main/java/share/common/enums/MqttOpenType.java
+1
-1
MqttReportType.java
...mmon/src/main/java/share/common/enums/MqttReportType.java
+64
-0
pom.xml
share-framework/pom.xml
+22
-0
MqttConfig.java
...mework/src/main/java/share/framework/mqtt/MqttConfig.java
+199
-0
MqttGatewayComponent.java
.../main/java/share/framework/mqtt/MqttGatewayComponent.java
+47
-0
MqttMessageHandler.java
...rc/main/java/share/framework/mqtt/MqttMessageHandler.java
+52
-0
MqttTopicHandler.java
.../src/main/java/share/framework/mqtt/MqttTopicHandler.java
+49
-0
MultiMqttMessageHandler.java
...in/java/share/framework/mqtt/MultiMqttMessageHandler.java
+82
-0
MyMqttPahoMessageHandler.java
...n/java/share/framework/mqtt/MyMqttPahoMessageHandler.java
+41
-0
SnowFlakeUtil.java
...ork/src/main/java/share/framework/mqtt/SnowFlakeUtil.java
+79
-0
MqttxService.java
...stem/src/main/java/share/system/service/MqttxService.java
+50
-1
MqttxServiceImpl.java
...main/java/share/system/service/impl/MqttxServiceImpl.java
+0
-0
DeviceGatewayMapper.xml
.../src/main/resources/mapper/system/DeviceGatewayMapper.xml
+3
-3
DeviceMapper.xml
...-system/src/main/resources/mapper/system/DeviceMapper.xml
+4
-4
No files found.
share-common/src/main/java/share/common/constant/MqttConstants.java
View file @
adea1d3d
package
share
.
common
.
constant
;
import
cn.hutool.core.collection.CollUtil
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* @className: share.common.constant.MqttConstants
...
...
@@ -24,16 +20,6 @@ public class MqttConstants {
public
static
final
String
MQTT_AC_RESP
=
"resp"
;
/**
* 操作命令: locklist, 网关设备列表操作
*/
public
static
final
String
MQTT_OP_LOCKLIST
=
"locklist"
;
/**
* 操作命令: opencloselock, 锁开门操作
*/
public
static
final
String
MQTT_OP_OPENCLOSELOCK
=
"opencloselock"
;
/**
* 数据流转方式,s2d:服务向设备请求
*/
public
static
final
String
MQTT_MODE_S2D
=
"s2d"
;
...
...
@@ -72,29 +58,39 @@ public class MqttConstants {
public
static
final
String
MQTT_DESCRIBE_DEVICE_ELECTRICITY_INTAKE
=
"取电"
;
/**
*
topic: 清除网关锁id列表
*
mqtt消息描述: 远程下发密码
*/
public
static
final
String
MQTT_
GATEWAY_CLEAR
=
"ydlink/{}/thing/property/set
"
;
public
static
final
String
MQTT_
DESCRIBE_DEVICE_PASSWORD
=
"远程下发密码
"
;
/**
*
topic: 下发组号、锁id/取电开关id列表
*
mqtt消息描述: 配置断电时间
*/
public
static
final
String
MQTT_
GATEWAY_ADD
=
"ydlink/{}/thing/property/set
"
;
public
static
final
String
MQTT_
DESCRIBE_CONFIG_POWEROFFDELAY
=
"配置断电时间
"
;
/**
*
topic: 下发组号、锁id/取电开关id列表
*
mqtt消息描述: 远程语音播报
*/
public
static
final
String
MQTT_DE
VICE_OPEN
=
"ydlink/{}/thing/action/execute
"
;
public
static
final
String
MQTT_DE
SCRIBE_ACTION_EXECUTE
=
"远程语音播报
"
;
/**
*
topic: 解析网关下属锁id信息: 锁id、信号值、电量、锁状态
*
mqtt消息描述: 远程下发卡片
*/
public
static
final
String
MQTT_
GATEWAY_BATCH_REPORT
=
"/thing/data/batch_report
"
;
public
static
final
String
MQTT_
DESCRIBE_DEVICE_IDCARD
=
"远程下发卡片
"
;
/**
*
设备上报订阅主题后缀集合
*
topic: 下发组号、锁id/取电开关id列表、清除网关锁id列表、配置从设备
*/
public
static
final
List
<
String
>
MQTT_TOPIC_REPORT_LIST
=
CollUtil
.
newArrayList
(
"/thing/data/batch_report"
);
public
static
final
String
MQTT_TOPIC_PROPERTY_SET
=
"ydlink/{}/thing/property/set"
;
/**
* topic: 远程开门、关门、取电、断电、语音播报
*/
public
static
final
String
MQTT_TOPIC_ACTION_EXECUTE
=
"ydlink/{}/thing/action/execute"
;
/**
* topic: 远程下发密码/卡片
*/
public
static
final
String
MQTT_DEVICE_SET_PASSWORD
=
"ydlink/{}/thing/data/batch"
;
}
share-common/src/main/java/share/common/enums/MqttOpType.java
0 → 100644
View file @
adea1d3d
package
share
.
common
.
enums
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.stream.Collectors
;
/**
* @className: share.common.enums.MqttOpenType
* @description: 操作类型
* @author: lwj
* @create: 2023-11-04 15:35
*/
public
enum
MqttOpType
{
MQTT_OP_OPENCLOSELOCK
(
"opencloselock"
,
"开关锁"
),
MQTT_OP_LOCKLIST
(
"locklist"
,
"网关设备列表"
),
PASSWORD
(
"password"
,
"远程下发密码"
),
IDCARD
(
"iccard"
,
"远程下发卡片"
),
CONFIG
(
"config"
,
"配置从设备"
),
VOICE
(
"voice"
,
"远程语音播报"
)
;
private
String
code
;
private
String
name
;
MqttOpType
(
String
code
,
String
name
)
{
this
.
code
=
code
;
this
.
name
=
name
;
}
//code转字符串1,2,3,4
public
static
String
getCodeList
()
{
List
<
String
>
list
=
Arrays
.
stream
(
MqttOpType
.
values
()).
map
(
MqttOpType:
:
getCode
).
collect
(
Collectors
.
toList
());
return
String
.
join
(
","
,
list
);
}
public
String
getCode
()
{
return
code
;
}
public
String
getName
()
{
return
name
;
}
}
share-common/src/main/java/share/common/enums/MqttOpenType.java
View file @
adea1d3d
...
...
@@ -27,7 +27,7 @@ public enum MqttOpenType {
//code转字符串1,2,3,4
public
static
String
getCodeList
()
{
List
<
String
>
list
=
Arrays
.
stream
(
StoreType
.
values
()).
map
(
Store
Type:
:
getCode
).
collect
(
Collectors
.
toList
());
List
<
String
>
list
=
Arrays
.
stream
(
MqttOpenType
.
values
()).
map
(
MqttOpen
Type:
:
getCode
).
collect
(
Collectors
.
toList
());
return
String
.
join
(
","
,
list
);
}
public
String
getCode
()
{
...
...
share-common/src/main/java/share/common/enums/MqttReportType.java
0 → 100644
View file @
adea1d3d
package
share
.
common
.
enums
;
import
cn.hutool.core.util.StrUtil
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.stream.Collectors
;
/**
* @className: share.common.enums.MqttReportType
* @description: mqtt 上报主题后缀
* @author: lwj
* @create: 2023-11-04 15:35
*/
public
enum
MqttReportType
{
// topic: 解析网关下属锁id信息: 锁id、信号值、电量、锁状态
MQTT_GATEWAY_BATCH_REPORT
(
"batch_report"
,
"解析网关下属锁id信息"
,
"/thing/data/batch_report"
),
// topic: 下发组号、锁id/取电开关id列表 回复主题
MQTT_GATEWAY_PROPERTY_SET
(
"property_set"
,
"下发组号、锁id/取电开关id信息回复主题"
,
"/thing/property/set"
)
;
private
String
code
;
private
String
name
;
private
String
topic
;
MqttReportType
(
String
code
,
String
name
,
String
topic
)
{
this
.
code
=
code
;
this
.
name
=
name
;
this
.
topic
=
topic
;
}
//code转字符串
public
static
String
getCodeList
()
{
List
<
String
>
list
=
Arrays
.
stream
(
MqttReportType
.
values
()).
map
(
MqttReportType:
:
getCode
).
collect
(
Collectors
.
toList
());
return
String
.
join
(
","
,
list
);
}
//topic转字符串
public
static
List
<
String
>
getTopicList
()
{
List
<
String
>
list
=
Arrays
.
stream
(
MqttReportType
.
values
()).
map
(
MqttReportType:
:
getTopic
).
collect
(
Collectors
.
toList
());
return
list
;
}
public
static
String
getTopicStr
(
String
code
)
{
if
(
StrUtil
.
isNotBlank
(
code
))
{
for
(
MqttReportType
type
:
MqttReportType
.
values
())
{
if
(
type
.
code
.
compareTo
(
code
)==
0
)
{
return
type
.
name
;
}
}
}
return
"0"
;
}
public
String
getCode
()
{
return
code
;
}
public
String
getName
()
{
return
name
;
}
public
String
getTopic
()
{
return
topic
;
}
}
share-framework/pom.xml
View file @
adea1d3d
...
...
@@ -59,6 +59,27 @@
<artifactId>
share-system
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-integration
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.integration
</groupId>
<artifactId>
spring-integration-stream
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.integration
</groupId>
<artifactId>
spring-integration-mqtt
</artifactId>
</dependency>
<dependency>
<groupId>
cn.hutool
</groupId>
<artifactId>
hutool-all
</artifactId>
<version>
5.8.16
</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
share-framework/src/main/java/share/framework/mqtt/MqttConfig.java
0 → 100644
View file @
adea1d3d
package
share
.
framework
.
mqtt
;
import
lombok.extern.slf4j.Slf4j
;
import
org.eclipse.paho.client.mqttv3.MqttConnectOptions
;
import
org.eclipse.paho.client.mqttv3.internal.wire.MqttConnect
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.context.event.EventListener
;
import
org.springframework.integration.annotation.IntegrationComponentScan
;
import
org.springframework.integration.annotation.ServiceActivator
;
import
org.springframework.integration.channel.DirectChannel
;
import
org.springframework.integration.core.MessageProducer
;
import
org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory
;
import
org.springframework.integration.mqtt.core.MqttPahoClientFactory
;
import
org.springframework.integration.mqtt.event.MqttConnectionFailedEvent
;
import
org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent
;
import
org.springframework.integration.mqtt.event.MqttMessageSentEvent
;
import
org.springframework.integration.mqtt.event.MqttSubscribedEvent
;
import
org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter
;
import
org.springframework.integration.mqtt.support.DefaultPahoMessageConverter
;
import
org.springframework.messaging.MessageChannel
;
import
org.springframework.messaging.MessageHandler
;
import
javax.annotation.Resource
;
import
java.util.Date
;
/**
* mqtt连接信息
*
* @author : mwx
* @date : 2022/12/28 13:39
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public
class
MqttConfig
{
@Value
(
"${spring.mqtt.url}"
)
private
String
url
;
@Value
(
"${spring.mqtt.clientId}"
)
private
String
clientId
;
@Value
(
"${spring.mqtt.username}"
)
private
String
userName
;
@Value
(
"${spring.mqtt.password}"
)
private
String
passWord
;
@Value
(
"${spring.mqtt.completion-timeout}"
)
private
int
completionTimeout
;
@Value
(
"${spring.mqtt.defaultTopic}"
)
private
String
defaultTopic
;
private
SnowFlakeUtil
snowFlakeUtil
=
new
SnowFlakeUtil
(
0
,
0
);
@Resource
private
MqttMessageHandler
mqttMessageHandler
;
@Bean
public
MqttConnectOptions
getMqttConnectOptions
()
{
// MQTT的连接设置
MqttConnectOptions
mqttConnectOptions
=
new
MqttConnectOptions
();
// 设置连接的用户名
mqttConnectOptions
.
setUserName
(
userName
);
// 设置连接的密码
mqttConnectOptions
.
setPassword
(
passWord
.
toCharArray
());
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
// 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,
// 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
mqttConnectOptions
.
setCleanSession
(
false
);
// 设置发布端地址,多个用逗号分隔, 如:tcp://111:1883,tcp://222:1883
// 当第一个111连接上后,222不会在连,如果111挂掉后,重试连111几次失败后,会自动去连接222
String
[]
urls
=
new
String
[]{
url
};
mqttConnectOptions
.
setServerURIs
(
urls
);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
mqttConnectOptions
.
setKeepAliveInterval
(
20
);
mqttConnectOptions
.
setAutomaticReconnect
(
true
);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//mqttConnectOptions.setWill(willTopic, willContent.getBytes(), 2, false);
mqttConnectOptions
.
setMaxInflight
(
1000000
);
return
mqttConnectOptions
;
}
@Bean
public
MqttPahoClientFactory
mqttClientFactory
()
{
DefaultMqttPahoClientFactory
factory
=
new
DefaultMqttPahoClientFactory
();
factory
.
setConnectionOptions
(
getMqttConnectOptions
());
return
factory
;
}
@Bean
@ServiceActivator
(
inputChannel
=
"mqttOutboundChannel"
)
public
MessageHandler
mqttOutbound
()
{
//==========方案一(基本普通方案)==========
//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线
//踩坑记录:1.相同clientId相互挤掉线导致mqtt重复断线重连和消息无法正常消费
//String clientIdStr = clientId + snowFlakeUtil.nextId();
//MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdStr, mqttClientFactory());
// //async如果为true,则调用方不会阻塞。而是在发送消息时等待传递确认。默认值为false(发送将阻塞,直到确认发送)
//messageHandler.setAsync(true);
//messageHandler.setAsyncEvents(true);
//messageHandler.setDefaultTopic(defaultTopic);
//messageHandler.setDefaultQos(0);
//return messageHandler;
//==========方案二(并发生产消息方案)==========
return
new
MultiMqttMessageHandler
();
}
public
MessageHandler
create
()
{
String
clientIdStr
=
clientId
+
snowFlakeUtil
.
nextId
();
MyMqttPahoMessageHandler
myMqttPahoMessageHandler
=
new
MyMqttPahoMessageHandler
(
url
,
clientIdStr
,
mqttClientFactory
());
myMqttPahoMessageHandler
.
setAsync
(
true
);
myMqttPahoMessageHandler
.
setAsyncEvents
(
true
);
myMqttPahoMessageHandler
.
setDefaultTopic
(
defaultTopic
);
myMqttPahoMessageHandler
.
setDefaultQos
(
0
);
myMqttPahoMessageHandler
.
onInit
();
return
myMqttPahoMessageHandler
;
}
/**
* 发消息通道
*/
@Bean
public
MessageChannel
mqttOutboundChannel
()
{
return
new
DirectChannel
();
}
/**
* 接收消息通道
*/
@Bean
public
MessageChannel
mqttInputChannel
()
{
return
new
DirectChannel
();
}
@Bean
public
MessageProducer
inbound
()
{
//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线
String
serverIdStr
=
clientId
+
snowFlakeUtil
.
nextId
();
//动态设置adapter的的topic值
String
[]
allTopics
=
defaultTopic
.
split
(
","
);
MqttPahoMessageDrivenChannelAdapter
adapter
=
new
MqttPahoMessageDrivenChannelAdapter
(
serverIdStr
,
mqttClientFactory
(),
allTopics
);
adapter
.
setCompletionTimeout
(
completionTimeout
);
DefaultPahoMessageConverter
defaultPahoMessageConverter
=
new
DefaultPahoMessageConverter
();
//接收字节数组类型有效荷载
defaultPahoMessageConverter
.
setPayloadAsBytes
(
true
);
adapter
.
setConverter
(
defaultPahoMessageConverter
);
adapter
.
setQos
(
2
);
adapter
.
setOutputChannel
(
mqttInputChannel
());
return
adapter
;
}
/**
* 通过通道获取数据 订阅的数据
*/
@Bean
@ServiceActivator
(
inputChannel
=
"mqttInputChannel"
)
public
MessageHandler
handler
()
{
return
message
->
{
//接收到的消息由这个处理器进行处理
mqttMessageHandler
.
handleMessage
(
message
);
};
}
@EventListener
(
MqttConnectionFailedEvent
.
class
)
public
void
mqttConnectionFailedEvent
(
MqttConnectionFailedEvent
event
)
{
log
.
error
(
"mqttConnectionFailedEvent连接mqtt失败: "
+
"date={}, hostUrl={}, username={}, error={}"
,
new
Date
(),
url
,
userName
,
event
.
getCause
().
getMessage
());
}
@EventListener
(
MqttConnect
.
class
)
public
void
reSubscribed
()
{
}
@EventListener
(
MqttMessageSentEvent
.
class
)
public
void
mqttMessageSentEvent
(
MqttMessageSentEvent
event
)
{
log
.
info
(
"mqttMessageSentEvent发送信息: date={}, info={}"
,
new
Date
(),
event
.
toString
());
}
// @EventListener(MqttMessageDeliveredEvent.class)
// public void mqttMessageDeliveredEvent(MqttMessageDeliveredEvent event) {
// //log.info("mqttMessageDeliveredEvent发送成功信息: date={}, info={}", new Date(), event.toString());
// }
@EventListener
(
MqttSubscribedEvent
.
class
)
public
void
mqttSubscribedEvent
(
MqttSubscribedEvent
event
)
{
log
.
info
(
"mqttSubscribedEvent订阅成功信息: date={}, info={}"
,
new
Date
(),
event
.
toString
());
}
}
share-framework/src/main/java/share/framework/mqtt/MqttGatewayComponent.java
0 → 100644
View file @
adea1d3d
package
share
.
framework
.
mqtt
;
import
org.springframework.integration.annotation.MessagingGateway
;
import
org.springframework.integration.mqtt.support.MqttHeaders
;
import
org.springframework.messaging.handler.annotation.Header
;
import
org.springframework.stereotype.Component
;
/**
* mqtt发送网关
*
* @author : mwx
* @date : 2022/12/28 14:02
*/
@Component
@MessagingGateway
(
defaultRequestChannel
=
"mqttOutboundChannel"
)
public
interface
MqttGatewayComponent
{
/**
* 发送消息至指定topic
*
* @param payload 消息内容
* @param topic topic
* @see
*/
@Deprecated
void
sendToMqtt
(
String
payload
,
@Header
(
MqttHeaders
.
TOPIC
)
String
topic
);
/**
* 发送字节数组
*
* @param payload 消息内容
* @param topic topic
* @see
*/
void
sendToMqtt
(
byte
[]
payload
,
@Header
(
MqttHeaders
.
TOPIC
)
String
topic
);
/**
* 发送消息至指定topic 能指定qos
*
* @param payload 消息内容
* @param topic topic
* @param qos 值为 0 1 2
*/
void
sendToMqtt
(
@Header
(
MqttHeaders
.
TOPIC
)
String
topic
,
@Header
(
MqttHeaders
.
QOS
)
int
qos
,
String
payload
);
}
share-framework/src/main/java/share/framework/mqtt/MqttMessageHandler.java
0 → 100644
View file @
adea1d3d
package
share
.
framework
.
mqtt
;
import
cn.hutool.core.util.ObjectUtil
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.integration.mqtt.support.MqttHeaders
;
import
org.springframework.messaging.Message
;
import
org.springframework.messaging.MessageHandler
;
import
org.springframework.messaging.MessagingException
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
/**
* mqtt topic中转
*
* @author : mwx
* @date : 2022/12/28 15:02
*/
@Slf4j
@Component
public
class
MqttMessageHandler
implements
MessageHandler
{
@Resource
private
MqttTopicHandler
topicHandler
;
/**
*
*消息会在这进行对topic的处理
* <p>重连后</p>
* */
@Override
public
void
handleMessage
(
Message
<?>
message
)
throws
MessagingException
{
//从这里来对消息进行分topic处理
String
topic
=
message
.
getHeaders
().
get
(
MqttHeaders
.
RECEIVED_TOPIC
).
toString
();
boolean
exist
=
topic
.
startsWith
(
"ydlink"
);
if
(
exist
==
false
)
{
log
.
info
(
"当前topic:{}未注册!不做处理!"
,
topic
);
return
;
}
//MqttTopicHandler topicHandler = SpringUtil.getBean(mqttTopicEnum.getTopicHandlerBeanName(), TopicHandler.class);
if
(
ObjectUtil
.
isNull
(
topicHandler
))
{
log
.
warn
(
"消息处理器未被实现,无法消费消息!"
);
return
;
}
// topic消费处理
topicHandler
.
handler
(
message
);
}
}
share-framework/src/main/java/share/framework/mqtt/MqttTopicHandler.java
0 → 100644
View file @
adea1d3d
package
share
.
framework
.
mqtt
;
import
cn.hutool.core.lang.Assert
;
import
cn.hutool.core.util.ObjectUtil
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.integration.mqtt.support.MqttHeaders
;
import
org.springframework.messaging.Message
;
import
org.springframework.stereotype.Service
;
/**
* 消息处理器
*
* @author : mwx
* @date : 2022/12/28 15:00
*/
@Slf4j
@Service
public
class
MqttTopicHandler
{
/**
* 这一步主要是对消息的有效荷载进行非空校验以及日志打印
*/
public
void
handler
(
Message
<?>
message
)
{
String
topic
=
message
.
getHeaders
().
get
(
MqttHeaders
.
RECEIVED_TOPIC
).
toString
();
log
.
info
(
"topic:{}-->message received!"
,
topic
);
String
payload
=
new
String
((
byte
[])
message
.
getPayload
());
Assert
.
notNull
(
payload
,
"接收到的消息为空!"
);
dispose
(
payload
,
topic
);
}
/**
* 对mqtt传来的消息有效荷载进行数据处理
*
*/
private
void
dispose
(
String
payload
,
String
topic
)
{
if
(!
ObjectUtil
.
isNull
(
payload
)){
try
{
log
.
info
(
"来自topic:{}的消息处理完毕!"
,
topic
);
log
.
info
(
"消息内容:{}"
,
payload
);
}
catch
(
Exception
e
)
{
log
.
error
(
"来自topic:{}的消息处理异常!原因:{}"
,
topic
,
e
);
}
}
}
}
share-framework/src/main/java/share/framework/mqtt/MultiMqttMessageHandler.java
0 → 100644
View file @
adea1d3d
package
share
.
framework
.
mqtt
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.Lifecycle
;
import
org.springframework.integration.handler.AbstractMessageHandler
;
import
org.springframework.messaging.Message
;
import
org.springframework.messaging.MessageHandler
;
import
org.springframework.stereotype.Component
;
import
java.util.Map
;
import
java.util.Random
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.atomic.AtomicBoolean
;
/**
* mqtt并发生产消息优化项配置
*
* <p>通知实例化多个消息处理器并随机取实现负载均衡产生消息</p>
*
* @author : mwx
* @date : 2023/1/13 11:11
*/
@Component
@Slf4j
public
class
MultiMqttMessageHandler
extends
AbstractMessageHandler
implements
Lifecycle
{
private
final
AtomicBoolean
running
=
new
AtomicBoolean
();
private
volatile
Map
<
Integer
,
MessageHandler
>
mqttHandlerMap
;
/**
* 模拟3个Client端
*/
private
Integer
handlerCount
=
3
;
@Autowired
private
MqttConfig
senderConfig
;
@Override
public
void
start
()
{
if
(!
this
.
running
.
getAndSet
(
true
))
{
doStart
();
}
}
private
void
doStart
()
{
mqttHandlerMap
=
new
ConcurrentHashMap
<>();
for
(
int
i
=
0
;
i
<
handlerCount
;
i
++)
{
mqttHandlerMap
.
put
(
i
,
senderConfig
.
create
());
}
}
@Override
public
void
stop
()
{
if
(
this
.
running
.
getAndSet
(
false
))
{
doStop
();
}
}
private
void
doStop
()
{
for
(
Map
.
Entry
<
Integer
,
MessageHandler
>
e
:
mqttHandlerMap
.
entrySet
())
{
MessageHandler
handler
=
e
.
getValue
();
((
MyMqttPahoMessageHandler
)
handler
).
doStop
();
}
}
@Override
public
boolean
isRunning
()
{
return
this
.
running
.
get
();
}
@Override
protected
void
handleMessageInternal
(
Message
<?>
message
)
{
//负载均衡
Random
random
=
new
Random
();
MyMqttPahoMessageHandler
messageHandler
=
(
MyMqttPahoMessageHandler
)
mqttHandlerMap
.
get
(
random
.
nextInt
(
handlerCount
));
log
.
info
(
"开始处理信息:{}"
,
message
.
toString
());
messageHandler
.
handleMessageInternal
(
message
);
}
}
share-framework/src/main/java/share/framework/mqtt/MyMqttPahoMessageHandler.java
0 → 100644
View file @
adea1d3d
package
share
.
framework
.
mqtt
;
import
org.springframework.integration.mqtt.core.MqttPahoClientFactory
;
import
org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler
;
import
org.springframework.messaging.Message
;
/**
* mqtt并发生产消息优化配置项
*
*
* @author : mwx
* @date : 2023/1/13 11:08
*/
public
class
MyMqttPahoMessageHandler
extends
MqttPahoMessageHandler
{
public
MyMqttPahoMessageHandler
(
String
url
,
String
clientId
,
MqttPahoClientFactory
clientFactory
)
{
super
(
url
,
clientId
,
clientFactory
);
}
@Override
public
void
doStop
()
{
super
.
doStop
();
}
@Override
public
void
handleMessageInternal
(
Message
<?>
message
)
{
super
.
handleMessageInternal
(
message
);
}
@Override
public
void
onInit
()
{
try
{
super
.
onInit
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
share-framework/src/main/java/share/framework/mqtt/SnowFlakeUtil.java
0 → 100644
View file @
adea1d3d
package
share
.
framework
.
mqtt
;
import
cn.hutool.core.lang.Singleton
;
/**
* 雪花算法工具类
*/
public
class
SnowFlakeUtil
{
private
static
final
long
START_STMP
=
1420041600000L
;
private
static
final
long
SEQUENCE_BIT
=
9L
;
private
static
final
long
MACHINE_BIT
=
2L
;
private
static
final
long
DATACENTER_BIT
=
2L
;
private
static
final
long
MAX_SEQUENCE
=
511L
;
private
static
final
long
MAX_MACHINE_NUM
=
3L
;
private
static
final
long
MAX_DATACENTER_NUM
=
3L
;
private
static
final
long
MACHINE_LEFT
=
9L
;
private
static
final
long
DATACENTER_LEFT
=
11L
;
private
static
final
long
TIMESTMP_LEFT
=
13L
;
private
long
datacenterId
;
private
long
machineId
;
private
long
sequence
=
0L
;
private
long
lastStmp
=
-
1L
;
public
SnowFlakeUtil
(
long
datacenterId
,
long
machineId
)
{
if
(
datacenterId
<=
3L
&&
datacenterId
>=
0L
)
{
if
(
machineId
<=
3L
&&
machineId
>=
0L
)
{
this
.
datacenterId
=
datacenterId
;
this
.
machineId
=
machineId
;
}
else
{
throw
new
IllegalArgumentException
(
"machineId can't be greater than MAX_MACHINE_NUM or less than 0"
);
}
}
else
{
throw
new
IllegalArgumentException
(
"datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0"
);
}
}
public
synchronized
long
nextId
()
{
long
currStmp
=
this
.
getNewstmp
();
if
(
currStmp
<
this
.
lastStmp
)
{
throw
new
RuntimeException
(
"Clock moved backwards. Refusing to generate id"
);
}
else
{
if
(
currStmp
==
this
.
lastStmp
)
{
this
.
sequence
=
this
.
sequence
+
1L
&
511L
;
if
(
this
.
sequence
==
0L
)
{
currStmp
=
this
.
getNextMill
();
}
}
else
{
this
.
sequence
=
0L
;
}
this
.
lastStmp
=
currStmp
;
return
currStmp
-
1420041600000L
<<
13
|
this
.
datacenterId
<<
11
|
this
.
machineId
<<
9
|
this
.
sequence
;
}
}
private
long
getNextMill
()
{
long
mill
;
for
(
mill
=
this
.
getNewstmp
();
mill
<=
this
.
lastStmp
;
mill
=
this
.
getNewstmp
())
{
}
return
mill
;
}
private
long
getNewstmp
()
{
return
System
.
currentTimeMillis
();
}
public
static
Long
getDefaultSnowFlakeId
()
{
return
Singleton
.
get
(
SnowFlakeUtil
.
class
,
new
Object
[]{
1L
,
1L
}).
nextId
();
}
public
static
void
main
(
String
[]
args
)
{
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
System
.
out
.
println
(
getDefaultSnowFlakeId
());
System
.
out
.
println
(
getDefaultSnowFlakeId
().
toString
().
length
());
}
}
}
share-system/src/main/java/share/system/service/MqttxService.java
View file @
adea1d3d
...
...
@@ -2,6 +2,8 @@ package share.system.service;
import
share.system.domain.vo.MqttxVo
;
import
java.util.Map
;
/**
* @className: share.system.service.MqttxService
* @description: MQTT 通讯服务接口
...
...
@@ -34,10 +36,57 @@ public interface MqttxService {
/**
* 开锁/关锁、取电/断电
* @param devId 门锁设备dev_id
* @param phone
开锁
用户
* @param phone
操作
用户
* @param opType 操作类型:10:开门,20:取电,30:锁门,40:断电
* @return
*/
MqttxVo
openOrCloseDevice
(
String
devId
,
String
phone
,
String
opType
);
/**
* 远程下发密码/卡片
* @param devId 门锁设备dev_id
* @param param 设置参数:密码、卡号、指纹
* @param startTime 生效时间:2020-04-06 10:00:00(格式)
* @param endTime 失效时间:2020-04-06 12:00:00(格式)
* @param phone 操作用户
* @param opType 操作类型:10:密码,20:卡片,30:指纹
* @return
*/
MqttxVo
setOpenPassword
(
String
devId
,
String
param
,
String
startTime
,
String
endTime
,
String
phone
,
String
opType
);
/**
* 远程删除密码/卡片
* @param devId 门锁设备dev_id
* @param param 设置参数:密码、卡号、指纹
* @param startTime 生效时间:2020-04-06 10:00:00(格式)
* @param endTime 失效时间:2020-04-06 12:00:00(格式)
* @param phone 操作用户
* @param opType 操作类型:10:密码,20:卡片,30:指纹
* @return
*/
MqttxVo
deleteOpenPassword
(
String
devId
,
String
param
,
String
startTime
,
String
endTime
,
String
phone
,
String
opType
);
/**
* 配置从设备信息
* @param devId 设备dev_id
* @param phone 操作用户
* @param opType 操作类型:poweroffdelay:配置断电时间
* @param params 配置参数
* @return
*/
MqttxVo
configDevice
(
String
devId
,
String
phone
,
String
opType
,
Map
<
String
,
String
>
params
);
/**
* 远程语音播报
* @param devId 设备dev_id
* @param phone 操作用户
* @param content 语音内容
* @param startTime 开始时间
* @param endTime 结束时间
* @param number 播报次数
* @return
*/
MqttxVo
actionExecute
(
String
devId
,
String
phone
,
String
content
,
String
startTime
,
String
endTime
,
String
number
);
}
share-system/src/main/java/share/system/service/impl/MqttxServiceImpl.java
View file @
adea1d3d
This diff is collapsed.
Click to expand it.
share-system/src/main/resources/mapper/system/DeviceGatewayMapper.xml
View file @
adea1d3d
...
...
@@ -39,7 +39,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if
test=
"devPsw != null and devPsw != ''"
>
and dev_psw = #{devPsw}
</if>
<if
test=
"devVer != null and devVer != ''"
>
and dev_ver = #{devVer}
</if>
<if
test=
"devType != null and devType != ''"
>
and dev_type = #{devType}
</if>
<if
test=
"group != null and group != ''"
>
and
group
= #{group}
</if>
<if
test=
"group != null and group != ''"
>
and
`group`
= #{group}
</if>
<if
test=
"status != null and status != ''"
>
and status = #{status}
</if>
<if
test=
"mqttIp != null and mqttIp != ''"
>
and mqtt_iP = #{mqttIp}
</if>
<if
test=
"mqttPort != null and mqttPort != ''"
>
and mqtt_port = #{mqttPort}
</if>
...
...
@@ -67,7 +67,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if
test=
"devPsw != null"
>
dev_psw,
</if>
<if
test=
"devVer != null"
>
dev_ver,
</if>
<if
test=
"devType != null"
>
dev_type,
</if>
<if
test=
"group != null"
>
group
,
</if>
<if
test=
"group != null"
>
`group`
,
</if>
<if
test=
"status != null"
>
status,
</if>
<if
test=
"mqttIp != null"
>
mqtt_iP,
</if>
<if
test=
"mqttPort != null"
>
mqtt_port,
</if>
...
...
@@ -111,7 +111,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if
test=
"devPsw != null"
>
dev_psw = #{devPsw},
</if>
<if
test=
"devVer != null"
>
dev_ver = #{devVer},
</if>
<if
test=
"devType != null"
>
dev_type = #{devType},
</if>
<if
test=
"group != null"
>
group
= #{group},
</if>
<if
test=
"group != null"
>
`group`
= #{group},
</if>
<if
test=
"status != null"
>
status = #{status},
</if>
<if
test=
"mqttIp != null"
>
mqtt_iP = #{mqttIp},
</if>
<if
test=
"mqttPort != null"
>
mqtt_port = #{mqttPort},
</if>
...
...
share-system/src/main/resources/mapper/system/DeviceMapper.xml
View file @
adea1d3d
...
...
@@ -42,7 +42,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if
test=
"devType != null and devType != ''"
>
and dev_type = #{devType}
</if>
<if
test=
"projtId != null and projtId != ''"
>
and projt_id = #{projtId}
</if>
<if
test=
"projtPsw != null and projtPsw != ''"
>
and projt_psw = #{projtPsw}
</if>
<if
test=
"group != null and group != ''"
>
and
group
= #{group}
</if>
<if
test=
"group != null and group != ''"
>
and
`group`
= #{group}
</if>
<if
test=
"status != null and status != ''"
>
and status = #{status}
</if>
<if
test=
"devPosition != null and devPosition != ''"
>
and dev_position = #{devPosition}
</if>
<if
test=
"gatewayId != null and gatewayId != ''"
>
and gateway_id = #{gatewayId}
</if>
...
...
@@ -71,7 +71,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if
test=
"devType != null"
>
dev_type,
</if>
<if
test=
"projtId != null"
>
projt_id,
</if>
<if
test=
"projtPsw != null"
>
projt_psw,
</if>
<if
test=
"group != null"
>
group
,
</if>
<if
test=
"group != null"
>
`group`
,
</if>
<if
test=
"status != null"
>
status,
</if>
<if
test=
"createBy != null"
>
create_by,
</if>
<if
test=
"createTime != null"
>
create_time,
</if>
...
...
@@ -117,7 +117,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if
test=
"devType != null"
>
dev_type = #{devType},
</if>
<if
test=
"projtId != null"
>
projt_id = #{projtId},
</if>
<if
test=
"projtPsw != null"
>
projt_psw = #{projtPsw},
</if>
<if
test=
"group != null"
>
group
= #{group},
</if>
<if
test=
"group != null"
>
`group`
= #{group},
</if>
<if
test=
"status != null"
>
status = #{status},
</if>
<if
test=
"createBy != null"
>
create_by = #{createBy},
</if>
<if
test=
"createTime != null"
>
create_time = #{createTime},
</if>
...
...
@@ -143,7 +143,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if
test=
"devType != null"
>
dev_type = #{devType},
</if>
<if
test=
"projtId != null"
>
projt_id = #{projtId},
</if>
<if
test=
"projtPsw != null"
>
projt_psw = #{projtPsw},
</if>
<if
test=
"group != null"
>
group
= #{group},
</if>
<if
test=
"group != null"
>
`group`
= #{group},
</if>
<if
test=
"status != null"
>
status = #{status},
</if>
<if
test=
"createBy != null"
>
create_by = #{createBy},
</if>
<if
test=
"createTime != null"
>
create_time = #{createTime},
</if>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment