Commit 38a82faf by YG8999

设备操作,redis队列下发消息

parent b560c24b
......@@ -7,7 +7,10 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import cn.hutool.json.JSONUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.security.access.prepost.PreAuthorize;
......@@ -17,7 +20,9 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import share.common.constant.CacheConstants;
import share.common.constant.MqttConstants;
import share.common.core.domain.AjaxResult;
import share.common.core.redis.RedisUtil;
import share.common.utils.StringUtils;
import share.system.domain.SysCache;
......@@ -32,6 +37,8 @@ public class CacheController
{
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private RedisUtil redisUtil;
private final static List<SysCache> caches = new ArrayList<SysCache>();
{
......@@ -42,6 +49,8 @@ public class CacheController
caches.add(new SysCache(CacheConstants.REPEAT_SUBMIT_KEY, "防重提交"));
caches.add(new SysCache(CacheConstants.RATE_LIMIT_KEY, "限流处理"));
caches.add(new SysCache(CacheConstants.PWD_ERR_CNT_KEY, "密码错误次数"));
caches.add(new SysCache(MqttConstants.MQTT_REDIS_KEY, "设备下发消息队列"));
}
@PreAuthorize("@ss.hasPermi('monitor:cache:list')")
......@@ -87,7 +96,13 @@ public class CacheController
@GetMapping("/getValue/{cacheName}/{cacheKey}")
public AjaxResult getCacheValue(@PathVariable String cacheName, @PathVariable String cacheKey)
{
String cacheValue = redisTemplate.opsForValue().get(cacheKey);
String cacheValue = "";
if (MqttConstants.MQTT_REDIS_KEY.equals(cacheName)) {
List<Object> list = redisUtil.lRange(cacheKey, 0, -1);
cacheValue = JSONUtil.toJsonStr(list);
} else {
cacheValue = redisTemplate.opsForValue().get(cacheKey);
}
SysCache sysCache = new SysCache(cacheName, cacheKey, cacheValue);
return AjaxResult.success(sysCache);
}
......
......@@ -2,6 +2,8 @@ package share.web.controller.system;
import java.util.List;
import javax.servlet.http.HttpServletResponse;
import cn.hutool.core.util.StrUtil;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
......@@ -15,6 +17,7 @@ import org.springframework.web.bind.annotation.RestController;
import share.common.annotation.Log;
import share.common.core.controller.BaseController;
import share.common.core.domain.AjaxResult;
import share.common.core.domain.entity.SysUser;
import share.common.enums.BusinessType;
import share.common.utils.SecurityUtils;
import share.system.domain.DeviceDto;
......@@ -22,11 +25,9 @@ import share.system.mqtt.MqttGatewayComponent;
import share.system.domain.Device;
import share.system.domain.vo.DeviceParamVo;
import share.system.domain.vo.MqttxVo;
import share.system.service.DeviceLogService;
import share.system.service.DeviceService;
import share.system.service.*;
import share.common.utils.poi.ExcelUtil;
import share.common.core.page.TableDataInfo;
import share.system.service.MqttxService;
/**
* 设备信息Controller
......@@ -41,11 +42,9 @@ public class DeviceController extends BaseController
@Autowired
private DeviceService deviceService;
@Autowired
private MqttxService mqttxService;
@Autowired
private DeviceLogService deviceLogService;
private DeviceOpService deviceOpService;
@Autowired
private MqttGatewayComponent mqttGatewayComponent;
private ISysUserService sysUserService;
/**
* 查询设备信息列表
......@@ -123,14 +122,13 @@ public class DeviceController extends BaseController
@PostMapping(value = "/openOrClose")
public AjaxResult openOrClose(@RequestBody DeviceParamVo deviceParam)
{
// 获取mqtt的topic、payload
MqttxVo mqttxVo = mqttxService.openOrCloseDevice(deviceParam.getDevId(),
SecurityUtils.getUsername(), deviceParam.getOpType());
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 写日志记录
int result = deviceLogService.addDeviceLog(mqttxVo, SecurityUtils.getUsername());
return toAjax(result);
Long userId = SecurityUtils.getUserId();
if (userId != null) {
SysUser sysUser = sysUserService.selectUserById(userId);
deviceParam.setPhone(sysUser.getPhonenumber());
deviceOpService.openOrCloseDev(deviceParam);
}
return toAjax(true);
}
/**
......
......@@ -10,6 +10,16 @@ package share.common.constant;
public class MqttConstants {
/**
* 设备消息发送redis缓存前缀
*/
public static final String MQTT_REDIS_KEY = "MQTT_REDIS_KEY:";
/**
* 记录最后设备消息发送时间redis
*/
public static final String MQTT_DEVICE_LAST_TIME_KEY = "MQTT_DEVICE_LAST_TIME_KEY:";
/**
* 动作: 请求端为req
*/
public static final String MQTT_AC_REQ = "req";
......
......@@ -227,6 +227,18 @@ public class RedisUtil {
}
/**
* 列表添加右边添加
* @param k string key
* @param v Object v
* @author Mr.Zhang
* @since 2020-04-13
*/
public void rPush(String k, Object v) {
ListOperations<String, Object> list = redisTemplate.opsForList();
list.rightPush(k, v);
}
/**
* 从右边拿出来一个
* @param k string key
* @param t Long 超时秒数
......@@ -236,6 +248,28 @@ public class RedisUtil {
}
/**
* 某一个信息获取
* @param k string key
* @param l long l
* @return List<Object>
*/
public Object getIndex(String k, long l) {
ListOperations<String, Object> list = redisTemplate.opsForList();
return list.index(k, l);
}
/**
* 设置list中指定下标的值,采用干的是替换规则, 最左边的下标为0;-1表示最右边的一个
*
* @param k 主键
* @param index 下标
* @param v 值
*/
public void set(String k, Integer index, Object v) {
redisTemplate.opsForList().set(k, index, v);
}
/**
* 列表获取数量
* @param k string key
* @return Long
......
......@@ -57,6 +57,9 @@ public class DeviceLog extends BaseEntity
@Excel(name = "描述")
private String description;
@Excel(name = "重新发送次数")
private Integer afreshNum;
/**
* 房间名称
*/
......
......@@ -35,4 +35,19 @@ public class MqttxVo extends BaseEntity {
/** 消息描述:开门、开灯、网关设备绑定等 */
private String mqttDescribe;
/** 消息操作类型 */
private String opType;
/**
* 重新发送次数
*/
private Integer refreshNum;
/**
* 是否需要发送短信及结果:0-不需要,1-需要,2-发送成功,4-发送失败
*/
private Integer sendSmsResult;
private String phone;
}
......@@ -71,4 +71,7 @@ public interface DeviceLogMapper extends BaseMapper<DeviceLog>
*/
List<DeviceLog> selectListByMaxId(@Param("devIds") List<String> devIds, @Param("startDate") Date startDate);
DeviceLog selectDeviceLogOneBySeqMax(String seq);
int updateDeviceById(DeviceLog deviceLog);
}
......@@ -12,6 +12,12 @@ import share.system.domain.vo.MqttxVo;
public interface DeviceOpService {
/**
* 管理系统开门、通电、断电
* @param deviceParam
*/
void openOrCloseDev(DeviceParamVo deviceParam);
/**
* 小程序订单开门、通电
* @param deviceParam
*/
......@@ -98,4 +104,10 @@ public interface DeviceOpService {
*/
void deleteDevicePassword(Long roomId, String param, String phone, String opType);
/**
* mqtt 消息发送
* @param mqttxVo 消息内容
*/
void sendMqtt(MqttxVo mqttxVo);
}
......@@ -73,4 +73,12 @@ public interface DeviceService extends IService<Device>
List<Device> selectDeviceListByIds(List<Long> collect);
List<Device> selectDeviceByRoomId(Long roomId);
/**
* 查询设备信息
*
* @param devId 设备devId
* @return 设备信息
*/
Device selectDeviceByDevId(String devId);
}
......@@ -151,6 +151,7 @@ public class DeviceLogServiceImpl extends ServiceImpl<DeviceLogMapper, DeviceLog
deviceLog.setMqttDescribe(mqttxVo.getMqttDescribe());
deviceLog.setCreateTime(DateUtils.getNowDate());
deviceLog.setCreateBy(userNmae);
deviceLog.setAfreshNum(mqttxVo.getRefreshNum());
return deviceLogMapper.insertDeviceLog(deviceLog);
}
}
package share.system.service.impl;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import share.common.constant.Constants;
import share.common.constant.MqttConstants;
import share.common.core.redis.RedisUtil;
import share.common.enums.DeviceType;
import share.common.enums.OpTypeEnum;
import share.common.utils.SecurityUtils;
......@@ -44,6 +50,83 @@ public class DeviceOpServiceImpl implements DeviceOpService {
private DeviceLogService deviceLogService;
@Autowired
private DeviceMapper deviceMapper;
@Autowired
private RedisUtil redisUtil;
/**
* 设备消息redis缓存
* @param devId 设备id
* @param mqttxVo 消息内容
*/
private void setRedisCache(String devId, MqttxVo mqttxVo) {
// 设备信息
LambdaQueryWrapper<Device> queryWrapper = new LambdaQueryWrapper();
queryWrapper.eq(Device::getDevId, devId);
Device device = deviceMapper.selectOne(queryWrapper);
// 房间信息
LambdaQueryWrapper<SRoom> queryRoomWrapper = new LambdaQueryWrapper();
queryRoomWrapper.eq(SRoom::getId, device.getRoomId());
SRoom room = roomMapper.selectOne(queryRoomWrapper);
if (room != null) {
String key = StrUtil.concat(true, MqttConstants.MQTT_REDIS_KEY,
StrUtil.toString(room.getStoreId()));
Long size = redisUtil.getListSize(key);
if (size > 0) {
// 如果存在数据
// 加入redis缓存队列
boolean isExist = Boolean.FALSE;
List<Object> list = redisUtil.lRange(key, 0, -1);
for (Object o : list) {
MqttxVo vo = BeanUtil.toBean(o, MqttxVo.class);
if (vo.getSeq().equals(mqttxVo.getSeq())) {
isExist = Boolean.TRUE;
}
}
if (!isExist) {
// 如果redis队列不存在此消息,添加队列
// 加入发送mqtt消息队列
mqttxVo.setRefreshNum(0);
mqttxVo.setSendSmsResult(0);
redisUtil.lPush(key, mqttxVo);
}
} else {
// 加入发送mqtt消息队列
mqttxVo.setRefreshNum(0);
mqttxVo.setSendSmsResult(0);
redisUtil.lPush(key, mqttxVo);
// 发送mqtt消息
this.sendMqtt(mqttxVo);
}
}
}
public void sendMqtt(MqttxVo mqttxVo) {
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 记录当前设备最后发送时间
redisUtil.set(MqttConstants.MQTT_DEVICE_LAST_TIME_KEY + mqttxVo.getDevId(), DateUtil.now());
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, mqttxVo.getPhone());
}
/**
* 管理系统开门、通电、断电
* @param deviceParam
*/
@Override
public void openOrCloseDev(DeviceParamVo deviceParam) {
Device device = deviceMapper.selectDeviceByDevId(deviceParam.getDevId());
if (device != null) {
if (DeviceType.DEVICE_CCEE.getCode().equals(device.getDevType())) {
// 门锁
this.deviceOpInit(device.getDevId(), deviceParam.getPhone(), deviceParam.getOpType(), true, 2L);
}
if (DeviceType.DEVICE_0001.getCode().equals(device.getDevType())) {
// 取电开关
this.deviceOpInit(device.getDevId(), deviceParam.getPhone(), deviceParam.getOpType(), true, 2L);
}
}
}
/**
* 小程序订单开门
......@@ -62,7 +145,7 @@ public class DeviceOpServiceImpl implements DeviceOpService {
}
if (DeviceType.DEVICE_0001.getCode().equals(device.getDevType())) {
// 取电开关
this.deviceOpInit(device.getDevId(), deviceParam.getPhone(), OpTypeEnum.GET_ELECTRIC.getCode(), true, 5L);
this.deviceOpInit(device.getDevId(), deviceParam.getPhone(), OpTypeEnum.GET_ELECTRIC.getCode(), true, 2L);
}
}
......@@ -87,7 +170,7 @@ public class DeviceOpServiceImpl implements DeviceOpService {
}
if (DeviceType.DEVICE_0001.getCode().equals(device.getDevType())) {
// 取电开关
this.deviceOpInit(device.getDevId(), phone, OpTypeEnum.GET_ELECTRIC.getCode(), true, 5L);
this.deviceOpInit(device.getDevId(), phone, OpTypeEnum.GET_ELECTRIC.getCode(), true, 2L);
}
}
......@@ -103,11 +186,9 @@ public class DeviceOpServiceImpl implements DeviceOpService {
Thread.sleep(m * 1000);
// 获取mqtt的topic、payload
MqttxVo mqttxVo = mqttxService.openOrCloseDevice(devId, phone, opType);
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, phone);
logger.info("延时执行开门、取电操作完成!");
mqttxVo.setPhone(phone);
// 设置消息队列
this.setRedisCache(devId, mqttxVo);
} catch (InterruptedException e) {
e.printStackTrace();
}
......@@ -116,10 +197,9 @@ public class DeviceOpServiceImpl implements DeviceOpService {
} else {
// 获取mqtt的topic、payload
MqttxVo mqttxVo = mqttxService.openOrCloseDevice(devId, phone, opType);
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, phone);
mqttxVo.setPhone(phone);
// 设置消息队列
this.setRedisCache(devId, mqttxVo);
}
}
......@@ -132,10 +212,9 @@ public class DeviceOpServiceImpl implements DeviceOpService {
public void clearGatewayDevice(String devId, String userName) {
// 获取mqtt的topic、payload
MqttxVo mqttxVo = mqttxService.clearGatewayDevice(devId);
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, userName);
mqttxVo.setPhone(userName);
// 设置消息队列
this.setRedisCache(devId, mqttxVo);
}
/**
......@@ -147,10 +226,9 @@ public class DeviceOpServiceImpl implements DeviceOpService {
public void addGatewayDevice(String devId, String userName) {
// 获取mqtt的topic、payload
MqttxVo mqttxVo = mqttxService.addGatewayDevice(devId);
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, userName);
mqttxVo.setPhone(userName);
// 设置消息队列
this.setRedisCache(devId, mqttxVo);
}
/**
......@@ -175,10 +253,9 @@ public class DeviceOpServiceImpl implements DeviceOpService {
for (Device device : list) {
// 获取mqtt的topic、payload
MqttxVo mqttxVo = mqttxService.actionExecute(device.getDevId(), phone, content, startTime, endTime, number);
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, phone);
mqttxVo.setPhone(phone);
// 设置消息队列
this.setRedisCache(device.getDevId(), mqttxVo);
}
}
}
......@@ -211,10 +288,9 @@ public class DeviceOpServiceImpl implements DeviceOpService {
for (Device device : list) {
// 获取mqtt的topic、payload
MqttxVo mqttxVo = mqttxService.actionExecute(device.getDevId(), phone, content, startTime, endTime, number);
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, phone);
mqttxVo.setPhone(phone);
// 设置消息队列
this.setRedisCache(device.getDevId(), mqttxVo);
}
logger.info("延时执行语音播报完成!");
} catch (InterruptedException e) {
......@@ -255,10 +331,9 @@ public class DeviceOpServiceImpl implements DeviceOpService {
Thread.sleep(m * 1000);
// 获取mqtt的topic、payload
MqttxVo mqttxVo = mqttxService.openOrCloseDevice(device.getDevId(), phone, opType);
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, phone);
mqttxVo.setPhone(phone);
// 设置消息队列
this.setRedisCache(device.getDevId(), mqttxVo);
logger.info("延时执行开锁/关锁、取电/断电完成!");
} catch (InterruptedException e) {
e.printStackTrace();
......@@ -268,10 +343,9 @@ public class DeviceOpServiceImpl implements DeviceOpService {
} else {
// 获取mqtt的topic、payload
MqttxVo mqttxVo = mqttxService.openOrCloseDevice(device.getDevId(), phone, opType);
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, phone);
mqttxVo.setPhone(phone);
// 设置消息队列
this.setRedisCache(device.getDevId(), mqttxVo);
}
});
}
......@@ -299,10 +373,9 @@ public class DeviceOpServiceImpl implements DeviceOpService {
list.stream().forEach(device -> {
// 获取mqtt的topic、payload
MqttxVo mqttxVo = mqttxService.setOpenPassword(device.getDevId(), param, startTime, endTime, phone, opType);
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, phone);
mqttxVo.setPhone(phone);
// 设置消息队列
this.setRedisCache(device.getDevId(), mqttxVo);
});
}
}
......@@ -326,10 +399,9 @@ public class DeviceOpServiceImpl implements DeviceOpService {
list.stream().forEach(device -> {
// 获取mqtt的topic、payload
MqttxVo mqttxVo = mqttxService.deleteOpenPassword(device.getDevId(), param, null, null, phone, opType);
// 发送mqtt消息
mqttGatewayComponent.sendToMqtt(mqttxVo.getTopic(), 0, mqttxVo.getPayload());
// 写日志记录
deviceLogService.addDeviceLog(mqttxVo, phone);
mqttxVo.setPhone(phone);
// 设置消息队列
this.setRedisCache(device.getDevId(), mqttxVo);
});
}
}
......
......@@ -125,6 +125,13 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceMapper, Device> impleme
}
@Override
public Device selectDeviceByDevId(String devId) {
LambdaQueryWrapper<Device> queryWrapper = new LambdaQueryWrapper();
queryWrapper.eq(Device::getDevId, devId);
return deviceMapper.selectOne(queryWrapper);
}
@Override
public List<Device> notRoomIdList(Device device) {
SRoom room = roomMapper.selectSRoomById(device.getRoomId());
if (room != null) {
......
package share.system.service.impl;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
......@@ -11,14 +13,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import share.common.constant.MqttConstants;
import share.common.core.redis.RedisUtil;
import share.common.enums.*;
import share.common.utils.DateUtils;
import share.system.domain.*;
import share.system.domain.vo.MqttxVo;
import share.system.mapper.DeviceGatewayMapper;
import share.system.mapper.DeviceLogMapper;
import share.system.mapper.DeviceMapper;
import share.system.mapper.DeviceStatusLogMapper;
import share.system.mapper.*;
import share.system.service.DeviceOpService;
import share.system.service.DeviceStatusLogService;
import share.system.service.MqttxService;
......@@ -26,6 +27,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
......@@ -45,9 +47,15 @@ public class MqttxServiceImpl implements MqttxService {
@Autowired
private DeviceLogMapper deviceLogMapper;
@Autowired
private SRoomMapper roomMapper;
@Autowired
private DeviceStatusLogService deviceStatusLogService;
@Value("${mqtt.device-op-last-time}")
private Integer deviceOpLastTime;
@Autowired
private RedisUtil redisUtil;
@Autowired
private DeviceOpService deviceOpService;
/**
* 清除网关锁id列表
......@@ -567,77 +575,82 @@ public class MqttxServiceImpl implements MqttxService {
// 设备信息
List<Device> list = new ArrayList<>();
for (Object o : array) {
// 默认离线
String status = DeviceStatusEnum.DEVICE_OFFLINE.getCode();
JSONArray jsonArray = JSONUtil.parseArray(o);
String devId = jsonArray.getStr(0);
// 当前数据库设备信息
Device dev = map.get(devId);
String statusStr = jsonArray.getStr(3);
// 十进制转二进制
int decimal = Integer.parseInt(statusStr);
String binary = Integer.toBinaryString(decimal);
String[] binaryArray = binary.split("(?<=\\G.)");
if(binaryArray.length > 0) {
String bit0 = binaryArray[0];
if ("1".equals(bit0)) {
if(binaryArray.length > 2) {
String bit2 = binaryArray[2];
if ("1".equals(bit2)) {
status = DeviceStatusEnum.DEVICE_ENERGIZE.getCode();
} else {
status = DeviceStatusEnum.DEVICE_OUTAGE.getCode();
}
} else if (dev != null){
// 设备类型判断
if (DeviceType.DEVICE_0001.getCode().equals(dev.getDevType())) {
status = DeviceStatusEnum.DEVICE_OUTAGE.getCode();
} else if(DeviceType.DEVICE_CCEE.getCode().equals(dev.getDevType())) {
status = DeviceStatusEnum.DEVICE_ONLINE.getCode();
try {
// 默认离线
String status = DeviceStatusEnum.DEVICE_OFFLINE.getCode();
JSONArray jsonArray = JSONUtil.parseArray(o);
String devId = jsonArray.getStr(0);
// 当前数据库设备信息
Device dev = map.get(devId);
String statusStr = jsonArray.getStr(3);
// 十进制转二进制
int decimal = Integer.parseInt(statusStr);
String binary = Integer.toBinaryString(decimal);
String[] binaryArray = binary.split("(?<=\\G.)");
if (binaryArray.length > 0) {
String bit0 = binaryArray[0];
if ("1".equals(bit0)) {
if (binaryArray.length > 2) {
String bit2 = binaryArray[2];
if ("1".equals(bit2)) {
status = DeviceStatusEnum.DEVICE_ENERGIZE.getCode();
} else {
status = DeviceStatusEnum.DEVICE_OUTAGE.getCode();
}
} else if (dev != null) {
// 设备类型判断
if (DeviceType.DEVICE_0001.getCode().equals(dev.getDevType())) {
status = DeviceStatusEnum.DEVICE_OUTAGE.getCode();
} else if (DeviceType.DEVICE_CCEE.getCode().equals(dev.getDevType())) {
status = DeviceStatusEnum.DEVICE_ONLINE.getCode();
}
}
}
}
}
if (dev != null && StrUtil.isNotEmpty(dev.getStatus()) && !status.equals(dev.getStatus())) {
DeviceStatusLog deviceStatusLog = new DeviceStatusLog();
deviceStatusLog.setDevId(devId);
deviceStatusLog.setDevMac(dev.getDevMac());
deviceStatusLog.setRoomId(dev.getRoomId());
deviceStatusLog.setSignalValue(jsonArray.getStr(1));
deviceStatusLog.setVoltage(jsonArray.getStr(2));
deviceStatusLog.setStatus(status);
deviceStatusLog.setPreviousStatus(dev.getStatus());
deviceStatusLog.setCreateTime(DateUtils.getNowDate());
deviceStatusLog.setIsSendSms(YesNoEnum.no.getIndex());
deviceStatusLog.setIsAbnormal(YesNoEnum.no.getIndex());
if (DeviceStatusEnum.DEVICE_OFFLINE.getCode().equals(status)) {
// 变更设备为离线状态
deviceStatusLog.setIsAbnormal(YesNoEnum.yes.getIndex());
} else {
DeviceLog deviceLog = deviceLogMap.get(devId);
if (deviceLog != null) {
deviceStatusLog.setOperateLogId(deviceLog.getId());
if (DeviceType.DEVICE_0001.getCode().equals(dev.getDevType())) {
if (!DeviceStatusEnum.getNameStr(status).equals(deviceLog.getMqttDescribe())) {
// 平台3分钟内最新操作类型与设备变更状态不匹配
deviceStatusLog.setIsAbnormal(YesNoEnum.yes.getIndex());
if (dev != null && StrUtil.isNotEmpty(dev.getStatus()) && !status.equals(dev.getStatus())) {
DeviceStatusLog deviceStatusLog = new DeviceStatusLog();
deviceStatusLog.setDevId(devId);
deviceStatusLog.setDevMac(dev.getDevMac());
deviceStatusLog.setRoomId(dev.getRoomId());
deviceStatusLog.setSignalValue(jsonArray.getStr(1));
deviceStatusLog.setVoltage(jsonArray.getStr(2));
deviceStatusLog.setStatus(status);
deviceStatusLog.setPreviousStatus(dev.getStatus());
deviceStatusLog.setCreateTime(DateUtils.getNowDate());
deviceStatusLog.setIsSendSms(YesNoEnum.no.getIndex());
deviceStatusLog.setIsAbnormal(YesNoEnum.no.getIndex());
if (DeviceStatusEnum.DEVICE_OFFLINE.getCode().equals(status)) {
// 变更设备为离线状态
deviceStatusLog.setIsAbnormal(YesNoEnum.yes.getIndex());
} else {
DeviceLog deviceLog = deviceLogMap.get(devId);
if (deviceLog != null) {
deviceStatusLog.setOperateLogId(deviceLog.getId());
if (DeviceType.DEVICE_0001.getCode().equals(dev.getDevType())) {
if (!DeviceStatusEnum.getNameStr(status).equals(deviceLog.getMqttDescribe())) {
// 平台3分钟内最新操作类型与设备变更状态不匹配
deviceStatusLog.setIsAbnormal(YesNoEnum.yes.getIndex());
}
}
} else {
// 异常设备状态变更,非平台操作变更
deviceStatusLog.setIsAbnormal(YesNoEnum.yes.getIndex());
}
} else {
// 异常设备状态变更,非平台操作变更
deviceStatusLog.setIsAbnormal(YesNoEnum.yes.getIndex());
}
statusLogs.add(deviceStatusLog);
}
statusLogs.add(deviceStatusLog);
// 更新设备信息
Device device = new Device();
device.setDevId(devId);
device.setSignalValue(jsonArray.getStr(1));
device.setVoltage(jsonArray.getStr(2));
device.setStatus(status);
device.setUpdateTime(DateUtils.getNowDate());
list.add(device);
} catch (Exception e) {
// 解析异常跳出循环
continue;
}
// 更新设备信息
Device device = new Device();
device.setDevId(devId);
device.setSignalValue(jsonArray.getStr(1));
device.setVoltage(jsonArray.getStr(2));
device.setStatus(status);
device.setUpdateTime(DateUtils.getNowDate());
list.add(device);
}
// 插入设备变更记录
deviceStatusLogService.saveBatch(statusLogs);
......@@ -658,15 +671,105 @@ public class MqttxServiceImpl implements MqttxService {
if (json.size() > 0) {
JSONArray array = json.getJSONArray("params");
if (array != null && array.size() > 0) {
DeviceLog deviceLog = new DeviceLog();
String seq = json.getStr("seq");
deviceLog.setSeq(seq);
deviceLog.setRemark(payload);
deviceLog.setResult(array.getStr(1));
return 0 < deviceLogMapper.updateBySeq(deviceLog);
String result = array.getStr(1);
DeviceLog deviceLog = deviceLogMapper.selectDeviceLogOneBySeqMax(seq);
// 状态10代表网关接到消息响应,不做处理;另外还会有一条操作结果响应数据
if (deviceLog != null && !"10".equals(result)) {
if ("0".equals(result)) {
// 成功
this.sendMqttTopic(deviceLog.getDevId(), 2);
} else {
// 失败
if (deviceLog.getAfreshNum() != null && deviceLog.getAfreshNum() < 3) {
this.sendMqttTopic(deviceLog.getDevId(), 1);
} else {
// 失败3次后不重复发送,执行下一条
this.sendMqttTopic(deviceLog.getDevId(), 2);
}
}
deviceLog.setRemark(payload);
deviceLog.setResult(result);
deviceLog.setUpdateTime(DateUtil.date());
return 0 < deviceLogMapper.updateDeviceById(deviceLog);
}
}
}
return false;
}
/**
* mqtt监控返回消息调用
* @param devId 设备id
* @param type 类型:1-失败,2-成功
*/
private void sendMqttTopic(String devId, Integer type) {
LambdaQueryWrapper<Device> queryWrapper = new LambdaQueryWrapper();
queryWrapper.eq(Device::getDevId, devId);
Device device = deviceMapper.selectOne(queryWrapper);
if (device != null) {
LambdaQueryWrapper<SRoom> queryRoomWrapper = new LambdaQueryWrapper();
queryRoomWrapper.eq(SRoom::getId, device.getRoomId());
SRoom room = roomMapper.selectOne(queryRoomWrapper);
if (room != null) {
String key = StrUtil.concat(true, MqttConstants.MQTT_REDIS_KEY,
StrUtil.toString(room.getStoreId()));
// 如果成功,先删除当前数据缓存
if (type == 2) {
// 发送成功或者失败超过3次,删除当前消息,发送下一条
redisUtil.getRightPop(key, 10L);
}
// 获取当前需要发送的消息
Object data = redisUtil.getIndex(key, -1);
if (null != data) {
MqttxVo vo = BeanUtil.toBean(data, MqttxVo.class);
if (type == 1) {
// 失败重新发送,
// 更新发送次数,并修改重新加入队列
vo.setRefreshNum(vo.getRefreshNum() + 1);
vo.setSendSmsResult(0);
redisUtil.set(key, -1, vo);
// 重新发送mqtt消息
// 异步执行, 间隔5秒
// 延时执行操作
this.supplyAsync(5L, vo);
} else if (type == 2) {
// 成功,发送下一条消息,发送mqtt消息
LambdaQueryWrapper<Device> query2Wrapper = new LambdaQueryWrapper();
query2Wrapper.eq(Device::getDevId, vo.getDevId());
Device dev = deviceMapper.selectOne(query2Wrapper);
// 先验证相同设备是否6秒内连续发送
long l = 1L;
String s = redisUtil.get(MqttConstants.MQTT_DEVICE_LAST_TIME_KEY + vo.getDevId());
if (StrUtil.isNotEmpty(s) && DeviceType.DEVICE_CCEE.getCode().equals(dev.getDevType())) {
long betweenDay = DateUtil.between(DateUtil.parseDateTime(s), DateUtil.date(), DateUnit.SECOND);
if (betweenDay < 10L) {
l = 10L - betweenDay;
}
}
System.out.println("测试设备消息间隔:"+ l);
// 异步执行
this.supplyAsync(l, vo);
}
}
}
}
}
private void supplyAsync(Long m, MqttxVo vo) {
CompletableFuture.supplyAsync(() -> {
// 延时执行操作
try {
Thread.sleep(m * 1000);
// 获取mqtt的topic、payload
deviceOpService.sendMqtt(vo);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result: devId=" + vo.getDevId();
});
}
}
......@@ -20,12 +20,13 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<result property="remark" column="remark" />
<result property="result" column="result" />
<result property="description" column="description" />
<result property="afreshNum" column="afresh_num" />
</resultMap>
<sql id="selectDeviceLogVo">
select s1.id, s1.dev_mac, s1.dev_id, s1.seq, s1.mqtt_type, s1.mqtt_describe, s1.payload, s1.topic,
s1.create_by, s1.create_time,
s1.update_by, s1.update_time, s1.remark, s1.`result`, s1.description
s1.update_by, s1.update_time, s1.remark, s1.`result`, s1.description, s1.afresh_num
from s_device_log s1
left join s_device s2 on s1.dev_id = s2.dev_id
left join s_room s3 on s2.room_id = s3.id
......@@ -48,7 +49,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
t1.update_time,
t1.remark,
t1.`result`,
t1.description
t1.description, t1.afresh_num
from s_device_log t1
join s_device t2 on t1.dev_id = t2.dev_id
join s_room t3 on t2.room_id = t3.id
......@@ -66,6 +67,26 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</where>
order by create_time desc
</select>
<select id="selectDeviceLogOneBySeqMax" parameterType="String" resultMap="DeviceLogResult">
select
t1.id,
t1.dev_mac,
t1.dev_id,
t1.seq,
t1.mqtt_type,
t1.mqtt_describe,
t1.payload,
t1.topic,
t1.create_by,
t1.create_time,
t1.update_by,
t1.update_time,
t1.remark,
t1.`result`,
t1.description, t1.afresh_num
from s_device_log t1 where seq = #{seq} order by t1.afresh_num desc limit 0,1
</select>
<select id="selectDeviceLogById" parameterType="Long" resultMap="DeviceLogResult">
<include refid="selectDeviceLogVo"/>
......@@ -74,7 +95,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<select id="selectListByMaxId" resultMap="DeviceLogResult">
SELECT t1.id, t1.dev_mac, t1.dev_id, t1.seq, t1.mqtt_type, t1.mqtt_describe, t1.payload, t1.topic,
t1.create_by, t1.create_time, t1.update_by, t1.update_time, t1.remark,
t1.`result`, t1.description
t1.`result`, t1.description, t1.afresh_num
FROM s_device_log t1
JOIN (
SELECT MAX(id) AS max_id
......@@ -106,6 +127,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if test="remark != null">remark,</if>
<if test="result != null">`result`,</if>
<if test="description != null">description,</if>
<if test="afreshNum != null">afresh_num,</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="devMac != null">#{devMac},</if>
......@@ -122,6 +144,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if test="remark != null">#{remark},</if>
<if test="result != null">#{result},</if>
<if test="description != null">#{description},</if>
<if test="afreshNum != null">#{afreshNum},</if>
</trim>
</insert>
......@@ -142,6 +165,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if test="remark != null">remark = #{remark},</if>
<if test="result != null">`result` = #{result},</if>
<if test="description != null">description = #{description},</if>
<if test="afreshNum != null">afresh_num = #{afreshNum},</if>
</trim>
where id = #{id}
</update>
......@@ -151,6 +175,12 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
where seq = #{seq}
</update>
<update id="updateDeviceById" parameterType="DeviceLog">
update s_device_log
set remark = #{remark},`result` = #{result}, update_time = #{updateTime}
where id = #{id}
</update>
<delete id="deleteDeviceLogById" parameterType="Long">
delete from s_device_log where id = #{id}
</delete>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment