目录
1、生产者+消费者工程搭建2、订阅/发布模式2.1、消费者代码2.2、生产者代码2.3、测试3、LPUSH+BRPOP模式3.1、消费者代码3.2、生产者代码3.3、测试1、生产者+消费者工程搭建
创建两个SpringBoot工程,名称叫做producer
和consumer
,并且都引入相应的pom、配置yaml文件、配置redisConfig。
pom:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>mons</groupId><artifactId>commons-pool2</artifactId></dependency>
yaml配置:
server:port: xxxservlet:context-path: /xxxspring:#redis配置redis:# Redis数据库索引(默认为0)database: 1# 连接地址host: 127.0.0.1#端口号port: 6389##连接超时时间timeout: 3600ms#密码password:lettuce:pool:# 连接池最大连接数(使用负值表示没有限制)max-active: 8# 连接池最大阻塞等待时间(使用负值表示没有限制)max-wait: -1ms# 连接池中的最大空闲连接max-idle: 8# 连接池中的最小空闲连接min-idle: 1#关闭超时shutdown-timeout: 500ms
配置redisConfig:
@Configurationpublic class RedisTemplateConfig {@Bean("redisTemplate")public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();// 设置连接工厂template.setConnectionFactory(connectionFactory);// 设置序列化方式StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();// key序列化template.setKeySerializer(stringRedisSerializer);// value序列化template.setValueSerializer(getJackson2JsonRedisSerializer());// Hash key序列化template.setHashKeySerializer(stringRedisSerializer);// Hash value序列化template.setHashValueSerializer(getJackson2JsonRedisSerializer());template.afterPropertiesSet();return template;}/*** 缓存管理器** @param redisConnectionFactory* @return*/@Beanpublic CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig();// 设置缓存管理器管理defaultCacheConfig = defaultCacheConfig// 缓存的默认过期时间:.entryTtl(Duration.ofSeconds(36000))// 设置 key为string序列化.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))// 设置value为json序列化.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(getJackson2JsonRedisSerializer()))// 不缓存空值.disableCachingNullValues();Set<String> cacheNames = new HashSet<>();// 对每个缓存空间应用不同的配置Map<String, RedisCacheConfiguration> configMap = new HashMap<>(8);RedisCacheManager cacheManager = RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(defaultCacheConfig).initialCacheNames(cacheNames).withInitialCacheConfigurations(configMap).build();return cacheManager;}/*** 获取Jackson2JsonRedisSerializer序列化对象** @return o*/private Jackson2JsonRedisSerializer<Object> getJackson2JsonRedisSerializer() {/* 明文存取 */Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper om = new ObjectMapper();//指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和publicom.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);jackson2JsonRedisSerializer.setObjectMapper(om);return jackson2JsonRedisSerializer;}}
2、订阅/发布模式
生产者通过convertAndSend
方法发送消息,
消费者者需要配置Receiver
、Listener
和Adapter
对相应的频道进行监听,有消息时就会接收处理。
优点:
可以实现广播模式,一个消息可以发布到多个消费者。多频道订阅,一个消费者可以同时订阅多个频道。
缺点:
消息必须及时消费,不能做消息储存。
2.1、消费者代码
我们测试代码,设置两个频道:sms
和wx
启动类开启异步(@EnableAsync):
@EnableAsync@SpringBootApplicationpublic class RedisConsumerApplication {public static void main(String[] args) {SpringApplication.run(RedisConsumerApplication.class, args);}}
消息接收配置:
配置消息异步接收器 (RedisQueueReceiver):
@Component@Slf4jpublic class RedisQueueReceiver {/*** 接收wx消息,开启异步监听** @param message*/@Asyncpublic void wxMessage(String message) {log.info("消息接收:wx,消息内容为:" + message);}/*** 接收sms消息,开启异步监听** @param message*/@Asyncpublic void smsMessage(String message) {log.info("消息接收:sms,消息内容为:" + message);}}
配置sms类型消息监听器(RedisSmsQueueListener):
@Configurationpublic class RedisSmsQueueListener {/*** 系统消息适配器,如果有多个监听器使用改adapter,可以将adapter进行独立出来** @param receiver* @return*/@Bean(name = "smsAdapter")public MessageListenerAdapter adapter(RedisQueueReceiver receiver) {//指定RedisQueueReceiver类中回调接收消息的方法,监听器收到的所有内容,>RedisQueueReceiver的wxMessage()方法中进行处理MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "smsMessage");return adapter;}/*** 构建redis消息监听器容器** @param connectionFactory* @param smsAdapter 绑定指定的adapter* @return*/@Bean("smsContainer")public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter smsAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 一个频道可以有多个监听方法,多个频道可以同时被一个方法监听// new PatternTopic()中的值与监听的生产者的channel一致container.addMessageListener(smsAdapter, new PatternTopic("sms"));return container;}}
配置wx类型消息监听器(RedisWxQueueListener):
@Configurationpublic class RedisWxQueueListener {/*** 系统消息适配器,如果有多个监听器使用改adapter,可以将adapter进行独立出来** @param receiver* @return*/@Bean(name = "wxAdapter")public MessageListenerAdapter adapter(RedisQueueReceiver receiver) {//指定RedisQueueReceiver类中回调接收消息的方法,监听器收到的所有内容,>RedisQueueReceiver的wxMessage()方法中进行处理MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "wxMessage");//adapter.afterPropertiesSet();return adapter;}/*** 构建redis消息监听器容器** @param connectionFactory* @param wxAdapter 绑定指定的adapter* @return*/@Bean("wxContainer")public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter wxAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// new PatternTopic()中的值与监听的生产者的channel一致// 一个频道可以有多个监听方法,多个频道可以同时被一个方法监听,比如下列情况:container.addMessageListener(wxAdapter, new PatternTopic("wx1"));container.addMessageListener(wxAdapter, new PatternTopic("wx2"));return container;}}
2.2、生产者代码
注意:生产者的中channel与监听器中的new PatternTopic("xxx")设置的xxx一致
@RestController@RequestMapping@Slf4jpublic class ProducerController {@Resourceprivate RedisTemplate redisTemplate;/*** 发送异步队列消息 —— 发布订阅模式* 类型为sms,频道为sms,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致** @return*/@GetMapping("/sms")public Object sms() {String channel = "sms";String msg = "这里是 SMS";log.info("频道:{}发送内容:{}", channel, msg);// 生成者发送消息redisTemplate.convertAndSend(channel, msg);return "成功";}/*** 发送异步队列消息 —— 发布订阅模式* 类型为wx,频道为wx1,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致** @return*/@GetMapping("/wx1")public Object wx1() {String channel = "wx1";String msg = "这里是 WX1";log.info("频道:{}发送内容:{}", channel, msg);redisTemplate.convertAndSend(channel, msg);return "成功";}/*** 发送异步队列消息 —— 发布订阅模式* 类型为wx,频道为wx2,要与消费者监听器中的new PatternTopic("xxx")设置的xxx一致** @return*/@GetMapping("/wx2")public Object wx2() {String channel = "wx2";String msg = "这里是 WX2";log.info("频道:{}发送内容:{}", channel, msg);redisTemplate.convertAndSend(channel, msg);return "成功";}}
2.3、测试
通过浏览器依次访问:sms
、wx1
、wx2
这三个接口,观察打印台:
3、LPUSH+BRPOP模式
优点:
可以做消息储存,直到消息被消费。
缺点:一个消息只能一个消费者被消费一次。
3.1、消费者代码
消费者创建Listener
,内部通过线程进行监听。
@Component@Slf4jpublic class RedisPushPopListener {@Resourceprivate RedisTemplate redisTemplate;/*** 启动时开启一个线程来消费消息,实际开发中可以实现线程池来进行处理*/@PostConstructpublic void init() {new Thread(() -> {while (true) {// 与生产者中的channel 需要一致String channel = "push-pop";Object pop = redisTemplate.opsForList().rightPop(channel, 1, TimeUnit.SECONDS);if (pop != null) {log.info("频道为:{},消息内容为:{}", channel, pop);}}}).start();}}
3.2、生产者代码
@RestController@RequestMapping@Slf4jpublic class ProducerController {@Resourceprivate RedisTemplate redisTemplate;/*** 发送异步队列消息 —— LPUSH+RPOP 模式* 频道为push-pop** @return*/@GetMapping("/push/pop")public Object pushPop() {String channel = "push-pop";String msg = "这里是 push-pop";log.info("频道:{}发送内容:{}", channel, msg);redisTemplate.opsForList().leftPush(channel, msg);// Object o = redisTemplate.opsForList().rightPop(channel);// log.info("接收:" + o);return "成功";}}
3.3、测试
通过浏览器访问:/push/pop
接口,观察打印台:
补充:
LPUSH+BRPOP模式
相比于订阅/发布模式
,可以实现消息的储存,我们先启动生产者发生消息,然后再启动消费者,此时消费者才会进行消息的消费。而订阅/发布模式
是必须生产者和消费者都在线。