2018/05/31

Recent entries from same category

  1. Go 言語プログラミングエッセンスという本を書きました。
  2. errors.Join が入った。
  3. unsafe.StringData、unsafe.String、unsafe.SliceData が入った。
  4. Re: Go言語で画像ファイルか確認してみる
  5. net/url に JoinPath が入った。

Go は goroutine という非同期の仕組みを提供していますが、使い方次第では色々なパターンが実装できる為、初めて goroutine を見た人はどの様な物が正解なのか分からない事があります。以前、このブログでも紹介した事がありますが Go の非同期の仕組みは一見単純な様に見えて実はとても奥深いのです。

Big Sky :: golang の channel を使ったテクニックあれこれ

golang の channel は他の言語に見ない独特のパラダイムを開発者に提供します。 単純にスレッド間でメッセージングをするだけでもC言語で書けばそこそこの量になったり、慣れていない人であればど...

https://mattn.kaoriya.net/software/lang/go/20160706165757.htm
2012 年に Rob Pike 氏が Google I/O で「Go Concurrency Patterns」というトークを行いました。Gopher (Go言語使い) には結構有名なトークでしたが、改めてこのトークで紹介された Go の非同期パターンをおさらいする事で Go の魅力を感じとって頂けたらと思います。

以下はスライド。

Go Concurrency Patterns
https://talks.golang.org/2012/concurrency.slide#1

以下のリポジトリにトークで紹介されたコードが集められています。

GitHub - kevchn/go-concurrency-patterns: Golang concurrency patterns from Rob Pike's 2012 Google I/O talk

Golang concurrency patterns from Rob Pike's 2012 Google I/O talk

https://github.com/kevchn/go-concurrency-patterns

それぞれどの様に動くのか解説します。

基本 - 1-1-basics.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Exposition of Golang's concurrency primitives

package main

import (
    "fmt"
    "time"
)

func main() {
    go regular_print("Hello")
    fmt.Println("Second print statement!")
    time.Sleep(3 * time.Second)
    fmt.Println("Third print statement!"// when main returns, the goroutines also end
}

func regular_print(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Second)
    }
}

The Go Playground

数字の出力は数秒すると停止します。main が終了すると goroutine も終了する事を理解しましょう。

channel - 1-2-channel.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Golang channels

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go channel_print("Hello", ch)
    for i := 0; i < 3; i++ {
        fmt.Println(<-ch) // ends of channel block until both are ready
                          // NOTE: golang supports buffered channels, like mailboxes (no sync)
    }
    fmt.Println("Done!")
}

func channel_print(msg string, ch chan string) {
    for i := 0; ; i++ {
        ch <- fmt.Sprintf("%s %d", msg, i)
        time.Sleep(time.Second)
    }
}

The Go Playground

channel はバッファが1の場合(デフォルト)、送信側と受信側の両方が準備できるまでブロックします。3回読み取るまで goroutine が先に終了してしまう事はないという事です。

ジェネレータ - 1-3-generator.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Golang generator pattern: functions that return channels

package main

import (
    "fmt"
    "time"
)

// goroutine is launched inside the called function (more idiomatic)
// multiple instances of the generator may be called

func main() {
    ch := generator("Hello")
    for i := 0; i < 5; i++ {
        fmt.Println(<- ch)
    }
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

関数呼び出しの中で goroutine を起動するとジェネレータを複数インスタンス生成できます。

ロックステップ - 1-4-lockstep.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Golang channels as handles on a service (working in lockstep)

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := generator("Hello")
    ch2 := generator("Bye")
    for i := 0; i < 5; i++ {
        fmt.Println(<- ch1)
        fmt.Println(<- ch2)
    }
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

Hello/Bye のセットが1秒間隔で出力されます。順番に読み取っているので必ず順番に表示されます。

多重送信 - 1-5-fanin.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Golang multiplexing (fan-in) function to allow multiple channels go through one channel

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := fanIn(generator("Hello"), generator("Bye"))
    for i := 0; i < 10; i++ {
        fmt.Println(<- ch)
    }
}

// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan string<-chan string { // receives two read-only channels
    new_ch := make(chan string)
    go func() { for { new_ch <- <-ch1 } }() // launch two goroutine while loops to continuously pipe to new channel
    go func() { for { new_ch <- <-ch2 } }()
    return new_ch
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

それぞれの channel から new_ch にリダイレクトする事で多重送信が作れます。処理順に new_ch に書きこまれるので順番はタイミング次第になります。

同調 - 1-6-restoring-sequencing.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Golang restoring sequencing after multiplexing

package main

import (
    "fmt"
    "time"
)

type Message struct {
    str string
    block chan int
}

func main() {
    ch := fanIn(generator("Hello"), generator("Bye"))
    for i := 0; i < 10; i++ {
        msg1 := <-ch
        fmt.Println(msg1.str)

        msg2 := <-ch
        fmt.Println(msg2.str)

        <- msg1.block // reset channel, stop blocking
        <- msg2.block
    }
}

// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan Message) <-chan Message { // receives two read-only channels
    new_ch := make(chan Message)
    go func() { for { new_ch <- <-ch1 } }() // launch two goroutine while loops to continuously pipe to new channel
    go func() { for { new_ch <- <-ch2 } }()
    return new_ch
}

func generator(msg string<-chan Message { // returns receive-only channel
    ch := make(chan Message)
    blockingStep := make(chan int// channel within channel to control exec, set false default
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- Message{fmt.Sprintf("%s %d", msg, i), blockingStep}
            time.Sleep(time.Second)
            blockingStep <- 1 // block by waiting for input
        }
    }()
    return ch
}

The Go Playground

1-5-fanin.go は同じ1秒ずつの処理ですが、ずれが生じてくると同調しなくなります。この様にループの末尾で msg1.blockmsg2.block 待ち、次の channel 読み取りを行わせない事で同調を作る事が出来ます。

先着処理 - 1-7-select-fanin.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Select is a control structure for concurrency (why channels/goroutines are built in; not library)
//  Based off of Dijkstra's guarded commands... providing an idiomatic way for concurrent processes to
//  pass in data without programmer having to worry about 'steps'

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := fanIn(generator("Hello"), generator("Bye"))
    for i := 0; i < 10; i++ {
        fmt.Println(<- ch)
    }
}

// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan string<-chan string { // receives two read-only channels
    new_ch := make(chan string)
    go func() {
        for {
            select {
                case s := <-ch1: new_ch <- s
                case s := <-ch2: new_ch <- s
            }
        }
    }()
    return new_ch
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

select を使うと channel の受信を同時に待つことが出来ます。この場合、どちらか先に受信したほうのメッセージが new_ch に流れてきます。

タイムアウト - 1-8-timeout-select.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  In non deterministic select control block, 1 second timer (created each iteration) may
//  time out if channel does not return a string in a second

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := generator("Hi!")
    for i := 0; i < 10; i++ {
        select {
        case s := <-ch:
            fmt.Println(s)
        case <-time.After(1 * time.Second): // time.After returns a channel that waits N time to send a message
            fmt.Println("Waited too long!")
            return
        }
    }
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

time.After は引数で貰った所要時間だけ有効な channel を返します。このコードでは毎回 time.After を呼び出しているので goroutine が 1 秒以内にメッセージを送信してくる場合に限ってメッセージが表示されます。タイミング次第なのでメッセージが幾つ表示されるかは確定しません。

全体的なタイムアウト - 1-9-timeout-direct-select.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Global timer returns after 5 seconds, stopping execution in select control block

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := generator("Hi!")
    timeout := time.After(5 * time.Second)
    for i := 0; i < 10; i++ {
        select {
        case s := <-ch:
            fmt.Println(s)
        case <-timeout: // time.After returns a channel that waits N time to send a message
            fmt.Println("5s Timeout!")
            return
        }
    }
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

最初に5秒だけ有効な channel を作っているので5秒間は ch からの読み取りが有効になります。ほぼ5回表示されます。

select の終了 - 2-1-quit-select.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Deterministically quit goroutine with quit channel option in select

package main

import (
    "fmt"
    "math/rand"
)

func main() {
    quit := make(chan bool)
    ch := generator("Hi!", quit)
    for i := rand.Intn(50); i >= 0; i-- {
        fmt.Println(<-ch, i)
    }
    quit <- true
}

