国外的做外包项目的网站,windows优化软件,无锡建设工程招标网,山西 网站制作这里介绍的源码主要是涉及springboot框架下的rabbitmq客户端代码#xff08;具体在springframework.amqp.rabbit包下#xff0c;区分一下不由springboot直接接管的spring-rabbit的内容#xff09;#xff0c;springboot基于RabbitMQ的Java客户端建立了简便易用的框架。 
sp…这里介绍的源码主要是涉及springboot框架下的rabbitmq客户端代码具体在springframework.amqp.rabbit包下区分一下不由springboot直接接管的spring-rabbit的内容springboot基于RabbitMQ的Java客户端建立了简便易用的框架。 
springboot的框架下相对更多地使用消费者Consumer和监听器Listener的概念这两个概念不注意区分容易混淆。默认情况下springboot中消费者为单线程串行消费的模型体现了队列的特性。 在springboot的框架下使用rabbitmq的一般步骤 
启动rabbitmq服务器springboot项目引入依赖配置信息有两种方式 配置文件配置配置类配置SimpleMessageListenerContainer 实现消息处理类ChannelAwareMessageListener处理业务逻辑或用RabbitListener注解 
这两种方式其实异曲同工RabbitListener的方式在实际使用时创建MessagingMessageListenerAdapter这个对象是ChannelAwareMessageListener接口的实现类实现了onMessage()方法这个方法利用了适配器模式能够调用注解标注的方法而实现ChannelAwareMessageListener的方式比较直白就是实现onMessage()方法 源码解析 
关于SimpleMessageListenerContainer 
SimpleMessageListenerContainer是在spring项目中使用RabbitMQ关键的类用来接收并处理消息的。阅读源码可以从这个类入手。 首先关注构造器需要传入ConnectionFactory用于获取连接这跟原生rabbitmq是一致的都从Connection连接开始。  关键属性 concurrentConsumers指定要创建的并发消费者的数量。默认值为1。建议增加并发使用者的数量以便扩展从队列传入的消息的消耗。但是请注意一旦注册了多个消费者将无法保证顺序。一般来说对于低容量队列坚持使用1个消费者。同时不能超过maxConcurrentConsumers(如果设置了)。 maxConcurrentConsumers设置消费者数量的上限。默认为concurrentConsumers。消费者可以根据需求增加但不会小于concurrentConsumers。 acknowledgeMode消息确认模式 // 自动确认消息
container.setAcknowledgeMode(AcknowledgeMode.NONE);
// 根据情况确认消息
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);绑定组件  设置消费者的Consumer_tag和Argumentscontainer.setConsumerTagStrategy可以设置消费者的 Consumer_tag container.setConsumerArguments可以设置消费者的 Arguments container.setConsumerTagStrategy(queue - order_queue_(count));
//设置消费者的Arguments
MapString, Object args  new HashMap();
args.put(module,订单模块);
args.put(fun,发送消息);
container.setConsumerArguments(args);spring的亮点在于用注解简化了很多代码操作其中最常用的当属RabbitListener 
RabbitListener(queues  {BiMqConstant.BI_QUEUE_NAME}, ackMode  MANUAL)
public void receiveMessage(String message, Channel channel, Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
}从RabbitListener入手 
1、从spring开启RabbitMQ的注解模式EnableRabbit导入RabbitBootstrapConfiguration配置类。 
2、这个配置类定义了RabbitListenerAnnotationBeanPostProcessor和RabbitListenerEndpointRegistry两个bean。前者用来扫描加了RabbitListener 的类通过反射找到带注解的类再找到对应的方法存为handlerMethods。后者在注册终端后用于构建ListenerContainer继承了RabbitListener注解内的信息包括监听的队列和注解所在的类和方法。 
Target(ElementType.TYPE)
Retention(RetentionPolicy.RUNTIME)
Documented
Import(RabbitBootstrapConfiguration.class)
public interface EnableRabbit {
}3、RabbitListenerEndpointRegistry通过创建MethodRabbitListenerEndpoint对象和SimpleRabbitListenerContainerFactory工厂bean生成SimpleMessageListenerContainer对象。 
RabbitListenerAnnotationBeanPostProcessor中拥有注解信息如队列名以及被标注注解的方法所以endpoint的注册还是在processor类中 
processor中有注册员成员变量registrar的registerEndpoint()注册endpointregistrar有注册处registry成员变量注册利用registerListenerContainer()的createListenerContainer()注册container 
public class RabbitListenerEndpointRegistry  implements SmartLifecycle{private final MapString, MessageListenerContainer listenerContainers new ConcurrentHashMapString, MessageListenerContainer();//注册终端public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory? factory,boolean startImmediately) {String id  endpoint.getId();synchronized (this.listenerContainers) {//创建 listenerContainerMessageListenerContainer container  createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);……if (startImmediately) {startIfNecessary(container);}}}protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory? factory) {//调用RabbitListener容器工厂的createListenerContainer方法获取RabbitListener容器MessageListenerContainer listenerContainer  factory.createListenerContainer(endpoint);return listenerContainer;}4、SimpleMessageListenerContainer对象保存了要监听的队列名可以是configuration时set的也可以是RabbitListener中标注的创建了用于处理消息的MessagingMessageListenerAdapter实例实际上是一个listener 
public class MethodRabbitListenerEndpoint extends AbstractRabbitListenerEndpoint {......Overrideprotected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {Assert.state(this.messageHandlerMethodFactory ! null,Could not create message listener - MessageHandlerMethodFactory not set);MessagingMessageListenerAdapter messageListener  createMessageListenerInstance();messageListener.setHandlerMethod(configureListenerAdapter(messageListener));String replyToAddress  getDefaultReplyToAddress();if (replyToAddress ! null) {messageListener.setResponseAddress(replyToAddress);}MessageConverter messageConverter  container.getMessageConverter();if (messageConverter ! null) {messageListener.setMessageConverter(messageConverter);}if (getBeanResolver() ! null) {messageListener.setBeanResolver(getBeanResolver());}return messageListener;}protected MessagingMessageListenerAdapter createMessageListenerInstance() {return new MessagingMessageListenerAdapter(this.bean, this.method);}......
}5、SimpleMessageListenerContainer的内部类AsyncMessageProcessingConsumer区分该类封装了BlockingQueueConsumer由于该类实现了Runnable接口可以视为一个线程任务放入线程池中执行有一个run()方法调用了receiveAndExecute()这个方法会获取BlockingQueueConsumer阻塞读取其消息一次获取多条完成消息读取。 
6、接着调用listener进行消息处理这里设置了代理最终会执行actualInvokeListener所谓实际被执行的listener溯源最终调用了listener.onMessage(message, channelToUse)。 
SimpleMessageListenerContainer {//接受并执行private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {//do接受并执行return doReceiveAndExecute(consumer);}//do接受并执行private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable {Channel channel  consumer.getChannel();for (int i  0; i  this.txSize; i) {//txSize为一次事务接受的消息个数//读取消息这里阻塞的但是有一个超时时间。Message message  consumer.nextMessage(this.receiveTimeout);if (message  null) {//阻塞超时break;}try {executeListener(channel, message);//消息接收已完成现在开始处理消息。}catch (Exception e) {}}return consumer.commitIfNecessary(isChannelLocallyTransacted());}//处理消息开始。该方法在其父类中protected void executeListener(Channel channel, Message messageIn) throws Exception {try {Message message  messageIn;if (……) {//批处理信息这个不研究}else {invokeListener(channel, message);}}catch (Exception ex) {}}//在其父类中protected void invokeListener(Channel channel, Message message) throws Exception {//这里this.proxy.invokeListener最终会调用actualInvokeListener方法。this.proxy.invokeListener(channel, message);}//在其父类中protected void actualInvokeListener(Channel channel, Message message) throws Exception {Object listener  getMessageListener();if (listener instanceof ChannelAwareMessageListener) {doInvokeListener((ChannelAwareMessageListener) listener, channel, message);}else if (listener instanceof MessageListener) {//……doInvokeListener((MessageListener) listener, message)}else{//……}}    protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message)throws Exception {Channel channelToUse  channel;try {listener.onMessage(message, channelToUse);}catch (Exception e) {throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);}}
}7、关于第6点根据这个listener实例的不同有两种处理方式 
如果是前面所说的实现ChannelAwareMessageListener就直接调用实现类的onMessage()。 
如果是RabbitListener注解不同在于MessagingMessageListenerAdapterChannelAwareMessageListener的实现类也是listen基于适配器模式持有RabbitListener注解的对象和方法adapter实例中有HandlerMethod属性加入到adapter类中HandlerMethod调用invoke()就能执行注解标注的方法。 
public class HandlerAdapter {private final InvocableHandlerMethod invokerHandlerMethod;private final DelegatingInvocableHandler delegatingHandler;public Object invoke(Message? message, Object... providedArgs) throws Exception                {if (this.invokerHandlerMethod ! null) {//InvocableHandlerMethod不为null,就调用invokerHandlerMethod.invoke方法。return this.invokerHandlerMethod.invoke(message, providedArgs);}else if (this.delegatingHandler.hasDefaultHandler()) {//……}else {//……}}
}
public class MessagingMessageListenerAdapter extends AbstractAdaptableMessageListener {private HandlerAdapter handlerMethod;
}现在就能把整个过程串起来了 关于关于endpoint和register 
Endpoint为终端像电脑、手机都是终端他们都可以接受外部信息并响应如手机来短信了就有提示。这里也用了终端的概念被RabbitListener注解修饰方法也有终端的特点可以接受外部信息并响应即接到消息就执行对应方法。 
registry姑且成为注册处用Map保存endpoint的id和对应的listenerContainer注册处registerListenerContainer()利用endpoint和factory实例创建container实际上是用了containerfactory的createListenerContainer(RabbitListenerEndpoint endpoint)方法 
public class RabbitListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,ApplicationListenerContextRefreshedEvent {// 检查是否被注册过注册过就不能注册第二次// 调用createListenerContainer创建消息监听// 关于分组消费的我们不关心// 是否立即启动是的话同步调用startIfNecessary方法public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory? factory,boolean startImmediately) {Assert.notNull(endpoint, Endpoint must not be null);Assert.notNull(factory, Factory must not be null);String id  endpoint.getId();Assert.hasText(id, Endpoint id must not be empty);synchronized (this.listenerContainers) {Assert.state(!this.listenerContainers.containsKey(id),Another endpoint is already registered with id   id  );MessageListenerContainer container  createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);if (StringUtils.hasText(endpoint.getGroup())  this.applicationContext ! null) {ListMessageListenerContainer containerGroup;if (this.applicationContext.containsBean(endpoint.getGroup())) {containerGroup  this.applicationContext.getBean(endpoint.getGroup(), List.class);}else {containerGroup  new ArrayListMessageListenerContainer();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}containerGroup.add(container);}if (startImmediately) {startIfNecessary(container);}}// 其实就是调用了RabbitListenerContainerFactory的createListenerContainer生成了一个MessageListenerContainer对象protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory? factory) {MessageListenerContainer listenerContainer  factory.createListenerContainer(endpoint);if (listenerContainer instanceof InitializingBean) {try {((InitializingBean) listenerContainer).afterPropertiesSet();}catch (Exception ex) {throw new BeanInitializationException(Failed to initialize message listener container, ex);}}int containerPhase  listenerContainer.getPhase();if (containerPhase  Integer.MAX_VALUE) {  // a custom phase valueif (this.phase  Integer.MAX_VALUE  this.phase ! containerPhase) {throw new IllegalStateException(Encountered phase mismatch between container factory definitions:  this.phase   vs   containerPhase);}this.phase  listenerContainer.getPhase();}return listenerContainer;}
}把endpoint内的信息全部注入到container里。 
Override
public C createListenerContainer(RabbitListenerEndpoint endpoint) {C instance  createContainerInstance();if (this.connectionFactory ! null) {instance.setConnectionFactory(this.connectionFactory);}if (this.errorHandler ! null) {instance.setErrorHandler(this.errorHandler);}if (this.messageConverter ! null) {instance.setMessageConverter(this.messageConverter);}if (this.acknowledgeMode ! null) {instance.setAcknowledgeMode(this.acknowledgeMode);}if (this.channelTransacted ! null) {instance.setChannelTransacted(this.channelTransacted);}if (this.autoStartup ! null) {instance.setAutoStartup(this.autoStartup);}if (this.phase ! null) {instance.setPhase(this.phase);}instance.setListenerId(endpoint.getId());// 最重要的一行endpoint.setupListenerContainer(instance);initializeContainer(instance);return instance;
}关于container和containerFactory 
containerFactory也能配置并发消费者等参数。 
Configuration
EnableAsync
public class ThreadPoolConfig { Bean(customContainerFactory) public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory  new SimpleRabbitListenerContainerFactory();factory.setConcurrentConsumers(10); //设置线程数factory.setMaxConcurrentConsumers(10); //最大线程数configurer.configure(factory, connectionFactory);return factory; }
}配置containerFactory能够创建container但一般不在配置类中手动创建。一般是在注解中标记然后让spring来生产container。 
RabbitListener(queuesdemo.queue,containerFactory  customContainerFactory)直接配置container效果是相同的同样可以设置队列并发消费者等。 细说上面第5步container内的操作。 container的启动入口是star()方法然后进入doStart()在该方法中会初始化consumerBlockingQueueConsumer每一个并发需要对应一个consumerconsumer的数量是根据前面所说的concurrentConsumers确定 consumer  new BlockingQueueConsumer(getConnectionFactory(), getMessagePropertiesConverter(),this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);
// 带有连接信息数据转换器确认模式预取值consumerArgs监听的队列可多个等信息传入区分一下consumer和listenerconsumer是接收消息的消费者listener是实际处理业务的执行者consumer接收的每个消息都需要调用listener内的onMessage()方法来处理实际业务。 
int newConsumers  initializeConsumers();然后将consumer封装成AsyncMessageProcessingConsumer线程任务类型然后就可以放入线程池中执行。 
AsyncMessageProcessingConsumer processor  new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);这里的线程池是SimpleAsyncTaskExecutor也可以自定义传入默认是不限制并发量的。每个container都有一个线程池线程不足以支持consumer并发时就会超时报错。 private Executor taskExecutor  new SimpleAsyncTaskExecutor();进入AsyncMessageProcessingConsumer这个Runnable类的run()方法如果consumer有监听的队列就初始化initialize并开启mainloop() if (this.consumer.getQueueCount()  1) {...
}
try {initialize();while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {mainLoop();}
}initialize()会创建exchange、queue、bindings等实例设置Qos实现consumer与broker之间的对接完成消息的订阅并且会根据tag不同在每个BlockingQueueConsumer中再划分出internalConsumer再放入BlockingQueueConsumer的queue中逐一处理。 说明Qos流控指令包括prefetch-size、prefetch-count参数。 //该参数是设置在channel上的
int prefetchCount  1;
channel.basicQos(prefetchCount);broker的delivery指令在客户端会先打包成一个Envelope所以consumertag是对应consumer一个而deliveryTag是对应broker中的一条消息一个。 Envelope envelope  new Envelope(m.getDeliveryTag(),m.getRedelivered(),m.getExchange(),m.getRoutingKey());当然在broker执行delivery指令将消息推送到客户端Consumer之前还有channel一个BlockingQueueConsumer对应一个channel对应一个线程的调用。内部的consumer共用channelchannel会根据tag在dispatcher将消息推送至对应的consumer。 一个channel对应了多个consumer  多个AsyncMessageProcessingConsumer对应不同的线程来处理  一个container可能监听多个队列。   mainLoop()相较于如何利用consumer接收消息更侧重于最终的listener来进行业务处理。前面已经知道客户端会将消息存到Consume的queue中简单来说mainloop就是只要客户端正常启动就会无限循环来处理业务的它主要就是完成从queue中提取消息数据然后经过一系列操作最终传递给业务逻辑处理MessageListener中。 mainLoop()方法中就会从queue中提取消息根据**batchSize**确定每次提取消息数量最后回调MessageListener实现将消息传递到业务逻辑进行处理 多个AsyncMessageProcessingConsumer对应一个listener一个container对应一个listener即是一套处理业务共用一个线程池因为它们只是对应不同的并发 处理的业务逻辑应是相同的。   
增加RabbitMQ并发的方法 增加并发消费者数量。并保障能提供充足的线程资源虽然默认的线程池不设线程并发上线。示例Redis与RabbitMQ配合使用多线程多消费者处理消息_多线程 处理 rabbitmq消息-CSDN博客  在listener方法上加上Async()这样会在异步的子线程下执行如果提供线程池就能实现并发。示例线程池解决RabbitMQ消息堆积_rabbitmq线程池-CSDN博客  增大prefetchCountprefetchCount是BlockingQueueConsumer内部维护的一个阻塞队列LinkedBlockingQueue的大小其作用就是如果某个消费者队列阻塞就无法接收新的消息  配置container的自定义线程池但这个方法不推荐示例【RabbitMQ-9】自定义配置线程池线程池资源不足-MQ初始化队列MQ动态扩容影响 - 简书 (jianshu.com)  当并发量确实无法短时间内提高时也应尽可能提高消息队列的容量并开启持久化。如设置惰性队列。 RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘而在消费者消费到相应的消息时才会被加载到内存中它的一个重要的设计目标是能够支持更长的队列即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成消息堆积时惰性队列就很有必要了。 正常的队列会尽可能存储在内存中。