Redis集群监听key过期事件
By:Roy.LiuLast updated:2021-07-28
这篇文章是上一篇文章(Redis单机监听key过期事件)的一个补充,因为在生产环境下,通常使用的是redis cluster, 并非单机模式。但在redis cluster的情况下,如果再用单机模式的方式,就不能全面得到过期的key, 只能获取到部分。原因redis官方已经说明了。
官网说明地址: https://redis.io/topics/notifications/ 在介绍的最后面部分,介绍了redis cluster下,监听key失效等情况,为什么只能得到部分,而不是全部。
event 事件并不是我们常用的发布,订阅,它并没有广播到各个集群节点。而应用程序在启动时,只是连接到了集群节点中的一个而已。所以这个时候,你只能接收到key值存放在这个节点过期的事件。其他节点过期的事件,你是接收不到的。那么解决的方法就是监听所有节点。自己去实现方法。
前期准备
与上一篇文章(Redis单机监听key过期事件)一致。多所有redis 集群节点,每一个节点都需要修改,如下图所示:
然后重启redis 集群.
程序修改
增加类 RedisClusterMessageListenerFactory ,用来自己构造监听所有node节点的类:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisClusterConnection; import org.springframework.data.redis.connection.RedisClusterNode; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import redis.clients.jedis.JedisShardInfo; public class RedisClusterMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> { private static final Logger log = LoggerFactory.getLogger(RedisClusterMessageListenerFactory.class); private DefaultListableBeanFactory beanFactory; private RedisConnectionFactory redisConnectionFactory; @Autowired private MessageListener messageListener; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = (DefaultListableBeanFactory) beanFactory; } public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) { this.redisConnectionFactory = redisConnectionFactory; } @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection(); if (redisClusterConnection != null) { Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes(); for (RedisClusterNode node : nodes) { if (node.isMaster()) { log.info(node.getHost() + ":" + node.getPort() + " is master, hash code is: " + node.hashCode()); String containerBeanName = "messageContainer" + node.hashCode(); if (beanFactory.containsBean(containerBeanName)) { return; } JedisConnectionFactory factory = new JedisConnectionFactory( new JedisShardInfo(node.getHost(), node.getPort())); BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder .genericBeanDefinition(RedisMessageListenerContainer.class); containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory); containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON); containerBeanDefinitionBuilder.setLazyInit(false); beanFactory.registerBeanDefinition(containerBeanName, containerBeanDefinitionBuilder.getRawBeanDefinition()); RedisMessageListenerContainer container = beanFactory .getBean(containerBeanName, RedisMessageListenerContainer.class); String listenerBeanName = "messageListener" + node.hashCode(); if (beanFactory.containsBean(listenerBeanName)) { return; } container.addMessageListener(messageListener, new PatternTopic("__keyevent@0__:expired")); container.start(); } } } } }
RedisClusterMessageListenerContainerConfig 类,用来加载上面自定义的监听redis node的类
import org.springframework.beans.factory.BeanFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; @ConditionalOnExpression("!'${spring.redis.cluster.nodes:}'.isEmpty()") @Configuration public class RedisClusterMessageListenerContainerConfig { @Bean public RedisClusterMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory, RedisConnectionFactory redisConnectionFactory) { RedisClusterMessageListenerFactory beans = new RedisClusterMessageListenerFactory(); beans.setBeanFactory(beanFactory); beans.setRedisConnectionFactory(redisConnectionFactory); return beans; } }
监听类 KeyExpiredEventMessageListener 实际可能处理业务的地方:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Service; @Service public class KeyExpiredEventMessageListener implements MessageListener { private static final Logger log = LoggerFactory.getLogger(KeyExpiredEventMessageListener.class); @Override public void onMessage(Message message, byte[] pattern) { byte[] body = message.getBody(); byte[] channel = message.getChannel(); String topic = new String(channel); String expireKey = new String(body); log.info("exporeKey: {}, topic: {}", expireKey, topic); } }
运行后效果图:
From:一号门
Previous:Redis单机监听key过期事件
Next:springboot 读取资源文件
COMMENTS