本文首发于公众号:Hunter后端
原文链接:Golang基础笔记十五之sync
这一篇笔记介绍 Golang 中的 sync 模块。
sync 包主要提供了基础的同步原语,比如互斥锁,读写锁,等待组等,用于解决并发编程中的线程安全问题,以下是本篇笔记目录:
- WaitGroup-等待组
- sync.Mutex-互斥锁
- sync.RWMutex-读写锁
- sync.Once-一次性执行
- sync.Pool-对象池
- sync.Cond-条件变量
- sync.Map
1、WaitGroup-等待组
前面在第十篇我们介绍 goroutine 和 channel 的时候,在使用 goroutine 的时候介绍有一段代码如下:
package mainimport ("fmt""time"
)func PrintGoroutineInfo() {fmt.Println("msg from goroutine")
}func main() {go PrintGoroutineInfo()time.Sleep(1 * time.Millisecond)fmt.Println("msg from main")
}
在这里,我们开启了一个协程调用 PrintGoroutineInfo()
函数,然后使用 time.Sleep()
来等待它调用结束。
然而在开发中,我们不能确定这个函数多久才能调用完毕,也无法使用准确的 sleep 时间来等待,那么这里就可以使用到 sync
模块的 WaitGroup
函数来等待一个或多个 goroutine
执行完毕。
下面是使用示例:
package mainimport ("fmt""math/rand""sync""time"
)func SleepRandSeconds(wg *sync.WaitGroup) {defer wg.Done()sleepSeconds := rand.Intn(3)fmt.Printf("sleep %d seconds\n", sleepSeconds)time.Sleep(time.Duration(sleepSeconds) * time.Second)
}func main() {var wg sync.WaitGroupwg.Add(2)go SleepRandSeconds(&wg)go SleepRandSeconds(&wg)wg.Wait()fmt.Println("函数执行完毕")
}
在这里,我们通过 var wg sync.WaitGroup
定义了一个等待组,并通过 wg.Add(2)
表示添加了需要等待的并发数,在并发中我们将 &wg
传入并通过 wg.Done()
减少需要等待的并发数。
在 wg.Done()
函数内部,使用 wg.Add(-1)
减少需要等待的并发数,在 main 函数中,使用 wg.Wait()
进入阻塞状态,当等待的并发都完成后,此函数就会返回,完成等待并接着往后执行。
2、sync.Mutex-互斥锁
1. 数据竞态与互斥锁
当多个 goroutine 并发访问同一个共享资源,且至少有一个访问是写操作时,就会发生数据竞态
,造成的结果就是程序每次运行的结果表现可能会不一致。
比如下面的示例:
var balance intfunc AddFunc() {balance += 1
}
func main() {for range 100 {go AddFunc()}time.Sleep(5 * time.Second)fmt.Println("balance is: ", balance)
}
多次执行上面的代码,最终输出的 balance
的值可能都不一致。
如果一个变量在多个 goroutine 同时访问时,不会出现比如数据不一致或程序崩溃的情况,那么我们就称其是并发安全
的。
我们可以使用 go run -race main.go
的方式来检测数据竞态,执行检测后,会输出数据竞态的一些信息,比如发生在代码的多少行,一共发生了多少次数据竞态:
==================
WARNING: DATA RACE
Read at 0x000003910df8 by goroutine 7:main.AddFunc()/../main.go:13 +0x24Previous write at 0x000003910df8 by goroutine 6:main.AddFunc()/../main.go:13 +0x3cGoroutine 7 (running) created at:main.main()/../main.go:18 +0x32Goroutine 6 (finished) created at:main.main()/../main.go:18 +0x32==================
balance is: 98
Found 3 data race(s)
exit status 66
而要避免这种数据竞态的发生,我们可以限制在同一时间只能有一个 goroutine 访问同一个变量,这种方法称为互斥机制。
我们可以通过缓冲通道和 sync.Mutex
来实现这种互斥锁的操作。
2. 缓冲通道实现互斥锁
我们可以通过容量为 1 的缓冲通道来实现互斥锁的操作,保证同一时间只有一个 goroutine 访问同一个变量,下面是修改后的代码:
var sema = make(chan struct{}, 1)
var balance intfunc AddFunc() {sema <- struct{}{}balance += 1<-sema
}
func main() {for range 100 {go AddFunc()}time.Sleep(3 * time.Second)fmt.Println("balance is: ", balance)
}
在上面这段代码里,我们定义了 sema
这个容量为 1 的通道,在每个 AddFunc()
并发中,对变量 balance
执行读写操作前,我们先往通道里写入了一条数据,这样其他并发在执行该函数时,由于也会先往通道里写入数据,而这个时候通道已经满了,所以会处于堵塞状态,这就相当于获取锁。
直到 balance
写操作完成,从通道里读取数据,通道为空,相当于释放锁,这个时候其他并发才可以往通道里写入数据重新拿到锁。
这样我们通过往通道里写入和读取数据保证了同一时间只有一个 goroutine 在对 balance 进行写操作,从而实现互斥锁的操作。
3. sync.Mutex
在 sync 包中,sync.Mutex
直接为我们实现了互斥锁的操作,它的操作如下:
var mutex sync.Mutex // 互斥锁的定义
mutex.Lock() // 获取锁
mutex.Unlock() // 释放锁
那么使用 sync.Mutex
实现上面的逻辑,代码如下:
var mutex sync.Mutex
var balance intfunc AddFunc() {mutex.Lock()defer mutex.Unlock()balance += 1
}
func main() {for range 100 {go AddFunc()}time.Sleep(3 * time.Second)fmt.Println("balance is: ", balance)
}
3、sync.RWMutex-读写锁
在上面介绍的 sync.Mutex
互斥锁中,限制了同一时间只能有一个 goroutine 访问某个变量,包括读和写,但这种情况并非是最理想的,比如在读多写少的场景下。
那么 Golang 里的 sync.RWMutex
为我们提供了读写锁的操作,它会允许多个读操作的并发,而写操作会阻塞所有读和写。
读写锁的基本规则如下:
- 当一个 goroutine 获取了读锁后,其他 goroutine 仍然可以获取读锁,但不能获取写锁。
- 当一个 goroutine 获取了写锁后,其他 goroutine 无论是读锁还是写锁都不能获取,必须等待该写锁释放。
- 当有写操作在等待时,避免写操作长期饥饿,会优先处理写锁请求。
下面是读写锁的用法:
var rwMu sync.RWMutexrwMu.RLock() // 获取读锁
rwMu.RUnlock() // 释放读锁rwMu.Lock() // 获取写锁
rwMu.Unlock() // 释放写锁
下面是读写锁在函数中的用法示例:
func ReadBalance() {rwMu.RLock()fmt.Println("get read lock")defer rwMu.RUnlock()fmt.Println("balance: ", balance)
}func WriteBalance() {rwMu.Lock()fmt.Println("get write lock")defer rwMu.Unlock()balance += 1
}
4、sync.Once-一次性执行
sync.Once
可用于确保函数只被执行一次,常用于初始化操作,且可以用于延迟加载。
提供的方法是 Do(f func()),参数内容是一个需要被执行的函数 f,这个方法实现的功能是只有在第一次被调用的时候会执行 f 函数进行初始化。
下面是该方法的使用示例:
import ("fmt""sync"
)type Config struct {// 配置信息
}var (instance *Configonce sync.Once
)func LoadConfig() {fmt.Println("初始化配置...")instance = &Config{}// 加载配置的逻辑
}func GetConfig() *Config {once.Do(LoadConfig)return instance
}func main() {c1 := GetConfig()c2 := GetConfig()fmt.Println(c1 == c2) // 输出: true(同一个实例)
}
在这里,虽然 GetConfig() 函数执行了两遍,但是其内部的调用的 LoadConfig
函数却只执行了一次,因为 sync.Once
会在内部记录该函数是否已经初始化。
sync.Once
是个结构体,其结构如下:
type Once struct {done atomic.Uint32m Mutex
}
其中,done
字段用于记录需要执行的函数 f 是否已经被执行,其对应的 Do()
方法内部会先根据 done 字段判断,如果已经被执行过则直接返回,而如果没有则会先执行一次。
而 m
字段表示的互斥锁则用于在 Do()
方法内部调用的 doSlow()
中使用,用于确保并发情况下目标函数只被执行一次,在 f 函数执行结束后,done
参数会被置为 1,表示该函数已经被执行,这样再次调用 Do()
方法时,判断 done
字段的值为 1 则不会再执行此函数。
5、sync.Pool-对象池
sync.Pool,对象池,我们可以将一些生命周期短且创建成本高的对象存在其中,从而避免频繁的创建和销毁对象,以减少内存分配和垃圾回收压力。
简单地说就是复用对象。
1. 基础用法
下面以复用一个字节缓冲区为例介绍一下对象池的基础用法。
1) 创建对象池
创建对象池的操作如下:
var bufferPool = sync.Pool{New: func() interface{} {return &bytes.Buffer{}},
}
可以看到,这里对 sync.Pool
的 New
字段赋值了一个函数,返回的是一个字节缓冲区。
2) 从池中获取对象
从对象池中获取该对象的操作使用 Get() 操作:
buf := bufferPool.Get().(*bytes.Buffer)
3) 将对象放回池中
对该字节缓冲区使用完毕后可以将该对象再放回池中:
bufferPool.Put(buf)
2. 使用示例
import ("bytes""fmt""sync"
)var bufferPool = sync.Pool{New: func() interface{} {fmt.Println("create bytes buffer")return &bytes.Buffer{}},
}func LogMessage(msg string) {buf := bufferPool.Get().(*bytes.Buffer)defer bufferPool.Put(buf)buf.Reset()buf.WriteString(msg)fmt.Println(buf.String())
}func main() {LogMessage("hello world")LogMessage("hello world")LogMessage("hello world")
}
在上面的代码中,我们先定义了 bufferPool
,然后在 LogMessage()
函数中,先使用 Get() 获取该字节缓冲对象,因为这里返回的数据是接口类型,所以这里将其转为了对应的类型,然后使用 buf.Reset()
重置了之前的记录后写入新的数据,最后使用的 defer
操作将此对象又放回了对象池。
6、sync.Cond-条件变量
sync.Cond
用于等待特定条件发生后再继续执行,可用于生产者-消费者的模式。
创建一个条件变量,参数只有一个,那就是锁,下面代码里用的是互斥锁:
cond = sync.NewCond(&sync.Mutex{})
返回的 cond
对外暴露的字段 L
就是我们输入的锁。
下面用一个生产者的代码示例来介绍 cond 的几个相关函数。
在这里定义了 queue 作为队列,其中拥有需要处理的数据,queueMaxSize 字段为限制的最大队列长度。
var (queue []intqueueMaxSize int = 5cond = sync.NewCond(&sync.Mutex{})
)func Producer() {for {cond.L.Lock()for len(queue) == queueMaxSize {fmt.Println("produce queue max size, wait")cond.Wait()}queue = append(queue, 1)fmt.Println("produce queue")cond.Signal() // 通知消费者cond.L.Unlock()time.Sleep(100 * time.Millisecond)}
}
在定义的 Producer()
函数中,有一个死循环,内部先使用 cond.L.Lock()
获取锁,然后判断生产的数据是否有消费者消费,如果队列满了的话,则进入等待。
1. cond.Wait()
在上面的代码中,我们使用 cond.Wait()
进入了等待状态。
在 Wait()
函数内部,先对前面的锁进行释放操作,然后进入阻塞状态,直到其他 gouroutine 通过 Signal()
函数唤醒后重新获取锁。
2. cond.Signal()
前面往队列里添加数据后,通过 cond.Signal()
函数通知消费者,消费者在另一个函数中就可以被唤醒,然后进行处理,同时这个函数后面将锁释放 cond.L.Unlock()
。
3. cond.Broadcast()
前面的 Signal() 函数是唤醒一个等待的 goroutine,cond.Broadcast() 函数则可以唤醒所有等待的 goroutine。
下面提供一下生产者-消费者的全部处理代码:
package mainimport ("fmt""sync""time"
)var (queue []intqueueMaxSize int = 5cond = sync.NewCond(&sync.Mutex{})
)func Producer() {for {cond.L.Lock()for len(queue) == queueMaxSize {fmt.Println("produce queue max size, wait")cond.Wait()}queue = append(queue, 1)fmt.Println("produce queue")cond.Signal() // 通知消费者cond.L.Unlock()time.Sleep(100 * time.Millisecond)}
}func Consumer() {for {cond.L.Lock()for len(queue) == 0 {fmt.Println("wait for produce")cond.Wait() // 等待并释放锁}fmt.Println("consume queue")item := queue[0]queue = queue[1:]cond.Signal() // 通知生产者cond.L.Unlock()ProcessItem(item)}
}func ProcessItem(i int) {fmt.Println("process i: ", i)
}func main() {go Producer()go Consumer()time.Sleep(1 * time.Second)
}
7、sync.Map
sync 模块提供了 sync.Map 用来存储键值对,但是和之前介绍的 map 不一样的是,sync.Map 是并发安全的,而且无需初始化,并且在操作方法上与原来的 map 不一样。
1. 并发安全
原生的 map 是非并发安全的,如果多个 goroutine 对其进行同时读写会触发错误,比如下面的操作:
import ("fmt""time"
)var originMap = make(map[string]int)func UpdateMapKey() {originMap["a"] += 1
}func GetMapKey() {a := originMap["a"]fmt.Println(a)
}func main() {originMap["a"] = 0for range 100 {go UpdateMapKey()go GetMapKey()}time.Sleep(1 * time.Second)fmt.Println("originMap: ", originMap)
}
但是 sync.Map 是并发安全的,内部会通过互斥锁的操作允许多个 goroutine 安全地读写,下面是使用 sync.Map
对上面逻辑的改写,后面我们会具体介绍其操作方法:
import ("fmt""sync""time"
)var originMap sync.Mapfunc UpdateMapKey() {for {oldValue, loaded := originMap.Load("a")if !loaded {if _, ok := originMap.LoadOrStore("a", 1); ok {return}} else {newValue := oldValue.(int) + 1if originMap.CompareAndSwap("a", oldValue, newValue) {return}}}
}func GetMapKey() {a, _ := originMap.Load("a")fmt.Println(a)
}func main() {originMap.Store("a", 0)for range 100 {go UpdateMapKey()go GetMapKey()}time.Sleep(1 * time.Second)a, _ := originMap.Load("a")fmt.Println("originMap: ", a)
}
2. 初始化
原生的 map 进行初始化,有下面两种操作方法:
var originMap = make(map[string]int)
var originMap = map[string]int{}
sync.Map 可以直接声明使用:
var m sync.Map
m.Store("a", 0)
3. 操作方法
这里先介绍 sync.Map 增删改查的基础操作:
1) 增
增加一个 key 的操作如下:
originMap.Store("a", 1)
2) 删
删除一个 key 的操作如下:
originMap.Delete("a", 1)
3) 改
修改操作还是可以用 Store() 方法,而且可以修改为不同数据类型:
m.Store("a", "123")
4) 查
查询操作可以使用 Load() 方法,返回对应的 value 值以及是否存在:
m.Store("a", 1)
v, ok := m.Load("a")
if ok {fmt.Printf("exist value:%v\n", v.(int))
} else {fmt.Printf("key not exists")
}
5) 遍历
遍历操作如下:
m.Store("a", 1)
m.Range(func(key, value any) bool {fmt.Println(key, value)return true
})
4. 原子性条件操作
上面的这些方法可以实现基础的增删改查操作,但是如果我们有一个需求,比如前面的获取一个 key 的 value,然后在原值的基础上 +1 再存入,大概逻辑如下:
v, ok := m.Load(key)
v = v.(int)
v +=1
m.Store(key, v)
但是在这个操作中,如果有其他 goroutine 已经修改了 v 的值,那么我们这里的操作就相当于污染了源数据,而为了避免这个可能,我们可以使用一些原子性条件操作,以实现并发操作。
1) CompareAndSwap()
CompareAndSwap() 是一个更新操作,传入 3 个参数,key,oldValue 和 newValue,仅当 key 的结果为 oldValue 的时候,将结果更新为 newValue,使用示例如下:
key := "a"
m.Store(key, 1)swapped := m.CompareAndSwap(key, 1, 2)
fmt.Printf("当 value 为 1 的时候,将 value 从 1 修改为 2, 是否更新结果 %v\n", swapped)
swapped = m.CompareAndSwap(key, 1, 3)
fmt.Printf("当 value 为 1 的时候,将 value 从 1 修改为 3, 是否更新结果 %v\n", swapped)
所以在上面我们要对结果进行 +1 的代码操作为:
newValue := oldValue.(int) + 1
if originMap.CompareAndSwap("a", oldValue, newValue) {return
}
2) CompareAndDelete()
CompareAndDelete 是一个原子性的删除操作,接受两个参数,key 和 oldValue,仅当 key 的值为 oldValue 时删除该 key,返回结果为是否删除:
key := "a"
m.Store(key, 1)
deleted := m.CompareAndDelete(key, 1)
if deleted {fmt.Printf("当 key 的 value 为 %v 时,删除\n", 1)
} else {fmt.Printf(" key 的 value 不为 %v 时,不执行删除\n", 1)
}
3) LoadAndDelete()
LoadAndDelete 表示是否加载某个 key 的值并删除该 key,无论该 key 是否存在,参数为 key,返回值为 value 和是否存在该 key:
key := "a"
m.Store(key, 1)value, loaded := m.LoadAndDelete(key)
fmt.Printf("是否存在 %v, value 为 %v\n", loaded, value)
value, loaded = m.LoadAndDelete(key)
fmt.Printf("是否存在 %v, value 为 %v\n", loaded, value)
4) LoadOrStore()
LoadOrStore 方法为不存在 key 则存入,存在的话则返回该值:
key := "a"value, loaded := m.LoadOrStore(key, 1)
fmt.Printf("是否存在: %v, 值: %v\n", loaded, value)
value, loaded = m.LoadOrStore(key, 1)
fmt.Printf("是否存在: %v, 值: %v\n", loaded, value)