func generator(msg string, quit chan bool<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for {
            select {
            case ch <- fmt.Sprintf("%s", msg):
                // nothing
            case <-quit:
                fmt.Println("Goroutine done")
                return
            }
        }
    }()
    return ch
}

The Go Playground

終了用の channel quit を作り main 側は終了指示、goroutine 側はそれを受信してループを終了します。

終了の受信 - 2-2-receive-quit.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Deterministically quit goroutine with quit channel option in select

package main

import (
    "fmt"
    "math/rand"
)

func main() {
    quit := make(chan string)
    ch := generator("Hi!", quit)
    for i := rand.Intn(10); i >= 0; i-- {
        fmt.Println(<-ch, i)
    }
    quit <- "Bye!"
    fmt.Printf("Generator says %s"<-quit)
}

func generator(msg string, quit chan string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for {
            select {
            case ch <- fmt.Sprintf("%s", msg):
                // nothing
            case <-quit:
                quit <- "See you!"
                return
            }
        }
    }()
    return ch
}

The Go Playground

2-1-quit-select.go は main 側から終了を指示しましたが、こちらは goroutine 側から完了通知を送る事で main は goroutine の終了を待ちます。

チェイン - 2-3-daisy-chain.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Daisy chaining goroutines... all routines at once, so if one is fulfilled
//  everything after should also have their blocking commitment (input) fulfilled

package main

import (
    "fmt"
)

// takes two int channels, stores right val (+1) into left
func f(left, right chan int) {
    left <- 1 + <-right // bafter 1st right read, locks until left read
}

func main() {
    const n = 10000

    // construct an array of n+1 int channels
    var channels [n + 1]chan int
    for i := range channels {
        channels[i] = make(chan int)
    }

    // wire n goroutines in a chain
    for i := 0; i < n; i++ {
        go f(channels[i], channels[i+1])
    }

    // insert a value into right-hand end
    go func(c chan<- int) { c <- 1 }(channels[n])

    // get value from the left-hand end
    fmt.Println(<-channels[0])
}

The Go Playground

2つの channel を貰って右から左へ受け流す(♪)関数 f を 10000 個 groutine として起動しておきます。この状態では全て入力待ちになりますが、一番末尾の channel に 1 を流し込むと、関数 f が右の channel から得た値を +1 して左の channel に渡し、その channel を読み取って各 goroutine が動き出します。実際には main の一番最後で channel を読み取るまで末尾の channel 送信は行われませんので、channel がバッファを持たないのであればこのコードの様に goroutine で送信する必要があります。

Google 検索 - 2-4-google-search.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Using go patterns with experiment 'Google Search' i.e concurrent goroutines getting results

package main

import (
    "fmt"
    "math/rand"
    "time"
)

var (
    Web    = fakeSearch("web")
    Web2   = fakeSearch("web")
    Image  = fakeSearch("image")
    Image2 = fakeSearch("image")
    Video  = fakeSearch("video")
    Video2 = fakeSearch("video")
)

type Result string
type Search func(query string) Result

func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := Google("golang"// collate results
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
}

func fakeSearch(kind string) Search {
    return func(query string) Result {
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
        return Result(fmt.Sprintf("%s result for %q\n", kind, query))
    }
}

func First(query string, replicas ...Search) Result {
    c := make(chan Result)
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}

func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- First(query, Web, Web2) }()
    go func() { c <- First(query, Image, Image2) }()
    go func() { c <- First(query, Video, Video2) }()

    timeout := time.After(80 * time.Millisecond)

    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return
}

The Go Playground

Google で検索するとウェブ検索結果と画像検索結果と動画検索結果が同時に表示されます。これを実現する方法を紹介しています。関数 Google を呼び出すと各 groutine が処理を開始し、結果の channel に書き込みます。結果は3つに決まっているのでループを3回 channel を読み取っていますが、一定以上処理に時間が掛かる処理結果は無視する様になっています。

この様に、Go 言語(golang) が提供する channel と goroutine の2つだけを使うだけでも色んなパターンを作る事が出来ます。おそらくこれだけではありません。面白いパターンを発明してみて下さい。

Posted at by