golang と言えば非同期に特化した言語ですが、慣れない内は簡単な非同期しか使えません。しかし sync パッケージを知る事でもっとカジュアルに、かつ確実な非同期処理を行う事が出来る様になります。
今日はそんな sync パッケージについて説明してみたいと思います。
sync.Mutex
ご存じ sync.Mutex です。皆さんが一番使う排他制御だと思います。
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func parallel(wg *sync.WaitGroup) {
fmt.Println("博")
time.Sleep(100 * time.Millisecond)
fmt.Println("多")
time.Sleep(100 * time.Millisecond)
fmt.Println("の")
time.Sleep(100 * time.Millisecond)
fmt.Println("塩")
wg.Done()
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
wg := new(sync.WaitGroup)
for i := 0; i < 3; i++ {
wg.Add(1)
go parallel(wg)
}
wg.Wait()
}
このコードを実行すると結果はおおよそ以下の様になります。
博
博
博
多
多
多
の
の
の
塩
塩
塩
時には順番が入り乱れる事もあるでしょう。これに sync.Mutex を追加して以下の様にします。
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func parallel(wg *sync.WaitGroup, m *sync.Mutex) {
m.Lock()
defer m.Unlock()
fmt.Println("博")
time.Sleep(100 * time.Millisecond)
fmt.Println("多")
time.Sleep(100 * time.Millisecond)
fmt.Println("の")
time.Sleep(100 * time.Millisecond)
fmt.Println("塩")
wg.Done()
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
wg := new(sync.WaitGroup)
m := new(sync.Mutex)
for i := 0; i < 3; i++ {
wg.Add(1)
go parallel(wg, m)
}
wg.Wait()
}
すると
博
多
の
塩
博
多
の
塩
博
多
の
塩
期待通りの順番に表示されます。上記のコードでは表示の排他のみを扱いましたが、更新処理と参照処理により排他制御を区別出来る RWMutex もあります。
また、上記のコードでしれーっと書いていますが sync.WaitGroup は Wait()
を呼び出すと Add()
を呼び出した回数から Done()
を呼び出した回数を引いて 0 になるまで待機する機能が簡単に実装出来ます。全ての goroutine の終了を待つ場合に使用します。
sync/atomic
sync/atomic パッケージは、名前の通りアトミックな演算を提供します。
package main
import (
"fmt"
"runtime"
"sync"
)
var v int32
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
wg := new(sync.WaitGroup)
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
v++
v++
v++
v++
v++
v++
v++
v++
v++
v++
wg.Done()
}()
}
wg.Wait()
fmt.Println(v)
}
このコードを実行すると、10回の加算を10並行で行うため合計で100になる事が期待出来ますが実際には何回かに1回100でない事象が発生します。
演算がアトミックでないからです。こういう場合に sync/atomic を使います。
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var v int32
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
wg := new(sync.WaitGroup)
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
atomic.AddInt32(&v, 1)
atomic.AddInt32(&v, 1)
atomic.AddInt32(&v, 1)
atomic.AddInt32(&v, 1)
atomic.AddInt32(&v, 1)
atomic.AddInt32(&v, 1)
atomic.AddInt32(&v, 1)
atomic.AddInt32(&v, 1)
atomic.AddInt32(&v, 1)
atomic.AddInt32(&v, 1)
wg.Done()
}()
}
wg.Wait()
fmt.Println(v)
}
このコードは何回実行しても合計が100になります。
sync.Once
ある関数ではある処理を1回のみ行いたい、そういった場合に使用します。
package main
import (
"fmt"
"runtime"
"sync"
)
var once = new(sync.Once)
func greeting(wg *sync.WaitGroup) {
once.Do(func() {
fmt.Println("こんにちわ")
})
fmt.Println("ごきげんいかがですか")
wg.Done()
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
defer fmt.Println("さようなら")
wg := new(sync.WaitGroup)
for i := 0; i < 5; i++ {
wg.Add(1)
go greeting(wg)
}
wg.Wait()
}
このコードの実行結果は以下の様になります。
こんにちわ
ごきげんいかがですか
ごきげんいかがですか
ごきげんいかがですか
ごきげんいかがですか
ごきげんいかがですか
さようなら
並行で走った場合でも1度しか実行されません。
sync.Cond
例えば goroutine を先行で10個用意しておき、ある号令に従い1つずつ並行処理を開始したいとします。こういうケースでは sync.Cond を作り Signal()
を1回ずつ呼び出します。
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
l := new(sync.Mutex)
c := sync.NewCond(l)
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Printf("waiting %d\n", i)
l.Lock()
defer l.Unlock()
c.Wait()
fmt.Printf("go %d\n", i)
}(i)
}
for i := 0; i < 10; i++ {
time.Sleep(1 * time.Second)
c.Signal()
}
time.Sleep(1 * time.Second)
}
このコードを実行すると以下の様な出力になります。
waiting 0
waiting 9
waiting 5
waiting 1
waiting 2
waiting 3
waiting 4
waiting 8
waiting 6
waiting 7
go 0
go 9
go 5
go 1
go 2
go 3
go 4
go 8
go 6
go 7
また全 goroutine に対して一斉に号令を掛ける場合は Broadcast()
を使います。
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
l := new(sync.Mutex)
c := sync.NewCond(l)
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Printf("waiting %d\n", i)
l.Lock()
defer l.Unlock()
c.Wait()
fmt.Printf("go %d\n", i)
}(i)
}
for i := 3; i >= 0; i-- {
time.Sleep(1 * time.Second)
fmt.Println(i)
}
c.Broadcast()
time.Sleep(3 * time.Second)
}
このコードを実行すると以下の様な出力になります。
waiting 0
waiting 1
waiting 4
waiting 7
waiting 8
waiting 9
waiting 5
waiting 2
waiting 3
waiting 6
3
2
1
0
go 0
go 9
go 1
go 4
go 7
go 8
go 3
go 5
go 2
go 6
カウントダウン後、「go」の部分が一気に出力されます。
簡略化の為に time.Sleep でカウントダウン待ちを入れていますが、実際は sync.Mutex により c.Broadcast
が c.Wait
を追い越さないようにしなければなりません。
sync.Pool
golang 1.3 から追加された機能です。
例えば、逐次行う作業と非同期に割込みで入ってくる作業を上手く処理したいとします。
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
defer debug.SetGCPercent(debug.SetGCPercent(-1))
p := sync.Pool{
New: func() interface{} {
return "定時作業"
},
}
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
for i := 0; i < 10; i++ {
p.Put("割込作業")
time.Sleep(100 * time.Millisecond)
}
wg.Done()
}()
wg.Add(1)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(p.Get())
time.Sleep(50 * time.Millisecond)
}
wg.Done()
}()
wg.Wait()
}
このコードでは、10個の「定時作業」(Get
を行う途中に「割込作業」(Put
)が割り込みます。作業を行うのは10個限りとします。
定時作業
定時作業
割込作業
定時作業
割込作業
割込作業
定時作業
定時作業
割込作業
定時作業
作業を割り込む側、作業を処理する側が並行で動いていても上手く処理出来ています。
この様に、golang の sync パッケージにはにとても便利な機能が揃っています。ぜひもっとカジュアルに非同期機能を使ってみて下さい。
きっと面白い物が出来上がると思います。