使用Golang編寫Cache

學習如何使用golang從0編寫cache系統


前言

為什麼需要緩存?

  • 請求網頁時,網頁會將第一次請求的內容存入緩存中(通常為key-value),以後再次訪問時,就可以直接從緩存中提取,從而降低載入時間

本地緩存 vs 分布式緩存

  • 本地緩存
    • 這邊的本地緩存通常指的為應用中的緩存組件,最大的優點就是應用和cache在同一個進程裡面,不會發生過多的網路開銷.缺點就是沒辦法共享緩存,個應用或集群都要維護自己的緩存,從而造成大量的開銷
  • 分布式緩存
    • 指的是應用和組件分離的緩存服務,優點就是緩存為一個獨立的應用,和本地分離,因此可以直接共享緩存

LRU緩存淘汰政策

  • 核心思想就是維護一個隊列,如果該條紀錄已經被訪問過了,就會到隊尾,那麼隊首變為最少訪問的紀錄,可以直接淘汰
  • 使用go語言中的container/list當作map中的值,該list為一個雙向隊列
  • entry是雙向隊列的數據,這麼做的目的是當要淘汰隊列的首節點時,使用key從字典中映射即可
type Cache struct {
 maxBytes  int64
 nbytes    int64
 ll        *list.List
 cache     map[string]*list.Element
 onEvicted func(key string, value Value)
}
 
type entry struct {
 key   string
 value Value
}
 
type Value interface {
 Len() int
}
 
func (c *Cache) Len() int {
 return c.ll.Len()
}
 
func New(maxBytes int64, onEvicted func(string, Value)) *Cache {
 return &Cache{
  maxBytes:  maxBytes,
  ll:        list.New(),
  cache:     make(map[string]*list.Element),
  onEvicted: onEvicted,
 }
}

查找功能

  • 如果鍵對應的值存在,將該節點移動到隊尾,並返回值
  • c.ll.MoveToFront(ele) 即為將值移動到隊尾(在這裡的front代表隊尾)
func (c *Cache) Get(key string) (value Value, ok bool) {
 if ele, ok := c.cache[key]; ok {
  c.ll.MoveToFront(ele)
  kv := ele.Value.(*entry)
  return kv.value, true
 }
 return
}

刪除功能

  • 實際上為淘汰最少訪問的首位節點,從linklist中刪除
  • 使用delete從字典中刪除該key
  • nbytes更新為當前正確的數值
  • 如果onEvicted不為nil的話,調用該函數
func (c *Cache) RemoveOldest() {
 ele := c.ll.Back()
 if ele != nil {
  c.ll.Remove(ele)
  kv := ele.Value.(*entry)
  delete(c.cache, kv.key)
  c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len())
  if c.onEvicted != nil {
   c.onEvicted(kv.key, kv.value)
  }
 }
}

新增

  • 如果key存在的話,更新對應的值,並將該節點移動到隊尾
  • 不存在的話首先在隊尾新增該節點,並且新增和map的映射關係
  • 更新nbytes後如果超過最大值的話,調用Remove函數
func (c *Cache) Add(key string, value Value) {
 if ele, ok := c.cache[key]; ok {
  c.ll.MoveToFront(ele)
  kv := ele.Value.(*entry)
  c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len())
  kv.value = value
 } else {
  ele := c.ll.PushFront(&entry{key, value})
  c.cache[key] = ele
  c.nbytes += int64(len(key)) + int64(value.Len())
 }
 for c.maxBytes != 0 && c.maxBytes < c.nbytes {
  c.RemoveOldest()
 }
}

分布式緩存算法

一至性哈希

  • 將key映射到2^32的空間中,將這個數字首尾相連,行程一個環
    • 計算節點/機器的哈希值,放置在環上
    • 計算key的哈希值,放置在環上,順時針尋找到的第一個節點,就是應選取的節點/機器

傾斜問題

  • 如果服務器節點過少,很容易都對應到同個節點,造成環存節點間負載不均衡
  • 使用虛擬節點的概念解決
    • 計算虛擬節點上的Hash值,放置在環上
    • 計算key的Hash值,在環上順時針尋找對應選取得虛擬節點例如peer2-1對應的就是真實節點peer2

實現一至性哈希

初始化

  • 定義函數類型的Hash,採取依賴注入的方式,允許用來替換成自定義的Hash函數,默認為ChecksumIEEE算法
  • Map是一至性哈希算法的主數據結構,包含
    1. Hash函數 hash
    2. 虛擬節點倍數 replicas
    3. 虛擬節點和真實節點的映射表 hashMap,key是虛擬節點的哈希值,值是真實節點的名稱
  • 構造函數New允許自定義虛擬節點倍數和Hash函數
type Hash func(data []byte) uint32
 
// Map constains all hashed keys
type Map struct {
 hash     Hash
 replicas int
 keys     []int // Sorted
 hashMap  map[int]string
}
 
// New creates a Map instance
func New(replicas int, fn Hash) *Map {
 m := &Map{
  replicas: replicas,
  hash:     fn,
  hashMap:  make(map[int]string),
 }
 if m.hash == nil {
  m.hash = crc32.ChecksumIEEE
 }
 return m
}

新增真實節點/機器

  • Add函數允許傳入0或多個真實節點的名稱
  • 對每一個真實節點key,對應創建m.replicas個虛擬節點
  • 使用m.hash()計算虛擬節點的哈希值,再使用append添加到環上
  • 在hashMap中增加虛擬節點和真實節點的映射關西
  • 依照環上哈希排序
