漫谈Guava之Cache(二)

之前的文章介绍了Guava Cache的使用,本文将剖析Cache的源码,对Guava Cache的不太了解的可以移步这里:

漫谈Guava之Cache(一)

创建

创建Cache需通过CacheBuilder,该处使用了建造者模式,我们来看CacheBuilder的build方法。


public <K1 extends K, V1 extends V> Cache<K1, V1> build() {
  checkWeightWithWeigher();
  checkNonLoadingCache();
  return new LocalCache.LocalManualCache<>(this);
}
LocalManualCache(CacheBuilder<? super K, ? super V> builder) {
  this(new LocalCache<K, V>(builder, null));
}

LocalManualCache 相当于LocalCache的包装类,大部分方法直接使用LocalCache的实现,部分方法自己实现了一下。我们直接看LocalCache的逻辑,LocalCache的实现参考了ConcurrentHashMap,采用分段锁技术,当一个线程访问一个 Segment 时,不会影响其他的 Segment

Segment

Segment<K, V> segmentFor(int hash) {
  return segments[(hash >>> segmentShift) & segmentMask];
}

Segment中维护了5个队列

  • keyReferenceQueue key引用队列
  • valueReferenceQueue value引用队列
  • recencyQueue 临时队列,数据刷新清空该队列,队列中数据同步到accessQueue中
  • writeQueue 写队列
  • accessQueue 访问队列

存放元素

segmentFor(hash)根据hash值取到Segment,优化并发操作。
加锁,preWriteCleanup删除被GC的数据,判断加入的元素在缓存中是否存在,若存在替换enqueueNotification或者不做更改,不存在新增放入缓存中,若缓存元素超限删除不活跃元素evictEntries(LRU),最后释放锁并推送删除的元素给监听者postWriteCleanup


@Override
public V put(K key, V value) {
  checkNotNull(key);
  checkNotNull(value);
  int hash = hash(key);
  return segmentFor(hash).put(key, hash, value, false);
}


