Go从入门到精通——同步——保证并发环境下数据访问的准确性(竞态检测、互斥锁、读写互斥锁)(Go from entry to mastery – synchronization – ensure the accuracy of data access in concurrent environment (race detection, mutex, read-write mutex))

同步——保证并发环境下数据访问的准确性(竞态检测、互斥锁、读写互斥锁)

  Go 程序可以使用通道进行多个 goroutine 间的数据交换,但这仅仅是数据同步中的一种方法。通道内部的实现依然使用了各种锁,因此优雅代码的代价是性能。在某些轻量级的场合,原子访问(atomic包)、互斥锁(sync.Mutex)以及等待组(sync.WaitGroup)能最大程度满足需求。

一、竞态检测——检测代码在并发环境下可能出现的问题

  当多线程并发运行的程序竞争访问和修改同一块资源时,会发生竞态问题。

  下面的代码中有一个 ID生成器,每次调用生成器将会生成一个不会重复的顺序序号,使用 10个并发生成序号,观察 10个并发后的结果。

package main

import (
	"fmt"
	"sync/atomic"
)

var (
	//序号
	seq int64
)

//序号生成器
func GenID() int64 {

	//使用原子操作函数 atomic.AddInt64() 对 seq加 1操作。
	//这里没有使用 atomic.AddInt64()的返回值作为 GenID()函数的返回值,
	//用来引出一个竞态问题。
	atomic.AddInt64(&seq, 1)
	return seq
}

func main() {

	//循环10次,生成10个 goroutine 调用 GenID() 函数
	for i := 0; i <= 10; i++ {
		go GenID()
	}

	//单独调用一次 GEnID() 函数
	fmt.Println(GenID())
}

  在运行程序时,为运行参数加入 “-race” 参数,开启运行时(runtime)对竞态问题的分析,命令如下:

PS D:\go-testfiles> go run -race .\racedetect.go

  代码运行发生宕机,输出信息如下:

PS D:\go-testfiles> go run -race .\racedetect.go
==================
WARNING: DATA RACE
Write at 0x0000011fd4f0 by goroutine 8:
  sync/atomic.AddInt64()
      C:/Program Files/Go/src/runtime/race_amd64.s:287 +0xb
  sync/atomic.AddInt64()
      <autogenerated>:1 +0x1b
  main.main.func1()
      D:/go-testfiles/racedetect.go:27 +0x2b

Previous read at 0x0000011fd4f0 by goroutine 7:
  main.GenID()
      D:/go-testfiles/racedetect.go:20 +0x3a
  main.main.func1()

PS D:\go-testfiles> go run -race .\racedetect.gocls
go: go.mod file not found in current directory or any parent directory; see 'go help modules'
PS D:\go-testfiles> go run -race .\racedetect.go   
==================
WARNING: DATA RACE
Write at 0x00000023d4f0 by goroutine 8:
  sync/atomic.AddInt64()
      C:/Program Files/Go/src/runtime/race_amd64.s:287 +0xb
  sync/atomic.AddInt64()
      <autogenerated>:1 +0x1b
  main.main.func1()
      D:/go-testfiles/racedetect.go:27 +0x2b

Previous read at 0x00000023d4f0 by goroutine 7:
  main.GenID()
      D:/go-testfiles/racedetect.go:20 +0x3a
  main.main.func1()
      D:/go-testfiles/racedetect.go:27 +0x2b

Goroutine 8 (running) created at:
  main.main()
      D:/go-testfiles/racedetect.go:27 +0x39

Goroutine 7 (finished) created at:
  main.main()
      D:/go-testfiles/racedetect.go:27 +0x39
==================
11
Found 1 data race(s)
exit status 66
PS D:\go-testfiles> 

  根据报错信息,在 20行 发现有竞态问题,

Previous read at 0x00000023d4f0 by goroutine 7:
  main.GenID()
      D:/go-testfiles/racedetect.go:20 +0x3a
  main.main.func1()
      D:/go-testfiles/racedetect.go:27 +0x2b

  我们修改下:

//序号生成器
func GenID() int64 {
	return atomic.AddInt64(&seq, 1)
}

  再次运行:

