Spring Redis 注解式Cache那些事

前言:

spring-data-redis使得Spring项目可以快速简单的通过RedisTemplate来操作Redis。而spring-boot-starter-data-redis更是让redis集成更加的方便。

SpringBoot如何与Redis集成,作为cache

application.yml里如下配置:

1
2
3
4
5
6
7
8
9
10
11
spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
timeout: 1000
pool:
max-idle: 200
min-idle: 0
max-active: 200
max-wait: 1000

spring boot可以自动组装相关配置,注意其中使用到了jedis pool,用于提升性能,非必须。
通过以下的annotation加入方法名上,可以无侵入的使用cache。

  • @Cacheable 缓存
  • @CachePut 设置缓存
  • @CacheEvict 失效或更新缓存
  • @Caching 组合操作

以上annotation不做详细展开。

做到上面似乎已经可以了,但有一些问题需要我们来解决。

  • a.redis连接报错或者超时怎么办?此时应该是可降级的。
  • b.使用连接池,连接不可用如何破?

下面贴一个比较成熟的做法,继承CachingConfigurerSupport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Configuration
@EnableCaching //启用
public class RedisConfig extends CachingConfigurerSupport {

// 过期时间
private static final long expire = 600;

// application.yml配置参数有限,注入并扩展用。
@Autowired
private RedisProperties redisProperties;

//此处自定义jedis pool配置,设置TestOnBrrow等等
@Bean
public JedisPoolConfig jedisPoolConfig() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
RedisProperties.Pool pool = redisProperties.getPool();
jedisPoolConfig.setMaxIdle(pool.getMaxIdle());
jedisPoolConfig.setMaxTotal(pool.getMaxActive());
jedisPoolConfig.setMinIdle(pool.getMinIdle());
jedisPoolConfig.setMaxWaitMillis(pool.getMaxWait());
jedisPoolConfig.setTestOnBorrow(true);
jedisPoolConfig.setTestWhileIdle(true);
return jedisPoolConfig;
}

//生成redisConnectionFactory,使用自定义的jedis pool
@Bean
public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig jedisPoolConfig) {
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setHostName(redisProperties.getHost());
jedisConnectionFactory.setPort(redisProperties.getPort());
jedisConnectionFactory.setDatabase(redisProperties.getDatabase());
jedisConnectionFactory.setTimeout(redisProperties.getTimeout());
if (null != redisProperties.getPassword()) {
jedisConnectionFactory.setPassword(redisProperties.getPassword());
}
jedisConnectionFactory.setPoolConfig(jedisPoolConfig);
return jedisConnectionFactory;
}

// 设置cacheManager相关,主要涉及默认过期时间。
@Bean
public CacheManager cacheManager(RedisTemplate redisTemplate) {
RedisCacheManager cacheManager = new RedisCacheManager(redisTemplate);
//设置缓存过期时间,可单独对某个cache制定过期时间
cacheManager.setDefaultExpiration(expire);
//设置redis key是否使用前缀,默认前缀是cacheName
cacheManager.setUsePrefix(true);
return cacheManager;
}

//定义redisTemplate,主要是定义key\value的序列化器
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate(redisConnectionFactory);
template.setValueSerializer(getValueSerializer());
template.afterPropertiesSet();
return template;
}

private RedisSerializer getValueSerializer() {
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
return jackson2JsonRedisSerializer;
}

// 设置redis key生成策略
@Bean
@Override
public KeyGenerator keyGenerator() {
return new RequestKeyGenerator();
}

// 重点:设置和redis交互报错时的错误处理器。
@Bean
@Override
public CacheErrorHandler errorHandler() {
return new CallbackCacheErrorHandler();
}

下面看一下CallbackCacheErrorHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CallbackCacheErrorHandler implements CacheErrorHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(CallbackCacheErrorHandler.class);

@Override
public void handleCacheGetError(RuntimeException exception, Cache cache, Object key) {
LOGGER.error("cache get error, cacheName:{}, key:{}, msg:", cache.getName(), key, exception);
}

@Override
public void handleCachePutError(RuntimeException exception, Cache cache, Object key, Object value) {
LOGGER.error("cache put error, cacheName:{}, key:{}, msg:", cache.getName(), key, exception);

}

@Override
public void handleCacheEvictError(RuntimeException exception, Cache cache, Object key) {
LOGGER.error("cache evict error, cacheName:{}, key:{}, msg:", cache.getName(), key, exception);

}

@Override
public void handleCacheClearError(RuntimeException exception, Cache cache) {
LOGGER.error("cache clear error, cacheName:{}, msg:", cache.getName(), exception);
}
}

此处当报错的时候只进行了日志记录,当然如果有其他需求,都可以在这里扩展。自此,spring boot与redis集成大功告成,一切都是那么的完美。

