600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > Redis实现消息队列(订阅/发布模式 LPUSH+BRPOP)

Redis实现消息队列(订阅/发布模式 LPUSH+BRPOP)

时间:2020-05-27 03:52:18

相关推荐

Redis实现消息队列(订阅/发布模式  LPUSH+BRPOP)

目录

1、生产者+消费者工程搭建2、订阅/发布模式2.1、消费者代码2.2、生产者代码2.3、测试3、LPUSH+BRPOP模式3.1、消费者代码3.2、生产者代码3.3、测试

1、生产者+消费者工程搭建

创建两个SpringBoot工程,名称叫做producerconsumer,并且都引入相应的pom、配置yaml文件、配置redisConfig。

pom:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>mons</groupId><artifactId>commons-pool2</artifactId></dependency>

yaml配置:

server:port: xxxservlet:context-path: /xxxspring:#redis配置redis:# Redis数据库索引(默认为0)database: 1# 连接地址host: 127.0.0.1#端口号port: 6389##连接超时时间timeout: 3600ms#密码password:lettuce:pool:# 连接池最大连接数(使用负值表示没有限制)max-active: 8# 连接池最大阻塞等待时间(使用负值表示没有限制)max-wait: -1ms# 连接池中的最大空闲连接max-idle: 8# 连接池中的最小空闲连接min-idle: 1#关闭超时shutdown-timeout: 500ms

配置redisConfig:

