Go笔记(6)并发编程

这篇记录 go 中的并发编程:协程机制、共享内存并发机制(lock、wait)、CSP、多路选择、超时机制、任务取消、并发任务执行once。

协程机制

  1. 协程(Groutine,默认stack 2K)是更轻量级的线程(Thread,默认stack 1M)
  2. 和 KSE(Kernel Space Entity,系统线程)的对应关系:
    Thread 是 1:1;线程切换会涉及内核对象切换,消耗大
    Groutine 是 M:N;利用系统线程高效完成并发任务
  3. Go 中 channel 是有容量限制并且独立于处理 Groutine
  4. 向关闭的 channel 发送数据,会导致 panic(panic:send on closed channel)
    V, ok <-ch; ok 为 bool 值,true/表示正常接受,false 表示通道关闭
  5. 所有的 channel 接收者都会在 channel 关闭时,立刻从阻塞等待中返回且(ok=false)。这个广播机制常被利用,进行向多个订阅者同时发送信号。(如:退出信号。)
  6. Context
    • 根 Context:通过 context.Background()创建
    • 子 Context:context.WithCancel(parentContext)创建
    ctx, cancel := context.WithCancel(context.Background())
    • 当前 context 被取消时,基于他的子 context 都会被取消
    • 接收取消通知 <-ctx.Done
  7. buffered channel 防止协程泄漏,适用于任何任务完成;

代码示例

func 前面加上 go,表示启用协程机制。
协程调用的顺序并不是方法的顺序(和协程调用机制有关)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func TestGroutine(t *testing.T) {
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println(i) // 每次调用输出值不一样
}(i) // 方法调用为值传递,传递i的同时,值被复制了一份,每个协程拥有的i的地址是不一样的
/**
不能采用以下写法,会输出相同的值。因为内存共享
go func() {
fmt.Println(i)
}(i)
*/
}
time.Sleep(time.Millisecond * 50) // 等待
}

共享内存保护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func TestCounterThreadSafe(t *testing.T) {
var mut sync.Mutex
counter := 0
for i := 0; i < 5000; i++ {
go func() {
defer func() {
mut.Unlock() // 锁释放
}()
mut.Lock()
counter++
}()
}
time.Sleep(1 * time.Second) // (优化1)防止结果错误:外面协程执行的速度超过了所有协程执行完的速度
t.Logf("counter = %d", counter)
}

等待机制
只有 wait 的所有内容都完成之后(每个协程的都执行完),才能继续往下执行。优化1的更好写法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func TestCounterWaitGroup(t *testing.T) {
var mut sync.Mutex
var wg sync.WaitGroup
counter := 0
for i := 0; i < 5000; i++ {
wg.Add(1) // 新增一个等待
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
wg.Done() // 等待完成了
}()
}
wg.Wait()
t.Logf("counter = %d", counter)
}

CSP

CSP(Communicating Sequential Process)利用一个通道进行两个通信实体之间的协调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func service() string {
time.Sleep(time.Millisecond * 50)
return "Done"
}
func otherTask() {
fmt.Println("working on something else")
time.Sleep(time.Millisecond * 100)
fmt.Println("Task is done.")
}
func AsyncService() chan string { // buffer channel不会阻塞
// make(chan string, 1)声明buffer channel;make(chan string, 1)声明channel
retCh := make(chan string, 1) // make声明方式同slice,map
//retCh := make(chan string, 1)
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret // 往channel里面放数据
fmt.Println("service exited.")
}()
return retCh
}

func TestAsynService(t *testing.T) {
retCh := AsyncService()
otherTask()
fmt.Println(<-retCh) // 从channel里面取数据
time.Sleep(time.Second * 1)
}
/**
执行结果如下:
working on something else
returned result.
service exited.
Task is done.
Done
*/

猜想使用场景:如果不确定程序的执行时间,但是需要使用其返回值,可以通过channel作为中转,存放在channel中,需要时取出。

多路选择 & 超时机制

1
2
3
4
5
6
7
8
func TestAsynService(t *testing.T) {
select {
case retCh := AsyncService():
t.Log(retCh)
case <= time.After(time.Millisecond * 100)
t.Error("time out")
}
}

chanel 取消

close机制

close 是广播机制的,所有 chanel 都会被取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func isCancelled(cancelChan chan struct{}) bool {
select {
case <- cancelChan: // 从cancelChan收到消息,说明应该被取消
return true
default: // 被阻塞,没有被取消
return false
}
}
// 仅代表随意传入一个值,此时仅有一个channel被取消
func cancel_1(cancelChan chan struct{}) {
cancelChan <- struct{}{}
}
// 使用close机制
func cancel_2(cancelChan chan struct{}) {
close(cancelChan)
}

func TestCancel(t *testing.T) {
cancelChan := make(chan struct{}, 0)
for i := 0; i < 5; i++ { // 起5个channel
go func(i int, cancelCh chan struct{}) {
for {
if isCancelled(cancelCh) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, cancelChan)
}
cancel_2(cancelChan)
time.Sleep(time.Second * 1)
}

关联任务的取消-context

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

func TestCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 5; i++ {
go func(i int, ctx context.Context) {
for {
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, ctx)
}
cancel()
time.Sleep(time.Second * 1)
}

once

单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type Singleton struct {
data string
}
var singleInstance *Singleton
var once sync.Once

func GetSingletonObj() *Singleton {
once.Do(func() {
fmt.Println("Create Obj")
singleInstance = new(Singleton)
})
return singleInstance
}

func TestGetSingletonObj(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
obj := GetSingletonObj()
fmt.Printf("%X\n", unsafe.Pointer(obj)) // 输出的内存地址一样
wg.Done()
}()
}
wg.Wait()
}

Go笔记(6)并发编程
https://guoningyan.com/2024/02/27/Go笔记(6)并发编程/
作者
Ningyan Guo
发布于
2024年2月27日
许可协议