Работа с данными в конкурентных программах на GO

02.04.2020 срез карта счетчик sync atomic каналы mutex lock конкурентность race


Работа с данными в конкурентных программах на GO

В языке GO базовой возможностью является использование горутин, это является фичей "из коробки". Код запускаемый в горутинах, может работать паралеллельно. Параллельно работающие горутины могут использовать общие переменные и возникают вопросы безопасна ли такая работа и как поведет себя программа.

Рассмотрим "задачу счетчика" — попробуем инкрементировать переменную 200 раз через несколько горутин:

c := 0
wg := sync.WaitGroup{}
n := 200
wg.Add(n)
for i := 0; i < n; i++ {
	go func() {
		c++
		wg.Done()
	}()
}
wg.Wait()

fmt.Println(c)

// 194

При неоднократном вызове кода получаем разное значение счетчика, не равное 200. Данный код не потокобезопасен, не смотря на отсутствие ошибок на этапе компиляции или в рантайме. Очевидно также, что задача счетчика не решена.

Рассмотрим другой случай — конкурентная вставка в slice 200 записей.

c := []int{}
wg := sync.WaitGroup{}
n := 200
wg.Add(n)
for i := 0; i < n; i++ {
	go func() {
		c = append(c, 1)
		wg.Done()
	}()
}
wg.Wait()

fmt.Println(len(c))

// 129

В данном случае видим снова разную итоговую длину среза и эта длина еще более далека от 200, чем был счетчик. Данный код также не потокобезопасен.

Рассмотрим теперь ту же ситуацию но при вставке в map:

c := map[int]int{}
wg := sync.WaitGroup{}
n := 200
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		c[i] = i
		wg.Done()
	}(i)
}
wg.Wait()

fmt.Println(len(c))

// fatal error: concurrent map writes

Получить результат работы данной программы невозможно, она завершается с ошибкой. Все 3 варианта кода нельзя использовать, но только в случае с map разработчики языка позаботились о явном информировании о проблеме.

Race detection

В GO есть встроенный механизм определения подобных ситуаций — race detection.

Запускаем любой из приведенных примеров go test -race ./test.go и видим список всех горутин, которые осуществляют конкурентный доступ:

go test -race ./test.go

==================
WARNING: DATA RACE
Read at 0x00c0000a6070 by goroutine 9:
  command-line-arguments.Test.func1()
      /go/src/github.com/antelman107/go_blog/test.go:16 +0x38

Previous write at 0x00c0000a6070 by goroutine 8:
  command-line-arguments.Test.func1()
      /go/src/github.com/antelman107/go_blog/test.go:16 +0x4e

Goroutine 9 (running) created at:
  command-line-arguments.Test()
      /go/src/github.com/antelman107/go_blog/test.go:15 +0xe8
  testing.tRunner()
      /usr/local/Cellar/go/1.14/libexec/src/testing/testing.go:992 +0x1eb
--- FAIL: Test (0.01s)
    testing.go:906: race detected during execution of test
FAIL
FAIL    command-line-arguments  0.025s
FAIL

Запускать таким образом можно не только go test, но и даже скомпилировать вашу программу в данном режиме:

$ go test -race mypkg    // to test the package
$ go run -race .  // to run the source file
$ go build -race .   // to build the command
$ go install -race mypkg // to install the package

Таким образом можно не гадать о наличии конкуретного доступа (race), но явным видом проверить его.

Приятно, что даже стандартная проблема "цикла с замыканием" обнаруживается race детектором:

wg := sync.WaitGroup{}
n := 10
wg.Add(n)
for i := 0; i < n; i++ {
	go func() {
		fmt.Println(i)
		wg.Done()
	}()
}
wg.Wait()

Ошибка данного кода в том, что цикл переберет не значение от 1 до 9, а вообще достаточно рандомные значения в этом интервале.

Решение проблемы синхронизации данных

Далее все примеры и логику будем решать на основе проблемы счетчика, хотя работа со срезом и мапой может быть решена так же.

По результатам работы кода мы видим, что счетчик имеет ошибочно меньшее значение, чем мы расчитываем.
Несмотря на краткость вызова инкремента (c++), программа фактически выполняет несколько действий:

  • считать текущее значение счетчика из памяти,
  • увеличить его значение,
  • сохранить результат.

Проблема возникает, когда горутины при одновременной работе считывают одно и то же исходное значение счетчика и далее вычисляют изменение исходя из него. По "правильной" же логике, каждое изменение счетчика должно уникальным образом изменить его значение.

Чем больше раз такая ситуация встречается, тем на большее число итоговое значение счетчика отстает от 200. На диаграммах приведен простейший случай — из 2 горутин, при этом данную проблему можно распространить на любое число горутин, работающих параллельно.

Решением проблемы будет атомарная работа с данными, т.е. каждая пара действий чтение+изменение должна быть выполнена строго последовательно. Если одна из горутин прочла значение счетчика, то никакие другие горутины не должны читать или менять его, пока первая горутина не запишет в счетчик измененное значение.

После добавления механизмов синхронизации диаграмма должна выглядит так:

Решение с использованием sync.Mutex/sync.RWMutex

В данном случае атомарность обеспечена вызовами Lock и Unlock, которые позволяют только 1 горутине получить лок и обеспечивают лишь один выполняемый инкремент в одну единицу времени, как показано на диаграмме выше.

sync.RWMutex предоставляет локи на чтение и на запись и разрешит одновременный вызов нескольких RLock, но не допустит одновременной работы RLock и Lock.

Для нашей задачи будет достаточно простого Mutex:

c := 0
n := 200
m := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		m.Lock()
		c++
		m.Unlock()
		wg.Done()
	}(i)
}
wg.Wait()

fmt.Println(c)

// 200 == OK

Синхронизация с помощью каналов

Каналы имеют встроенный механизм синхронизации — операции вставки и извлечения выполняются последовательно.

Обеспечив отправку "задач" через канал с единственным получателем, мы "естественным" образом выполним увеличение счетчика последовательно.

Однако работа с каналом требует дополнительного кода:

c := 0
n := 200

ch := make(chan struct{}, n)
chanWg := sync.WaitGroup{}
chanWg.Add(1)
go func() {
	for range ch {
		c++
	}
	chanWg.Done()
}()

wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		ch <- struct{}{}
		wg.Done()
	}(i)
}
wg.Wait()
close(ch)
chanWg.Wait()

fmt.Println(c)

// 200 = OK

Для передачи данных по каналу также использована пустая структура, как наиболее экономичная по занимаемой памяти.

Синхронизация с помощью atomic

Пакет atomic, судя по своему названию, позволяет выполнять атомарные операции с данными.

Атомарность обеспечивается функциями runtime_procPin / runtime_procUnpin (исходник).

Данные функции обеспечивают то, что между ними планировщик GO не будет выполнять никакую другую горутину. Благодаря этому код между pin и unpin выполняется атомарно.

В пакете atomic имеются готовые функции для работы со счетчиком различных численных типов и код сильно упрощается:

c := int32(0)
n := 200

wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		atomic.AddInt32(&c, 1)
		wg.Done()
	}(i)
}
wg.Wait()

fmt.Println(c)

// 200 = OK

Проблема атомарности изменений распространяется на любые ситуации чтения и изменения данных в условиях нескольких параллельно работающих процессов.
Например, часто ошибкой бывает неатомарное выполнение SELECT и UPDATE запросов в SQL-подобных базах данных.

Другие статьи