<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
首先在pom檔案裡引入mqtt的依賴設定
<!--mqtt--> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.4</version> </dependency>
其次在springboot 的設定yml檔案,設定mqtt的服務設定
spring: mqtt: url: tcp://127.0.0.1:1883 client-id: niubility-tiger username: password: topic: [/unify/test]
建立 MqttProperties設定引數類
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @Data @ConfigurationProperties("spring.mqtt") public class MqttProperties { private String url; private String clientId; private String username; private String password; private String[] topic; }
建立 MqttConfiguration 設定類
import org.eclipse.paho.client.mqttv3.IMqttClient; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.listener.MqttSubscribeListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableConfigurationProperties({MqttProperties.class}) public class MqttConfiguration { private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class); @Autowired private MqttProperties mqttProperties; public MqttConfiguration() { } @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions connectOptions = new MqttConnectOptions(); connectOptions.setServerURIs(new String[]{this.mqttProperties.getUrl()}); if (Func.isNotBlank(this.mqttProperties.getUrl())) { connectOptions.setUserName(this.mqttProperties.getUsername()); } if (Func.isNotBlank(this.mqttProperties.getPassword())) { connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray()); } connectOptions.setKeepAliveInterval(60); return connectOptions; } @Bean public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException { IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId()); mqttClient.connect(options); for(int i = 0; i< this.mqttProperties.getTopic().length; ++i) { mqttClient.subscribe(this.mqttProperties.getTopic()[i], new MqttSubscribeListener()); } return mqttClient; } }
建立 訂閱事件類
import org.springframework.context.ApplicationEvent; public class UWBMqttSubscribeEvent extends ApplicationEvent { private String topic; public UWBMqttSubscribeEvent(String topic, Object source) { super(source); this.topic = topic; } public String getTopic() { return this.topic; } }
建立訂閱事件監聽器
import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springblade.core.tool.utils.SpringUtil; import org.springblade.ubw.event.UWBMqttSubscribeEvent; public class MqttSubscribeListener implements IMqttMessageListener { @Override public void messageArrived(String s, MqttMessage mqttMessage) { String content = new String(mqttMessage.getPayload()); UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content); SpringUtil.publishEvent(event); } }
建立mqtt訊息事件非同步處理監聽器
import com.baomidou.mybatisplus.core.toolkit.StringPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.config.MqttProperties; import org.springblade.ubw.event.UWBMqttSubscribeEvent; import org.springblade.ubw.service.MqttService; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import javax.annotation.Resource; import java.util.Arrays; import java.util.List; @Configuration public class MqttEventListener { private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class); @Resource private MqttProperties mqttProperties; @Resource private MqttService mqttService; private String processTopic (String topic) { List<String> topics = Arrays.asList(mqttProperties.getTopic()); for (String wild : topics) { wild = wild.replace(StringPool.HASH, StringPool.EMPTY); if (topic.startsWith(wild)) { return topic.replace(wild, StringPool.EMPTY); } } return StringPool.EMPTY; } @Async @EventListener(UWBMqttSubscribeEvent.class) public void listen (UWBMqttSubscribeEvent event) { String topic = processTopic(event.getTopic()); Object source = event.getSource(); if (Func.isEmpty(source)) { return; } mqttService.issue(topic,source); // log.info("mqtt接收到 通道 {} 的資訊為:{}",topic,source); } }
建立MqttService 資料處理服務類
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.area.entity.WorkArea; import org.springblade.ubw.area.entity.WorkSite; import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo; import org.springblade.ubw.area.entity.WorkSitePassInfo; import org.springblade.ubw.area.service.WorkAreaService; import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService; import org.springblade.ubw.area.service.WorkSitePassInfoService; import org.springblade.ubw.area.service.WorkSiteService; import org.springblade.ubw.constant.UbwConstant; import org.springblade.ubw.history.entity.HistoryLocusInfo; import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo; import org.springblade.ubw.history.service.HistoryLocusInfoService; import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService; import org.springblade.ubw.loc.entity.LocStatusInfo; import org.springblade.ubw.loc.entity.LocStatusInfoHistory; import org.springblade.ubw.loc.service.LocStatusInfoHistoryService; import org.springblade.ubw.loc.service.LocStatusInfoService; import org.springblade.ubw.msg.entity.*; import org.springblade.ubw.msg.service.*; import org.springblade.ubw.system.entity.*; import org.springblade.ubw.system.service.*; import org.springblade.ubw.system.wrapper.MqttWrapper; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; import java.util.stream.Collectors; @Service public class MqttService { private static final Logger log = LoggerFactory.getLogger(MqttService.class); @Resource private EmployeeAndDepartmentService employeeAndDepartmentService; @Resource private VehicleInfoService vehicleInfoService; @Resource private WorkSiteService workSiteService; @Resource private LocStatusInfoService locStatusInfoService; @Resource private LocStatusInfoHistoryService locStatusInfoHistoryService; @Resource private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService; @Resource private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService; @Resource private LocSosAlarminfoService locSosAlarminfoService; @Resource private AttendanceInfoService attendanceInfoService; @Resource private HistoryLocusInfoService historyLocusInfoService; @Resource private WorkSitePassInfoService workSitePassInfoService; @Resource private EnvironmentalMonitorInfoService environmentalMonitorInfoService; @Resource private TrAlertService trAlertService; @Resource private AddEvacuateInfoService addEvacuateInfoService; @Resource private CancelEvacuateInfoService cancelEvacuateInfoService; @Resource private WorkSiteNeighbourInfoService workSiteNeighbourInfoService; @Resource private LinkMsgAlarmInfoService linkMsgAlarmInfoService; @Resource private LeaderEmployeeInfoService leaderEmployeeInfoService; @Resource private ElectricMsgInfoService electricMsgInfoService; @Resource private WorkAreaService workAreaService; @Resource private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService; @Resource private SpecialWorksService specialWorksService; @Resource private AttendanceLocusInfoService attendanceLocusInfoService; @Resource private WorkTypeService workTypeService; @Resource private OfficePositionService officePositionService; @Resource private ClassTeamService classTeamService; /** * 方法描述: 訊息分發 * * @param topic * @param source * @author liwenbin * @date 2021年12月14日 14:14:09 */ public void issue(String topic,Object source){ switch(topic){ case UbwConstant.TOPIC_EMP : //人員和部門資訊 employeeAndDepartmentService.saveBatch(source); break; case UbwConstant.TOPIC_VEHICLE : //車輛資訊 List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source,new VehicleInfo()); vehicleInfoService.deleteAll(); vehicleInfoService.saveBatch(vehicleInfos); break; case UbwConstant.TOPIC_WORK_SITE : //基站資訊 List<WorkSite> workSites = MqttWrapper.build().toEntityList(source,new WorkSite()); workSiteService.deleteAll(); workSiteService.saveBatch(workSites); break; case UbwConstant.TOPIC_LOC_STATUS: //井下車輛人員實時 List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source,new LocStatusInfo()); if (Func.isEmpty(locStatusInfos)){ break; } locStatusInfoService.deleteAll(); //篩選入井人員列表 List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1).collect(Collectors.toList()); locStatusInfoService.saveBatch(inWellList); //人員歷史資料入庫 List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source,new LocStatusInfoHistory()); locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys); break; case UbwConstant.TOPIC_LOC_OVER_TIME: //超時報警資訊 List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocOverTimeSosAlarminfo()); locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos); break; case UbwConstant.TOPIC_LOC_OVER_AREA: //超員報警資訊 List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocAreaOverSosAlarminfo()); locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos); break; case UbwConstant.TOPIC_LOC_SOS: //求救報警資訊 List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocSosAlarminfo()); locSosAlarminfoService.saveBatch(locSosAlarmInfos); break; case UbwConstant.TOPIC_ATTEND: //考勤資訊 List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source,new AttendanceInfo()); attendanceInfoService.saveBatch(attendanceInfos); break; case UbwConstant.TOPIC_HISTORY_LOCUS: //精確軌跡資訊 List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source,new HistoryLocusInfo()); historyLocusInfoService.saveBatch(historyLocusInfos); break; case UbwConstant.TOPIC_WORK_SITE_PASS: //基站經過資訊 List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source,new WorkSitePassInfo()); workSitePassInfoService.saveBatch(workSitePassInfos); break; case UbwConstant.TOPIC_ENV_MON: //環境監測資訊 List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source,new EnvironmentalMonitorInfo()); environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos); break; case UbwConstant.TOPIC_TR_ALERT: //環境監測報警資訊 List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source,new TrAlert()); trAlertService.saveBatch(trAlerts); break; case UbwConstant.TOPIC_ADD_EVA: //下發撤離資訊 List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source,new AddEvacuateInfo()); addEvacuateInfoService.saveBatch(addEvacuateInfos); break; case UbwConstant.TOPIC_CANCEL_EVA: //取消撤離資訊 List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source,new CancelEvacuateInfo()); cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos); break; case UbwConstant.TOPIC_WORK_SITE_NEI: //相鄰基站關係資訊 workSiteNeighbourInfoService.deleteAll(); List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source,new WorkSiteNeighbourInfo()); workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos); break; case UbwConstant.TOPIC_LINK_MSG: //基站鏈路資訊 linkMsgAlarmInfoService.deleteAll(); List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source,new LinkMsgAlarmInfo()); linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos); break; case UbwConstant.TOPIC_LEADER_EMP: //帶班領導資訊 leaderEmployeeInfoService.deleteAll(); List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source,new LeaderEmployeeInfo()); leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos); break; case UbwConstant.TOPIC_ELE_MSG: //低電報警資訊 List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source,new ElectricMsgInfo()); electricMsgInfoService.saveBatch(electricMsgInfos); break; case UbwConstant.TOPIC_WORK_AREA: //區域資訊 workAreaService.deleteAll(); List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source,new WorkArea()); workAreaService.saveBatch(workAreas); break; case UbwConstant.TOPIC_HIS_OVER_TIME_SOS: //歷史超時報警資訊 List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new HistoryOverTimeSosAlarmInfo()); historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos); break; case UbwConstant.TOPIC_SPECIAL_WORK: //特種人員預設線路資訊 specialWorksService.deleteAll(); List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source,new SpecialWorks()); specialWorksService.saveBatch(specialWorks); break; case UbwConstant.TOPIC_ATTEND_LOC: //歷史考勤軌跡資訊 List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source,new AttendanceLocusInfo()); attendanceLocusInfoService.saveBatch(attendanceLocusInfos); break; case UbwConstant.TOPIC_WORK_TYPE: //工種資訊 workTypeService.deleteAll(); List<WorkType> workTypes = MqttWrapper.build().toEntityList(source,new WorkType()); workTypeService.saveBatch(workTypes); break; case UbwConstant.TOPIC_OFFICE_POS: //職務資訊 officePositionService.deleteAll(); List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source,new OfficePosition()); officePositionService.saveBatch(officePositions); break; case UbwConstant.TOPIC_CLASS_TEAM: //班組資訊 classTeamService.deleteAll(); List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source,new ClassTeam()); classTeamService.saveBatch(classTeams); break; default : //可選 break; } } }
完結,小夥伴們,可以根據這個demo 改造自己的mqtt服務處理!!!
以上就是Springboot整合mqtt服務的範例程式碼的詳細內容,更多關於Springboot整合mqtt的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45