@Configurationpublic class RedisTemplateConfig {@Bean("redisTemplate")public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();// 设置连接工厂template.setConnectionFactory(connectionFactory);// 设置序列化方式StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();// key序列化template.setKeySerializer(stringRedisSerializer);// value序列化template.setValueSerializer(getJackson2JsonRedisSerializer());// Hash key序列化template.setHashKeySerializer(stringRedisSerializer);// Hash value序列化template.setHashValueSerializer(getJackson2JsonRedisSerializer());template.afterPropertiesSet();return template;}/*** 缓存管理器** @param redisConnectionFactory* @return*/@Beanpublic CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig();// 设置缓存管理器管理defaultCacheConfig = defaultCacheConfig// 缓存的默认过期时间:.entryTtl(Duration.ofSeconds(36000))// 设置 key为string序列化.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))// 设置value为json序列化.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(getJackson2JsonRedisSerializer()))// 不缓存空值.disableCachingNullValues();Set<String> cacheNames = new HashSet<>();// 对每个缓存空间应用不同的配置Map<String, RedisCacheConfiguration> configMap = new HashMap<>(8);RedisCacheManager cacheManager = RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(defaultCacheConfig).initialCacheNames(cacheNames).withInitialCacheConfigurations(configMap).build();return cacheManager;}/*** 获取Jackson2JsonRedisSerializer序列化对象** @return o*/private Jackson2JsonRedisSerializer<Object> getJackson2JsonRedisSerializer() {/* 明文存取 */Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper om = new ObjectMapper();//指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和publicom.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);jackson2JsonRedisSerializer.setObjectMapper(om);return jackson2JsonRedisSerializer;}}

2、订阅/发布模式

生产者通过convertAndSend方法发送消息,

消费者者需要配置ReceiverListenerAdapter对相应的频道进行监听,有消息时就会接收处理。

优点:

可以实现广播模式,一个消息可以发布到多个消费者。多频道订阅,一个消费者可以同时订阅多个频道。

缺点:

消息必须及时消费,不能做消息储存。

2.1、消费者代码

我们测试代码,设置两个频道:smswx

启动类开启异步(@EnableAsync):

@EnableAsync@SpringBootApplicationpublic class RedisConsumerApplication {public static void main(String[] args) {SpringApplication.run(RedisConsumerApplication.class, args);}}

消息接收配置:

配置消息异步接收器 (RedisQueueReceiver):

@Component@Slf4jpublic class RedisQueueReceiver {/*** 接收wx消息,开启异步监听** @param message*/@Asyncpublic void wxMessage(String message) {log.info("消息接收:wx,消息内容为:" + message);}/*** 接收sms消息,开启异步监听** @param message*/@Asyncpublic void smsMessage(String message) {log.info("消息接收:sms,消息内容为:" + message);}}

配置sms类型消息监听器(RedisSmsQueueListener):

@Configurationpublic class RedisSmsQueueListener {/*** 系统消息适配器,如果有多个监听器使用改adapter,可以将adapter进行独立出来** @param receiver* @return*/@Bean(name = "smsAdapter")public MessageListenerAdapter adapter(RedisQueueReceiver receiver) {//指定RedisQueueReceiver类中回调接收消息的方法,监听器收到的所有内容,>RedisQueueReceiver的wxMessage()方法中进行处理MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "smsMessage");return adapter;}/*** 构建redis消息监听器容器** @param connectionFactory* @param smsAdapter 绑定指定的adapter* @return*/@Bean("smsContainer")public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter smsAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 一个频道可以有多个监听方法,多个频道可以同时被一个方法监听// new PatternTopic()中的值与监听的生产者的channel一致container.addMessageListener(smsAdapter, new PatternTopic("sms"));return container;}}

配置wx类型消息监听器(RedisWxQueueListener):

@Configurationpublic class RedisWxQueueListener {/*** 系统消息适配器,如果有多个监听器使用改adapter,可以将adapter进行独立出来** @param receiver* @return*/@Bean(name = "wxAdapter")public MessageListenerAdapter adapter(RedisQueueReceiver receiver) {//指定RedisQueueReceiver类中回调接收消息的方法,监听器收到的所有内容,>RedisQueueReceiver的wxMessage()方法中进行处理MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "wxMessage");//adapter.afterPropertiesSet();return adapter;}/*** 构建redis消息监听器容器** @param connectionFactory* @param wxAdapter 绑定指定的adapter* @return*/@Bean("wxContainer")public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter wxAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// new PatternTopic()中的值与监听的生产者的channel一致// 一个频道可以有多个监听方法,多个频道可以同时被一个方法监听,比如下列情况:container.addMessageListener(wxAdapter, new PatternTopic("wx1"));container.addMessageListener(wxAdapter, new PatternTopic("wx2"));return container;}}

2.2、生产者代码

注意:生产者的中channel与监听器中的new PatternTopic("xxx")设置的xxx一致

@RestController@RequestMapping@Slf4jpublic class ProducerController {@Resourceprivate RedisTemplate redisTemplate;/*** 发送异步队列消息 —— 发布订阅模式* 类型为sms,频道为sms,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致** @return*/@GetMapping("/sms")public Object sms() {String channel = "sms";String msg = "这里是 SMS";log.info("频道:{}发送内容:{}", channel, msg);// 生成者发送消息redisTemplate.convertAndSend(channel, msg);return "成功";}/*** 发送异步队列消息 —— 发布订阅模式* 类型为wx,频道为wx1,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致** @return*/@GetMapping("/wx1")public Object wx1() {String channel = "wx1";String msg = "这里是 WX1";log.info("频道:{}发送内容:{}", channel, msg);redisTemplate.convertAndSend(channel, msg);return "成功";}/*** 发送异步队列消息 —— 发布订阅模式* 类型为wx,频道为wx2,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致** @return*/@GetMapping("/wx2")public Object wx2() {String channel = "wx2";String msg = "这里是 WX2";log.info("频道:{}发送内容:{}", channel, msg);redisTemplate.convertAndSend(channel, msg);return "成功";}}

2.3、测试

通过浏览器依次访问:smswx1wx2这三个接口,观察打印台:

3、LPUSH+BRPOP模式

优点:

可以做消息储存,直到消息被消费。

缺点:一个消息只能一个消费者被消费一次。

3.1、消费者代码

消费者创建Listener,内部通过线程进行监听。

@Component@Slf4jpublic class RedisPushPopListener {@Resourceprivate RedisTemplate redisTemplate;/*** 启动时开启一个线程来消费消息,实际开发中可以实现线程池来进行处理*/@PostConstructpublic void init() {new Thread(() -> {while (true) {// 与生产者中的channel 需要一致String channel = "push-pop";Object pop = redisTemplate.opsForList().rightPop(channel, 1, TimeUnit.SECONDS);if (pop != null) {log.info("频道为:{},消息内容为:{}", channel, pop);}}}).start();}}

3.2、生产者代码

@RestController@RequestMapping@Slf4jpublic class ProducerController {@Resourceprivate RedisTemplate redisTemplate;/*** 发送异步队列消息 —— LPUSH+RPOP 模式* 频道为push-pop** @return*/@GetMapping("/push/pop")public Object pushPop() {String channel = "push-pop";String msg = "这里是 push-pop";log.info("频道:{}发送内容:{}", channel, msg);redisTemplate.opsForList().leftPush(channel, msg);// Object o = redisTemplate.opsForList().rightPop(channel);// log.info("接收:" + o);return "成功";}}

3.3、测试

通过浏览器访问:/push/pop接口,观察打印台:

补充:LPUSH+BRPOP模式相比于订阅/发布模式,可以实现消息的储存,我们先启动生产者发生消息,然后再启动消费者,此时消费者才会进行消息的消费。而订阅/发布模式是必须生产者和消费者都在线。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。