万字解析Golang基于桶思想的map实现原理
0、引言相信大家对Map这个数据结构都不陌生,像C++的map、Java的HashMap。各个语言的底层实现各有不同,在本篇博客中,我将分享个人对Go的map实现的理解,以及深入源码进行分析,相信耐心看完一定会收获不少。
1、宏观结构
相信大家对于map的基本使用都不陌生,Golang中的map是不允许并发写操作的,这里的写指代的是宏观意义上的“写”,包括了更新、插入、删除操作。当发生了并发写时,程序会抛出fatal,这是一个非常严重的错误,会直接中断程序的进行,因此对于同一个map,它不应该被共享到多个协程中。
我们可以通过以下代码来验证:
func main() {
hm := mapint{1: 1, 2: 2, 3: 3}
for i := 4; i <= 9999; i++ {
go func(num int) {
hm = i
}(i)
}
}math.MulUintptr实现如下:
type hmap struct {
count int // # live cells == size of map.Must be first (used by len() builtin)
flags uint8
B uint8// log_2 of # of buckets (can hold up to loadFactor * 2^B items)
noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details
hash0 uint32 // hash seed
buckets unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
nevacuateuintptr // progress counter for evacuation (buckets less than this have been evacuated)
extra *mapextra // optional fields
}
在这里,bucketCnt为8。若count= 4 { nbuckets += bucketShift(b - 4) //计算分配的总内存大小 sz := t.Bucket.Size_ * nbuckets //将内存大小向上对齐到合适的大小,是内存分配的一个优化。 up := roundupsize(sz, t.Bucket.PtrBytes == 0) if up != sz { //调整桶数量,使得内存被充分利用 nbuckets = up / t.Bucket.Size_ } } if dirtyalloc == nil { //分配nbuckets个桶 buckets = newarray(t.Bucket, int(nbuckets)) } else { //复用旧的内存 buckets = dirtyalloc size := t.Bucket.Size_ * nbuckets if t.Bucket.PtrBytes != 0 { memclrHasPointers(buckets, size) } else { memclrNoHeapPointers(buckets, size) } } if base != nbuckets { //如果base和nbuckets的数量不同,说明预分配了溢出桶,需要设置溢出桶链表 //指向第一个可用的预分配溢出桶,计算出溢出桶的起始位置 nextOverflow = (*bmap)(add(buckets, base*uintptr(t.BucketSize))) //最后一个预分配的溢出桶的位置 last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.BucketSize))) //将最后一个溢出桶的指针设置为buckets,形成一个环形链表,用于后面的分配判断 last.setoverflow(t, (*bmap)(buckets)) } return buckets, nextOverflow}makeBucketArray方法会根据初始的对数B来判断是否需要分配溢出桶。若B>=4,则需要预分配的溢出桶数量为2^(B-4)。确定好桶的总数后,会根据dirtyalloc是否为nil来判断是否需要新开辟空间。最后会返回指向桶数组的指针以及指向首个溢出桶位置的指针。
当最后返回到上层的makemap方法中,最终创造出的map结构如图:
2.2、map的读流程
2.2.1、读流程步骤总览
大致流程如下:
1、检查表是否为nil,或者表是否没有元素,若是则直接返回零值。
2、若处在并发写状态,则会导致程序崩溃(fatal)。
3、计算key对应的hash值,并且定位到对应的桶上。
4、若数据在旧桶,且数据没有迁移到新桶中,就在旧桶查找,否则在新桶中查找。
5、外层遍历桶数组的每个桶,内层遍历桶的每个kv对,找到了就返回value,否则返回零值
2.2.2、源码跟进mapaccess1
type mapextra struct {
overflow *[]*bmap
oldoverflow *[]*bmap
nextOverflow *bmap
}(1)若哈希表为空,或不存在键值对,则会返回零值。在此之前,会检查key是否合法,非法会触发panic。
type bmap struct {
tophash uint8
}(2)若存在并发写map,会立刻报错,使得程序停止运行。flags的第3个bit位标识map是否处于并发写状态。
type bmap struct {
tophash uint8
keys T
values T
overflow unsafe.Pointer
}(3)计算key的hash值,并且对桶数量取模,定位到具体的桶。取模运算为x & (mod-1),只有mod为2的幂时可以加速。
//makemap为make(mapv,hint)实现Go映射创建
func makemap(t *maptype, hint int, h *hmap) *hmap {
mem, overflow := math.MulUintptr(uintptr(hint), t.Bucket.Size_)
if overflow || mem > maxAlloc {
hint = 0
}
// initialize Hmap
if h == nil {
h = new(hmap)
}
h.hash0 = uint32(rand())
// Find the size parameter B which will hold the requested # of elements.
// For hint < 0 overLoadFactor returns false since hint < bucketCnt.
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B
//若B==0,那么buckets将会采取懒创建的策略,会在未来要写map的方法mapassign中创建。
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}(4)检查是否存在旧桶,存在旧桶且数据未搬迁完成则去旧桶中找key,否则在新桶找。
mem, overflow := math.MulUintptr(uintptr(hint), t.Bucket.Size_)
if overflow || mem > maxAlloc {
hint = 0
}在取旧桶的时候,会根据evacuated判断数据是否已经迁移到新的桶:判断方法是取桶首个元素的tophash值,若值为2,3,4中的一个,代表数据已经迁移完成。
func MulUintptr(a, b uintptr) (uintptr, bool) {
if a|b < 1<<(4*goarch.PtrSize) || a == 0 {
return a * b, false
}
overflow := b > MaxUintptr/a
return a * b, overflow
}
(5)取key的hash值的高8位值top,若值> (goarch.PtrSize*8 - 8)) if top < minTopHash { top += minTopHash } return top}(6)外层b遍历每一个桶,内层遍历b中的每一个kv对,对比每一个kv对的tophash值是否和要查询的key的top值是否相同进行查找。
// initialize Hmap
if h == nil {
h = new(hmap)
}若两个hash值不同,并且桶中的当前键值对的tophash为0,表示后续没有元素,直接退出循环返回零值。否则检查下一个kv。
h.hash0 = uint32(rand())若找到了,就根据内存偏移找到对应的value并且返回。注意:会调用key.Equal方法具体检查要读的key和当前key是否一样,避免因为哈希冲突导致读取了错误的value。
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B否则最终返回0值。
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}2.3、map的写流程
2.3.1、写流程步骤总览
大致流程如下:
[*]1、若表为nil,则panic,若处在并发写,则fatal
[*]2、获取key的hash值,用于校验是否已经存在,需要覆盖
[*]3、设置处于写状态
[*]4、懒创建buckets,若B==0,则buckets会在第一次写的时候创建
[*]5、根据hash定位到具体的bucket中,若表处在扩容阶段则调用growWork辅助扩容;创建三个拟插入位置指针,分别存储要插入的tophash、key、value的位置。(作用是若遇见空位置,就存储,然后要继续看是否存在相同的key要覆盖。)
[*]6、遍历该桶的每一个kv,会遇到两种情况:
[*]若当前槽位的tophash和要插入的键值对的tophash不相同,那么检查是否是空槽,是则更新拟存储指针;若当前槽位是空槽,会继续检查对后面是否存在kv的标识,若后面全是空槽了,就可以直接退出了不必继续遍历。
[*]若相同,那就直接进行覆盖操作,更新完成直接到第10步进行收尾。
[*]7、如果我们没有找到要插入的位置,或者要插入的位置是当前桶的最后一个槽位,检查以下条件决定是否进行扩容:
[*]Count+1 > loadfactor * 2^h.B,即总键值对 > 负载因子*总桶数
[*]h.noverflow > threshold:如果 溢出桶过多,说明冲突严重,也要扩容。
发生扩容后,刚刚的记录就无效了,重新到第5步。
[*]8、若不扩容,且没有插入的位置(没有空槽,也没有覆盖),就新创建一个新桶,连接到当前桶的后面作为溢出桶,插入到新桶的第一个位置上。这个新桶可以是新分配的,也可以是一开始创建表就预分配的(优先)。
[*]9、对拟插入的位置进行实际的插入
[*]10、收尾,再次检查是否处在并发写状态,是则fatal,否则重置写状态标识,然后退出。
2.3.2、源码跟进mapassign
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}(1)错误处理:map为空则panic,并发写则出发fatal。
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
//初始桶数量
base := bucketShift(b)
//最终桶数量,初始和base相同
nbuckets := base
//溢出桶预分配
if b >= 4 {
nbuckets += bucketShift(b - 4)
//计算分配的总内存大小
sz := t.Bucket.Size_ * nbuckets
//将内存大小向上对齐到合适的大小,是内存分配的一个优化。
up := roundupsize(sz, t.Bucket.PtrBytes == 0)
if up != sz {
//调整桶数量,使得内存被充分利用
nbuckets = up / t.Bucket.Size_
}
}
if dirtyalloc == nil {
//分配nbuckets个桶
buckets = newarray(t.Bucket, int(nbuckets))
} else {
//复用旧的内存
buckets = dirtyalloc
size := t.Bucket.Size_ * nbuckets
if t.Bucket.PtrBytes != 0 {
memclrHasPointers(buckets, size)
} else {
memclrNoHeapPointers(buckets, size)
}
}
if base != nbuckets {
//如果base和nbuckets的数量不同,说明预分配了溢出桶,需要设置溢出桶链表
//指向第一个可用的预分配溢出桶,计算出溢出桶的起始位置
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.BucketSize)))
//最后一个预分配的溢出桶的位置
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.BucketSize)))
//将最后一个溢出桶的指针设置为buckets,形成一个环形链表,用于后面的分配判断
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}(2)标识map处在写的状态,并且懒创建桶。
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
//...
if h == nil || h.count == 0 {
if err := mapKeyError(t, key); err != nil {
panic(err) // see issue 23734
}
return unsafe.Pointer(&zeroVal)
}
if h.flags&hashWriting != 0 {
fatal("concurrent map read and map write")
}
hash := t.Hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.BucketSize)))
if c := h.oldbuckets; c != nil {
if !h.sameSizeGrow() {
// There used to be half as many buckets; mask down one more power of two.
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.BucketSize)))
if !evacuated(oldb) {
b = oldb
}
}
if c := h.oldbuckets; c != nil { //c!=nil,说明旧桶未完成迁移(rehash)
if !h.sameSizeGrow() { //是否是等量扩容
//如果不是等量扩容,调整 hash 掩码(mask)
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.BucketSize)))//计算旧桶地址
if !evacuated(oldb) { //检查旧桶是否已搬迁
b = oldb //未搬迁则数据有效
}
}
bucketloop:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash != top {
if b.tophash == emptyRest {
break bucketloop
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
if t.IndirectKey() {
k = *((*unsafe.Pointer)(k))
}
if t.Key.Equal(key, k) {
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
if t.IndirectElem() {
e = *((*unsafe.Pointer)(e))
}
return e
}
}
}
return unsafe.Pointer(&zeroVal)
}(3)获取当前key对应的桶的桶索引
if h == nil || h.count == 0 {
if err := mapKeyError(t, key); err != nil {
panic(err) // see issue 23734
}
return unsafe.Pointer(&zeroVal)
}(4)若发现当前map处在扩容状态,则帮助其渐进扩容。具体在下文中提及。
hashWriting= 4 // a goroutine is writing to the map 4->100
if h.flags&hashWriting != 0 {
fatal("concurrent map read and map write")
}(5)进行地址偏移,定位到具体的桶b
hash := t.Hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.BucketSize)))(6)计算tophash
if c := h.oldbuckets; c != nil { //c!=nil,说明旧桶未完成迁移(rehash)
if !h.sameSizeGrow() { //是否是等量扩容
//如果不是等量扩容,调整 hash 掩码(mask)
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.BucketSize)))//计算旧桶地址
if !evacuated(oldb) { //检查旧桶是否已搬迁
b = oldb //未搬迁则数据有效
}
}(7)提前声明三个指针,用于指向存放kv对槽位
const emptyOne = 1
const evacuatedX = 2
const evacuatedY = 3
const evacuatedEmpty = 4
const minTopHash = 5
func evacuated(b *bmap) bool {
h := b.tophash
return h > emptyOne && h < minTopHash
}(8)开启循环,和读流程类似,外层遍历桶,内层遍历桶的每个位置。
top := tophash(hash)(9)若key的tophash和当前槽位的tophash不相同,则进行以下的检查:
[*]若该槽位是空的,那么就将kv拟插入在这个位置(先记录),因为可能存在相同的key在后面的桶中。
[*]若槽位当前位置tophash标识为emptyRest(0),则标识从当前槽位开始往后的槽位都是空的,就不用继续遍历了,直接退出bucketloop。(说明可以插入在此位置了)
func tophash(hash uintptr) uint8 {
top := uint8(hash >> (goarch.PtrSize*8 - 8))
if top < minTopHash {
top += minTopHash
}
return top
}(10)否则说明找到了相同的key,需要进行覆盖操作。更新完成后跳到done,执行收尾流程。注意:会调用key.Equal方法具体检查要写的key和当前key是否一样,避免因为哈希冲突导致原来不同的kv对被错误的覆盖。
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
//...
}
}(11)倘若没有相同的key,也没有剩余的空间了,则会考虑执行扩容模式,完成后再回到agian的位置重新桶定位以及遍历流程。
if b.tophash != top {
if b.tophash == emptyRest {
break bucketloop
}
continue
}触发扩容的条件:
[*]h.count+1 > loadFactor * 2^h.B:如果当前 map 达到负载因子上限,需要扩容。
[*]h.noverflow > threshold:如果 溢出桶过多,说明冲突严重,也要扩容。
[*]h.growing():检查是否 已经在扩容,如果已经在扩容,就不会触发新的扩容。
(12)若不执行扩容操作,也没有找到插入的位置,则新创建一个溢出桶,将kv拟插入在溢出桶的第一个位置。
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
if t.IndirectKey() {
k = *((*unsafe.Pointer)(k))
}
if t.Key.Equal(key, k) {
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
if t.IndirectElem() {
e = *((*unsafe.Pointer)(e))
}
return e
}创建新桶操作如下:
return unsafe.Pointer(&zeroVal)这里存在一个十分容易混淆的点:请注意,在最开始的makeBucketArray方法中,我们提及到了只有最后一个溢出桶它才设置了overflow指针,对于前面的溢出桶,overflow指针是nil,所以可以根据这个特性来判断当前的溢出桶是不是最后一个溢出桶。
用图来表示,每个桶经过了多次溢出桶扩展后的表状态,如下:
(13)在拟插入位置实际插入kv
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
//...
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
hash := t.Hasher(key, uintptr(h.hash0))
// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write.
h.flags ^= hashWriting
if h.buckets == nil {
h.buckets = newobject(t.Bucket) // newarray(t.Bucket, 1)
}
again:
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.BucketSize)))
top := tophash(hash)
var inserti *uint8
var insertk unsafe.Pointer
var elem unsafe.Pointer
bucketloop:
for {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash != top {
if isEmpty(b.tophash) && inserti == nil {
inserti = &b.tophash
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
}
if b.tophash == emptyRest {
break bucketloop
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
if t.IndirectKey() {
k = *((*unsafe.Pointer)(k))
}
if !t.Key.Equal(key, k) {
continue
}
// already have a mapping for key. Update it.
if t.NeedKeyUpdate() {
typedmemmove(t.Key, k, key)
}
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
goto done
}
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}
// Did not find mapping for key. Allocate new cell & add entry.
// If we hit the max load factor or we have too many overflow buckets,
// and we're not already in the middle of growing, start growing.
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}
if inserti == nil {
// The current bucket and all the overflow buckets connected to it are full, allocate a new one.
newb := h.newoverflow(t, b)
inserti = &newb.tophash
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.KeySize))
}
// store new key/elem at insert position
if t.IndirectKey() {
kmem := newobject(t.Key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.IndirectElem() {
vmem := newobject(t.Elem)
*(*unsafe.Pointer)(elem) = vmem
}
typedmemmove(t.Key, insertk, key)
*inserti = top
h.count++
done:
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
h.flags &^= hashWriting
if t.IndirectElem() {
elem = *((*unsafe.Pointer)(elem))
}
return elem
}(14)收尾流程,再次校验是否处在并发写,有则抛出fatal,否则将标记重置,然后退出。
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
//...
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
hash := t.Hasher(key, uintptr(h.hash0))2.4、map的删流程
2.4.1、删流程步骤总览
删流程步骤大致如下:
[*]1、若表为nil或者不存在元素,则直接返回;若处在并发写则fatal
[*]2、获取key的哈希因子,根据哈希值找到对应的桶
[*]3、若表处在扩容阶段,则利用growWork辅助扩容
[*]4、开始遍历查找要删除的元素,若没找到则直接退出查找流程,找到了则将值清为0值
[*]5、若表的结构如:「值1——空——空——空——删除值——后全空——后全空」的结构,则需要向前回溯,将值1后的所有slot都置为emptyRest状态。
[*]6、若删除后,表的count为0,则更新hash因子,避免哈希碰撞攻击。
[*]7、再次校验是否处在并发写,处在将fatal,否则重置写标识
2.4.2、源码跟进mapdelete
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) { //... bucket := hash & bucketMask(h.B) if h.growing() {
growWork(t, h, bucket)
} if h == nil || h.count == 0 {
if err := mapKeyError(t, key); err != nil {
panic(err) // see issue 23734
}
return unsafe.Pointer(&zeroVal)
} hashWriting= 4 // a goroutine is writing to the map 4->100
if h.flags&hashWriting != 0 {
fatal("concurrent map read and map write")
} hash := t.Hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.BucketSize))) bOrig := b if c := h.oldbuckets; c != nil { //c!=nil,说明旧桶未完成迁移(rehash)
if !h.sameSizeGrow() { //是否是等量扩容
//如果不是等量扩容,调整 hash 掩码(mask)
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.BucketSize)))//计算旧桶地址
if !evacuated(oldb) { //检查旧桶是否已搬迁
b = oldb //未搬迁则数据有效
}
}search: for ; b != nil; b = b.overflow(t) { for i := uintptr(0); i < bucketCnt; i++ { var inserti *uint8 //tophash拟插入位置
var insertk unsafe.Pointer //key拟插入位置
var elem unsafe.Pointer //value拟插入位置 for {
for i := uintptr(0); i < bucketCnt; i++ {
//...
}
b = ovf
} // If the bucket now ends in a bunch of emptyOne states, // change those to emptyRest states. // It would be nice to make this a separate function, but // for loops are not currently inlineable. if i == bucketCnt-1 { if b.overflow(t) != nil && b.overflow(t).tophash != emptyRest { goto notLast } } else { if b.tophash != emptyRest { goto notLast } } k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
if t.IndirectKey() {
k = *((*unsafe.Pointer)(k))
}
if !t.Key.Equal(key, k) {
continue
}
// already have a mapping for key. Update it.
if t.NeedKeyUpdate() {
typedmemmove(t.Key, k, key)
}
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
goto done // 如果达到负载因子上限,或者溢出桶过多,则扩容
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
} } if inserti == nil {
// The current bucket and all the overflow buckets connected to it are full, allocate a new one.
newb := h.newoverflow(t, b)
inserti = &newb.tophash
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.KeySize))
}}(1)错误处理:当表为nil或者不存在元素,则直接返回;若处在并发写状态则fatal
bucket := hash & bucketMask(h.B)(2)获取key的hash,并且标识表为写状态
if h.growing() {
growWork(t, h, bucket)
}(3)若表正在扩容,则调用growWork辅助扩容。通过hash值映射到对应的桶b。
if h == nil || h.count == 0 {
if err := mapKeyError(t, key); err != nil {
panic(err) // see issue 23734
}
return unsafe.Pointer(&zeroVal)
} hashWriting= 4 // a goroutine is writing to the map 4->100
if h.flags&hashWriting != 0 {
fatal("concurrent map read and map write")
} hash := t.Hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.BucketSize))) bOrig := b if c := h.oldbuckets; c != nil { //c!=nil,说明旧桶未完成迁移(rehash)
if !h.sameSizeGrow() { //是否是等量扩容
//如果不是等量扩容,调整 hash 掩码(mask)
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.BucketSize)))//计算旧桶地址
if !evacuated(oldb) { //检查旧桶是否已搬迁
b = oldb //未搬迁则数据有效
}
}(4)进入桶的遍历,外层遍历桶,内层遍历每个kv对
// initialize Hmap
if h == nil {
h = new(hmap)
}(5)若当前槽位的tophash和需要查找的不相同,则检查后面是否还有元素;有元素就继续进行查找,没有就直接退出,表示想删除的元素不存在。
var inserti *uint8 //tophash拟插入位置
var insertk unsafe.Pointer //key拟插入位置
var elem unsafe.Pointer //value拟插入位置(6)否则,说明找到了对应的key,进行删除操作,具体包括了:
[*]查找key并找到存储位置
[*]清除 key 和 value(包括直接清零或 nil 指针)。
[*]更新 tophash,标记该槽位为空(EmptyOne)。
注意:会调用key.Equal方法具体检查要删除的key和当前key是否一样,避免因为哈希冲突导致原来不同的kv对被错误的删除。
for {
for i := uintptr(0); i < bucketCnt; i++ {
//...
}
b = ovf
}(7)检查当前删除的桶的元素是否是桶的最后一个元素:
[*]若是,若该桶的后面还存在溢出桶,并且溢出桶非空则跳过清理环节,进入收尾阶段
[*]若不是,检查后面是否还有元素,有元素也跳过清理环节。(通过检查下一个slot的标识是否为emptyRest)
if b.tophash != top {
if isEmpty(b.tophash) && inserti == nil {
inserti = &b.tophash
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
}
if b.tophash == emptyRest {
break bucketloop
}
continue
}否则,说明后面没有更多的元素了,需要向前回溯,将最后一个元素的槽位后面的所有槽位都设置为emptyRest状态,优化未来的流程。
回溯流程:
[*]先将当前桶从当前的删除元素一个个往前走,遇到是emptyOne的就修改为emptyRest
[*]当到达第一个元素并且也为空时,回溯到上一个桶的末尾,重复流程
[*]遇到非空元素,完成所有回溯并且退出。
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
if t.IndirectKey() {
k = *((*unsafe.Pointer)(k))
}
if !t.Key.Equal(key, k) {
continue
}
// already have a mapping for key. Update it.
if t.NeedKeyUpdate() {
typedmemmove(t.Key, k, key)
}
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
goto done(8)收尾流程,将map的元素计数器count-1,若count为0,则更新哈希因子。
// 如果达到负载因子上限,或者溢出桶过多,则扩容
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}为什么要更新哈希因子?
让攻击者无法利用相同的哈希因子 h.hash0 构造出一组导致严重哈希碰撞的 key,从而保护 map 免受拒绝服务(DoS)攻击。
(9)最后的校验是否处在并发写状态,是则fatal,然后再更新状态标识
if inserti == nil {
// The current bucket and all the overflow buckets connected to it are full, allocate a new one.
newb := h.newoverflow(t, b)
inserti = &newb.tophash
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.KeySize))
}3、扩容机制
3.1、扩容类型
Gomap的扩容方式分为两种:
[*]等量扩容(Same-Size-Grow):如果map的溢出桶过多,导致查找性能下降,说明KV分布不均匀,此时就会触发等量扩容,哈希表的桶数不会改变,但会重新分配K-V对的位置,目的是减少溢出桶的数量,增加KV的密度,让数据能平均分布。
[*]增量扩容(Double-Size-Grow):如果负载因子超标「count/2^B > loadFactor」,即KV对的数目超过了一定上限,就会触发增量扩容,使得Buckets数量翻倍,让所有的KV对重新分配在新的桶数组中,目的是减少K-V对的密度,降低每个桶的KV数量,优化查询时间。
为什么说等量扩容是增加密度呢?
我们想,既然count是合理的,但是当前map导致了溢出桶过多,那么只可能是经过了多次删除操作,导致出现了很多空位,例如「A——空——空——空——B」,这样子每次查找就很耗时了,于是等量扩容需要重新分配KV对的位置变为「A——B」,让数据更加紧凑。
3.2、扩容触发
在之前的写流程中,提及到以下代码会触发map的扩容:
func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
var ovf *bmap
//若表存在预分配溢出桶,则直接使用预分配的溢出桶。
if h.extra != nil && h.extra.nextOverflow != nil {
if ovf.overflow(t) == nil {
// 不是最后一个预分配的溢出桶,直接移动 `nextOverflow` 指针
h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.BucketSize)))
} else {
// 这是最后一个预分配的溢出桶,重置 overflow 指针
ovf.setoverflow(t, nil)
h.extra.nextOverflow = nil
}
} else {
//创建一个新的溢出桶
ovf = (*bmap)(newobject(t.Bucket))
}
//更新 h.noverflow 计数,跟踪 map 目前有多少个溢出桶。
h.incrnoverflow()
if t.Bucket.PtrBytes == 0 { //如果map只存储基本数据类型
h.createOverflow() //创建overflow记录表
*h.extra.overflow = append(*h.extra.overflow, ovf) //记录新的溢出桶
}
b.setoverflow(t, ovf) //把ovf连接到b这个桶的overflow指针
return ovf
}只有当map不处在扩容中,并且满足以下两个条件之一,触发扩容:
<ul>负载因子超标:当KV对的数目超过桶的数目,并且「KV对数目/桶数 > 负载因子」时,发生扩容
// store new key/elem at insert position
if t.IndirectKey() {
kmem := newobject(t.Key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.IndirectElem() {
vmem := newobject(t.Elem)
*(*unsafe.Pointer)(elem) = vmem
}
typedmemmove(t.Key, insertk, key)
*inserti = top
h.count++溢出桶过多:当溢出桶数量>桶数组的数量(B最大取15),则发生扩容
done:
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
h.flags &^= hashWriting
if t.IndirectElem() {
elem = *((*unsafe.Pointer)(elem))
}
return elem(8)通过读流程的mapaccessK方法来读取这个K-V对,通过迭代器hiter的key、value指针进行接收,用于对用户的遍历操作进行响应。
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
//...
if h == nil || h.count == 0 {
if err := mapKeyError(t, key); err != nil {
panic(err) // see issue 23734
}
return
}
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
hash := t.Hasher(key, uintptr(h.hash0))
// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write (delete).
h.flags ^= hashWriting
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.BucketSize)))
bOrig := b
top := tophash(hash)
search:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash != top {
if b.tophash == emptyRest {
break search
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
k2 := k
if t.IndirectKey() {
k2 = *((*unsafe.Pointer)(k2))
}
if !t.Key.Equal(key, k2) {
continue
}
// Only clear key if there are pointers in it.
if t.IndirectKey() {
*(*unsafe.Pointer)(k) = nil
} else if t.Key.PtrBytes != 0 {
memclrHasPointers(k, t.Key.Size_)
}
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
if t.IndirectElem() {
*(*unsafe.Pointer)(e) = nil
} else if t.Elem.PtrBytes != 0 {
memclrHasPointers(e, t.Elem.Size_)
} else {
memclrNoHeapPointers(e, t.Elem.Size_)
}
b.tophash = emptyOne
// If the bucket now ends in a bunch of emptyOne states,
// change those to emptyRest states.
// It would be nice to make this a separate function, but
// for loops are not currently inlineable.
if i == bucketCnt-1 {
if b.overflow(t) != nil && b.overflow(t).tophash != emptyRest {
goto notLast
}
} else {
if b.tophash != emptyRest {
goto notLast
}
}
for {
b.tophash = emptyRest
if i == 0 {
if b == bOrig {
break // beginning of initial bucket, we're done.
}
// Find previous bucket, continue at its last entry.
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
}
i = bucketCnt - 1
} else {
i--
}
if b.tophash != emptyOne {
break
}
}
notLast:
h.count--
// Reset the hash seed to make it more difficult for attackers to
// repeatedly trigger hash collisions. See issue 25237.
if h.count == 0 {
h.hash0 = uint32(rand())
}
break search
}
}
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
h.flags &^= hashWriting
}5、结语
感谢观看阅读,如果有什么疑问可以在评论区一起讨论。本篇博客参考了小徐先生的文章,也是读者跟随其思路学习的笔记记录,推荐大家去看原文,一起深入源码学习,链接在下方:
Golang map 实现原理
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]