这么建设新的网站,免费微网站开发平台,用php做网站的方法,北京户外广告公司排名本文章通过MQ队列来实现秒杀场景 
整体的设计如下图#xff0c;整个流程中对于发送发MQ失败和发送到死信队列的数据未做后续处理 1、首先先创建MQ的配置文件 
Configuration
public class RabbitConfig  {public static final String DEAD_LETTER_EXCHANGE  deadLetterE…本文章通过MQ队列来实现秒杀场景 
整体的设计如下图整个流程中对于发送发MQ失败和发送到死信队列的数据未做后续处理 1、首先先创建MQ的配置文件 
Configuration
public class RabbitConfig  {public static final String DEAD_LETTER_EXCHANGE  deadLetterExchange;public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY  dead.#;public static final String DEAD_LETTER_QUEUEA_NAME  deadQueue;Autowiredprivate RabbitTemplate rabbitTemplate;Autowiredprivate ConnectionFactory connectionFactory;Beanpublic TopicExchange topicExchange(){return new TopicExchange(seckill_topic,true,false);}// 声明死信ExchangeBean(deadLetterExchange)public DirectExchange deadLetterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE);}Bean(seckillQueue)public Queue seckillQueue(){MapString,Object args  new HashMap();args.put(x-dead-letter-exchange, DEAD_LETTER_EXCHANGE);//  x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put(x-dead-letter-routing-key, DEAD_LETTER_QUEUEA_ROUTING_KEY);return QueueBuilder.durable(seckillQueue).withArguments(args).build();}Bean(deadQueue)public Queue binding(){return new Queue(DEAD_LETTER_QUEUEA_NAME);}Beanpublic Binding bindingExchange(){return BindingBuilder.bind(seckillQueue()).to(topicExchange()).with(seckill.#);}// 声明死信队列绑定关系Beanpublic Binding deadLetterBinding(Qualifier(deadQueue) Queue queue,Qualifier(deadLetterExchange) DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);}//配置会覆盖yml的重试次数//RabbitMQ监听容器/*Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory  new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置并发factory.setConcurrentConsumers(1);SimpleMessageListenerContainer snew SimpleMessageListenerContainer();//最大并发factory.setMaxConcurrentConsumers(1);//消息接收——手动确认factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//设置超时factory.setReceiveTimeout(2000L);//设置重试间隔factory.setFailedDeclarationRetryInterval(3000L);//监听自定义格式转换//factory.setMessageConverter(jsonMessageConverter);return factory;}*/
} 
2、配置yml文件 
spring:redis:database: 0host: xxxport: 6379password: xxxtimeout: 60jedis:pool:max-active: 8max-wait: -1max-idle: 8min-idle: 0rabbitmq:username: adminpassword: adminvirtual-host: /host: xxxxport: 12345publisher-confirms: truepublisher-returns: truetemplate:mandatory: truelistener:simple:concurrency: 1max-concurrency: 3# 消费者预取1条数据到内存默认为250条prefetch: 1# 确定机制acknowledge-mode: manualretry:enabled: true #是否支持重试max-attempts: 2# 重试间隔(ms)initial-interval: 5000 
这里有一点需要注意的是在做死信队列的时候如果Config文件中配置了监听容器在yml文件中的一些属性要在容器里面进行配置当时测试重试的时候发现没有在Config文件中配置只在yml文件中配置了重试次数结果会无限期的重试MQ的默认方式就是无限期的重试所以这点很容易踩坑 
3、实现交换机的ACK实现 RabbitTemplate.ConfirmCallback接口 
Component
public class ConfirmCallBackHandler implements RabbitTemplate.ConfirmCallback {Autowiredprivate RabbitMessageMapper rabbitMessageMapper;Autowiredprivate RabbitTemplate rabbitTemplate;//注入//PostConstruct注解会在Component、Autowired注解完成后再执行PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack){RabbitMessage rabbitMessage  new RabbitMessage();rabbitMessage.setUniqueKey(correlationData.getId().toString());rabbitMessage.setSuccessFlag(N);rabbitMessageMapper.updateSuccessFlag(rabbitMessage);System.out.println(失败原因cause);}}
} 
4、实现队列的ACK实现 RabbitTemplate.ReturnCallback 
Component
public class ReturnCallBackHandler implements RabbitTemplate.ReturnCallback {Autowiredprivate RabbitTemplate rabbitTemplate;//注入//PostConstruct注解会在Component、Autowired注解完成后再执行PostConstructpublic void init(){rabbitTemplate.setReturnCallback(this);}Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(消息主体 messagemessage);System.out.println(应答码 replyCode: replyCode);System.out.println(原因描述 replyTextreplyText);System.out.println(交换机 exchangeexchange);System.out.println(消息使用的路由键 routingKeyroutingKey);}
} 
5、消费者方面实现 ChannelAwareMessageListener 接口 
Component
public class AckListener implements ChannelAwareMessageListener {Autowiredprivate RabbitMqService rabbitMqService;RabbitListener(queues  seckillQueue)Overridepublic void onMessage(Message messagex, Channel channel) throws Exception {try {String result  new String(messagex.getBody(),utf-8);rabbitMqService.receive(result);channel.basicAck(messagex.getMessageProperties().getDeliveryTag(), false);}catch (Exception exception){channel.basicNack(messagex.getMessageProperties().getDeliveryTag(), false, false);}}
}