One of the reasons people come to Go is for the performance benefits that come from running a concurrent program in parallel (i.e., on multiple cores). However, concurrency can also cause problems such as:
- Unexpected behavior due to race condition by concurrent access.
- Decreased performance caused by resource contention (e.g., too much disk access at a time).
- Program is killed when it runs the system out of memory.
While Go schedules CPU access by limiting the number of concurrently running goroutines (with GOMAXPROCS), it does not manage any of the resources used by those goroutines such as memory, disk, network,
os/execed processes, etc. Since Go does not manage them, your program needs to.
I'll show you several way to limit concurrent resource use. You can get the code by running:
go get -d pocketgophers.com/limit-concurrent-use
An example to work from¶
One of the problems in understanding how concurrent things work is figuring out what things are happening at the same time. So, we'll start out without concurrency:
package main import ( "log" "time" ) func main() { log.SetFlags(log.Ltime) for i := 0; i < 9; i++ { // Resource intensive work goes here time.Sleep(time.Second) log.Println(i) } }
The resource each task uses is a second of time. Each task uses one second, and prints out which one it used. Since this program is serial, each task uses a different second.
$ go run serial.go 09:59:36 0 09:59:37 1 09:59:38 2 09:59:39 3 09:59:40 4 09:59:41 5 09:59:42 6 09:59:43 7 09:59:44 8
Unlimited Concurrency¶
That serial implementation is slow because the tasks aren't sharing the resource they need. Surely all the tasks could use the same second. For that, we need a concurrent implementation.
package main import ( "log" "sync" "time" ) func main() { log.SetFlags(log.Ltime) var wg sync.WaitGroup for i := 0; i < 9; i++ { wg.Add(1) go func(i int) { defer wg.Done() // Resource intensive work goes here time.Sleep(time.Second) log.Println(i) }(i) } wg.Wait() }
Each task is ran in a separate goroutine. This allows up to
GOMAXPROCS to execute at the same time. The
sync.WaitGroup is used to keep the program running until all the tasks complete. Notice that
i is passed into each task so the goroutines don't share the same
i. Not doing this is a common error because each loop iteration uses the same
i.
If the computer is fast enough, or there are enough cores to run on, all of the tasks could use the same second.
$ go run unlimited.go 09:59:45 7 09:59:45 1 09:59:45 0 09:59:45 3 09:59:45 2 09:59:45 4 09:59:45 5 09:59:45 8 09:59:45 6
It ends up my computer is fast enough. Each task printed out the same time.
So maybe this is a bad example of resource contention because all the tasks were able to use the same second. Increasing the number of tasks from 9 to 9000 creates enough contention on my computer that two seconds were needed. If your resource contention is also low, it might not be worth the effort to figure out what the limits are and enforce them.
In this case, the fact that all the tasks can use the same second means that tasks using different seconds is due to the method that limits concurrent access.
Single Access¶
You can limit concurrent resource use to a single accessor with a
sync.Mutex. A mutex is usually used to avoid race conditions.
package main import ( "log" "sync" "time" ) func main() { log.SetFlags(log.Ltime) var wg sync.WaitGroup var mu sync.Mutex for i := 0; i < 9; i++ { wg.Add(1) go func(i int) { defer wg.Done() mu.Lock() defer mu.Unlock() // Resource intensive work goes here time.Sleep(time.Second) log.Println(i) }(i) } wg.Wait() }
The idea is that each task tries to
Lock access to the resource away from other tasks while it performs the task. When the task is complete, it
Unlocks access, allowing some other task access to the resource.
$ go run mutex.go 09:59:47 1 09:59:48 0 09:59:49 3 09:59:50 2 09:59:51 4 09:59:52 5 09:59:53 8 09:59:54 6 09:59:55 7
Each task, running in its own goroutine, used a different second. This means we successfully limited concurrent resource use to a single user at a time.
A mutex is most useful when only a part of your code needs to access the shared resource. That part is called the critical section.
Critical Sections and Performance¶
The part of code that needs access to the shared resource is known as the critical section. Here are two ways to control what part of the code is in the critical section:
go func() { // not critical mu.Lock() defer mu.Unlock() // critical section }()
go func() { // not critical mu.Lock() // critical section mu.Unlock() // not critical }()
I prefer
defer because is keeps the
Lock/
Unlock together and makes early
returns in the critical section easier (because you don't have to make sure to
Unlock at each one).
The main performance concern is that when a task is in its critical section, other tasks are waiting to get their turn.
In this example, the mutex version takes as long as the serial version. It is also less efficient because the programmer had to write more code that does more work:
- launching goroutines
- scheduling goroutines
- tasks contending to lock the mutex
However, if the critical section of the tasks is only a small part, the performance gains can be worth the extra work.
Single Writer, Multiple Reader¶
Sometimes not all of your tasks will need exclusive access. For example if most of your tasks only read from a variable while only a few need to change it. Performance can be greatly improved by allowing an arbitrary number of readers concurrent access, but giving exclusive access to a writer. This is accomplished with a readers-writer lock, available in Go as
sync.RWMutex.
package main import ( "log" "sync" "time" ) func main() { log.SetFlags(log.Ltime) var wg sync.WaitGroup var mu sync.RWMutex for i := 0; i < 9; i++ { wg.Add(1) go func(i int) { defer wg.Done() if i == 4 { mu.Lock() defer mu.Unlock() } else { mu.RLock() defer mu.RUnlock() } // Resource intensive work goes here time.Sleep(time.Second) log.Println(i) }(i) } wg.Wait() }
I have task 4 act as a writer, while the other tasks act as readers.
$ go run rwmutex.go 09:59:56 3 09:59:56 1 09:59:56 8 09:59:56 2 09:59:57 4 09:59:58 7 09:59:58 0 09:59:58 6 09:59:58 5
Notice the second used by task 4 is only used by task 4. The other tasks use the second before or after the one used by task 4.
Limited Concurrency¶
Sometimes concurrent access is fine, but too much concurrent use will hinder performance. For example, multiple network transfers can happen at the same time. However, your bandwidth limits the number of transfers that can perform well at the same time. Semaphores allow for concurrent access, but limit the number of concurrent accessors. Buffered channels can be used to implement a semaphore.
package main import ( "log" "sync" "time" ) func main() { log.SetFlags(log.Ltime) var wg sync.WaitGroup semaphore := make(chan struct{}, 3) for i := 0; i < 9; i++ { wg.Add(1) go func(i int) { defer wg.Done() semaphore <- struct{}{} // Lock defer func() { <-semaphore // Unlock }() // Resource intensive work goes here time.Sleep(time.Second) log.Println(i) }(i) } wg.Wait() }
The capacity of the channel, in this case 3, is the maximum number of concurrent accessors. To lock the resource, a task will send a value on the channel. If the channel is full, the send will block until a value is read from the channel, which is how a task releases its lock.
$ go run semaphore.go 10:00:00 3 10:00:00 1 10:00:00 0 10:00:01 2 10:00:01 4 10:00:01 5 10:00:02 7 10:00:02 8 10:00:02 6
Notice that 3 tasks use each second.
Limited Goroutines¶
So far all the concurrent examples used one goroutine for each task. While goroutines are cheap to run, they aren't free as they use memory and need to be scheduled. You can limit the number of goroutines used to execute your tasks with worker pool.
package main import ( "log" "sync" "time" ) func main() { log.SetFlags(log.Ltime) tasks := make(chan int) var wg sync.WaitGroup for worker := 0; worker < 3; worker++ { wg.Add(1) go func() { defer wg.Done() for i := range tasks { // Resource intensive work goes here time.Sleep(time.Second) log.Println(i) } }() } for i := 0; i < 9; i++ { tasks <- i } close(tasks) wg.Wait() }
A goroutine is launched for each worker, in this case 3 of them. Each worker waits for a task to come in on the
tasks channel. When one arrives, the worker performs the task. Tasks are sent from another goroutine. After all the tasks are sent, the
tasks channel is closed. When a channel is closed, any values remaining in the channel will be received. When the channel is both closed and empty, the
for …
range loops will finish, ending the workers.
$ go run pool.go 10:00:03 1 10:00:03 0 10:00:03 2 10:00:04 4 10:00:04 5 10:00:04 3 10:00:05 7 10:00:05 6 10:00:05 8
As you can see, each second was used by 3 tasks. However, only 4 goroutines (3 workers + the task sender) were used to execute the tasks whereas the semaphore example used 9 (one for each task).