使用Golang編寫Cache
學習如何使用golang從0編寫cache系統
前言
- 本篇主要參考7天用Go从零实现分布式缓存GeeCache,主要用來當作筆記以供學習
為什麼需要緩存?
- 請求網頁時,網頁會將第一次請求的內容存入緩存中(通常為key-value),以後再次訪問時,就可以直接從緩存中提取,從而降低載入時間
本地緩存 vs 分布式緩存
- 本地緩存
- 這邊的本地緩存通常指的為應用中的緩存組件,最大的優點就是應用和cache在同一個進程裡面,不會發生過多的網路開銷.缺點就是沒辦法共享緩存,個應用或集群都要維護自己的緩存,從而造成大量的開銷
- 分布式緩存
- 指的是應用和組件分離的緩存服務,優點就是緩存為一個獨立的應用,和本地分離,因此可以直接共享緩存
LRU緩存淘汰政策
- 核心思想就是維護一個隊列,如果該條紀錄已經被訪問過了,就會到隊尾,那麼隊首變為最少訪問的紀錄,可以直接淘汰
- 使用go語言中的
container/list
當作map中的值,該list為一個雙向隊列 - entry是雙向隊列的數據,這麼做的目的是當要淘汰隊列的首節點時,使用key從字典中映射即可
type Cache struct {
maxBytes int64
nbytes int64
ll *list.List
cache map[string]*list.Element
onEvicted func(key string, value Value)
}
type entry struct {
key string
value Value
}
type Value interface {
Len() int
}
func (c *Cache) Len() int {
return c.ll.Len()
}
func New(maxBytes int64, onEvicted func(string, Value)) *Cache {
return &Cache{
maxBytes: maxBytes,
ll: list.New(),
cache: make(map[string]*list.Element),
onEvicted: onEvicted,
}
}
查找功能
- 如果鍵對應的值存在,將該節點移動到隊尾,並返回值
c.ll.MoveToFront(ele)
即為將值移動到隊尾(在這裡的front代表隊尾)
func (c *Cache) Get(key string) (value Value, ok bool) {
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
return kv.value, true
}
return
}
刪除功能
- 實際上為淘汰最少訪問的首位節點,從linklist中刪除
- 使用delete從字典中刪除該key
- nbytes更新為當前正確的數值
- 如果onEvicted不為nil的話,調用該函數
func (c *Cache) RemoveOldest() {
ele := c.ll.Back()
if ele != nil {
c.ll.Remove(ele)
kv := ele.Value.(*entry)
delete(c.cache, kv.key)
c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len())
if c.onEvicted != nil {
c.onEvicted(kv.key, kv.value)
}
}
}
新增
- 如果key存在的話,更新對應的值,並將該節點移動到隊尾
- 不存在的話首先在隊尾新增該節點,並且新增和map的映射關係
- 更新nbytes後如果超過最大值的話,調用Remove函數
func (c *Cache) Add(key string, value Value) {
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len())
kv.value = value
} else {
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
c.nbytes += int64(len(key)) + int64(value.Len())
}
for c.maxBytes != 0 && c.maxBytes < c.nbytes {
c.RemoveOldest()
}
}
分布式緩存算法
一至性哈希
- 將key映射到
2^32
的空間中,將這個數字首尾相連,行程一個環- 計算節點/機器的哈希值,放置在環上
- 計算key的哈希值,放置在環上,順時針尋找到的第一個節點,就是應選取的節點/機器
傾斜問題
- 如果服務器節點過少,很容易都對應到同個節點,造成環存節點間負載不均衡
- 使用虛擬節點的概念解決
- 計算虛擬節點上的Hash值,放置在環上
- 計算key的Hash值,在環上順時針尋找對應選取得虛擬節點例如
peer2-1
對應的就是真實節點peer2
實現一至性哈希
初始化
- 定義函數類型的Hash,採取依賴注入的方式,允許用來替換成自定義的Hash函數,默認為ChecksumIEEE算法
- Map是一至性哈希算法的主數據結構,包含
- Hash函數 hash
- 虛擬節點倍數 replicas
- 虛擬節點和真實節點的映射表 hashMap,key是虛擬節點的哈希值,值是真實節點的名稱
- 構造函數New允許自定義虛擬節點倍數和Hash函數
type Hash func(data []byte) uint32
// Map constains all hashed keys
type Map struct {
hash Hash
replicas int
keys []int // Sorted
hashMap map[int]string
}
// New creates a Map instance
func New(replicas int, fn Hash) *Map {
m := &Map{
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
}
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
}
return m
}
新增真實節點/機器
- Add函數允許傳入0或多個真實節點的名稱
- 對每一個真實節點key,對應創建
m.replicas
個虛擬節點 - 使用
m.hash()
計算虛擬節點的哈希值,再使用append添加到環上 - 在hashMap中增加虛擬節點和真實節點的映射關西
- 依照環上哈希排序
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}
選擇節點
- 計算key的哈希值
- 順時針找到第一個匹配虛擬節點的下標idx,從
m.keys
中或去對應的哈希值,如果len相同,說明選擇第一個,因為keys是一個環的結構 - 透過hashMap映射得到真實節點
func (m *Map) Get(key string) string {
if len(m.keys) == 0 {
return ""
}
hash := int(m.hash([]byte(key)))
// Binary search for appropriate replica.
idx := sort.Search(len(m.keys), func(i int) bool {
return m.keys[i] >= hash
})
return m.hashMap[m.keys[idx%len(m.keys)]]
}
緩存問題和解決方案
- 緩存雪崩
- 緩存在同一時刻全部失效,遭成瞬間DB請求量大,壓力驟增引起雪崩.通常因為緩存服務器當機,緩存的key設置相同過期時間導致
- 緩存擊穿
- 一個存在的key在緩存過期的那一刻,同時有大量的請求,這些請求都會擊穿DB,造成順時DB請求量大,壓力驟增
- 緩存穿透
- 查詢一個不存在的數據,因為不存在則不會寫入緩存,所以美以都會請求DB,造成瞬間流量過大,穿透DB造成當機
實現解決方法
- 創建call和group類型
- call代表正在進行或已經結束的請求
- Group管理不同key的請求
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
type Group struct {
mu sync.Mutex // protects m
m map[string]*call
}
- Do方法的作用為針對相同key,無論Do被調用多少次,函數fn都只會被調用一次,等待fn調用結束,return返回值或error
- 使用延遲初始化
g.m
以提高內存使用效率
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}
重點地方
g.mu
是保護Group的成員變量m不被併發讀寫加上鎖
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
if c, ok := g.m[key]; ok {
c.wg.Wait() //如果請求正在進行中則等待
return c.val, c.err //請求結束返回結果
}
c := new(call)
c.wg.Add(1) //發起請求前保護鎖
g.m[key] = c //添加g.m,表明key已經有對應的請求在處理
c.val, c.err = fn() //調用fn發起請求
c.wg.Done() //請求結束
delete(g.m, key) //更新 g.m
return c.val, c.err //返回结果
}
使用
- 修改geecache中的Group,他加成員變量loader並更新構建函數NewGroup
- 修改load函數,將原來的load邏輯使用
g.loader.Do
包裹,確保併發場景下針對同個key,load只會調用一次
type Group struct {
name string
getter Getter
mainCache cache
peers PeerPicker
// use singleflight.Group to make sure that
// each key is only fetched once
loader *singleflight.Group
}
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
// ...
g := &Group{
// ...
loader: &singleflight.Group{},
}
return g
}
func (g *Group) load(key string) (value ByteView, err error) {
// each key is only fetched once (either locally or remotely)
// regardless of the number of concurrent callers.
viewi, err := g.loader.Do(key, func() (interface{}, error) {
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok {
if value, err = g.getFromPeer(peer, key); err == nil {
return value, nil
}
log.Println("[GeeCache] Failed to get from peer", err)
}
}
return g.getLocally(key)
})
if err == nil {
return viewi.(ByteView), nil
}
return
}
使用Protocol通信
syntax = "proto3";
package geecachepb;
option go_package = "./";
message Request {
string group = 1;
string key = 2;
}
message Response {
bytes value = 1;
}
service GroupCache {
rpc Get(Request) returns (Response);
}
//protoc --go_out=. *.proto
使用
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// ...
// Write the value to the response body as a proto message.
body, err := proto.Marshal(&pb.Response{Value: view.ByteSlice()})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(body)
}
func (h *httpGetter) Get(in *pb.Request, out *pb.Response) error {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(in.GetGroup()),
url.QueryEscape(in.GetKey()),
)
res, err := http.Get(u)
// ...
if err = proto.Unmarshal(bytes, out); err != nil {
return fmt.Errorf("decoding response body: %v", err)
}
return nil
}