PS D:\go-testfiles> go run -race .\racedetect.go
11
PS D:\go-testfiles> 

  没有发生竞态问题了,程序运行正常。

  这个例子也可以用下面的互斥锁(sync.Mutex)解决,但是对性能消耗较大,在这种情况下,推荐使用原子操作(atomic)进行变量操作。

二、互斥锁(sync.Mutex)——保证同时只有一个 goroutine 可以访问共享资源

   互斥锁是一种常用的控制共享资源访问的方法。在 Go 程序中的使用非常简单。

package main

import (
	"fmt"
	"sync"
)

var (
	//逻辑中使用的某个变量
	count int

	//与变量对应的使用互斥锁,这里将互斥锁的变量命名为 变量名+Guard,以表示保护这个变量
	countGuard sync.Mutex
)

func GetCount() int {
	//锁定
	countGuard.Lock()

	//在函数退出时接触锁定
	defer countGuard.Unlock()

	return count
}

func SetCount(c int) {
	countGuard.Lock()
	count = c
	countGuard.Unlock()
}

func main() {

	//可以进行并发安全的设置
	SetCount(1)

	//可以进行并发安全的获取
	fmt.Println(GetCount())
}

 三、读写互斥锁(sync.RWMutex)——在读比写多的环境下比互斥锁更高效

   在读多写少的环境中,可以优先使用读写互斥锁,sync 包中的 RWMutex 提供了读写互斥锁的封装。

package main

import (
	"fmt"
	"sync"
)

var (
	//逻辑中使用的某个变量
	count int

	//与变量对应的使用互斥锁,这里将互斥锁的变量命名为 变量名+Guard,以表示保护这个变量
	countGuard sync.RWMutex
)

func GetCount() int {
	//锁定
	countGuard.RLock()

	//在函数退出时接触锁定
	defer countGuard.RUnlock()

	return count
}

func SetCount(c int) {
	countGuard.RLock()
	count = c
	countGuard.RUnlock()
}

func main() {

	//可以进行并发安全的设置
	SetCount(1)

	//可以进行并发安全的获取
	fmt.Println(GetCount())
}

四、等待组(sync.WaitGroup)——保证在并发环境中完成指定数量的任务

  除了可以使用通道(channel)和互斥锁进行两个并发程序间的同步外,还可以使用等待组进行多个任务的同步。

  等待组有下面几个方法可用,如下表:

等待组的方法
 方法名  功能
 (wg *WaitGroup)Add(delta int)  等待组的计数器+1
(wg *WaitGroup)Done()  等待组的计数器-1
(wg *WaitGroup)Wait()  当等待组计数器不等于0时阻塞直到变0

  等待组内部拥有一个计数器,计数器的值可以通过方法调用实现计数器的增加和减少。

  当我们添加了 N 个并发任务进行工作时,就将等待组的计数器值增加 N。每个任务完成时,这个值减 1。同时,在另外一个 goroutine 中等待这个等待组的计数器值为 0 时,表示所有任务已经完成。

package main

import (
	"fmt"
	"net/http"
	"sync"
)

func main() {

	//声明一个等待组
	var wg sync.WaitGroup

	//准备一系列的网站地址
	var urls = []string{
		"https://www.baidu.com",
		"https://www.jd.com",
		"https://www.taobao.com",
		"https://www.google.com",
	}

	//遍历这些地址
	for _, url := range urls {

		//每一个任务开始时,请等待组增加 1
		wg.Add(1)

		//开启一个并发
		go func(url string) {

			//使用 defer,表示函数完成时将等待组减少 1
			defer wg.Done()

			//使用 HTTP 访问提供的地址
			_, err := http.Get(url)

			//访问完成后,打印地址和可能发生的错误
			fmt.Println(url, err)

			//通过参数传递 url 地址
		}(url)
	}

	//等待所有的任务完成
	wg.Wait()

	fmt.Println("over")

}

  程序运行后:

Starting: D:\go-testfiles\bin\dlv.exe dap --check-go-version=false --listen=127.0.0.1:52039 from d:\go-testfiles
DAP server listening at: 127.0.0.1:52039
Type 'dlv help' for list of commands.
https://www.baidu.com <nil>
https://www.jd.com <nil>
https://www.taobao.com <nil>
https://www.google.com Get "https://www.google.com": dial tcp 162.125.32.6:443: connectex: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.
over
Process 20104 has exited with status 0
Detaching
dlv dap (5628) exited with code: 0
————————

