Use of synchronization techniques in Golang

It is Zion
9 min readDec 18, 2020

--

Golang synchronizes multiple goroutines via channels and sync packages to prevent multiple goroutines from competing for data.

Data competition problem

Due to the data competition problem, it is necessary to synchronize multiple goroutines concurrently, so we need to understand what data competition is before that.

To understand data competition, it is necessary to mention atomic operations.

When each task executed by the computer cannot be further subdivided (it cannot be split into smaller subtasks), it is an atomic task.

Atomic operations are operations that are directly implemented by a single CPU instruction (this instruction cannot be interrupted during execution).

Usually, num = num + 1 looks like a single line of code, but it actually adds num to 1 and then writes num, which is not an atomic operation since there are two instructions after compilation.

Sometimes a single line of code is compiled with multiple CPU instructions, but there is only one write operation involving data, so it can also be considered an atomic operation (because data contention problems are usually generated by writing data).

Python, for example, officially supports atomic operations such as update for dictionaries:

import disinfo = {}def target():
global info
new = {"Lebum": "Champion"}
info.update(new) # focus on testing whether this sentence is an atomic operation
print(dis.dis(target), end = "\n")

After running this script, you can see the bytecode displayed in the console:

...
LOAD_ATTR 1(update)
LOAD_FAST 0(new)
POP_TOP
LOAD_CONST 0(None)
...

The sentence info.update(new) is also divided into several operations, but there is only one POP_TOP involved in writing, so the update of the dictionary can be considered as an atomic operation.

Once you understand atomic operations, data competition is easy to understand.

Data contention arises precisely because of non-atomic operations, some goroutines are fast and some are slow, so each execution of the same instruction may have a different result.

For example:

import "sync"func add(w *sync.WaitGroup, num *int) {
defer w.Done()
*num= *num + 1
}
func main() {
var n int = 0
var wg *sync.WaitGroup = new(sync.WaitGroup)
wg.Add(1000)
for i := 0; i < 1000; i = i + 1 {
go add(wg, &n)
} // spawn 1000 new goroutines
wg.Wait()
println(n)
}

Although 1000 goroutines read and write to n concurrently would result in 1000 if *num = *num + 1 were an atomic operation, the fact is that when you run this code you will find that the result may not be the same each time you run it, and it may well be less than 1000. This is because *num = *num + 1 is not an atomic operation, and each goroutine may not execute at the same speed, and then there are preemption calls, so something needs to be done to control this situation.

Concurrency that does not generate data contention is called thread-safe concurrency. No one wants to run their code with an uncontrolled result every time, so let’s look at the concurrency synchronization techniques that Golang provides.

sync/atomic

The sync/atomic package provides support for atomic operations for synchronizing reads and writes of integers and pointers.

There are five types of operations: add, subtract, compare and swap, load, store, and swap.

The types supported by atomic operations include int32, int64, uint32, uint64, uintptr, unsafe.Pointer.

For example, for the above example, replace *num = *num + 1 with the atomic operation provided by the sync/atomic package:

import "sync"
import "sync/atomic"
func add(w *sync.WaitGroup, num *int32) {
defer w.Done()
atomic.AddInt32(num, 1)
}
func main() {
var n int32 = 0
var wg *sync.WaitGroup = new(sync.WaitGroup)
wg.Add(1000)
for i := 0; i < 1000; i = i + 1 {
go add(wg, &n)
} // create 1000 new goroutines
wg.Wait()
println(n)
}

This guarantees that the result of n is 1000.(note: the instruction execution of atomic operations cannot be interrupted, so there is naturally no data contention)

Other methods can be found in the official Golang documentation, but I don’t want to explain too much here.

However, atomic operations are limited in the number of operations they can support, and most rely on sync packages and channels.

sync.WaitGroup

The sync.WaitGroup struct in the sync package is used to wait for a group of goroutines to finish executing, and control is blocked until the group of goroutines finishes executing.

Each sync.WaitGroup value maintains an internal count, which initially defaults to zero.

