go语言:并发编程
引言
在C/C++中,高并发场景一般使用多线程支持;而go语言天然支持高并发。go语言采用goroutine来支持高并发场景,goroutine有官方实现的用户态的超级“线程池”,每个协程4-5KB栈内存占用并且实现机制大幅减少创建和销毁开销 是go语言高并发的根本原因。
OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。
goroutine
首先观察这段程序,我们使用了go关键字去启动一个协程跑Say函数,但是执行go run main.go后发现输出只有main say : hello world,而没有Say函数的输出。原因很简单,因为主函数mian首先执行完并return了,而go Say("func say : hello world!!!")需要为协程初始化消耗一些时间,因此晚于main的return,因此就没有输出打印。
package main
import (
"fmt"
)
func Say(msg string) {
fmt.Println(msg)
}
func main() {
go Say("func say : hello world!!!")
fmt.Println("main say : hello world")
}
但是我们在main函数结束前休眠3秒,Say函数的打印就顺利输出了。
package main
import (
"fmt"
"time"
)
func Say(msg string) {
fmt.Println(msg)
}
func main() {
go Say("func say : hello world!!!")
fmt.Println("main say : hello world")
time.Sleep(time.Second * 3)
}
main say : hello world
func say : hello world!!!
在Go中使用goroutine进行并发编程是比较简单的,但是跟多线程编程一样,并发编程难点在于线程同步和线程安全,因此下文重点探究goroutine如何实现并发控制和并发安全。
并发控制
多线程会有线程安全问题,如何保证线程间通信和数据共享是多线程编程中的大难题。协程作为用户级线程同样也会面临一样的问题,Go的并发控制是通过这几种方式进行的:
并发控制方法主要有:
全局变量
- channel
- WaitGroup
- context
- runtime
Sync.WaitGroup
当我们启动多个goroutine时,就涉及到并发控制。并发控制是个很大的主题,一句话概括就是我们想控制goroutine的生命周期,让goroutine按照我们的设定在某个时机执行某个动作。比如我们希望等所有协程完成自己的逻辑后,main才结束,这是并发控制的一个场景。c/c++中使用pthread_join完成线程的同步,而在go中,对应的功能可以由sync.WaitGroup来实现。
Sync.WaitGroup是一种实现并发控制方式,WaitGroup 对象内部有一个计数器,最初从0开始,它有三个方法:Add(), Done(), Wait() 用来控制计数器的数量。
- Add(n) 把计数器设置为n 。
- Done() 每次把计数器-1 。
- wait() 会阻塞代码的运行,直到计数器地值减为0。
package main
import (
"fmt"
//"time"
"sync"
)
var wg sync.WaitGroup
func Say(msg string) {
defer wg.Done()
fmt.Println(msg)
}
func main() {
for i:=0; i<10; i++ {
wg.Add(1)
go Say(fmt.Sprintf("func %d say : hello world!!!", i))
}
wg.Wait()
fmt.Println("main say : hello world")
}
输出
func 0 say : hello world!!!
func 3 say : hello world!!!
func 1 say : hello world!!!
func 2 say : hello world!!!
func 6 say : hello world!!!
func 9 say : hello world!!!
func 4 say : hello world!!!
func 5 say : hello world!!!
func 7 say : hello world!!!
func 8 say : hello world!!!
main say : hello world
runtime
当协程启动起来后,我们怎么控制这些协程的生命周期呢?比如我希望某些协程在特定条件下进行退出,或者让出调度时间片。此时我们可以使用runtime包来管理这些协程,这里我们主要掌握3个函数。
1、runtime.Gosched() : 让出当前协程的时间片,让出CPU时间片给其他协程,等其他协程执行完再执行协程后面的逻辑。
2、runtime.Goexit() : 使当前协程退出,而不影响协程的进行。
3、runtime.GOMAXPROCS() : Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码,默认值是机器上的CPU核心数。goroutine和OS线程是多对多的关系,即m:n。
runtime.Gosched()的使用例子:
package main
import (
"fmt"
"time"
"runtime"
)
func Say(msg string) {
fmt.Println(msg)
time.Sleep(time.Second * 3)
}
func main() {
for i:=0; i<10; i++ {
go Say(fmt.Sprintf("func %d say : hello world!!!", i))
}
runtime.Gosched()
fmt.Println("main say : hello world")
}
从上面的例子可以知道,如果没有 runtime.Gosched()这一句,main函数会最快执行完并结束进程,因此打印只会有一条;但加上runtime.Gosched()后,主协程因为让出了时间片,因此需要等到进程内其他协程执行完才轮到自己执行,因此会打印出所有日志,并且main say是最后一条日志,说明该协程最后执行。
func 9 say : hello world!!!
func 0 say : hello world!!!
func 1 say : hello world!!!
func 2 say : hello world!!!
func 3 say : hello world!!!
func 4 say : hello world!!!
func 5 say : hello world!!!
func 6 say : hello world!!!
func 7 say : hello world!!!
func 8 say : hello world!!!
main say : hello world
runtime.Goexit()的使用例子:
在子协程中调用runtime.Goexit()
package main
import (
"fmt"
"time"
"runtime"
)
func Say(msg string) {
runtime.Goexit()
fmt.Println(msg)
time.Sleep(time.Second * 3)
}
func main() {
for i:=0; i<10; i++ {
go Say(fmt.Sprintf("func %d say : hello world!!!", i))
}
fmt.Println("main say : hello world")
for {
}
}
打印输出
main say : hello world
如果在主协程调用runtime.Goexit()会发送什么呢?实验证明,会报死锁错误
fatal error: no goroutines (main called runtime.Goexit) - deadlock!
runtime.GOMAXPROCS()的使用例子:
在计算密集型的场景,使用多核并行计算能最大化其效率,如果是IO密集型场景,因为CPU时间都花在了CPU切换上,反而不值得。我们可以根据场景的不同,进而设置合适的runtime.GOMAXPROCS(),
package main
import (
"fmt"
"runtime"
)
func Say(msg string) {
fmt.Println(msg)
}
func main() {
for i:=0; i<10; i++ {
go Say(fmt.Sprintf("func %d say : hello world!!!", i))
}
_ = runtime.GOMAXPROCS(4) //指定以4核运算
fmt.Println("main say : hello world")
for {
}
}
channel
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。channel跟Linux中的双向管道很像,也是用于进程/协程间通信。
channel的定义打开、发送、接收、关闭。
chan T // 可以接收和发送类型为 T 的数据
chan<- float64 // 只可以用来发送 float64 类型的数据
<-chan int // 只可以用来接收 int 类型的数据
ch1 := make(chan int, 10) // 定义channel,里面可以塞的数据结构是int,缓冲长度为10
ch2 := make(chan []int) // 定义channel,里面可以塞的数据结构是[]int,缓冲长度为0(也就是无缓冲channel),往里面发了数据,会阻塞直到数据被接收
ch1 <- 10 //发送一个数据
x := <- ch1 // 接收一个数据
close(ch1) // 关闭channel
一个channel的案例,定义了多个数据结构的chan,实现了发送,接收以及协程同步和channel关闭操作。
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func reciever(ch1 chan int, ch2 chan bool, ch3 chan []int, ch4 chan map[string]string) {
defer wg.Done()
x := <- ch1
y := <- ch2
m := <- ch3
n := <- ch4
fmt.Println("recieve msg:", x,y,m,n)
}
func main() {
//发送,将一个数据发送到有缓冲通道中,进程内,协程间通信
ch1 := make(chan int, 10) // 缓冲长度为10
ch2 := make(chan bool, 10)
ch3 := make(chan []int, 10)
ch4 := make(chan map[string]string, 10)
// 关闭channel
defer func() {
close(ch1)
close(ch2)
close(ch3)
close(ch4)
} ()
ch1 <- 10
ch2 <- true
dataSlice := []int {1,23,45}
ch3 <- dataSlice
dataMap := map[string]string {
"name": "James",
"school": "SYSU",
}
ch4 <- dataMap
wg.Add(1)
go reciever(ch1, ch2, ch3, ch4)
fmt.Println("send done")
wg.Wait()
}
打印输出:
junshideMacBook-Pro:gogo junshili$ go run main.go
send done
recieve msg: 10 true [1 23 45] map[name:James school:SYSU]
当channel很多时,使用上面的结构来处理读channel会显得流程混乱,因此我们需要使用select来优化流程。同样是定义了4个channel,select的写法如下:
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
channel使用不当会导致panic,注意以下特殊情况的处理
channel | nil | 非空 | 空 | 满 | 非满 |
---|---|---|---|---|---|
接收 | 返回0值 | 接收值 | 阻塞 | 接收值 | 接收值 |
发送 | panic | 发送值 | 发送值 | 阻塞 | 发送值 |
关闭 | panic | 关闭成功 | 关闭成功 | 关闭成功 | 关闭成功 |
上图的nil一列是指channel关闭了。
注意到channel的读写操作会有阻塞的情形发生,因此如果不想我们的程序“卡住”不动的话,请为你的channel设置阻塞超时值,可以使用selct + <-time.After(time.Second * 1)定制超时时的处理。
select有很重要的一个应用就是超时处理。 如果没有case需要处理,select语句就会一直阻塞着。这时候我们可能就需要一个超时操作,用来处理超时的情况。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string, 10)
go func() {
time.Sleep(time.Second * 2)
ch1 <- "result test"
} ()
select {
case res := <-ch1:
fmt.Println(res)
case <-time.After(time.Second * 1):
fmt.Println("chan timeout 1s")
}
}
输出
chan timeout 1s
正因为channel有协程间通信的机制,因此可以做并发控制,比如我们使用channel完成上面waitgroup的并发控制的工作。
package main
import (
"fmt"
)
func Say(msg string, ch1 chan bool) {
fmt.Println(msg)
ch1 <- true
}
func main() {
ch1 := make(chan bool)
for i:=0; i<10; i++ {
go Say(fmt.Sprintf("func %d say : hello world!!!", i), ch1)
}
done := 0
for _ = range ch1 {
done += 1
if done == 10 {
break
}
}
close(ch1)
fmt.Println("main say : hello world")
}
打印输出
func 9 say : hello world!!!
func 6 say : hello world!!!
func 7 say : hello world!!!
func 8 say : hello world!!!
func 3 say : hello world!!!
func 2 say : hello world!!!
func 4 say : hello world!!!
func 0 say : hello world!!!
func 1 say : hello world!!!
func 5 say : hello world!!!
main say : hello world
资料:
https://zhuanlan.zhihu.com/p/493208600