Skip to content

Sample concurrency solutions #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions projects/concurrency/atomics/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"fmt"
"sync"
"sync/atomic"
)

var x atomic.Int32

func increment(wg *sync.WaitGroup) {
x.Add(1)
wg.Done()
}

func main() {
var w sync.WaitGroup
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w)
}
w.Wait()
fmt.Println("final value of x", x.Load())
}
88 changes: 88 additions & 0 deletions projects/concurrency/lru_cache_coarse_grained_generations/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cache

import (
"sync"
"sync/atomic"
"time"
)

func NewCache[K comparable, V any](targetSize uint64, garbageCollectionInterval time.Duration) *Cache[K, V] {
cache := &Cache[K, V]{
targetSize: targetSize,
values: make(map[K]*valueAndGeneration[V], targetSize),
}

go func() {
ticker := time.Tick(garbageCollectionInterval)
for range ticker {
currentGeneration := cache.currentGeneration.Load()
cache.currentGeneration.Add(1)

// Accumulate a keysToDelete slice so that we can collect the keys to delete under a read lock rather than holding a write lock for the entire GC cycle.
// This will use extra memory, and has a disadvantage that we may bump a generation from a Get but then still evict that value because we already decided to GC it.
var keysToDelete []K
cache.mu.RLock()
// If we have free space, don't garbage collect at all. This will probably lead to very spiky evictions.
if uint64(len(cache.values)) <= targetSize {
cache.mu.RUnlock()
continue
}
for k, v := range cache.values {
// This is a _very_ coarse-grained eviction policy. As soon as our cache becomes full, we may evict lots of entries.
// It may be more useful to treat different values of generation differently, e.g. always evict if v.generation < currentGeneration - 5, and only evict more recent entries if that didn't free up any space.
if v.generation.Load() != currentGeneration {
keysToDelete = append(keysToDelete, k)
}
}
cache.mu.RUnlock()
if len(keysToDelete) > 0 {
cache.mu.Lock()
for _, keyToDelete := range keysToDelete {
delete(cache.values, keyToDelete)
}
cache.mu.Unlock()
}
}
}()

return cache
}

// type Cache implements a roughly-LRU cache. It attempts to keep to a maximum of targetSize, but may contain more entries at points in time.
// When under size pressure, it garbage collects entries which haven't been read or written, with no strict eviction ordering guarantees.
type Cache[K comparable, V any] struct {
targetSize uint64

mu sync.RWMutex
// Every time we Get/Put a value, we store which generation it was last accessed.
// We have a garbage collection goroutine which will delete entries that haven't been recently accessed, if the cache is full.
currentGeneration atomic.Uint64
values map[K]*valueAndGeneration[V]
}

type valueAndGeneration[V any] struct {
value V
generation atomic.Uint64
}

func (c *Cache[K, V]) Put(key K, value V) bool {
c.mu.Lock()
defer c.mu.Unlock()
valueWrapper := &valueAndGeneration[V]{
value: value,
}
valueWrapper.generation.Store(c.currentGeneration.Load())
c.values[key] = valueWrapper
return false
}