Synchronization – ensure the accuracy of data access in concurrent environment (race detection, mutex, read-write mutex)

Go programs can use channels to exchange data between multiple goroutines, but this is only one method of data synchronization. The implementation inside the channel still uses various locks, so the price of elegant code is performance. In some lightweight situations, atomic access (atomic package), mutex (sync. Mutex) and wait group (sync. Waitgroup) can meet the requirements to the greatest extent.

1、 Race detection — detect possible problems of code in concurrent environment

< strong > race occurs when programs running concurrently with multiple threads compete to access and modify the same resource

There is an ID generator in the following code. Each time the generator is called, it will generate a sequence number that will not be repeated. Use 10 concurrent sequence numbers to generate sequence numbers, and observe the results after 10 concurrent sequences.

package main

import (
	"fmt"
	"sync/atomic"
)

var (
	//序号
	seq int64
)

//序号生成器
func GenID() int64 {

	//使用原子操作函数 atomic.AddInt64() 对 seq加 1操作。
	//这里没有使用 atomic.AddInt64()的返回值作为 GenID()函数的返回值,
	//用来引出一个竞态问题。
	atomic.AddInt64(&seq, 1)
	return seq
}

func main() {

	//循环10次,生成10个 goroutine 调用 GenID() 函数
	for i := 0; i <= 10; i++ {
		go GenID()
	}

	//单独调用一次 GEnID() 函数
	fmt.Println(GenID())
}

When running the program, add “- race” parameter to the running parameters to start the analysis of race problems at runtime. The command is as follows:

PS D:\go-testfiles> go run -race .\racedetect.go

When the code goes down, the output information is as follows:

PS D:\go-testfiles> go run -race .\racedetect.go
==================
WARNING: DATA RACE
Write at 0x0000011fd4f0 by goroutine 8:
  sync/atomic.AddInt64()
      C:/Program Files/Go/src/runtime/race_amd64.s:287 +0xb
  sync/atomic.AddInt64()
      <autogenerated>:1 +0x1b
  main.main.func1()
      D:/go-testfiles/racedetect.go:27 +0x2b

Previous read at 0x0000011fd4f0 by goroutine 7:
  main.GenID()
      D:/go-testfiles/racedetect.go:20 +0x3a
  main.main.func1()

PS D:\go-testfiles> go run -race .\racedetect.gocls
go: go.mod file not found in current directory or any parent directory; see 'go help modules'
PS D:\go-testfiles> go run -race .\racedetect.go   
==================
WARNING: DATA RACE
Write at 0x00000023d4f0 by goroutine 8:
  sync/atomic.AddInt64()
      C:/Program Files/Go/src/runtime/race_amd64.s:287 +0xb
  sync/atomic.AddInt64()
      <autogenerated>:1 +0x1b
  main.main.func1()
      D:/go-testfiles/racedetect.go:27 +0x2b

Previous read at 0x00000023d4f0 by goroutine 7:
  main.GenID()
      D:/go-testfiles/racedetect.go:20 +0x3a
  main.main.func1()
      D:/go-testfiles/racedetect.go:27 +0x2b

Goroutine 8 (running) created at:
  main.main()
      D:/go-testfiles/racedetect.go:27 +0x39

Goroutine 7 (finished) created at:
  main.main()
      D:/go-testfiles/racedetect.go:27 +0x39
==================
11
Found 1 data race(s)
exit status 66
PS D:\go-testfiles> 

According to the error information, a race problem is found in line 20,

Previous read at 0x00000023d4f0 by goroutine 7:
  main.GenID()
      D:/go-testfiles/racedetect.go:20 +0x3a
  main.main.func1()
      D:/go-testfiles/racedetect.go:27 +0x2b

We modify the following:

//序号生成器
func GenID() int64 {
	return atomic.AddInt64(&seq, 1)
}

Run again:

PS D:\go-testfiles> go run -race .\racedetect.go
11
PS D:\go-testfiles> 

There is no race problem, and the program runs normally.

