在项目中我们经常使用spring-data-redis来操作Redis,它封装了客户端来与Redis服务器进行各种命令操作。由于最近用到了Redis Cluster集群功能,这里就分析总结一下Jedis cluster集群初始化主要过程及源码。
环境
jar版本: spring-data-redis-1.8.4-RELEASE.jar、jedis-2.9.0.jar
测试环境: Redis 3.2.8,八个集群节点
applicationContext-redis-cluster.xml 配置文件:
Jedis集群类说明
Jedis与Redis集群交互时,涉及的类可以分为两类,分别如下:
1、Redis集群信息配置类:
类名 | 说明 |
---|---|
redis.clients.jedis.JedisPoolConfig | 保存Jedis连接池配置信息 |
org.springframework.data.redis.connection.RedisNode | 保存Redis集群节点信息 |
org.springframework.data.redis.connection.RedisClusterConfiguration | 保存Redis集群配置信息 |
org.springframework.data.redis.connection.jedis.JedisConnectionFactory | Jedis连接工厂,负责创建JedisCluster集群操作类,获取Redis连接对象 |
org.springframework.data.redis.connection.jedis.JedisClusterConnection | 在JedisCluster基础上实现,根据key类型使用具体的Jedis类与Redis进行交互 |
2、Redis集群信息操作类:
类名 | 说明 |
---|---|
redis.clients.jedis.JedisCluster | 扩展了BinaryJedisCluster类,负责与Redis集群进行String类型的key交互 |
redis.clients.jedis.BinaryJedisCluster | JedisCluster的父类,负责与Redis集群进行byte[]类型的key交互 |
redis.clients.jedis.JedisSlotBasedConnectionHandler | JedisClusterConnectionHandler类的子类,负责根据key的slot值获取Redis连接 |
redis.clients.jedis.JedisClusterConnectionHandler | 一个抽象类,负责初始化、重建、重置Redis slot槽缓存 |
redis.clients.jedis.JedisClusterInfoCache | Redis slot缓存类,负责保存、重建和自动发现Redis slot槽与集群节点的关系 |
Jedis集群初始化流程
集群初始化入口:JedisConnectionFactory类
从上面的配置文件applicationContext-redis-cluster.xml中我们声明了JedisConnectionFactory这个类:
这个类是用来创建、管理和销毁Jedis与Redis集群的连接的。由于我们在Spring配置文件中声明了这个类,因此当应用启动时,Spring会自动加载该类,Jedis集群信息初始化的动作也由此开始。该类初始化的方法代码如下:
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory { private JedisPoolConfig poolConfig = new JedisPoolConfig(); private RedisClusterConfiguration clusterConfig; public JedisConnectionFactory(RedisClusterConfiguration clusterConfig, JedisPoolConfig poolConfig) { this.clusterConfig = clusterConfig; this.poolConfig = poolConfig; } /* * (non-Javadoc) * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ public void afterPropertiesSet() { if (shardInfo == null) { shardInfo = new JedisShardInfo(hostName, port); if (StringUtils.hasLength(password)) { shardInfo.setPassword(password); } if (timeout > 0) { setTimeoutOn(shardInfo, timeout); } } if (usePool && clusterConfig == null) { this.pool = createPool(); } //如果集群配置信息不为空,则创建JedisCluster对象 if (clusterConfig != null) { this.cluster = createCluster(); } }}
在上面的配置文件中,我们使用构造函数注入的方式初始化了JedisConnectionFactory,由于该类实现了InitializingBean接口,因此在它被初始化之后会调用afterPropertiesSet()方法,在该方法中会根据clusterConfig集群配置信息是否为空来创建JedisCluster对象。createCluster()代码定义如下:
private JedisCluster createCluster() { JedisCluster cluster = createCluster(this.clusterConfig, this.poolConfig); this.clusterCommandExecutor = new ClusterCommandExecutor( new JedisClusterConnection.JedisClusterTopologyProvider(cluster), new JedisClusterConnection.JedisClusterNodeResourceProvider(cluster), EXCEPTION_TRANSLATION); return cluster; } /** * Creates {@link JedisCluster} for given {@link RedisClusterConfiguration} and {@link GenericObjectPoolConfig}. * * @param clusterConfig must not be {@literal null}. * @param poolConfig can be {@literal null}. * @return * @since 1.7 */ protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig, GenericObjectPoolConfig poolConfig) { Assert.notNull(clusterConfig, "Cluster configuration must not be null!"); SethostAndPort = new HashSet (); for (RedisNode node : clusterConfig.getClusterNodes()) { hostAndPort.add(new HostAndPort(node.getHost(), node.getPort())); } int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects().intValue() : 5; return StringUtils.hasText(getPassword()) ? new JedisCluster(hostAndPort, timeout, timeout, redirects, password, poolConfig) : new JedisCluster(hostAndPort, timeout, redirects, poolConfig); }
上面的代码调用了JedisCluster的构造函数来创建JedisCluster对象,JedisCluster使用super关键字调用父类的构造函数:
public JedisCluster(SetjedisClusterNode, int timeout, int maxAttempts, final GenericObjectPoolConfig poolConfig) { super(jedisClusterNode, timeout, maxAttempts, poolConfig); }
BinaryJedisCluster构造函数:
public BinaryJedisCluster(SetjedisClusterNode, int timeout, int maxAttempts, final GenericObjectPoolConfig poolConfig) { this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig, timeout); this.maxAttempts = maxAttempts; }
集群信息获取:JedisClusterInfoCache类
初始化流程到这里,主要的部分就要浮出水面了。在BinaryJedisCluster类的构造函数中初始化了JedisSlotBasedConnectionHandler类,该类的出现说明Jedis要开始获取Redis集群的slot槽和Redis集群节点信息了,该类也是使用super关键字调用父类构造函数来初始化的,它的父类JedisClusterConnectionHandler构造函数如下:
public JedisClusterConnectionHandler(Setnodes, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) { this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password); //这里是关键 initializeSlotsCache(nodes, poolConfig, password); }
JedisClusterConnectionHandler类的构造函数中创建了JedisClusterInfoCache对象,并调用initializeSlotsCache()方法对Redis集群信息进行初始化。该类的主要方法如下:
public Jedis getConnectionFromNode(HostAndPort node) { return cache.setupNodeIfNotExist(node).getResource(); } public MapgetNodes() { return cache.getNodes(); } private void initializeSlotsCache(Set startNodes, GenericObjectPoolConfig poolConfig, String password) { for (HostAndPort hostAndPort : startNodes) { Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); if (password != null) { jedis.auth(password); } try { cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } } public void renewSlotCache() { cache.renewClusterSlots(null); } public void renewSlotCache(Jedis jedis) { cache.renewClusterSlots(jedis); } @Override public void close() { cache.reset(); }
可以看到,该类主要还是调用JedisClusterInfoCache对象的方法来完成slot的相关操作。因此我们重点看一下JedisClusterInfoCache类。
JedisClusterInfoCache类主要负责发送cluster slots命令来获取Redis集群节点的槽和Redis集群节点信信息,并将相应信息保存到Map缓存中。我们使用redis-cli客户端工具连接上任意一个Redis中的集群节点,向Redis发送该命令之后,获得的结果如下:
127.0.0.1:6379> cluster slots1) 1) (integer) 12288 2) (integer) 16383 3) 1) "127.0.0.1" 2) (integer) 6382 3) "65aea5fc4485bc7c0c3c4425fb3f500c562ee243" 4) 1) "127.0.0.1" 2) (integer) 6386 3) "4061e306b094e707b6f4a7c8cd8e82bd61155060"2) 1) (integer) 4096 2) (integer) 8191 3) 1) "127.0.0.1" 2) (integer) 6380 3) "c6e1b3691b968b009357dcac3349afbcd557fd8c" 4) 1) "127.0.0.1" 2) (integer) 6384 3) "f915c7e6812a7d8fbe637c782ad261cd453022b2"3) 1) (integer) 0 2) (integer) 4095 3) 1) "127.0.0.1" 2) (integer) 6379 3) "91bb43a956a04a9812e4d6950efebbb2e0f646fd" 4) 1) "127.0.0.1" 2) (integer) 6383 3) "c1d9d907f6905dd826dad774d127b75484ef8ea8"4) 1) (integer) 8192 2) (integer) 12287 3) 1) "127.0.0.1" 2) (integer) 6381 3) "745936c1192bc1b136fd1f5df842bc1dd517ef36" 4) 1) "127.0.0.1" 2) (integer) 6385 3) "1c07bd8406156122eb4855d2e8b36e785e7901c7"
我现在本地的Redis集群有八个节点,四个主节点,四个从节点,通过cluster slots命令的结果都可以清楚地看到这些节点信息。这个命令的每一组结果由四个部分组成:起始槽节点、终止槽节点、主节点IP和端口加节点ID、从节点IP和端口加节点ID。
在JedisClusterInfoCache类中,相关的源码如下:
public class JedisClusterInfoCache { // 保存Redis集群节点和节点连接池信息:key为节点地址、value为连接池 private final Mapnodes = new HashMap (); // 保存Redis集群节点槽和槽所在的主节点连接池信息:key为节点槽、value为连接池 private final Map slots = new HashMap (); // 使用读写锁保证nodes和slots两个map的写安全 private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); // 重建缓存的标识变量,false为未进行,true为正在进行 private volatile boolean rediscovering; private final GenericObjectPoolConfig poolConfig; private int connectionTimeout; private int soTimeout; private String password; // 主节点索引位置标识,遍历cluster slots结果时使用 private static final int MASTER_NODE_INDEX = 2; public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) { this(poolConfig, timeout, timeout, null); } public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, final String password) { this.poolConfig = poolConfig; this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; this.password = password; } /** * 在jedis封装的redis集群节点信息上发送cluster slots命令,获取所有集群节点信息和槽信息 * * @param jedis */ public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock();// 由当前线程获得写锁,在当前线程操作未结束之前,其他线程只能等待 try { reset();// 重置nodes、slots两个Map,释放JedisPool连接池资源 List
总结
Jedis初始化Redis集群信息时,先使用JedisConnectionFactory获取JedisCluster对象,再根据JedisCluster去逐步引出JedisClusterInfoCache对象完成Redis集群信息的获取。在这个类中,主要有以下几点:
- 整个类的核心是discoverClusterNodesAndSlots方法,它在jedis封装的redis集群节点上发送cluster slots命令,来获取所有集群节点信息和槽信息,然后分别缓存在nodes和slots两个HashMap中
- 读写锁一般在读多写少的场景下使用。进行Redis集群信息保存和获取操作时,使用了读写锁ReentrantReadWriteLock,保证写和写之间互斥,避免一个写操作影响另外一个写操作,引发线程安全问题
- 在定义重建缓存标识变量rediscovering时,使用了volatile关键字,保证重建缓存的操作对于其他线程的内存可见性,使JVM主内存与方法线程工作内存状态同步
- 客户端内部维护slots缓存表,并且针对每个节点维护连接池,当集群规模非常大时,客户端会维护非常多的连接并消耗更多的内存
下一篇文章剖析。