首頁 > 軟體

Springboot整合RabbitMQ實現傳送驗證碼的範例程式碼

2022-02-10 13:01:01

1. RabbitMQ的介紹

  • MQ全稱為Message Queue,即訊息佇列, RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高階訊息佇列協定)協定實現的訊息佇列,它是一種應用程式之間的通訊方法,訊息佇列在分散式系統開 發中應用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com/
  • 開發中訊息佇列通常有如下應用場景:

1、任務非同步處理。 將不需要同步處理的並且耗時長的操作由訊息佇列通知訊息接收方進行非同步處理。提高了應用程式的響應時間。

2、應用程式解耦合 MQ相當於一箇中介,生產方通過MQ與消費方互動,它將應用程式進行解耦合。並且有如下優點。

1.使得簡單,功能強大。

2.基於AMQP協定。

3.社群活躍,檔案完善。

4.高並行效能好,這主要得益於Erlang語言。

5.Spring Boot預設已整合RabbitMQ

  • 組成部分說明如下:
  • Broker:訊息佇列服務程序,此程序包括兩個部分:
  • Exchange和Queue。
  • Exchange:訊息佇列交換機,按一定的規則將訊息路由轉發到某個佇列,對訊息進行過慮。

Queue:訊息佇列,儲存訊息的佇列,訊息到達佇列並轉發給指定的消費方。

Producer:訊息生產者,即生產方使用者端,生產方使用者端將訊息傳送到MQ。

Consumer:訊息消費者,即消費方使用者端,接收MQ轉發的訊息。

訊息釋出接收流程:

-----傳送訊息-----

1、生產者和Broker建立TCP連線。

2、生產者和Broker建立通道。

3、生產者通過通道訊息傳送給Broker,由Exchange將訊息進行轉發。

4、Exchange將訊息轉發到指定的Queue(佇列)

----接收訊息-----

1、消費者和Broker建立TCP連線

2、消費者和Broker建立通道

3、消費者監聽指定的Queue(佇列)

4、當有訊息到達Queue時Broker預設將訊息推播給消費者。

5、消費者接收到訊息

2. 搭建環境

  • 實現原理:
  • 在繫結(Binding)Exchange與Queue的同時,一般會指定一個binding key;消費者將訊息傳送給Exchange時,一般會指定一個routing key;當binding key與routing key相匹配時,訊息將會被路由到對應的Queue中。
  • 多個消費者可以訂閱同一個Queue,這時Queue中的訊息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的訊息並處理。

2.1引入jar包

 <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
         <!--redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2.2生產者設定

2.2.1Rabbit設定類

package com.cui.user.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** rabbitmq設定類   設定交換機,訊息佇列,並且繫結交換機和queue
 * @Author  Cui
 * @Date 2020-4-9 14:55
 */
@Configuration
public class RabbitmqConfig {
	//佇列bean的名稱  cms  用來傳送簡訊驗證碼
	public static final String QUEUE_INFORM_CMS= "queue_inform_cms";
	//佇列bean的名稱  email  用來傳送郵件
	//public static final String QUEUE_INFORM_EMAIL= "queue_inform_email";
	//交換機的名稱
	public static final String EXCHANGE_TOPIC_INFORM_="exchange_topic_inform";
	//佇列的名稱
	@Value("${cxp.mq.queue}")
	public  String queue_cms_postpage_name;
	//routingKey
	@Value("${cxp.mq.routingKey}")
	public  String routingKey;
	/**
	 * 交換機設定使用direct型別
	 * @return the exchange
	 */
	@Bean(EXCHANGE_TOPIC_INFORM_)
	public Exchange EXCHANGE_TOPICS_INFORM() {
		//durable(true) 持久化,mq重啟之後交換機還在
		return ExchangeBuilder.directExchange(EXCHANGE_TOPIC_INFORM_).durable(true).build();
	}
	//宣告佇列
	@Bean(QUEUE_INFORM_CMS)
	public Queue QUEUE_CMS_POSTPAGE() {
		Queue queue = new Queue(QUEUE_INFORM_CMS);
		return queue;
	 * 繫結佇列到交換機
	 *
	 * @param queue    the queue
	 * @param exchange the exchange
	 * @return the binding
	@Bean
	public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_CMS) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM_) Exchange exchange) {
		return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
}

2.2.2 application.yml檔案設定