关于RedisCacheManager是否setUsePrefix的坑

首先,我们要知道是否使用prefix的区别是什么?
区别如下:

  • 使用prefix的时候,redis cache的key都会默认添加上cacheName,用于区分不同的cache。
  • 使用prefix的时候,当清除或者失效所有的key的时候,使用的是key prefix*获取所有的key,然后依次清楚。而不使用prefix的时候,需要清除或者失效所有key的时候,则是从一个维护了所有key的zset中获取的,这个zset通常叫做${cacheName}~keys

下面通过源代码来证实一下:

RedisCache.java内RedisWriteThroughCallback负责往redis设置缓存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
	static class RedisWriteThroughCallback extends AbstractRedisCacheCallback<byte[]> {

public RedisWriteThroughCallback(BinaryRedisCacheElement element, RedisCacheMetadata metadata) {
super(element, metadata);
}

@Override
public byte[] doInRedis(BinaryRedisCacheElement element, RedisConnection connection) throws DataAccessException {

try {
//加锁
lock(connection);

try {

byte[] value = connection.get(element.getKeyBytes());

if (value != null) {
return value;
}

if (!isClusterConnection(connection)) {

connection.watch(element.getKeyBytes());
// 开始事务
connection.multi();
}

value = element.get();

if (value.length == 0) {
connection.del(element.getKeyBytes());
} else {
// 设置缓存key-value
connection.set(element.getKeyBytes(), value);
// 设置失效日期
processKeyExpiration(element, connection);
// 维护key到已知zset内
maintainKnownKeys(element, connection);
}

if (!isClusterConnection(connection)) {
connection.exec();
}

return value;
} catch (RuntimeException e) {
if (!isClusterConnection(connection)) {
connection.discard();
}
throw e;
}
} finally {
// 释放锁
unlock(connection);
}
}
};

protected void maintainKnownKeys(RedisCacheElement element, RedisConnection connection) {

if (!element.hasKeyPrefix()) { //不使用prefix
// 则zadd到已知的key集合内
connection.zAdd(cacheMetadata.getSetOfKnownKeysKey(), 0, element.getKeyBytes());
if (!element.isEternal()) {
connection.expire(cacheMetadata.getSetOfKnownKeysKey(), element.getTimeToLive());
}
}
}

从上面分析得知,设置缓存的时候有以下几步:

  • 设置key-value
  • 设置key的过期时间
  • 维护key到已知key的zset列表

清理所有key的时候,是怎么操作的呢?

1
2
3
4
public void clear() {
redisOperations.execute(cacheMetadata.usesKeyPrefix() ? new RedisCacheCleanByPrefixCallback(cacheMetadata)
: new RedisCacheCleanByKeysCallback(cacheMetadata));
}

可以看出依据是否使用前缀,使用不同的回调方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* @author Christoph Strobl
* @since 1.5
*/
static class RedisCacheCleanByKeysCallback extends LockingRedisCacheCallback<Void> {

private static final int PAGE_SIZE = 128;
private final RedisCacheMetadata metadata;

RedisCacheCleanByKeysCallback(RedisCacheMetadata metadata) {
super(metadata);
this.metadata = metadata;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.cache.RedisCache.LockingRedisCacheCallback#doInLock(org.springframework.data.redis.connection.RedisConnection)
*/
@Override
public Void doInLock(RedisConnection connection) {

int offset = 0;
boolean finished = false;

do {
// need to paginate the keys
Set<byte[]> keys = connection.zRange(metadata.getSetOfKnownKeysKey(), (offset) * PAGE_SIZE,
(offset + 1) * PAGE_SIZE - 1); //使用zrange遍历,删除
finished = keys.size() < PAGE_SIZE;
offset++;
if (!keys.isEmpty()) {
connection.del(keys.toArray(new byte[keys.size()][]));
}
} while (!finished);

connection.del(metadata.getSetOfKnownKeysKey());
return null;
}
}

/**
* @author Christoph Strobl
* @since 1.5
*/
static class RedisCacheCleanByPrefixCallback extends LockingRedisCacheCallback<Void> {

private static final byte[] REMOVE_KEYS_BY_PATTERN_LUA = new StringRedisSerializer().serialize(
"local keys = redis.call('KEYS', ARGV[1]); local keysCount = table.getn(keys); if(keysCount > 0) then for _, key in ipairs(keys) do redis.call('del', key); end; end; return keysCount;");
private static final byte[] WILD_CARD = new StringRedisSerializer().serialize("*");
private final RedisCacheMetadata metadata;

public RedisCacheCleanByPrefixCallback(RedisCacheMetadata metadata) {
super(metadata);
this.metadata = metadata;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.cache.RedisCache.LockingRedisCacheCallback#doInLock(org.springframework.data.redis.connection.RedisConnection)
*/
@Override
public Void doInLock(RedisConnection connection) throws DataAccessException {

byte[] prefixToUse = Arrays.copyOf(metadata.getKeyPrefix(), metadata.getKeyPrefix().length + WILD_CARD.length);
System.arraycopy(WILD_CARD, 0, prefixToUse, metadata.getKeyPrefix().length, WILD_CARD.length);

if (isClusterConnection(connection)) {

// load keys to the client because currently Redis Cluster connections do not allow eval of lua scripts.
Set<byte[]> keys = connection.keys(prefixToUse); //集群模式下,使用keys获取所有的key
if (!keys.isEmpty()) {
connection.del(keys.toArray(new byte[keys.size()][]));
}
} else {
// 非集群模式下,使用LUA脚本,keys删除。
connection.eval(REMOVE_KEYS_BY_PATTERN_LUA, ReturnType.INTEGER, 0, prefixToUse);
}

return null;
}
}

从以上源码可以看出使用prefix的区别。总结下,坑在哪儿,应该如何根据业务来选择。

