Go は goroutine という非同期の仕組みを提供していますが、使い方次第では色々なパターンが実装できる為、初めて goroutine を見た人はどの様な物が正解なのか分からない事があります。以前、このブログでも紹介した事がありますが Go の非同期の仕組みは一見単純な様に見えて実はとても奥深いのです。
Big Sky :: golang の channel を使ったテクニックあれこれ2012 年に Rob Pike 氏が Google I/O で「Go Concurrency Patterns」というトークを行いました。Gopher (Go言語使い) には結構有名なトークでしたが、改めてこのトークで紹介された Go の非同期パターンをおさらいする事で Go の魅力を感じとって頂けたらと思います。
golang の channel は他の言語に見ない独特のパラダイムを開発者に提供します。 単純にスレッド間でメッセージングをするだけでもC言語で書けばそこそこの量になったり、慣れていない人であればど...
https://mattn.kaoriya.net/software/lang/go/20160706165757.htm
以下はスライド。
Go Concurrency Patterns
https://talks.golang.org/2012/concurrency.slide#1
以下のリポジトリにトークで紹介されたコードが集められています。
GitHub - kevchn/go-concurrency-patterns: Golang concurrency patterns from Rob Pike's 2012 Google I/O talk
Golang concurrency patterns from Rob Pike's 2012 Google I/O talk
https://github.com/kevchn/go-concurrency-patterns
それぞれどの様に動くのか解説します。
基本 - 1-1-basics.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Exposition of Golang's concurrency primitives
package main
import (
"fmt"
"time"
)
func main() {
go regular_print("Hello")
fmt.Println("Second print statement!")
time.Sleep(3 * time.Second)
fmt.Println("Third print statement!") // when main returns, the goroutines also end
}
func regular_print(msg string) {
for i := 0; ; i++ {
fmt.Println(msg, i)
time.Sleep(time.Second)
}
}
数字の出力は数秒すると停止します。main が終了すると goroutine も終了する事を理解しましょう。
channel - 1-2-channel.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Golang channels
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go channel_print("Hello", ch)
for i := 0; i < 3; i++ {
fmt.Println(<-ch) // ends of channel block until both are ready
// NOTE: golang supports buffered channels, like mailboxes (no sync)
}
fmt.Println("Done!")
}
func channel_print(msg string, ch chan string) {
for i := 0; ; i++ {
ch <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Second)
}
}
channel はバッファが1の場合(デフォルト)、送信側と受信側の両方が準備できるまでブロックします。3回読み取るまで goroutine が先に終了してしまう事はないという事です。
ジェネレータ - 1-3-generator.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Golang generator pattern: functions that return channels
package main
import (
"fmt"
"time"
)
// goroutine is launched inside the called function (more idiomatic)
// multiple instances of the generator may be called
func main() {
ch := generator("Hello")
for i := 0; i < 5; i++ {
fmt.Println(<- ch)
}
}
func generator(msg string) <-chan string { // returns receive-only channel
ch := make(chan string)
go func() { // anonymous goroutine
for i := 0; ; i++ {
ch <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Second)
}
}()
return ch
}
関数呼び出しの中で goroutine を起動するとジェネレータを複数インスタンス生成できます。
ロックステップ - 1-4-lockstep.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Golang channels as handles on a service (working in lockstep)
package main
import (
"fmt"
"time"
)
func main() {
ch1 := generator("Hello")
ch2 := generator("Bye")
for i := 0; i < 5; i++ {
fmt.Println(<- ch1)
fmt.Println(<- ch2)
}
}
func generator(msg string) <-chan string { // returns receive-only channel
ch := make(chan string)
go func() { // anonymous goroutine
for i := 0; ; i++ {
ch <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Second)
}
}()
return ch
}
Hello/Bye のセットが1秒間隔で出力されます。順番に読み取っているので必ず順番に表示されます。
多重送信 - 1-5-fanin.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Golang multiplexing (fan-in) function to allow multiple channels go through one channel
package main
import (
"fmt"
"time"
)
func main() {
ch := fanIn(generator("Hello"), generator("Bye"))
for i := 0; i < 10; i++ {
fmt.Println(<- ch)
}
}
// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan string) <-chan string { // receives two read-only channels
new_ch := make(chan string)
go func() { for { new_ch <- <-ch1 } }() // launch two goroutine while loops to continuously pipe to new channel
go func() { for { new_ch <- <-ch2 } }()
return new_ch
}
func generator(msg string) <-chan string { // returns receive-only channel
ch := make(chan string)
go func() { // anonymous goroutine
for i := 0; ; i++ {
ch <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Second)
}
}()
return ch
}
それぞれの channel から new_ch
にリダイレクトする事で多重送信が作れます。処理順に new_ch
に書きこまれるので順番はタイミング次第になります。
同調 - 1-6-restoring-sequencing.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Golang restoring sequencing after multiplexing
package main
import (
"fmt"
"time"
)
type Message struct {
str string
block chan int
}
func main() {
ch := fanIn(generator("Hello"), generator("Bye"))
for i := 0; i < 10; i++ {
msg1 := <-ch
fmt.Println(msg1.str)
msg2 := <-ch
fmt.Println(msg2.str)
<- msg1.block // reset channel, stop blocking
<- msg2.block
}
}
// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan Message) <-chan Message { // receives two read-only channels
new_ch := make(chan Message)
go func() { for { new_ch <- <-ch1 } }() // launch two goroutine while loops to continuously pipe to new channel
go func() { for { new_ch <- <-ch2 } }()
return new_ch
}
func generator(msg string) <-chan Message { // returns receive-only channel
ch := make(chan Message)
blockingStep := make(chan int) // channel within channel to control exec, set false default
go func() { // anonymous goroutine
for i := 0; ; i++ {
ch <- Message{fmt.Sprintf("%s %d", msg, i), blockingStep}
time.Sleep(time.Second)
blockingStep <- 1 // block by waiting for input
}
}()
return ch
}
1-5-fanin.go は同じ1秒ずつの処理ですが、ずれが生じてくると同調しなくなります。この様にループの末尾で msg1.block
と msg2.block
待ち、次の channel 読み取りを行わせない事で同調を作る事が出来ます。
先着処理 - 1-7-select-fanin.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Select is a control structure for concurrency (why channels/goroutines are built in; not library)
// Based off of Dijkstra's guarded commands... providing an idiomatic way for concurrent processes to
// pass in data without programmer having to worry about 'steps'
package main
import (
"fmt"
"time"
)
func main() {
ch := fanIn(generator("Hello"), generator("Bye"))
for i := 0; i < 10; i++ {
fmt.Println(<- ch)
}
}
// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan string) <-chan string { // receives two read-only channels
new_ch := make(chan string)
go func() {
for {
select {
case s := <-ch1: new_ch <- s
case s := <-ch2: new_ch <- s
}
}
}()
return new_ch
}
func generator(msg string) <-chan string { // returns receive-only channel
ch := make(chan string)
go func() { // anonymous goroutine
for i := 0; ; i++ {
ch <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Second)
}
}()
return ch
}
select を使うと channel の受信を同時に待つことが出来ます。この場合、どちらか先に受信したほうのメッセージが new_ch
に流れてきます。
タイムアウト - 1-8-timeout-select.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// In non deterministic select control block, 1 second timer (created each iteration) may
// time out if channel does not return a string in a second
package main
import (
"fmt"
"time"
)
func main() {
ch := generator("Hi!")
for i := 0; i < 10; i++ {
select {
case s := <-ch:
fmt.Println(s)
case <-time.After(1 * time.Second): // time.After returns a channel that waits N time to send a message
fmt.Println("Waited too long!")
return
}
}
}
func generator(msg string) <-chan string { // returns receive-only channel
ch := make(chan string)
go func() { // anonymous goroutine
for i := 0; ; i++ {
ch <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Second)
}
}()
return ch
}
time.After
は引数で貰った所要時間だけ有効な channel を返します。このコードでは毎回 time.After
を呼び出しているので goroutine が 1 秒以内にメッセージを送信してくる場合に限ってメッセージが表示されます。タイミング次第なのでメッセージが幾つ表示されるかは確定しません。
全体的なタイムアウト - 1-9-timeout-direct-select.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Global timer returns after 5 seconds, stopping execution in select control block
package main
import (
"fmt"
"time"
)
func main() {
ch := generator("Hi!")
timeout := time.After(5 * time.Second)
for i := 0; i < 10; i++ {
select {
case s := <-ch:
fmt.Println(s)
case <-timeout: // time.After returns a channel that waits N time to send a message
fmt.Println("5s Timeout!")
return
}
}
}
func generator(msg string) <-chan string { // returns receive-only channel
ch := make(chan string)
go func() { // anonymous goroutine
for i := 0; ; i++ {
ch <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Second)
}
}()
return ch
}
最初に5秒だけ有効な channel を作っているので5秒間は ch からの読み取りが有効になります。ほぼ5回表示されます。
select の終了 - 2-1-quit-select.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Deterministically quit goroutine with quit channel option in select
package main
import (
"fmt"
"math/rand"
)
func main() {
quit := make(chan bool)
ch := generator("Hi!", quit)
for i := rand.Intn(50); i >= 0; i-- {
fmt.Println(<-ch, i)
}
quit <- true
}
func generator(msg string, quit chan bool) <-chan string { // returns receive-only channel
ch := make(chan string)
go func() { // anonymous goroutine
for {
select {
case ch <- fmt.Sprintf("%s", msg):
// nothing
case <-quit:
fmt.Println("Goroutine done")
return
}
}
}()
return ch
}
終了用の channel quit
を作り main 側は終了指示、goroutine 側はそれを受信してループを終了します。
終了の受信 - 2-2-receive-quit.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Deterministically quit goroutine with quit channel option in select
package main
import (
"fmt"
"math/rand"
)
func main() {
quit := make(chan string)
ch := generator("Hi!", quit)
for i := rand.Intn(10); i >= 0; i-- {
fmt.Println(<-ch, i)
}
quit <- "Bye!"
fmt.Printf("Generator says %s", <-quit)
}
func generator(msg string, quit chan string) <-chan string { // returns receive-only channel
ch := make(chan string)
go func() { // anonymous goroutine
for {
select {
case ch <- fmt.Sprintf("%s", msg):
// nothing
case <-quit:
quit <- "See you!"
return
}
}
}()
return ch
}
2-1-quit-select.go は main 側から終了を指示しましたが、こちらは goroutine 側から完了通知を送る事で main は goroutine の終了を待ちます。
チェイン - 2-3-daisy-chain.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Daisy chaining goroutines... all routines at once, so if one is fulfilled
// everything after should also have their blocking commitment (input) fulfilled
package main
import (
"fmt"
)
// takes two int channels, stores right val (+1) into left
func f(left, right chan int) {
left <- 1 + <-right // bafter 1st right read, locks until left read
}
func main() {
const n = 10000
// construct an array of n+1 int channels
var channels [n + 1]chan int
for i := range channels {
channels[i] = make(chan int)
}
// wire n goroutines in a chain
for i := 0; i < n; i++ {
go f(channels[i], channels[i+1])
}
// insert a value into right-hand end
go func(c chan<- int) { c <- 1 }(channels[n])
// get value from the left-hand end
fmt.Println(<-channels[0])
}
2つの channel を貰って右から左へ受け流す(♪)関数 f を 10000 個 groutine として起動しておきます。この状態では全て入力待ちになりますが、一番末尾の channel に 1 を流し込むと、関数 f が右の channel から得た値を +1 して左の channel に渡し、その channel を読み取って各 goroutine が動き出します。実際には main の一番最後で channel を読み取るまで末尾の channel 送信は行われませんので、channel がバッファを持たないのであればこのコードの様に goroutine で送信する必要があります。
Google 検索 - 2-4-google-search.go
// Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"
// Using go patterns with experiment 'Google Search' i.e concurrent goroutines getting results
package main
import (
"fmt"
"math/rand"
"time"
)
var (
Web = fakeSearch("web")
Web2 = fakeSearch("web")
Image = fakeSearch("image")
Image2 = fakeSearch("image")
Video = fakeSearch("video")
Video2 = fakeSearch("video")
)
type Result string
type Search func(query string) Result
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang") // collate results
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- First(query, Web, Web2) }()
go func() { c <- First(query, Image, Image2) }()
go func() { c <- First(query, Video, Video2) }()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
}
Google で検索するとウェブ検索結果と画像検索結果と動画検索結果が同時に表示されます。これを実現する方法を紹介しています。関数 Google を呼び出すと各 groutine が処理を開始し、結果の channel に書き込みます。結果は3つに決まっているのでループを3回 channel を読み取っていますが、一定以上処理に時間が掛かる処理結果は無視する様になっています。
この様に、Go 言語(golang) が提供する channel と goroutine の2つだけを使うだけでも色んなパターンを作る事が出来ます。おそらくこれだけではありません。面白いパターンを発明してみて下さい。