server:
  port: ${PORT:8002}
spring:
  application:
    name: cxp-service-manage-user

#Redis設定
  redis:
    host: 127.0.0.1
    port: 6379
    jedis:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 500
        min-idle: 0
    lettuce:
      shutdown-timeout: 0
  datasource:
    url: jdbc:mysql://localhost:3306/system_user?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
    username: root
    password: 123456
    driver-class-name: com.mysql.jdbc.Driver
  rabbitmq:
    port: 5672
    username: guest
    password: guest
    virtualHost: /
cxp:
  mq:
    #cms使用者端監控的佇列名稱(不同的使用者端監控的佇列不能重複)
    queue: queue_inform_cms
    routingKey: inform.#.sms.#	#此routingKey郵件消費者和資訊消費者通用
mybatis:
  mapper-locations: classpath:mapper/*Mapper.xml
  type-aliases-package: com.cui.model.entity.user
mapper:
  mappers: com.cui.model.BaseMapper #通用基礎類別設定
  identity: mysql
pagehelper:
  helperDialect: mysql
  reasonable: true
  supportMethodsArguments: true
  params: count=countSql
eureka:
  client:
    registerWithEureka: true #服務註冊開關
    fetchRegistry: true #服務發現開關
    serviceUrl: #Eureka使用者端與Eureka伺服器端進行互動的地址,多箇中間用逗號分隔
      defaultZone: ${EUREKA_SERVER:http://localhost:50101/eureka/,http://localhost:50102/eureka/}
  instance:
    prefer-ip-address:  true  #將自己的ip地址註冊到Eureka服務中
    ip-address: ${IP_ADDRESS:127.0.0.1}
    instance-id: ${spring.application.name}:${server.port} #指定範例id
ribbon:
  MaxAutoRetries: 2 #最大重試次數,當Eureka中可以找到服務,但是服務連不上時將會重試,如果eureka中找不到服務則直接走斷路器
  MaxAutoRetriesNextServer: 3 #切換範例的重試次數
  OkToRetryOnAllOperations: false  #對所有操作請求都進行重試,如果是get則可以,如果是post,put等操作沒有實現冪等的情況下是很危險的,所以設定為false
  ConnectTimeout: 5000  #請求連線的超時時間
  ReadTimeout: 6000 #請求處理的超時時間

2.3消費者設定

引入jar包,這裡需引入阿里雲通訊多的jar包和Redis的jar包

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--阿里雲通訊-->
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.4.0</version>
            <artifactId>aliyun-java-sdk-dysmsapi</artifactId>
            <version>1.0.0</version>

        <!-- 匯入Eureka使用者端的依賴 -->
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        <!-- feign相關依賴  -->
            <artifactId>spring-cloud-starter-openfeign</artifactId>
          <dependency>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <artifactId>spring-boot-starter-amqp</artifactId>

2.3.1 消費者設定類(同生產者)

package com.cui.sms.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** rabbitmq設定類   設定交換機,訊息佇列,並且繫結交換機和queue
 * @Author Cui
 * @Date 2020-4-9 14:55
 */
@Configuration
public class RabbitmqConfig {
	//佇列bean的名稱  cms  用來傳送簡訊驗證碼
	public static final String QUEUE_INFORM_CMS= "queue_inform_cms";
	//佇列bean的名稱  email  用來傳送郵件
	//public static final String QUEUE_INFORM_EMAIL= "queue_inform_email";
	//交換機的名稱
	public static final String EXCHANGE_TOPIC_INFORM_="exchange_topic_inform";
	//佇列的名稱
	@Value("${cxp.mq.queue}")
	public  String queue_cms_postpage_name;
	//routingKey
	@Value("${cxp.mq.routingKey}")
	public  String routingKey;
	/**
	 * 交換機設定使用direct型別
	 * @return the exchange
	 */
	@Bean(EXCHANGE_TOPIC_INFORM_)
	public Exchange EXCHANGE_TOPICS_INFORM() {
		//durable(true) 持久化,mq重啟之後交換機還在
		return ExchangeBuilder.directExchange(EXCHANGE_TOPIC_INFORM_).durable(true).build();
	}
	//宣告佇列
	@Bean(QUEUE_INFORM_CMS)
	public Queue QUEUE_CMS_POSTPAGE() {
		Queue queue = new Queue(QUEUE_INFORM_CMS);
		return queue;
	 * 繫結佇列到交換機
	 *
	 * @param queue    the queue
	 * @param exchange the exchange
	 * @return the binding
	@Bean
	public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_CMS) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM_) Exchange exchange) {
		return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
}

