.setContent(smsContent);
smsMessageVo.setCreateDate(new Date());
smsMessageVo.setStatus((byte) SmsSendStatus.TO_SEND.getType());
smsMessageVo.setTemplateId(1);
//先把待發送的短信存入數據庫
SmsQueue smsQueue = new SmsQueue();
BeanUtils.買粉絲pyProperties(smsQueue, smsMessageVo);
smsQueueService.addSmsQueue(smsQueue);
//異步發送短信到redis隊列
sendMessage.sendMessage(Constants.REDIS_QUEUE_SMS_WEB, smsMessageVo);
//Constants.REDIS_QUEUE_SMS_WEB = "sms_queue_web_online",和applicationContext-redis中topic配置一樣123456789101112131415161718192021
4.監聽消息
//SmsMessageDelegateListener.java@Component("smsMessageDelegateListener")public class SmsMessageDelegateListener {
@Autowired
private SmsQueueService smsQueueService; //監聽Redis消息
public void handleMessage(Serializable message){ if(message instanceof SmsMessageVo){
SmsMessageVo messageVo = (SmsMessageVo) message; //發送短信
SmsSender smsSender = SmsSenderFactory.buildEMaySender();
smsSender.setMobile(messageVo.getMobile());
smsSender.setContent(messageVo.getContent()); boolean sendSucc = false; //判斷短信類型
//驗證碼短信
if(messageVo.getType() == (byte)SmsType.VERIFICATION.getType()){
sendSucc = smsSender.send();
} if(!sendSucc){ return;
} // 異步更新短信表狀態為發送成功
final Integer smsId = messageVo.getSmsId();
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable() { public void run() {
SmsQueue smsQueue = new SmsQueue();
smsQueue.setSmsId(smsId);
smsQueue.setStatus((byte)SmsSendStatus.SEND.getType());
smsQueue.setProcessTime(new Date());
smsQueueService.updateSmsQueue(smsQueue);
}
});
}
}
}123456789101112131415161718192021222324252627282930313233343536373839404142434445
4.總結
下面使用一張流程圖,來總結Redis消息隊列和短信服務。
閱讀全文
1、有效期:限時活動、拼團。。。
2、超時處理:取消超時未支付訂單、超時自動確認收貨。。。
4、重試:網絡異常重試、打車派單、依賴條件未滿足重試。。。
5、定時任務:智能設備定時啟動。。。
1、RabbitMQ
1)簡介:基于AMQP協議,使用Erlang編寫,實現了一個Broker框架
a、Broker:接收和分發消息的代理服務器
b、Virtual Host:虛擬主機之間相互隔離,可理解為一個虛擬主機對應一個消息服務
c、Exchange:交換機,消息發送到指定虛擬機的交換機上
d、Binding:交換機與隊列綁定,并通過路由策略和routingKey將消息投遞到一個或多個隊列中
e、Queue:存放消息的隊列,FIFO,可持久化
f、Channel:信道,消費者通過信道消費消息,一個TCP連接上可同時創建成百上千個信道,作為消息隔離
2)延遲隊列實現:RabbitMQ的延遲隊列基于消息的存活時間TTL(Time To Live)和死信交換機DLE(Dead Letter Exchanges)實現
a、TTL:RabbitMQ支持對隊列和消息各自設置存活時間,取二者中較小的值,即隊列無消費者連接或消息在隊列中一直未被消費的過期時間
b、DLE:過期的消息通過綁定的死信交換機,路由到指定的死信隊列,消費者實際上消費的是死信隊列上的消息
3)缺點:
a、配置麻煩,額外增加一個死信交換機和一個死信隊列的配置
b、脆弱性,配置錯誤或者生產者消費者連接的隊列錯誤都有可能造成延遲失效
2、RocketMQ
1)簡介:來源于阿里,目前為Apache頂級開源項目,使用Java編寫,基于長輪詢的拉取方式,支持事務消息,并解決了順序消息和海量堆積的問題
a、Broker:存放Topic并根據讀取Procer的提交日志,將邏輯上的一個Topic分多個Queue存儲,每個Queue上存儲消息在提交日志上的位置
b、Name Server:無狀態的節點,維護Topic與Broker的對應關系以及Broker的主從關系
2)延遲隊列實現:RocketMQ發送延時消息時先把消息按照延遲時間段發送到指定的隊列中(rocketmq把每種延遲時間段的消息都存放到同一個隊列中),然后通過一個定時器進行輪訓這些隊列,查看消息是否到期,如果到期就把這個消息發送到指定topic的隊列中
3)缺點:延遲時間粒度受限制(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)
3、Kafka
1)簡介:來源于Linkedin,目前為Apache頂級開源項目,使用Scala和Java編寫,基于zookeeper協調的分布式、流處理的日志系統,升級版為Jafka
2)延遲隊列實現:Kafka支持延時生產、延時拉取、延時刪除等,其基于時間輪和JDK的DelayQueue實現
a、時間輪(TimingWheel):是一個存儲定時任務的環形隊列,底層采用數組實現,數組中的每個元素可以存放一個定時任務列表
b、定時任務列表(TimerTaskList):是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項
c、定時任務項(TimerTaskEntry):封裝了真正的定時任務TimerTask
d、層級時間輪:當任務的到期時間超過了當前時間輪所表示的時間范圍時,就會嘗試添加到上層時間輪中,類似于鐘表就是一個三級時間輪
e、JDK DelayQueue:存儲TimerTaskList,并根據其expiration來推進時間輪的時間,每推進一次除執行相應任務列表外,層級時間輪也會進行相應調整
3)缺點:
a、延遲精度取決于時間格設置
b、延遲任務除由超時觸發還可能被外部事件觸發而執行
4、ActiveMQ
1)簡介:基于JMS協議,Java編寫的Apache頂級開源項目,支持點對點和發布訂閱兩種模式。
a、點對點(point-to-point):消息發送到指定的隊列,每條消息只有一個消費者能夠消費,基于拉模型
b、發布訂閱(publish/subscribe):消息發送到主題Topic上,每條消息會被訂閱該Topic的所有消費者各自消費,基于推模型
2)延遲隊列實現:需要延遲的消息會先存儲在JobStore中,通過異步線程任務JobScheler將到達投遞時間的消息投遞到相應隊列上
a、Broker Filter:Broker中定義了一系列BrokerFilter的子類構成攔截器鏈,按順序對消息進行相應處理
b、ScheleBroker:當消息中指定了延遲相關屬性,并且jobId為空時,會生成調度任務存儲到JobStore中,此時消息不會進入到隊列
c、JobStore:基于BTree存儲,key為任務執行的時間戳,