漫谈Guava之Cache(二)
之前的文章介绍了Guava Cache的使用,本文将剖析Cache的源码,对Guava Cache的不太了解的可以移步这里:
创建
创建Cache需通过CacheBuilder,该处使用了建造者模式,我们来看CacheBuilder的build方法。
public <K1 extends K, V1 extends V> Cache<K1, V1> build() {
checkWeightWithWeigher();
checkNonLoadingCache();
return new LocalCache.LocalManualCache<>(this);
}
LocalManualCache(CacheBuilder<? super K, ? super V> builder) {
this(new LocalCache<K, V>(builder, null));
}
LocalManualCache 相当于LocalCache的包装类,大部分方法直接使用LocalCache的实现,部分方法自己实现了一下。我们直接看LocalCache的逻辑,LocalCache的实现参考了ConcurrentHashMap
,采用分段锁技术,当一个线程访问一个 Segment
时,不会影响其他的 Segment
。
Segment<K, V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
Segment中维护了5个队列
- keyReferenceQueue key引用队列
- valueReferenceQueue value引用队列
- recencyQueue 临时队列,数据刷新清空该队列,队列中数据同步到accessQueue中
- writeQueue 写队列
- accessQueue 访问队列
存放元素
segmentFor(hash)根据hash值取到Segment,优化并发操作。
加锁,preWriteCleanup
删除被GC的数据,判断加入的元素在缓存中是否存在,若存在替换enqueueNotification
或者不做更改,不存在新增放入缓存中,若缓存元素超限删除不活跃元素evictEntries
(LRU),最后释放锁并推送删除的元素给监听者postWriteCleanup
。
@Override
public V put(K key, V value) {
checkNotNull(key);
checkNotNull(value);
int hash = hash(key);
return segmentFor(hash).put(key, hash, value, false);
}
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
long now = map.ticker.read();
// 清理过期数据和被GC回收的数据
preWriteCleanup(now);
int newCount = this.count + 1;
// 扩容
if (newCount > this.threshold) { // ensure capacity
expand();
newCount = this.count + 1;
}
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
// Look for an existing entry.
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// We found an existing entry.
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
// 表示触发GC,值被回收了
if (entryValue == null) {
++modCount;
// 排除CacheLoad加载的数据
if (valueReference.isActive()) {
// 移入删除队列中,由删除队列触发监听
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
setValue(e, key, value, now);
newCount = this.count; // count remains unchanged
} else {
setValue(e, key, value, now);
newCount = this.count + 1;
}
this.count = newCount; // write-volatile
// 移除原来元素
evictEntries(e);
return null;
} else if (onlyIfAbsent) {
// Mimic
// "if (!map.containsKey(key)) ...
// else return map.get(key);
// 只更新最后访问时间
recordLockedRead(e, now);
return entryValue;
} else {
// clobber existing entry, count remains unchanged
++modCount;
// 与第一个条件分支逻辑类似
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
setValue(e, key, value, now);
evictEntries(e);
return entryValue;
}
}
}
// Create a new entry.
// 缓存中没有,创建一个
++modCount;
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
// 包装key,value放到newEntry中,并调用recordWrite()方法,更新权重,将entry存入accessQueue,writeQueue队列中
setValue(newEntry, key, value, now);
table.set(index, newEntry);
newCount = this.count + 1;
// count用volatile修饰,保证内存可见性
this.count = newCount; // write-volatile
// 新增元素后有可能导致缓存数据过多,需进行清理,优先删除较少访问的
evictEntries(newEntry);
return null;
} finally {
unlock();
// 移除的元素会放到单独队列中,此时触发监听
postWriteCleanup();
}
}
添加或替换元素时会触发recordWrite
,更新写队列和读队列
void setValue(ReferenceEntry<K, V> entry, K key, V value, long now) {
ValueReference<K, V> previous = entry.getValueReference();
int weight = map.weigher.weigh(key, value);
checkState(weight >= 0, "Weights must be non-negative");
ValueReference<K, V> valueReference =
map.valueStrength.referenceValue(this, entry, value, weight);
entry.setValueReference(valueReference);
recordWrite(entry, weight, now);
previous.notifyNewValue(value);
}
void recordWrite(ReferenceEntry<K, V> entry, int weight, long now) {
// we are already under lock, so drain the recency queue immediately
drainRecencyQueue();
totalWeight += weight;
if (map.recordsAccess()) {
entry.setAccessTime(now);
}
if (map.recordsWrite()) {
entry.setWriteTime(now);
}
accessQueue.add(entry);
writeQueue.add(entry);
}
缓存元素有变动时会调用drainRecencyQueue
方法,将recencyQueue
队列数据清空,并按照队列顺序放入访问队列中以备LRU策略删除
void drainRecencyQueue() {
ReferenceEntry<K, V> e;
while ((e = recencyQueue.poll()) != null) {
// An entry may be in the recency queue despite it being removed from
// the map . This can occur when the entry was concurrently read while a
// writer is removing it from the segment or after a clear has removed
// all of the segment's entries.
if (accessQueue.contains(e)) {
accessQueue.add(e);
}
}
}
取元素
和存放逻辑一样,也要先取对应的Segment。从Segment中取出value,value不为空继续判断是否超过缓存过期时间,若元素可用,更新读队列,最后清除GC和过期数据。
public @Nullable V getIfPresent(Object key) {
int hash = hash(checkNotNull(key));
V value = segmentFor(hash).get(key, hash);
if (value == null) {
globalStatsCounter.recordMisses(1);
} else {
globalStatsCounter.recordHits(1);
}
return value;
}
// `getLiveEntry`方法传入当前时间,过期返回空
V get(Object key, int hash) {
try {
if (count != 0) { // read-volatile
long now = map.ticker.read();
ReferenceEntry<K, V> e = getLiveEntry(key, hash, now);
if (e == null) {
return null;
}
V value = e.getValueReference().get();
if (value != null) { // 未被GC回收
// 放入recencyQueue队列中
recordRead(e, now);
// 当前时间和写入的时间超过刷新间隔,重新加载value
return scheduleRefresh(e, e.getKey(), hash, value, now, map.defaultLoader);
}
// 垃圾回收
tryDrainReferenceQueues();
}
return null;
} finally {
// 清超时和GC数据
postReadCleanup();
}
}
ReferenceEntry<K, V> getLiveEntry(Object key, int hash, long now) {
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e == null) {
return null;
} else if (map.isExpired(e, now)) { // 缓存已过期
tryExpireEntries(now);
return null;
}
return e;
}
参考文档
漫谈Guava之Cache(二)
https://blog.yjll.blog/post/a6e28e21.html