Commit dc720283 by YG8999

设备操作消息队列监控

parent 5385434a
......@@ -20,6 +20,11 @@ public class MqttConstants {
public static final String MQTT_DEVICE_LAST_TIME_KEY = "MQTT_DEVICE_LAST_TIME_KEY:";
/**
* 记录最后每个门店设备消息发送最后时间redis
*/
public static final String MQTT_STORE_LAST_TIME_KEY = "MQTT_STORE_LAST_TIME_KEY:";
/**
* 动作: 请求端为req
*/
public static final String MQTT_AC_REQ = "req";
......
package share.quartz.task;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONException;
import cn.hutool.json.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
......@@ -23,10 +26,12 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import share.common.constant.MqttConstants;
import share.common.core.redis.RedisUtil;
import share.common.enums.*;
import share.common.exception.base.BaseException;
import share.system.domain.*;
import share.system.domain.vo.MqttxVo;
import share.system.mapper.SConsumerMapper;
import share.system.service.*;
import share.system.service.impl.SOrderServiceImpl;
......@@ -298,4 +303,42 @@ public class RedisTask {
DatePattern.NORM_DATETIME_PATTERN), "1", 10L);
}
/**
* 设备消息发送队列失败中断监控
* 删除中断消息,继续发送设备消息
*/
public void monitorDeviceQueue() {
Set<String> keys = redisTemplate.keys(MqttConstants.MQTT_STORE_LAST_TIME_KEY + "*");
if (keys.size() == 0) {
return;
}
Date nowDate = DateUtil.date();
keys.stream().forEach(key -> {
String value = redisUtil.get(key);
if (StrUtil.isNotEmpty(value)) {
JSONObject jsonObject = new JSONObject(value);
Date sendTime = jsonObject.getDate("sendTime");
Long storeId = jsonObject.getLong("storeId");
if (sendTime != null) {
long betweenDay = DateUtil.between(sendTime, nowDate, DateUnit.SECOND);
if (betweenDay > 15L) {
String key2 = StrUtil.concat(true, MqttConstants.MQTT_REDIS_KEY,
StrUtil.toString(storeId));
if (redisUtil.getListSize(key2) > 0) {
// 删除失败消息redis记录
redisUtil.getRightPop(key2, 10L);
// 获取当前需要发送的消息
Object data = redisUtil.getIndex(key2, -1);
if (null != data) {
MqttxVo vo = BeanUtil.toBean(data, MqttxVo.class);
// 发送新的消息
deviceOpService.sendMqtt(vo, storeId);
}
}
}
}
}
});
}
}
......@@ -108,6 +108,6 @@ public interface DeviceOpService {
* mqtt 消息发送
* @param mqttxVo 消息内容
*/
void sendMqtt(MqttxVo mqttxVo);
void sendMqtt(MqttxVo mqttxVo, Long storeId);
}
......@@ -3,6 +3,8 @@ package share.system.service.impl;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -95,16 +97,26 @@ public class DeviceOpServiceImpl implements DeviceOpService {
mqttxVo.setSendSmsResult(0);
redisUtil.lPush(key, mqttxVo);
// 发送mqtt消息
this.sendMqtt(mqttxVo);
this.sendMqtt(mqttxVo, room.getStoreId());
}
}
}
public void sendMqtt(MqttxVo mqttxVo) {
/**
* 设备操作消息发送(开门、通电、断电)
* @param mqttxVo
*/
@Override
public void sendMqtt(MqttxVo mqttxVo, Long storeId) {
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 记录当前设备最后发送时间
redisUtil.set(MqttConstants.MQTT_DEVICE_LAST_TIME_KEY + mqttxVo.getDevId(), DateUtil.now());
// 记录当前门店最后发送设备时间
JSONObject jsonObject = new JSONObject();
jsonObject.set("storeId", storeId);
jsonObject.set("sendTime", DateUtil.date());
redisUtil.set(MqttConstants.MQTT_STORE_LAST_TIME_KEY + storeId, JSONUtil.toJsonStr(jsonObject));
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, mqttxVo.getPhone());
}
......
......@@ -732,7 +732,7 @@ public class MqttxServiceImpl implements MqttxService {
// 重新发送mqtt消息
// 异步执行, 间隔5秒
// 延时执行操作
this.supplyAsync(5L, vo);
this.supplyAsync(5L, vo, room.getStoreId());
} else if (type == 2) {
// 成功,发送下一条消息,发送mqtt消息
LambdaQueryWrapper<Device> query2Wrapper = new LambdaQueryWrapper();
......@@ -749,7 +749,7 @@ public class MqttxServiceImpl implements MqttxService {
}
System.out.println("测试设备消息间隔:"+ l);
// 异步执行
this.supplyAsync(l, vo);
this.supplyAsync(l, vo, room.getStoreId());
}
}
......@@ -757,13 +757,13 @@ public class MqttxServiceImpl implements MqttxService {
}
}
private void supplyAsync(Long m, MqttxVo vo) {
private void supplyAsync(Long m, MqttxVo vo, Long storeId) {
CompletableFuture.supplyAsync(() -> {
// 延时执行操作
try {
Thread.sleep(m * 1000);
// 获取mqtt的topic、payload
deviceOpService.sendMqtt(vo);
deviceOpService.sendMqtt(vo, storeId);
} catch (InterruptedException e) {
e.printStackTrace();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment