go语言:并发编程

小海哥哥de / 2023-08-16 / 原文

引言

在C/C++中,高并发场景一般使用多线程支持;而go语言天然支持高并发。go语言采用goroutine来支持高并发场景,goroutine有官方实现的用户态的超级“线程池”,每个协程4-5KB栈内存占用并且实现机制大幅减少创建和销毁开销 是go语言高并发的根本原因。
OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。

goroutine

首先观察这段程序,我们使用了go关键字去启动一个协程跑Say函数,但是执行go run main.go后发现输出只有main say : hello world,而没有Say函数的输出。原因很简单,因为主函数mian首先执行完并return了,而go Say("func say : hello world!!!")需要为协程初始化消耗一些时间,因此晚于main的return,因此就没有输出打印。

package main

import (
        "fmt"
)

func Say(msg string) {

        fmt.Println(msg)
}

func main() {
        go Say("func say : hello world!!!")
        fmt.Println("main say : hello world")

}

但是我们在main函数结束前休眠3秒,Say函数的打印就顺利输出了。

package main

import (
        "fmt"
        "time"
)

func Say(msg string) {

        fmt.Println(msg)
}


func main() {
        go Say("func say : hello world!!!")
        fmt.Println("main say : hello world")
        time.Sleep(time.Second * 3)

}
main say : hello world
func say : hello world!!!

在Go中使用goroutine进行并发编程是比较简单的,但是跟多线程编程一样,并发编程难点在于线程同步和线程安全,因此下文重点探究goroutine如何实现并发控制和并发安全。

并发控制

多线程会有线程安全问题,如何保证线程间通信和数据共享是多线程编程中的大难题。协程作为用户级线程同样也会面临一样的问题,Go的并发控制是通过这几种方式进行的:
并发控制方法主要有:
全局变量

  • channel
  • WaitGroup
  • context
  • runtime

Sync.WaitGroup

当我们启动多个goroutine时,就涉及到并发控制。并发控制是个很大的主题,一句话概括就是我们想控制goroutine的生命周期,让goroutine按照我们的设定在某个时机执行某个动作。比如我们希望等所有协程完成自己的逻辑后,main才结束,这是并发控制的一个场景。c/c++中使用pthread_join完成线程的同步,而在go中,对应的功能可以由sync.WaitGroup来实现。

Sync.WaitGroup是一种实现并发控制方式,WaitGroup 对象内部有一个计数器,最初从0开始,它有三个方法:Add(), Done(), Wait() 用来控制计数器的数量。

  • Add(n) 把计数器设置为n 。
  • Done() 每次把计数器-1 。
  • wait() 会阻塞代码的运行,直到计数器地值减为0。
package main

import (
        "fmt"
        //"time"
        "sync"
)


var wg sync.WaitGroup

func Say(msg string) {
        defer wg.Done()
        fmt.Println(msg)
}


func main() {
        for i:=0; i<10; i++ {
            wg.Add(1)
            go Say(fmt.Sprintf("func %d say : hello world!!!", i))
        }

        wg.Wait()

        fmt.Println("main say : hello world")

}

输出

func 0 say : hello world!!!
func 3 say : hello world!!!
func 1 say : hello world!!!
func 2 say : hello world!!!
func 6 say : hello world!!!
func 9 say : hello world!!!
func 4 say : hello world!!!
func 5 say : hello world!!!
func 7 say : hello world!!!
func 8 say : hello world!!!
main say : hello world

runtime

当协程启动起来后,我们怎么控制这些协程的生命周期呢?比如我希望某些协程在特定条件下进行退出,或者让出调度时间片。此时我们可以使用runtime包来管理这些协程,这里我们主要掌握3个函数。
1、runtime.Gosched() : 让出当前协程的时间片,让出CPU时间片给其他协程,等其他协程执行完再执行协程后面的逻辑。
2、runtime.Goexit() : 使当前协程退出,而不影响协程的进行。
3、runtime.GOMAXPROCS() : Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码,默认值是机器上的CPU核心数。goroutine和OS线程是多对多的关系,即m:n。
runtime.Gosched()的使用例子:

package main

import (
        "fmt"
        "time"
        "runtime"
)

func Say(msg string) {
        fmt.Println(msg)
        time.Sleep(time.Second * 3)
}

func main() {
        for i:=0; i<10; i++ {
            go Say(fmt.Sprintf("func %d say : hello world!!!", i))

        }

        runtime.Gosched()
        fmt.Println("main say : hello world")
}

从上面的例子可以知道,如果没有 runtime.Gosched()这一句,main函数会最快执行完并结束进程,因此打印只会有一条;但加上runtime.Gosched()后,主协程因为让出了时间片,因此需要等到进程内其他协程执行完才轮到自己执行,因此会打印出所有日志,并且main say是最后一条日志,说明该协程最后执行。

func 9 say : hello world!!!
func 0 say : hello world!!!
func 1 say : hello world!!!
func 2 say : hello world!!!
func 3 say : hello world!!!
func 4 say : hello world!!!
func 5 say : hello world!!!
func 6 say : hello world!!!
func 7 say : hello world!!!
func 8 say : hello world!!!
main say : hello world

runtime.Goexit()的使用例子:
在子协程中调用runtime.Goexit()

package main

import (
        "fmt"
        "time"
        "runtime"
)

func Say(msg string) {

        runtime.Goexit()
        fmt.Println(msg)
        time.Sleep(time.Second * 3)
}

func main() {
        for i:=0; i<10; i++ {
            go Say(fmt.Sprintf("func %d say : hello world!!!", i))

        }

        fmt.Println("main say : hello world")
        for {
        }
}

