Data handling in concurrent programs

04/02/2020 slice map counter sync atomic channels mutex lock concurrency race


Data handling in concurrent programs

In GO we have goroutines functionality out of the box . We can run a code in parallel. But in our parallel running code we can work with shared variables. And it is not clear how exactly GO handles such situations.

Let's start with the "counter task" — we'll try to increment a counter variable 200 times in multiple goroutines.

c := 0
wg := sync.WaitGroup{}
n := 200
wg.Add(n)
for i := 0; i < n; i++ {
	go func() {
		c++
		wg.Done()
	}()
}
wg.Wait()

fmt.Println(c)

// 194

The resulting counter value differs from time to time and in the most cases is not equal to 200. So, this code is not threadsafe and doesn't work as planned even if we don't have any compiler or runtime errors.

Next case — we'll try to insert 200 values into a slice in parallel and check if there are exactly 200 values.

c := []int{}
wg := sync.WaitGroup{}
n := 200
wg.Add(n)
for i := 0; i < n; i++ {
	go func() {
		c = append(c, 1)
		wg.Done()
	}()
}
wg.Wait()

fmt.Println(len(c))

// 129

The number of values in slice is even more far from 200, than it was in the counter task. This code is also not threadsafe.

Let's try to insert 200 values into a map in parallel:

c := map[int]int{}
wg := sync.WaitGroup{}
n := 200
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		c[i] = i
		wg.Done()
	}(i)
}
wg.Wait()

fmt.Println(len(c))

// panic: concurrent map writes

We can't check the result because of panic.

In all 3 tasks we have not working code, but the only with map there is an error message about concurent map writes, implemented by GO developers.

Race detection

GO has tool to detect such situations called race detection.

One can run any test case above with race flag — go test -race ./test.go. As a result GO displays data race goroutines:

go test -race ./test.go

==================
WARNING: DATA RACE
Read at 0x00c0000a6070 by goroutine 9:
  command-line-arguments.Test.func1()
      /go/src/github.com/antelman107/go_blog/test.go:16 +0x38

Previous write at 0x00c0000a6070 by goroutine 8:
  command-line-arguments.Test.func1()
      /go/src/github.com/antelman107/go_blog/test.go:16 +0x4e

Goroutine 9 (running) created at:
  command-line-arguments.Test()
      /go/src/github.com/antelman107/go_blog/test.go:15 +0xe8
  testing.tRunner()
      /usr/local/Cellar/go/1.14/libexec/src/testing/testing.go:992 +0x1eb
--- FAIL: Test (0.01s)
    testing.go:906: race detected during execution of test
FAIL
FAIL    command-line-arguments  0.025s
FAIL

Race detection is not go test functionality. One can even build a program in race detection mode:

$ go test -race mypkg    // to test the package
$ go run -race .  // to run the source file
$ go build -race .   // to build the command
$ go install -race mypkg // to install the package

It is nice that one can just directly data race in program.

Even popular "cycle closure" issue can be detected:

wg := sync.WaitGroup{}
n := 10
wg.Add(n)
for i := 0; i < n; i++ {
	go func() {
		fmt.Println(i)
		wg.Done()
	}()
}
wg.Wait()

The issue here is that code won't print exact 0,1,2 .. 9 numbers, but random numbers between 0 and 9.

The solution

Let's describe solution for counter task. This solution can be used for slice and map tasks.

So we have counter value that is less than what we expect.
Despite the brevity of increment call (c++), program actually performs the following list of actions:

  • read current counter value from memory,
  • increment it,
  • save the result to the memory.

The issue happens because some of goroutines read same initial value of counter. After reading the same initial value, such goroutines change it same way. This behaviour is explained on diagram:

The more we have this same initial value reading situation the more counter result differs from 200.

The solution here can be an atomic variable change. If some goroutine reads counter initial value, the next action should be the only counter update from that goroutine. None of other goroutines should access or change counter in the middle of that operation.

If we add some synchronization logic as described above, the diagram will look following:

sync.Mutex/sync.RWMutex solution

We can use methods Lock and Unlock to guarantee that the only one goroutine works with the counter at the time.

We can also use sync.RWMutex to provide parallel readings.

But in our task Mutex is completely enough:

c := 0
n := 200
m := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		m.Lock()
		c++
		m.Unlock()
		wg.Done()
	}(i)
}
wg.Wait()

fmt.Println(c)

// 200 == OK

Channels solution

Channels actions are atomic out of the box.

We can point any data into a channel with a single reader to provide consequent processing.

But to do that we need some additional code:

c := 0
n := 200

ch := make(chan struct{}, n)
chanWg := sync.WaitGroup{}
chanWg.Add(1)
go func() {
	for range ch {
		c++
	}
	chanWg.Done()
}()

wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		ch <- struct{}{}
		wg.Done()
	}(i)
}
wg.Wait()
close(ch)
chanWg.Wait()

fmt.Println(c)

// 200 = OK

We also used empty struct here because it is the smallest sized variable type of data in GO.

Atomic package solution

The standard GO package named atomic provides some set of atomic operations.

Thanks to runtime_procPin / runtime_procUnpin functions (the GO sources).

Pin function guarantees that GO scheduler won't run any other goroutine until Unpin is called.

We have several counter functions in atomic package, that help to implement our atomic counter:

c := int32(0)
n := 200

wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		atomic.AddInt32(&c, 1)
		wg.Done()
	}(i)
}
wg.Wait()

fmt.Println(c)

// 200 = OK

One can meet the atomic data change issue in many development situations.
For example, same issue is happening SELECT + UPDATE queries in SQL databases on multiple processes.

Related articles