This example can also be solved with the following mutex (sync. Mutex), but it consumes a lot of performance. In this case, atomic operation (atomic) is recommended for variable operation.

2、 Mutual exclusive lock (sync. Mutex) — ensure that only one goroutine can access shared resources at the same time

Mutex is a common method to control access to shared resources. The use in the Go program is very simple.

package main

import (
	"fmt"
	"sync"
)

var (
	//逻辑中使用的某个变量
	count int

	//与变量对应的使用互斥锁,这里将互斥锁的变量命名为 变量名+Guard,以表示保护这个变量
	countGuard sync.Mutex
)

func GetCount() int {
	//锁定
	countGuard.Lock()

	//在函数退出时接触锁定
	defer countGuard.Unlock()

	return count
}

func SetCount(c int) {
	countGuard.Lock()
	count = c
	countGuard.Unlock()
}

func main() {

	//可以进行并发安全的设置
	SetCount(1)

	//可以进行并发安全的获取
	fmt.Println(GetCount())
}

3. Read / write mutex (sync. Rwmutex) – it is more efficient than mutex in an environment where there are more reads than writes

In the environment of more reading and less writing, the read-write mutex can be used preferentially. Rwmutex in sync package provides the encapsulation of read-write mutex.

package main

import (
	"fmt"
	"sync"
)

var (
	//逻辑中使用的某个变量
	count int

	//与变量对应的使用互斥锁,这里将互斥锁的变量命名为 变量名+Guard,以表示保护这个变量
	countGuard sync.RWMutex
)

func GetCount() int {
	//锁定
	countGuard.RLock()

	//在函数退出时接触锁定
	defer countGuard.RUnlock()

	return count
}

func SetCount(c int) {
	countGuard.RLock()
	count = c
	countGuard.RUnlock()
}

func main() {

	//可以进行并发安全的设置
	SetCount(1)

	//可以进行并发安全的获取
	fmt.Println(GetCount())
}

4、 Sync. Waitgroup – ensures that a specified number of tasks are completed in a concurrent environment

In addition to using channels and mutexes to synchronize two concurrent programs, you can also use wait groups to synchronize multiple tasks.

The following methods are available for the waiting group, as shown in the table below:

等待组的方法
 方法名  功能
 (wg *WaitGroup)Add(delta int)  等待组的计数器+1
(wg *WaitGroup)Done()  等待组的计数器-1
(wg *WaitGroup)Wait()  当等待组计数器不等于0时阻塞直到变0

There is a counter inside the waiting group. The value of the counter can be increased and decreased through method calls.

When we add n concurrent tasks to work, we increase the counter value of the waiting group by n. When each task is completed, this value is reduced by 1. Meanwhile, when the counter value of the waiting group in another goroutine is 0, it indicates that all tasks have been completed.

package main

import (
	"fmt"
	"net/http"
	"sync"
)

func main() {

	//声明一个等待组
	var wg sync.WaitGroup

	//准备一系列的网站地址
	var urls = []string{
		"https://www.baidu.com",
		"https://www.jd.com",
		"https://www.taobao.com",
		"https://www.google.com",
	}

	//遍历这些地址
	for _, url := range urls {

		//每一个任务开始时,请等待组增加 1
		wg.Add(1)

		//开启一个并发
		go func(url string) {

			//使用 defer,表示函数完成时将等待组减少 1
			defer wg.Done()

			//使用 HTTP 访问提供的地址
			_, err := http.Get(url)

			//访问完成后,打印地址和可能发生的错误
			fmt.Println(url, err)

			//通过参数传递 url 地址
		}(url)
	}

	//等待所有的任务完成
	wg.Wait()

	fmt.Println("over")

}

After the program runs:

Starting: D:\go-testfiles\bin\dlv.exe dap --check-go-version=false --listen=127.0.0.1:52039 from d:\go-testfiles
DAP server listening at: 127.0.0.1:52039
Type 'dlv help' for list of commands.
https://www.baidu.com <nil>
https://www.jd.com <nil>
https://www.taobao.com <nil>
https://www.google.com Get "https://www.google.com": dial tcp 162.125.32.6:443: connectex: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.
over
Process 20104 has exited with status 0
Detaching
dlv dap (5628) exited with code: 0