  • 坑1:不使用prefix,需要额外的zset来保存已知key集合,风险点是zset有可能很大,占用空间,如果被置换出去,功能则不一致
  • 坑2:使用prefix, 没有额外的zset。但是失效或者清理所有key的时候,使用keys *可能导致redis被拖死,清理时间内无响应。
  • 坑3:设置缓存,使用了multi,对redis压力不小,高并发下尤其明显,需要注意。

关于Redis Cache默认使用lock的问题

在高并发下,发现spring redis cache的put效率并不高,经过排查发现put操作有lock机制,且lock时间无法更改。

如上RedisWriteThroughCallback所示,有lock和unlock操作,其实就是往redis写一个key作为lock, 删除这个key作为unlock。这个操作在分布式系统中,可以保证其一致性,但是也损失了性能。尤其在仅作为缓存使用的场景,key对应的value具备幂等性,完全可以忽略。

源码重点在这个waitForLock方法里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected boolean waitForLock(RedisConnection connection) {

boolean retry;
boolean foundLock = false;
do {
retry = false;
if (connection.exists(cacheMetadata.getCacheLockKey())) {
foundLock = true;
try {
Thread.sleep(WAIT_FOR_LOCK_TIMEOUT); //此处WAIT_FOR_LOCK_TIMEOUT=300ms
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
retry = true;
}
} while (retry);

return foundLock;
}

// 加锁
protected void lock(RedisConnection connection) {
waitForLock(connection);
connection.set(cacheMetadata.getCacheLockKey(), "locked".getBytes());
}
// 解锁
protected void unlock(RedisConnection connection) {
connection.del(cacheMetadata.getCacheLockKey());
}

可以看出每次加锁,如果lock已经存在的情况下,会额外sleep 300ms, 这在高并发、高性能的缓存场景是极其低效的。并且在极端情况下,unlock删除key没成功,将会导致所有key都无法设置或更新, 并陷入死循环。spring内部也没有提供相关的行为覆盖机制,这是一个较大的坑。

Spring-Data-Redis 2.0 RC1的优化

官方DATAREDIS-481注意到了Lock的优化,并对cache manager做了颠覆性的升级。

下面跟着我来看看,spring-data-redis 2.0之后如何使用注解式cache.
由于底层依赖的Jedis,自从发布2.9.0版本之后,升级缓慢,目前也仅支持到2.8.x和3.x.x版本,所以Spring推荐使用lettuce.

先看application.yml里如何写:

1
2
3
4
5
6
7
8
9
10
11
12
spring:
redis:
host: 127.0.0.1
database: 0
port: 6379
timeout: 1000
lettuce:
pool:
max-active: 500
min-idle: 0
max-idle: 500
max-wait: 1000

开始使用lettuce了, jedis提示deprecated了。
pool提供的参数有限,如果想自己定制,参见如下设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
  	//继承CachingConfigurerSupport
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

//注入默认参数
@Autowired
private RedisProperties redisProperties;
//默认超时
private long expire = 600L;

@Bean
public RedisConnectionFactory redisConnectionFactory() {
//commons-pool2包
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxIdle(500);
poolConfig.setMinIdle(0);
poolConfig.setMaxTotal(500);
poolConfig.setMaxWaitMillis(1000);
poolConfig.setTestOnBorrow(true); //额外设置

// 基本连接信息:host port database password
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName(redisProperties.getHost());
redisStandaloneConfiguration.setPort(redisProperties.getPort());
redisStandaloneConfiguration.setDatabase(redisProperties.getDatabase());
if (null != redisProperties.getPassword()){
redisStandaloneConfiguration.setPassword(RedisPassword.of(redisProperties.getPassword()));
}

//这里单独配置超时时间,连接池管理
LettuceClientConfiguration lettuceClientConfiguration = LettucePoolingClientConfiguration.builder()
.commandTimeout(Duration.ofMillis(200)).shutdownTimeout(Duration.ofMillis(200)).poolConfig
(poolConfig)
.build();
LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory
(redisStandaloneConfiguration, lettuceClientConfiguration);
lettuceConnectionFactory.setValidateConnection(true);
return lettuceConnectionFactory;
}

@Bean
public CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {
// 默认配置使用prefix、单独设置valueSerializer、过期时间
RedisCacheConfiguration redisCacheConfiguration =
RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith(
RedisSerializationContext.SerializationPair.fromSerializer(getValueSerializer()))
.entryTtl(Duration.ofSeconds
(expire)).disableCachingNullValues();
// 使用redisConnectionFactory直接创建无锁的cm
RedisCacheManager cm = RedisCacheManager.builder(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory)).cacheDefaults(redisCacheConfiguration).transactionAware().build();
return cm;
}

private RedisSerializer getValueSerializer() {
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
return jackson2JsonRedisSerializer;
}

@Bean
@Override
public CacheErrorHandler errorHandler() {
return new RedisCacheErrorHandler();
}

@Override
public KeyGenerator keyGenerator() {
return new MyKeyGenerator()
}

从上面可以看出,基本操作是一致的,但是RedisCacheManager创建更加优雅,不在直接依赖redisTemplate。
关于是否使用prefix问题,RedisCacheConfiguration.defaultCacheConfig()中代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private RedisCacheConfiguration(Duration ttl, Boolean cacheNullValues, Boolean usePrefix,
CacheKeyPrefix keyPrefix, SerializationPair<String> keySerializationPair,
SerializationPair<?> valueSerializationPair, ConversionService conversionService) {

this.ttl = ttl;
this.cacheNullValues = cacheNullValues;
this.usePrefix = usePrefix;
this.keyPrefix = keyPrefix;
this.keySerializationPair = keySerializationPair;
this.valueSerializationPair = (SerializationPair<Object>) valueSerializationPair;
this.conversionService = conversionService;
}


public static RedisCacheConfiguration defaultCacheConfig() {

DefaultFormattingConversionService conversionService = new DefaultFormattingConversionService();

registerDefaultConverters(conversionService);
// 默认usePrefix为true,是推荐的
return new RedisCacheConfiguration(Duration.ZERO, true, true, CacheKeyPrefix.simple(),
SerializationPair.fromSerializer(new StringRedisSerializer()),
SerializationPair.fromSerializer(new JdkSerializationRedisSerializer()), conversionService);
}

当然也是可以覆盖禁用的,使用disableKeyPrefix, 但明确提出,你需要特别注意,不建议使用。

关于是否使用lock的问题,新版本也提供了可选方案。通过RedisCacheWriter来实现:

1
2
3
4
5
6
7
8
9
10
11
static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");
return new DefaultRedisCacheWriter(connectionFactory);
}

static RedisCacheWriter lockingRedisCacheWriter(RedisConnectionFactory connectionFactory) {

Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");

return new DefaultRedisCacheWriter(connectionFactory, Duration.ofMillis(50));
}

可以看出lockingRedisCacheWriter将会有sleep 50ms来处理锁, nonlocking则没有加锁等待,给用户提供了更好的处理方案。

关于全部失效或者清理key的问题,2.0版本处理方案如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public void clean(String name, byte[] pattern) {

Assert.notNull(name, "Name must not be null!");
Assert.notNull(pattern, "Pattern must not be null!");

execute(name, connection -> {

boolean wasLocked = false;

try {

if (isLockingCacheWriter()) {
doLock(name, connection);
wasLocked = true;
}
// 这里仍旧是使用的keys操作
byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet())
.toArray(new byte[0][]);

if (keys.length > 0) {
connection.del(keys);
}
} finally {

if (wasLocked && isLockingCacheWriter()) {
doUnlock(name, connection);
}
}

return "OK";
});
}

这里仍旧使用的是keys命令,坑仍在。后续使用scan操作也许是更好的选择,但最终还是要依据自己的业务需求来定制。

总结:

开源项目的坑无处不在,即使是spring
无论是什么版本,使用prefix是更好的选择,也是趋势所在。
keys操作对性能的影响始终未能彻底消除,建议使用key expire机制来规避。(生产环境keys操作也是尽可能要避免的)。
redis缓存key的大小,无论是性能还是存储的影响都很大,强烈建议在业务允许范围内尽可能减小key的大小(比如使用MD5,有一定碰撞率)。

申艳超 wechat
欢迎关注微信订阅号~
您的支持是我原创的动力!