打印输出

main say : hello world

如果在主协程调用runtime.Goexit()会发送什么呢?实验证明,会报死锁错误

fatal error: no goroutines (main called runtime.Goexit) - deadlock!

runtime.GOMAXPROCS()的使用例子:
在计算密集型的场景,使用多核并行计算能最大化其效率,如果是IO密集型场景,因为CPU时间都花在了CPU切换上,反而不值得。我们可以根据场景的不同,进而设置合适的runtime.GOMAXPROCS(),

package main

import (
        "fmt"
        "runtime"
)

func Say(msg string) {
        fmt.Println(msg)
}

func main() {
        for i:=0; i<10; i++ {
            go Say(fmt.Sprintf("func %d say : hello world!!!", i))

        }
        _ = runtime.GOMAXPROCS(4) //指定以4核运算
        fmt.Println("main say : hello world")
        for {
        }
}

channel

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。channel跟Linux中的双向管道很像,也是用于进程/协程间通信。
channel的定义打开、发送、接收、关闭。

chan T          // 可以接收和发送类型为 T 的数据
chan<- float64  // 只可以用来发送 float64 类型的数据
<-chan int      // 只可以用来接收 int 类型的数据

ch1 := make(chan int, 10) // 定义channel,里面可以塞的数据结构是int,缓冲长度为10
ch2 := make(chan []int) // 定义channel,里面可以塞的数据结构是[]int,缓冲长度为0(也就是无缓冲channel),往里面发了数据,会阻塞直到数据被接收


ch1 <- 10  //发送一个数据
x := <- ch1  // 接收一个数据

close(ch1) // 关闭channel

一个channel的案例,定义了多个数据结构的chan,实现了发送,接收以及协程同步和channel关闭操作。

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func reciever(ch1 chan int, ch2 chan bool, ch3 chan []int, ch4 chan map[string]string) {
	defer wg.Done()
	x := <- ch1
	y := <- ch2
	m := <- ch3
	n := <- ch4
	fmt.Println("recieve msg:", x,y,m,n)
}


func main() {

	//发送,将一个数据发送到有缓冲通道中,进程内,协程间通信
	ch1 := make(chan int, 10) // 缓冲长度为10
	ch2 := make(chan bool, 10)
	ch3 := make(chan []int, 10)
	ch4 := make(chan map[string]string, 10)

	// 关闭channel
	defer func() {
		close(ch1)
		close(ch2)
		close(ch3)
		close(ch4)
	} ()

	ch1 <- 10
	ch2 <- true
	dataSlice := []int {1,23,45}
	ch3 <- dataSlice
	dataMap := map[string]string {
		"name": "James",
		"school": "SYSU",
	}
	ch4 <- dataMap
	wg.Add(1)
	go reciever(ch1, ch2, ch3, ch4)
	fmt.Println("send done")

	wg.Wait()
}

打印输出:

junshideMacBook-Pro:gogo junshili$ go run main.go 
send done
recieve msg: 10 true [1 23 45] map[name:James school:SYSU]

当channel很多时,使用上面的结构来处理读channel会显得流程混乱,因此我们需要使用select来优化流程。同样是定义了4个channel,select的写法如下:

select {
case <-chan1:
   // 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
   // 如果成功向chan2写入数据,则进行该case处理语句
default:
   // 如果上面都没有成功,则进入default处理流程
}

channel使用不当会导致panic,注意以下特殊情况的处理

channel nil 非空 非满
接收 返回0值 接收值 阻塞 接收值 接收值
发送 panic 发送值 发送值 阻塞 发送值
关闭 panic 关闭成功 关闭成功 关闭成功 关闭成功

上图的nil一列是指channel关闭了。
注意到channel的读写操作会有阻塞的情形发生,因此如果不想我们的程序“卡住”不动的话,请为你的channel设置阻塞超时值,可以使用selct + <-time.After(time.Second * 1)定制超时时的处理。
select有很重要的一个应用就是超时处理。 如果没有case需要处理,select语句就会一直阻塞着。这时候我们可能就需要一个超时操作,用来处理超时的情况。

package main


import (
	"fmt"
	"time"
)


func main() {
	ch1 := make(chan string, 10)
	go func() {
		time.Sleep(time.Second * 2)
		ch1 <- "result test"
	} ()
	
	select {
	case res := <-ch1:
		fmt.Println(res)
	case <-time.After(time.Second * 1):
		fmt.Println("chan timeout 1s")
	}
}

输出

chan timeout 1s

正因为channel有协程间通信的机制,因此可以做并发控制,比如我们使用channel完成上面waitgroup的并发控制的工作。

package main

import (
        "fmt"
)

func Say(msg string, ch1 chan bool) {
        fmt.Println(msg)
		ch1 <- true
}

func main() {
		ch1 := make(chan bool)
        for i:=0; i<10; i++ {
            go Say(fmt.Sprintf("func %d say : hello world!!!", i), ch1)
        }
		done := 0
		for _ = range ch1 {
			done += 1
			if done == 10 {
				break
			}
		}

		close(ch1)
        fmt.Println("main say : hello world")

}

打印输出

func 9 say : hello world!!!
func 6 say : hello world!!!
func 7 say : hello world!!!
func 8 say : hello world!!!
func 3 say : hello world!!!
func 2 say : hello world!!!
func 4 say : hello world!!!
func 0 say : hello world!!!
func 1 say : hello world!!!
func 5 say : hello world!!!
main say : hello world

资料:
https://zhuanlan.zhihu.com/p/493208600