0%

Golang并发控制总结

Golang并发控制总结

写在开头:最近用go做MIT6.824课程作业时,涉及到大量基于golang的并发控制,但是由于不熟悉golang语言以及相关并发控制手段,导致出现了大量的bug,影响了实现进程,因此产生了总结学习golang并发控制的想法

目前大厂的后端开发大量的从java转向go,很大一部分原因是由于go所具备的高并发、高性能、容易开发等性质,可以说go并发控制是学习go内容中最为重要的一部分(java并发我都没学会,直接学go,看出我的诚意了吧),下面的总结学习主要基于golang官网的Effective Go的cocurrency章节

何为并发控制?

In information technology and computer science, especially in the fields of computer programming, operating systems, multiprocessors, and databases, concurrency control ensures that correct results for concurrent operations are generated, while getting those results as quickly as possible. -wikipedia

并发控制本质上是通过一定的手段保证多个并发进行操作最终产生正确的结果,并且尽可能的保证性能,所以学习并发控制主要从两个角度入手:

  • 如何实现并发?

  • 有哪些控制并发的手段?

编程语言层面的并发控制可以理解为通过一定手段保证多个操作共享变量的线程正确执行,并且保证性能,这里的两个角度为

  • 多线程实现并发:go中的Goroutines
  • 多种控制并发的手段:mutex,channel,waitGroup等

golang并发控制

golang并发控制基于:”Do not communicate by sharing memory; instead, share memory by communicating.” 的思想,即并发线程之间不通过共享内存进行通信,而是通过通信实现共享内存,对此我的理解是:

  • 传统的并发控制手段是共享资源+锁的形式,实现互斥访问
  • golang舍弃了上述思想,采用通信的方式,将共享资源的访问变为序列化处理的通信传递

引用 演讲:Concurrency is not parallelism ppt中的例子(两个取书烧书线程,分别从书籍堆中取书运输到火堆烧毁),第一种思路如下图:

  • 属于互斥资源,上锁两个线程互斥取书、

  • “火堆”属于互斥资源,双锁两个线程互斥烧书

  • 共享资源(“书堆“)+锁实现并发控制

第二种并发控制思路如下图:

  • 三个线程:取书线程、运书线程、烧书线程
  • 三个线程之间通过通信,实现并发控制

上述两种思路的区别可以总结为:

  1. 传统思路属于并行的思路,一个活多个人干,人多效率就高,但问题是互斥问题会导致性能下降(纵向)
  2. golang思路属于将一个任务拆解,分成不同阶段,多个人各司其职,避免了互斥资源访问的性能下降,问题是等待通信的延迟(横向)

整合上述两种思想的细粒度并发+并行(双cpu):

Goroutines

Go中类似于”线程“的概念并发调度单位定义为Goroutine:与其它 Go 协程并发运行在同一地址空间的函数。Goroutine于线程的区别点在于:

  1. Goroutine相当于”轻量级“线程,启动时只占用少量的栈空间(java线程在启动时会分配固定大小的占空间)
  2. Goroutine与内核线程的数量对应关系是一对一或者多对一,Goroutine的调度由Go管理,避免了内核线程调度上下文切换的成本
  3. Goroutine更像一个独立运行的函数、操作等,线程更像是一个单独运行的轻量级”进程“

在调用前添加关键词 go即可启动一个Goroutine:

1
2
3
4
5
6
go list.Sort() 
//匿名函数
go func() {
time.Sleep(delay)\
fmt.Println(message)
}()

使用goroutine经常出现的一个错误如下:

  1. 由于i为共享变量,Goroutine执行之前主程序可能已经进入下一轮循环,导致输出错误
  2. 解决方案为:传参 或者 赋值局部变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//错误代码
for i:=1;i < 10;i++ {
go func() {
fmt.Println(i)
}()
}
//正确代码
for i:=1;i < 10;i++ {
go func(i) {
fmt.Println(number)
}(number int)
}
for i:=1;i < 10;i++ {
number := i
go func() {
fmt.Println(number)
}()
}