2.3.2 application.yml檔案設定

server:
  port: 8103
spring:
  application:
    name: cxp-manager-service-sms
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualHost: /

#Redis設定
  redis:
    port: 6379
    password: 123456
    jedis:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 500
        min-idle: 0
    lettuce:
      shutdown-timeout: 0
aliyun:
  sms:
    accessKeyId: XXXXXXXXXXXXXXXXXXXX
    accessKeySecret: XXXXXXXXXXXXXXXXXXXX
    template_code: XXXXXXXXXXX
    sign_name: XXXX
cxp:
  mq:
    #cms使用者端監控的佇列名稱(不同的使用者端監控的佇列不能重複)
    queue: queue_inform_cms
    routingKey: inform.sms	#此routingKey用來監聽資訊
eureka:
  client:
    registerWithEureka: true #服務註冊開關
    fetchRegistry: true #服務發現開關
    serviceUrl: #Eureka使用者端與Eureka伺服器端進行互動的地址,多箇中間用逗號分隔
      defaultZone: ${EUREKA_SERVER:http://localhost:50101/eureka/,http://localhost:50102/eureka/}
  instance:
    prefer-ip-address:  true  #將自己的ip地址註冊到Eureka服務中
    ip-address: ${IP_ADDRESS:127.0.0.1}
    instance-id: ${spring.application.name}:${server.port} #指定範例id
ribbon:
  MaxAutoRetries: 2 #最大重試次數,當Eureka中可以找到服務,但是服務連不上時將會重試,如果eureka中找不到服務則直接走斷路器
  MaxAutoRetriesNextServer: 3 #切換範例的重試次數
  OkToRetryOnAllOperations: false  #對所有操作請求都進行重試,如果是get則可以,如果是post,put等操作沒有實現冪等的情況下是很危險的,所以設定為false
  ConnectTimeout: 5000  #請求連線的超時時間
  ReadTimeout: 6000 #請求處理的超時時間

3.寫傳送簡訊驗證碼的程式碼

3.1寫一個controller來呼叫傳送驗證碼的介面

/**
	 *  傳送簡訊驗證碼
	 * @param phone
	 * @return
	 */
	@ApiOperation(value = "傳送簡訊驗證碼",notes = "傳送簡訊驗證碼")
	@GetMapping("/sendSms")
	public ResponseResult sendSms(String phone){
		LOGGER.info("要傳送的手機號為:{}", phone);
		userService.sendSms(phone);
		return new ResponseResult(UserMsg.SUCCESS.getMsgCd(), UserMsg.SUCCESS.getMsgInfo());
	}

3.2 生成驗證碼

後臺生成六位數的隨機驗證碼,並且將驗證碼存入Redis中,設定五分鐘的過期時間(用於使用者註冊時的校對),將驗證碼存到RabbitMQ中,當呼叫傳送介面時,生產端將資訊傳送到繫結的佇列中。

/**
	 * 向註冊使用者傳送傳送驗證碼
	 * @param phone  註冊的使用者的手機號
	 */
	@Override
	public void sendSms(String phone) {
		//1.生成六位隨機驗證碼
		Random random = new Random();//隨機函數
		int code = random.nextInt(999999);//設定亂數的最大值
		if(code<100000){  //如果驗證碼小於六位數,加100000保證驗證碼為6位數
			code+=100000;
		}
		//System.out.println("簡訊驗證碼:"+code);
		LOGGER.info("生成的簡訊驗證碼為:{{}}", code);
		//2.將驗證碼存入redis
		redisTemplate.boundValueOps("code_"+phone).set(code+"");
		redisTemplate.boundValueOps("code_"+phone).expire(5, TimeUnit.MINUTES);//設定驗證碼五分鐘到期
		//3.將驗證碼存入RabbitMQ
		Map<String,String> map = new HashMap<String, String>();
		map.put("phone", phone);
		map.put("code", code+"");
		//以json格式存到RabbitMQ訊息佇列中
		rabbitTemplate.convertAndSend(EXCHANGE_TOPIC_INFORM_, routingKey, JSON.toJSONString(map));
	}

3.3傳送簡訊驗證碼

在RabbitMQ的消費者端監聽簡訊的routingKey ,當收到生產端發來的訊息後,便會呼叫阿里雲通訊向用戶傳送簡訊

package com.cui.sms.mq;

import com.alibaba.fastjson.JSON;
import com.aliyuncs.CommonResponse;
import com.cui.sms.utils.SmsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @Author Cui
 * @Date 2020-4-9 15:40
 * 監聽MQ,傳送簡訊驗證碼
 */
@Component
public class SmsMessageConsumer {
	private static final Logger LOGGER = LoggerFactory.getLogger(SmsMessageConsumer.class);
	@Autowired
	private SmsUtil smsUtil;
	@Value("${aliyun.sms.template_code}")
	private String templateCode;
	@Value("${aliyun.sms.param}")
	private String param;  //簡訊引數
	@RabbitListener(queues = {"${cxp.mq.queue}"})
	public void onMessage(Message message) {
		String jsonString= new String(message.getBody());//得到mq中存入的json格式的訊息
		Map<String,String> map = JSON.parseObject(jsonString, Map.class);//將json格式轉換為Map格式
		String phone = map.get("phone");//mq中存入的手機號
		String code = map.get("code");//mq中存入的驗證碼
		//System.out.println("手機號"+phone+"驗證碼"+code);
		LOGGER.info("傳送的手機號為:{} ,傳送的驗證碼為 :{}",phone, code);
		//呼叫阿里雲通訊
		CommonResponse commonResponse = smsUtil.sendSms(phone, templateCode, param.replace("[value]", code));
	}
}

3.4 實現驗證碼的校對

使用者收到驗證碼並且填寫完相應的資訊後,點選註冊,將自己的資訊傳送到後臺,後臺收到資訊後,取出存在Redis中的驗證碼,和使用者的驗證碼進行比較,然後將結果返回給前端。程式碼如下所示:

@PostMapping("/save")
	@ApiOperation(value = "新增使用者",notes = "新增使用者")
	public ResponseResult add(@RequestBody User user, String smsCode){
		LOGGER.info("新增的使用者的資訊為:{},使用者收到的驗證碼為:{}", user.toString(),smsCode);
		//對使用者密碼進行加密後在存入資料庫
		BCryptPasswordEncoder encoder = new BCryptPasswordEncoder();
		String newPassword = encoder.encode(user.getPassword());
		user.setPassword(newPassword);
		userService.add(user,smsCode );
		return new ResponseResult(UserMsg.SUCCESS.getMsgCd(), UserMsg.SUCCESS.getMsgInfo());
	}
/**
	 * 使用者註冊
	 * @param user  使用者物件資訊
	 * @param smsCode  簡訊驗證碼
	 */
	@Override
	public void add(User user, String smsCode) {
		//獲取系統驗證碼
		String sysCode = (String) redisTemplate.boundValueOps("code_" + user.getPhone()).get();
		//比較簡訊驗證碼
		LOGGER.info("從Redis中取到的簡訊驗證碼為:{{}}",smsCode+"  使用者收到的的簡訊驗證碼為:{{}}",smsCode);
		if(sysCode==null||"".equals(smsCode)){
			throw new RuntimeException("驗證碼未傳送或已過期!請稍後重試");
		}
		if(!smsCode.equals(sysCode)){
			throw new RuntimeException("驗證碼不正確,請重新輸入!");
		}
		if(user.getUsername()==null){
			user.setUsername(user.getPhone());
		}
		User searchUser = new User();
		//將使用者傳來的手機號傳給searchUser,去查詢資料庫中是否存在該手機號
		searchUser.setPhone(user.getPhone());
		if(userDao.selectCount(searchUser)>0){
			throw  new RuntimeException("該手機號已被註冊!");
		}
		//設定user的其他引數
		user.setCreated(new Date());
		user.setUpdated(new Date());
		user.setPoints(0);//積分初始值為0
		user.setStatus("1");//狀態1
		user.setIsEmailCheck("0");//郵箱認證
		user.setIsMobileCheck("1");//手機認證
		userDao.insert(user);
	}

到此這篇關於Springboot整合RabbitMQ實現傳送驗證碼的功能的文章就介紹到這了,更多相關Springboot整合RabbitMQ傳送驗證碼內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com