func (m *Map) Add(keys ...string) {
 for _, key := range keys {
  for i := 0; i < m.replicas; i++ {
   hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
   m.keys = append(m.keys, hash)
   m.hashMap[hash] = key
  }
 }
 sort.Ints(m.keys)
}

選擇節點

  • 計算key的哈希值
  • 順時針找到第一個匹配虛擬節點的下標idx,從m.keys中或去對應的哈希值,如果len相同,說明選擇第一個,因為keys是一個環的結構
  • 透過hashMap映射得到真實節點
func (m *Map) Get(key string) string {
 if len(m.keys) == 0 {
  return ""
 }
 
 hash := int(m.hash([]byte(key)))
 // Binary search for appropriate replica.
 idx := sort.Search(len(m.keys), func(i int) bool {
  return m.keys[i] >= hash
 })
 
 return m.hashMap[m.keys[idx%len(m.keys)]]
}

緩存問題和解決方案

  • 緩存雪崩
    • 緩存在同一時刻全部失效,遭成瞬間DB請求量大,壓力驟增引起雪崩.通常因為緩存服務器當機,緩存的key設置相同過期時間導致
  • 緩存擊穿
    • 一個存在的key在緩存過期的那一刻,同時有大量的請求,這些請求都會擊穿DB,造成順時DB請求量大,壓力驟增
  • 緩存穿透
    • 查詢一個不存在的數據,因為不存在則不會寫入緩存,所以美以都會請求DB,造成瞬間流量過大,穿透DB造成當機

實現解決方法

  • 創建call和group類型
  • call代表正在進行或已經結束的請求
  • Group管理不同key的請求
type call struct {
 wg  sync.WaitGroup
 val interface{}
 err error
}
 
type Group struct {
 mu sync.Mutex       // protects m
 m  map[string]*call
}
  • Do方法的作用為針對相同key,無論Do被調用多少次,函數fn都只會被調用一次,等待fn調用結束,return返回值或error
  • 使用延遲初始化g.m以提高內存使用效率
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
 g.mu.Lock()
 if g.m == nil {
  g.m = make(map[string]*call)
 }
 if c, ok := g.m[key]; ok {
  g.mu.Unlock()
  c.wg.Wait()
  return c.val, c.err
 }
 c := new(call)
 c.wg.Add(1)
 g.m[key] = c
 g.mu.Unlock()
 
 c.val, c.err = fn()
 c.wg.Done()
 
 g.mu.Lock()
 delete(g.m, key)
 g.mu.Unlock()
 
 return c.val, c.err
}

重點地方

  • g.mu是保護Group的成員變量m不被併發讀寫加上鎖
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
 if c, ok := g.m[key]; ok {
  c.wg.Wait()   //如果請求正在進行中則等待
  return c.val, c.err //請求結束返回結果
 }
 c := new(call)
 c.wg.Add(1)       //發起請求前保護鎖
 g.m[key] = c      //添加g.m,表明key已經有對應的請求在處理
 
 c.val, c.err = fn() //調用fn發起請求
 c.wg.Done()         //請求結束
 
    delete(g.m, key)    //更新 g.m
 
 return c.val, c.err //返回结果
}

使用

  • 修改geecache中的Group,他加成員變量loader並更新構建函數NewGroup
  • 修改load函數,將原來的load邏輯使用g.loader.Do包裹,確保併發場景下針對同個key,load只會調用一次
type Group struct {
 name      string
 getter    Getter
 mainCache cache
 peers     PeerPicker
 // use singleflight.Group to make sure that
 // each key is only fetched once
 loader *singleflight.Group
}
 
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
    // ...
 g := &Group{
        // ...
  loader:    &singleflight.Group{},
 }
 return g
}
 
func (g *Group) load(key string) (value ByteView, err error) {
 // each key is only fetched once (either locally or remotely)
 // regardless of the number of concurrent callers.
 viewi, err := g.loader.Do(key, func() (interface{}, error) {
  if g.peers != nil {
   if peer, ok := g.peers.PickPeer(key); ok {
    if value, err = g.getFromPeer(peer, key); err == nil {
     return value, nil
    }
    log.Println("[GeeCache] Failed to get from peer", err)
   }
  }
 
  return g.getLocally(key)
 })
 
 if err == nil {
  return viewi.(ByteView), nil
 }
 return
}

使用Protocol通信

syntax = "proto3";
 
package geecachepb;
 
option go_package = "./";
 
message Request {
  string group = 1;
  string key = 2;
}
 
message Response {
  bytes value = 1;
}
 
service GroupCache {
  rpc Get(Request) returns (Response);
}
 
//protoc --go_out=. *.proto

使用

func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // ...
 // Write the value to the response body as a proto message.
 body, err := proto.Marshal(&pb.Response{Value: view.ByteSlice()})
 if err != nil {
  http.Error(w, err.Error(), http.StatusInternalServerError)
  return
 }
 w.Header().Set("Content-Type", "application/octet-stream")
 w.Write(body)
}
 
func (h *httpGetter) Get(in *pb.Request, out *pb.Response) error {
 u := fmt.Sprintf(
  "%v%v/%v",
  h.baseURL,
  url.QueryEscape(in.GetGroup()),
  url.QueryEscape(in.GetKey()),
 )
    res, err := http.Get(u)
 // ...
 if err = proto.Unmarshal(bytes, out); err != nil {
  return fmt.Errorf("decoding response body: %v", err)
 }
 
 return nil
}