For an addressable sync.WaitGroup value wg:

  • wg.Add(delta) to change the value of the count maintained by wg, wg.Done() and wg.Add(-1) are exactly equivalent
  • If a wg.Add(delta) or wg.Done() call changes the count maintained by wg to a negative number, a panic will be generated
  • When wg.Wait() is called by a goroutine if the count maintained by wg is zero, the wg.Wait() operation is a null operation; otherwise (the count is a positive integer), the goroutine will go into a blocking state, and when some other goroutine later changes the count to zero (typically by calling wg.Done()), the concurrent process will re-enter the running state (i.e. wg.Wait() will return)

For an example, see the sync/atomic example above. The main goroutine will go into a blocking state to wait for 1000 to complete, after which the main goroutine will unblock.

sync.Once

The sync.Once value is used to ensure that a piece of code is not executed by multiple goroutines.

Each *sync.Once value has a Do(f func()) method.

For example:

var once *sync.Once = new(sync.Once)for i := 0; i < 10; i = i + 1 {
go func() {
println(i)
once.Do(doSomething())
}()
}
// here are 10 goroutines, but doSomething() will only be executed by one goroutine, println(i) must be executed before once.Do(doSomething())

sync.Mutex

A Mutex value is often referred to as a mutex lock, and a Mutex zero value is a mutex lock that has not yet been locked: var mutex *sync.Mutex = nil.

For an addressable Mutex value m:

  • If goroutine state is unlocked, call m.Lock() to change state to locked, call m.Unlock() will cause RuntimError exception.
  • If the goroutine state is locked, m.Lock() will be blocked until another goroutine calls m.Unlock() to release the lock, and m.Unlock() to change the state to unlocked.

For example:

import "sync"var num int = 0func add(lc *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 100000; i = i + 1 {
lc.Lock()
num = num + 1
lc.Unlock()
}
}
func minus(lc *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 100000; i = i + 1 {
lc.Lock()
num = num - 1
lc.Unlock()
}
}
func main() {
var mutex *sync.Mutex = new(sync.Mutex)
var wg *sync.WaitGroup = new(sync.WaitGroup)
wg.Add(2)
go add(mutex, wg)
go minus(mutex, wg)
wg.Wait()

println(num) // 0
}

For this code, after calling lc.Lock() in add, it is called the current goroutine wants to get the lock, if lc.Lock() has been called before in minus, then the goroutine calling add will be blocked until lc.Unlock() and vice versa.

sync.RMutex

For sync.RWMutex, when a goroutine is writing, the other goroutines can neither read nor write.

*Mutex and *sync.RWMutex types both implement the sync:

type Locker interface {
Lock()
Unlock()
}

sync.RWMutex type has two other methods: RLock() and RUnlock().

So the total is:

  • func (rw *RWMutex) Lock(): Get a write lock
  • func (rw *RWMutex) Unlock(): release write lock
  • func (rw *RWMutex) RLock(): Get a read lock
  • func (rw *RWMutex) RUnlock(): release read lock

sync.RWMutex is similar to sync.Mutex, so I won’t list it here. For sync.RWMutex, just remember:

If a goroutine wants to get RLock and other goroutines have only RLock, it will not block the goroutine that wants to get RLock, if there is a goroutine with Lock, it will block the goroutine that wants to get RLock until all other goroutines with Lock have released their Lock.

If a goroutine wants to get Lock, other goroutines with RLock or Lock will block the current goroutine that wants to get Lock until all other goroutines have RLock and Lock released.

sync.Cond

Unlike a mutex, a condition variable does not guarantee that only one goroutine has access to a shared data at the same time, but notifies other goroutines that are blocked when the state of the corresponding shared data changes.

Conditional variables are always used in combination with mutexes, which provide mutex support for accessing shared data, and conditional variables, which notify the relevant goroutine of a change in the state of the shared data.(That is, when defining variables with var cond *sync.Cond = sync.NewCond(new(sync.Mutex)))

After seizing a lock, it determines whether it satisfies the processing conditions, and if not, it releases the lock to other goroutines, then blocks itself, waits for other goroutines to notify it, then releases the block, and then goes back to acquire the lock.

