04/02/2020 slice map counter sync atomic channels mutex lock concurrency race
In GO we have goroutines functionality out of the box . We can run a code in parallel. But in our parallel running code we can work with shared variables. And it is not clear how exactly GO handles such situations.
Let's start with the "counter task" — we'll try to increment a counter variable 200 times in multiple goroutines.
c := 0 wg := sync.WaitGroup{} n := 200 wg.Add(n) for i := 0; i < n; i++ { go func() { c++ wg.Done() }() } wg.Wait() fmt.Println(c) // 194
The resulting counter value differs from time to time and in the most cases is not equal to 200. So, this code is not threadsafe and doesn't work as planned even if we don't have any compiler or runtime errors.
Next case — we'll try to insert 200 values into a slice in parallel and check if there are exactly 200 values.
c := []int{} wg := sync.WaitGroup{} n := 200 wg.Add(n) for i := 0; i < n; i++ { go func() { c = append(c, 1) wg.Done() }() } wg.Wait() fmt.Println(len(c)) // 129
The number of values in slice is even more far from 200, than it was in the counter task. This code is also not threadsafe.
Let's try to insert 200 values into a map in parallel:
c := map[int]int{} wg := sync.WaitGroup{} n := 200 wg.Add(n) for i := 0; i < n; i++ { go func(i int) { c[i] = i wg.Done() }(i) } wg.Wait() fmt.Println(len(c)) // panic: concurrent map writes
We can't check the result because of panic.
In all 3 tasks we have not working code, but the only with map there is an error message about concurent map writes, implemented by GO developers.
GO has tool to detect such situations called race detection.
One can run any test case above with race flag — go test -race ./test.go
.
As a result GO displays data race goroutines:
go test -race ./test.go ================== WARNING: DATA RACE Read at 0x00c0000a6070 by goroutine 9: command-line-arguments.Test.func1() /go/src/github.com/antelman107/go_blog/test.go:16 +0x38 Previous write at 0x00c0000a6070 by goroutine 8: command-line-arguments.Test.func1() /go/src/github.com/antelman107/go_blog/test.go:16 +0x4e Goroutine 9 (running) created at: command-line-arguments.Test() /go/src/github.com/antelman107/go_blog/test.go:15 +0xe8 testing.tRunner() /usr/local/Cellar/go/1.14/libexec/src/testing/testing.go:992 +0x1eb --- FAIL: Test (0.01s) testing.go:906: race detected during execution of test FAIL FAIL command-line-arguments 0.025s FAIL
Race detection is not go test
functionality.
One can even build a program in race detection mode:
$ go test -race mypkg // to test the package $ go run -race . // to run the source file $ go build -race . // to build the command $ go install -race mypkg // to install the package
It is nice that one can just directly data race in program.
Even popular "cycle closure" issue can be detected:
wg := sync.WaitGroup{} n := 10 wg.Add(n) for i := 0; i < n; i++ { go func() { fmt.Println(i) wg.Done() }() } wg.Wait()
The issue here is that code won't print exact 0,1,2 .. 9 numbers, but random numbers between 0 and 9.
Let's describe solution for counter task. This solution can be used for slice and map tasks.
So we have counter value that is less than what we expect.
Despite the brevity of increment call (c++), program actually performs the following list of actions:
The issue happens because some of goroutines read same initial value of counter. After reading the same initial value, such goroutines change it same way. This behaviour is explained on diagram:
The more we have this same initial value reading situation the more counter result differs from 200.
The solution here can be an atomic variable change. If some goroutine reads counter initial value, the next action should be the only counter update from that goroutine. None of other goroutines should access or change counter in the middle of that operation.
If we add some synchronization logic as described above, the diagram will look following:
We can use methods Lock
and Unlock
to guarantee that
the only one goroutine works with the counter at the time.
We can also use sync.RWMutex
to provide parallel readings.
But in our task Mutex
is completely enough:
c := 0 n := 200 m := sync.Mutex{} wg := sync.WaitGroup{} wg.Add(n) for i := 0; i < n; i++ { go func(i int) { m.Lock() c++ m.Unlock() wg.Done() }(i) } wg.Wait() fmt.Println(c) // 200 == OK
Channels actions are atomic out of the box.
We can point any data into a channel with a single reader to provide consequent processing.
But to do that we need some additional code:
c := 0 n := 200 ch := make(chan struct{}, n) chanWg := sync.WaitGroup{} chanWg.Add(1) go func() { for range ch { c++ } chanWg.Done() }() wg := sync.WaitGroup{} wg.Add(n) for i := 0; i < n; i++ { go func(i int) { ch <- struct{}{} wg.Done() }(i) } wg.Wait() close(ch) chanWg.Wait() fmt.Println(c) // 200 = OK
We also used empty struct here because it is the smallest sized variable type of data in GO.
The standard GO package named atomic provides some set of atomic operations.
Thanks to runtime_procPin
/ runtime_procUnpin
functions
(the GO sources).
Pin function guarantees that GO scheduler won't run any other goroutine until Unpin is called.
We have several counter functions in atomic package, that help to implement our atomic counter:
c := int32(0) n := 200 wg := sync.WaitGroup{} wg.Add(n) for i := 0; i < n; i++ { go func(i int) { atomic.AddInt32(&c, 1) wg.Done() }(i) } wg.Wait() fmt.Println(c) // 200 = OK
One can meet the atomic data change issue in many development situations.
For example, same issue is happening SELECT + UPDATE queries in
SQL databases on multiple processes.