func (c *Cache[K, V]) Get(key K) (*V, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
valueWrapper, ok := c.values[key]
if !ok {
return nil, false
}
valueWrapper.generation.Store(c.currentGeneration.Load())
return &valueWrapper.value, true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package cache

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestPutThenGet(t *testing.T) {
cache := NewCache[string, string](10, 1*time.Millisecond)
previouslyExisted := cache.Put("greeting", "hello")
require.False(t, previouslyExisted)

time.Sleep(3 * time.Millisecond)

value, present := cache.Get("greeting")
require.True(t, present)
require.Equal(t, "hello", *value)
}

func TestGetMissing(t *testing.T) {
cache := NewCache[string, string](1, 1*time.Millisecond)
value, present := cache.Get("greeting")
require.False(t, present)
require.Nil(t, value)
}

func TestEviction_JustWrites(t *testing.T) {
cache := NewCache[string, string](10, 1*time.Millisecond)

for i := 0; i < 10; i++ {
cache.Put(fmt.Sprintf("entry-%d", i), "hello")
}

time.Sleep(3 * time.Millisecond)

_, present0 := cache.Get("entry-0")
require.True(t, present0)

_, present10 := cache.Get("entry-9")
require.True(t, present10)

cache.Put("entry-10", "hello")

time.Sleep(3 * time.Millisecond)

_, present1 := cache.Get("entry-1")
require.False(t, present1)
}
11 changes: 11 additions & 0 deletions projects/concurrency/lru_cache_coarse_grained_generations/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/CodeYourFuture/immersive-go-course/projects/concurrency/lru_cache_coarse_grained_generations

go 1.21.5

require github.com/stretchr/testify v1.8.4

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions projects/concurrency/lru_cache_coarse_grained_generations/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
131 changes: 131 additions & 0 deletions projects/concurrency/lru_cache_computing/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package cache

import (
"container/list"
"sync"
)

// entryLimit and concurrentComputeLimit must both be non-zero.
// computer must never panic.
func NewCache[K comparable, V any](entryLimit uint64, concurrentComputeLimit uint64, computer func(K) V) *Cache[K, V] {
computeChannel := make(chan K, concurrentComputeLimit)

resultChannel := make(chan keyValuePair[K, V], concurrentComputeLimit)

for i := 0; i < int(concurrentComputeLimit); i++ {
go func() {
for key := range computeChannel {
value := computer(key)
resultChannel <- keyValuePair[K, V]{
key: key,
value: &value,
}
}
}()
}

cache := &Cache[K, V]{
entryLimit: entryLimit,
computeChannel: computeChannel,

computedEntries: make(map[K]cacheEntry[K, V], entryLimit),
pendingEntries: make(map[K]*channelList[K, V]),
evictionList: list.New(),
}

go func() {
for result := range resultChannel {
cache.mu.Lock()
pendingEntry := cache.pendingEntries[result.key]
delete(cache.pendingEntries, result.key)

if len(cache.computedEntries) == int(cache.entryLimit) {
keyToEvict := cache.evictionList.Remove(cache.evictionList.Back()).(K)
delete(cache.computedEntries, keyToEvict)
}

evictionListPointer := cache.evictionList.PushFront(result.key)

cache.computedEntries[result.key] = cacheEntry[K, V]{
evictionListPointer: evictionListPointer,
value: *result.value,
}
pendingEntry.mu.Lock()
pendingEntry.value = result.value
cache.mu.Unlock()
for _, ch := range pendingEntry.channels {
ch <- result
}
pendingEntry.mu.Unlock()
}
}()

return cache
}

type cacheEntry[K any, V any] struct {
evictionListPointer *list.Element
value V
}

type keyValuePair[K any, V any] struct {
key K
value *V
}

type channelList[K any, V any] struct {
mu sync.Mutex
channels []chan (keyValuePair[K, V])
value *V
}

type Cache[K comparable, V any] struct {
entryLimit uint64

computeChannel chan K

mu sync.Mutex
computedEntries map[K]cacheEntry[K, V]
pendingEntries map[K]*channelList[K, V]
// Front is most recently used, back is least recently used
evictionList *list.List
}

func (c *Cache[K, V]) Get(key K) (V, bool) {
c.mu.Lock()
computedEntry, isComputed := c.computedEntries[key]
pendingEntry, isPending := c.pendingEntries[key]
if isComputed {
c.evictionList.MoveToFront(computedEntry.evictionListPointer)
c.mu.Unlock()
return computedEntry.value, true
}
if !isPending {
pendingEntry = &channelList[K, V]{}
c.pendingEntries[key] = pendingEntry
}
c.mu.Unlock()
if !isPending {
c.computeChannel <- key
}

pendingEntry.mu.Lock()
// Maybe the value was computed but hasn't been transfered from pending to computed yet
if pendingEntry.value != nil {
pendingEntry.mu.Unlock()
return *pendingEntry.value, isPending
}
channel := make(chan keyValuePair[K, V], 1)
pendingEntry.channels = append(pendingEntry.channels, channel)
pendingEntry.mu.Unlock()
value := <-channel
return *value.value, isPending
}

// Only exists for testing. Doesn't count as a usage for LRU purposes.
func (c *Cache[K, V]) has(key K) bool {
c.mu.Lock()
defer c.mu.Unlock()
_, ok := c.computedEntries[key]
return ok
}
Loading
Loading