V put(K key, int hash, V value, boolean onlyIfAbsent) {
  lock();
  try {
    long now = map.ticker.read();
    // 清理过期数据和被GC回收的数据
    preWriteCleanup(now);
	
    int newCount = this.count + 1;
    // 扩容
    if (newCount > this.threshold) { // ensure capacity
      expand();
      newCount = this.count + 1;
    }

    AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
    int index = hash & (table.length() - 1);
    ReferenceEntry<K, V> first = table.get(index);

    // Look for an existing entry.
    for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
      K entryKey = e.getKey();
      if (e.getHash() == hash
          && entryKey != null
          && map.keyEquivalence.equivalent(key, entryKey)) {
        // We found an existing entry.

        ValueReference<K, V> valueReference = e.getValueReference();
        V entryValue = valueReference.get();

        // 表示触发GC,值被回收了
        if (entryValue == null) {
          ++modCount;
          // 排除CacheLoad加载的数据
          if (valueReference.isActive()) {
            // 移入删除队列中,由删除队列触发监听
            enqueueNotification(
                key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
            setValue(e, key, value, now);
            newCount = this.count; // count remains unchanged
          } else {
            setValue(e, key, value, now);
            newCount = this.count + 1;
          }
          this.count = newCount; // write-volatile
          // 移除原来元素
          evictEntries(e);
          return null;
        } else if (onlyIfAbsent) { 
          // Mimic
          // "if (!map.containsKey(key)) ...
          // else return map.get(key);
          // 只更新最后访问时间
          recordLockedRead(e, now);
          return entryValue;
        } else {
          // clobber existing entry, count remains unchanged
          ++modCount;
          // 与第一个条件分支逻辑类似
          enqueueNotification(
              key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
          setValue(e, key, value, now);
          evictEntries(e);
          return entryValue;
        }
      }
    }

    // Create a new entry.
    // 缓存中没有,创建一个
    ++modCount;
    ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
    // 包装key,value放到newEntry中,并调用recordWrite()方法,更新权重,将entry存入accessQueue,writeQueue队列中
    setValue(newEntry, key, value, now);
    table.set(index, newEntry);
    newCount = this.count + 1;
    // count用volatile修饰,保证内存可见性
    this.count = newCount; // write-volatile
    // 新增元素后有可能导致缓存数据过多,需进行清理,优先删除较少访问的
    evictEntries(newEntry);
    return null;
  } finally {
    unlock();
    // 移除的元素会放到单独队列中,此时触发监听
    postWriteCleanup();
  }
}

添加或替换元素时会触发recordWrite,更新写队列和读队列


void setValue(ReferenceEntry<K, V> entry, K key, V value, long now) {
  ValueReference<K, V> previous = entry.getValueReference();
  int weight = map.weigher.weigh(key, value);
  checkState(weight >= 0, "Weights must be non-negative");

  ValueReference<K, V> valueReference =
      map.valueStrength.referenceValue(this, entry, value, weight);
  entry.setValueReference(valueReference);
  recordWrite(entry, weight, now);
  previous.notifyNewValue(value);
}

void recordWrite(ReferenceEntry<K, V> entry, int weight, long now) {
  // we are already under lock, so drain the recency queue immediately
  drainRecencyQueue();
  totalWeight += weight;

  if (map.recordsAccess()) {
    entry.setAccessTime(now);
  }
  if (map.recordsWrite()) {
    entry.setWriteTime(now);
  }
  accessQueue.add(entry);
  writeQueue.add(entry);
}

缓存元素有变动时会调用drainRecencyQueue方法,将recencyQueue队列数据清空,并按照队列顺序放入访问队列中以备LRU策略删除

void drainRecencyQueue() {
    ReferenceEntry<K, V> e;
    while ((e = recencyQueue.poll()) != null) {
      // An entry may be in the recency queue despite it being removed from
      // the map . This can occur when the entry was concurrently read while a
      // writer is removing it from the segment or after a clear has removed
      // all of the segment's entries.
      if (accessQueue.contains(e)) {
        accessQueue.add(e);
      }
    }
  }

取元素

和存放逻辑一样,也要先取对应的Segment。从Segment中取出value,value不为空继续判断是否超过缓存过期时间,若元素可用,更新读队列,最后清除GC和过期数据。

public @Nullable V getIfPresent(Object key) {
  int hash = hash(checkNotNull(key));
  V value = segmentFor(hash).get(key, hash);
  if (value == null) {
    globalStatsCounter.recordMisses(1);
  } else {
    globalStatsCounter.recordHits(1);
  }
  return value;
}

// `getLiveEntry`方法传入当前时间,过期返回空

V get(Object key, int hash) {
  try {
    if (count != 0) { // read-volatile
      long now = map.ticker.read();
      ReferenceEntry<K, V> e = getLiveEntry(key, hash, now);
      if (e == null) {
        return null;
      }

      V value = e.getValueReference().get();
      if (value != null) { // 未被GC回收
        // 放入recencyQueue队列中
        recordRead(e, now);
        // 当前时间和写入的时间超过刷新间隔,重新加载value
        return scheduleRefresh(e, e.getKey(), hash, value, now, map.defaultLoader);
      }
      // 垃圾回收
      tryDrainReferenceQueues();
    }
    return null;
  } finally {
    // 清超时和GC数据
    postReadCleanup();
  }
}

ReferenceEntry<K, V> getLiveEntry(Object key, int hash, long now) {
  ReferenceEntry<K, V> e = getEntry(key, hash);
  if (e == null) {
    return null;
  } else if (map.isExpired(e, now)) { // 缓存已过期
    tryExpireEntries(now);
    return null;
  }
  return e;
}

参考文档

Guava cacha 机制及源码分析

guava Cache源码分析(二)


漫谈Guava之Cache(二)
https://blog.yjll.blog/post/a6e28e21.html
作者
简斋
发布于
2020年3月28日
许可协议