这篇记录 go 中的并发编程:协程机制、共享内存并发机制(lock、wait)、CSP、多路选择、超时机制、任务取消、并发任务执行once。
协程机制
- 协程(Groutine,默认stack 2K)是更轻量级的线程(Thread,默认stack 1M)
- 和 KSE(Kernel Space Entity,系统线程)的对应关系:
Thread 是 1:1;线程切换会涉及内核对象切换,消耗大
Groutine 是 M:N;利用系统线程高效完成并发任务
- Go 中 channel 是有容量限制并且独立于处理 Groutine
- 向关闭的 channel 发送数据,会导致 panic(panic:send on closed channel)
V, ok <-ch; ok 为 bool 值,true/表示正常接受,false 表示通道关闭
- 所有的 channel 接收者都会在 channel 关闭时,立刻从阻塞等待中返回且(ok=false)。这个广播机制常被利用,进行向多个订阅者同时发送信号。(如:退出信号。)
- Context
• 根 Context:通过 context.Background()创建
• 子 Context:context.WithCancel(parentContext)创建
ctx, cancel := context.WithCancel(context.Background())
• 当前 context 被取消时,基于他的子 context 都会被取消
• 接收取消通知 <-ctx.Done
- buffered channel 防止协程泄漏,适用于任何任务完成;
代码示例
func 前面加上 go,表示启用协程机制。
协程调用的顺序并不是方法的顺序(和协程调用机制有关)。
| func TestGroutine(t *testing.T) { for i := 0; i < 10; i++ { go func(i int) { fmt.Println(i) }(i)
} time.Sleep(time.Millisecond * 50) }
|
共享内存保护
| func TestCounterThreadSafe(t *testing.T) { var mut sync.Mutex counter := 0 for i := 0; i < 5000; i++ { go func() { defer func() { mut.Unlock() }() mut.Lock() counter++ }() } time.Sleep(1 * time.Second) t.Logf("counter = %d", counter) }
|
等待机制
只有 wait 的所有内容都完成之后(每个协程的都执行完),才能继续往下执行。优化1的更好写法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func TestCounterWaitGroup(t *testing.T) { var mut sync.Mutex var wg sync.WaitGroup counter := 0 for i := 0; i < 5000; i++ { wg.Add(1) go func() { defer func() { mut.Unlock() }() mut.Lock() counter++ wg.Done() }() } wg.Wait() t.Logf("counter = %d", counter) }
|
CSP
CSP(Communicating Sequential Process)利用一个通道进行两个通信实体之间的协调。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func service() string { time.Sleep(time.Millisecond * 50) return "Done" } func otherTask() { fmt.Println("working on something else") time.Sleep(time.Millisecond * 100) fmt.Println("Task is done.") } func AsyncService() chan string {
retCh := make(chan string, 1) go func() { ret := service() fmt.Println("returned result.") retCh <- ret fmt.Println("service exited.") }() return retCh }
func TestAsynService(t *testing.T) { retCh := AsyncService() otherTask() fmt.Println(<-retCh) time.Sleep(time.Second * 1) }
|
猜想使用场景:如果不确定程序的执行时间,但是需要使用其返回值,可以通过channel作为中转,存放在channel中,需要时取出。
多路选择 & 超时机制
| func TestAsynService(t *testing.T) { select { case retCh := AsyncService(): t.Log(retCh) case <= time.After(time.Millisecond * 100) t.Error("time out") } }
|
chanel 取消
close机制
close 是广播机制的,所有 chanel 都会被取消。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func isCancelled(cancelChan chan struct{}) bool { select { case <- cancelChan: return true default: return false } }
func cancel_1(cancelChan chan struct{}) { cancelChan <- struct{}{} }
func cancel_2(cancelChan chan struct{}) { close(cancelChan) }
func TestCancel(t *testing.T) { cancelChan := make(chan struct{}, 0) for i := 0; i < 5; i++ { go func(i int, cancelCh chan struct{}) { for { if isCancelled(cancelCh) { break } time.Sleep(time.Millisecond * 5) } fmt.Println(i, "Cancelled") }(i, cancelChan) } cancel_2(cancelChan) time.Sleep(time.Second * 1) }
|
关联任务的取消-context
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func isCancelled(ctx context.Context) bool { select { case <-ctx.Done(): return true default: return false } }
func TestCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) for i := 0; i < 5; i++ { go func(i int, ctx context.Context) { for { if isCancelled(ctx) { break } time.Sleep(time.Millisecond * 5) } fmt.Println(i, "Cancelled") }(i, ctx) } cancel() time.Sleep(time.Second * 1) }
|
once
单例模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| type Singleton struct { data string } var singleInstance *Singleton var once sync.Once
func GetSingletonObj() *Singleton { once.Do(func() { fmt.Println("Create Obj") singleInstance = new(Singleton) }) return singleInstance }
func TestGetSingletonObj(t *testing.T) { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { obj := GetSingletonObj() fmt.Printf("%X\n", unsafe.Pointer(obj)) wg.Done() }() } wg.Wait() }
|