sync primitives better by building them yourself — but don’t use them in production! 🚀DISCLAIMER ⚠️
This article is for educational purposes only.
I don’t recommend replacing the sync package structures in production.
The goal is to strengthen your understanding of the sync primitives and how they can be built using chan.
Synchronization Primitives
Synchronization primitives control the behavior of applications during multithreaded execution.
In this article, we’ll cover three core structures from the sync package:
- Mutex
- RWMutex
- WaitGroup
For each, we will have:
- An example (buggy ➔ fixed)
- Method signature
- A full custom implementation using
chan.
Mutex
Mutex controls concurrent modifications of shared variables.
Example
import (
"net/http"
"strconv"
)
func main() {
var counter int
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
counter++
_, _ = w.Write([]byte(strconv.Itoa(counter)))
})
_ = http.ListenAndServe(":8080", nil)
}Looks innocent?
Actually, the counter can be corrupted under high request concurrency!
Solution
Use a Mutex:
import (
"net/http"
"strconv"
"sync"
)
func main() {
var (
mutex sync.Mutex
counter int
)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
mutex.Lock()
defer mutex.Unlock()
counter++
_, _ = w.Write([]byte(strconv.Itoa(counter)))
})
_ = http.ListenAndServe(":8080", nil)
}Signature
type Mutex struct {}
func NewMutex() *Mutex {}
func (m *Mutex) Lock() {}
func (m *Mutex) Unlock() {}
Custom Implementation
type Mutex struct {
channel chan struct{}
}
func NewMutex() *Mutex {
return &Mutex{
channel: make(chan struct{}, 1),
}
}
func (m *Mutex) Lock() {
m.channel <- struct{}{}
}
func (m *Mutex) Unlock() {
select {
case <-m.channel:
default:
panic("unlock of unlocked Mutex")
}
}RWMutex
RWMutex allows multiple readers but only one writer.
Example
type cache struct {
storage map[string]interface{}
}
func NewCache() *cache {
return &cache{storage: make(map[string]interface{})}
}
func (c *cache) Get(key string) interface{} {
return c.storage[key]
}
func (c *cache) Set(key string, value interface{}) {
c.storage[key] = value
}Concurrent writes will corrupt the storage map.
Solution
Use RWMutex:
import (
"sync"
)
type cache struct {
mutex sync.RWMutex
storage map[string]interface{}
}
func NewCache() *cache {
return &cache{storage: make(map[string]interface{})}
}
func (c *cache) Get(key string) interface{} {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.storage[key]
}
func (c *cache) Set(key string, value interface{}) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.storage[key] = value
}Signature
type RWMutex struct {}
func New() *RWMutex {}
func (rw *RWMutex) RLock() {}
func (rw *RWMutex) RUnlock() {}
func (rw *RWMutex) Lock() {}
func (rw *RWMutex) Unlock() {}
Custom Implementation
type RWMutex struct {
counter int
readChan chan struct{}
writeChan chan struct{}
mutex Mutex
}
func New() *RWMutex {
return &RWMutex{
readChan: make(chan struct{}, 1),
writeChan: make(chan struct{}, 1),
mutex: *NewMutex(),
}
}
func (rw *RWMutex) RLock() {
rw.mutex.Lock()
defer rw.mutex.Unlock()
if rw.counter == 0 {
rw.readChan <- struct{}{}
}
rw.counter++
}
func (rw *RWMutex) RUnlock() {
rw.mutex.Lock()
defer rw.mutex.Unlock()
if rw.counter == 0 {
panic("unlock of unlocked RWMutex")
}
rw.counter--
if rw.counter == 0 {
<-rw.readChan
}
}
func (rw *RWMutex) Lock() {
rw.writeChan <- struct{}{}
rw.readChan <- struct{}{}
}
func (rw *RWMutex) Unlock() {
select {
case <-rw.writeChan:
default:
panic("unlock of unlocked RWMutex")
}
<-rw.readChan
}WaitGroup
WaitGroup waits for a collection of goroutines to finish.
Example
import (
"math/rand/v2"
"time"
)
func main() {
for range 100 {
go func() {
time.Sleep(time.Duration(rand.IntN(10)) * time.Millisecond)
}()
}
}No waiting ➔ Main program may exit prematurely!
Solution
import (
"math/rand/v2"
"sync"
"time"
)
func main() {
wg := sync.WaitGroup{}
for range 100 {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.IntN(10)) * time.Millisecond)
}()
}
wg.Wait()
}Signature
type WaitGroup struct {}
func New() *WaitGroup {}
func (wg *WaitGroup) Add(delta int) {}
func (wg *WaitGroup) Done() {}
func (wg *WaitGroup) Wait() {}
Custom Implementation
type WaitGroup struct {
counter int
channel chan struct{}
mutex Mutex
}
func New() *WaitGroup {
return &WaitGroup{
channel: make(chan struct{}, 1),
mutex: *NewMutex(),
}
}
func (wg *WaitGroup) Add(delta int) {
wg.mutex.Lock()
defer wg.mutex.Unlock()
if wg.counter == 0 && delta > 0 {
wg.channel <- struct{}{}
}
wg.counter += delta
if wg.counter < 0 {
panic("negative WaitGroup counter")
}
if wg.counter == 0 {
<-wg.channel
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
func (wg *WaitGroup) Wait() {
wg.channel <- struct{}{}
<-wg.channel
}Conclusion
If you made it this far, I hope you learned something new 💫.
Stay tuned for more Go tips 🚀❤️🔥.