For an addressable variable cond of type *sync.Cond, the following methods are commonly used:

  • cond.L.Lock() and cond.L.Unlock(): lock() and lock.Unlock() can also be used, exactly the same.
  • cond.Wait(): When this method is called, the outflow of the executed operation is: Unlock() -> blocking waiting notification (i.e. waiting for notification from Signal() or Broadcast()) -> receiving notification -> Lock().
  • cond.Signal(): Notify a Wait goroutine, if there is no Wait(), no error will be reported, Signal() notification order is based on the original join notification list (Wait()) first in first out.
  • cond.Broadcast(): Notify all Wait goroutines, if there is no Wait(), no error will be reported.

For example:

func main() {
var cond *sync.Cond = sync.NewCond(new(sync.Mutex))
var condition int = 0

// Consumer
go func() {
for {
cond.L.Lock()
for condition == 0 {
cond.Wait()
}
condition = condition - 1
println(condition)
cond.Signal()
cond.L.Unlock()
}
}()
// Producer
for {
time.Sleep(time.Second)
cond.L.Lock()
for condition == 3 {
cond.Wait()
}
condition = condition + 1
println(condition)
cond.Signal()
cond.L.Unlock()
}
}

sync.Map

IO for maps in Golang is not concurrently safe, reading and writing maps between multiple goroutines will result in a fatal error: concurrent map writes.

If you want to read and write maps concurrently, you can use sync.Map.

For example:

import "sync"func run(wg *sync.WaitGroup, sc *sync.Map) {
defer wg.Done()
var sceneIterate func(interface{}, interface{}) bool = func(k interface{}, v interface{}) bool {
println("iterate:", k, v)
return true
}
sc.Range(sceneIterate)
}
func main() {
var scene *sync.Map = new(sync.Map)

scene.Store("greece", 97)
scene.Store("london", 100)
scene.Store("egypt", 200)
var v interface{} = nil
var ok bool = false
v, ok = scene.Load("london")
println(v, ok)
scene.Delete("london")

var wg *sync.WaitGroup = new(sync.WaitGroup)

wg.Add(1)
go run(wg, scene)
wg.Wait()
}

Of course, other concurrency techniques can be used to concurrently operate map.

sync.Pool

sync.Pool can be used to cache objects, since frequent use of heap memory can cause too much work for GC.

sync.Pool can be reclaimed without notice to relieve GC pressure.

To initialize the pool, the only thing you need is to set up the New function so that when the Get method is called, if the pool has a cached object, it returns the cached object directly, and if it does not, the New function is called to create a new object.

For example:

import "sync"var intPool *sync.Pool = &sync.Pool {
New: func() interface{} {
var b []int = make([]int, 8)
return &b
},
}
func main() { // get the obj and do not put it back
for i := 1; i < 4; i = i + 1 {
var obj *[]int = intPool.Get().(*[]int)
(*obj)[i] = i
println(obj)
}
println("=======") // get the obj and put it back
for i := 1; i < 4; i = i + 1 {
var obj *[]int = intPool.Get().(*[]int)
(*obj)[i] = i
println(obj)
intPool.Put(obj)
}
}
console:
0xc00000c060
0xc00000c080
0xc00000c0a0
=======
0xc00000c0c0
0xc00000c0c0
0xc00000c0c0

As you can see from the console, the structs fetched and put back are the same.

Pools are suitable for scenarios where there is a lot of memory and a lot of concurrency, but when there is little memory and little concurrency, using pools is counterproductive.

The best practice for using sync.Pool is: Empty Pool before Put, Empty Pool after Get.

Channel

Finally, the Channel is used for much more than just synchronizing multiple goroutines, they are also used to pass data between different goroutines, or to control the number of goroutines, etc. The previous synchronization techniques can only be used to synchronize, which is where Channel differs from them.

In the absence of extreme cases, the use of Channel is preferred in synchronization techniques.

For the sync.Cond example above, it can be implemented as a Channel instead:

func main() {
var ch chan int = make(chan int, 3)
var v int = 0
// Consumer
go func() {
for {
println(<- ch)
}
}()
// Producer
for {
v = v + 1
println(v)
ch <- v
time.Sleep(time.Second)
}
}

There is a lot of information about the Channel mechanism on the web, so I don’t want to describe it here.

--

--

Responses (1)