Channel

Channel类似于Unix中的管道概念,提供不同Goroutine之间的通信,通过make函数初始化,其中Channel在初始化时执行

  • 在访问无缓冲管道和缓冲写入已满的管道时,访问Goroutine会阻塞
1
2
3
4
5
6
7
ci := make(chan int)            // unbuffered channel of integers
cj := make(chan int, 0) // unbuffered channel of integers
cs := make(chan *os.File, 100) // buffered channel of pointers to Files
//写入管道
ci <- 1
//读取管道
i <- ci

通过Channel实现的最简单的同步例子如下:

  • 主线程调用排序Goroutine后等待读取管道
  • 排序Goroutine排序完成后,写入管道
1
2
3
4
5
6
7
8
c := make(chan int)  // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
list.Sort()
c <- 1 // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c // Wait for sort to finish; discard sent value.

结合select可实现异步阻塞通信同步功能:

  • 监听case管道,读取到内容,则执行相关操作
  • 无default时,阻塞直到读到内容;有default,执行default,继续向下执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}

发送者可以主动调用close()关闭管道,接收端for循环读取会终止:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c)
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
for i := range c {
fmt.Println(i)
}
}

sync

sync包提供了包含锁在内的一系列并发控制手段,包括sync.Mutex、sync.RWMutex、sync.Cond、sync.WaitGroup等

sync.Mutex

golang中的锁,提供上锁、解锁等操作方法

1
2
3
func (m *Mutex) Lock() 			//上锁
func (m *Mutex) TryLock() bool //尝试上锁
func (m *Mutex) Unlock() //解锁
sync.RWMutex

读写互斥锁,在普通锁接口上提供了“读”锁上锁解锁操作

1
2
func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
sync.Once

确保函数只执行一次的接口(例如初始化),调用对应Do方法,传入调用的函数

1
2
var once Once					//初始化
func (o *Once) Do(f func()) //执行f函数(保证只执行一次)
sync.WaitGroup

该方法实现等待一系列的Goroutines的退出,基本接口如下:

  • 通过add()方法增加WaitGroup counter数量,done()方法表明Goroutine之一完成,减少WaitGroup counter数量
  • wait()方法等待 WaitGroup counter为0
1
2
3
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()

具体的使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.example.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
}
sync.Cond

条件变量实现(golang建议能够通过Channel实现同步,尽量避免使用条件变量),需要与Locker配合使用:

1
2
3
4
func NewCond(l Locker) *Cond	//返回一个使用锁l的条件变量
func (c *Cond) Broadcast() //唤醒等待条件变量的所有Goroutine
func (c *Cond) Signal() //唤醒一个等待条件变量的Goroutine
func (c *Cond) Wait() //等待条件变量

基本的使用方法为:

  • 在修改条件时需要上锁,调用Broadcast()或Signal()不必须上锁
  • 调用wait()时首先上锁,wait()方法会将当前Goroutine添加到唤醒列表,释放锁阻塞当前Goroutine等待条件变量(Broadcast或Signal),上锁退出wait()方法
1
2
3
4
5
6
7
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock() //上面上锁的原因是并发添加等待Goroutine需要互斥执行
runtime_notifyListWait(&c.notify, t) //释放锁后,等待条件变量
c.L.Lock() //重新上锁,目的是与修改条件操作互斥
}

第一次学的时候没搞明白为什么要有锁,深入思考下发现,本质是为了保证条件变更和根据条件进行操作的Goroutine的互斥,避免操作时条件发生变更,不满足操作条件,此方法带来两个问题:

  1. 等待线程被唤醒后,条件可能发生该改变(因为等待时释放了锁),所以需要循环判断
  2. 同时只有一个等待Goroutine能在Wait()唤醒后进行执行

所以wait()方法的使用方式为:

1
2
3
4
5
6
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()

总结

golang并发控制学起来并不复杂,主要是理解其并发控制思路+学习常用的并发控制接口,结合官方文档以及官网提供的一些学习资料学下来并不难(别看csdn