2014/07/29


reflect.Select を使います。

package main

import (
    "fmt"
    "math/rand"
    "reflect"
    "sync"
    "time"
)

func multpleSelect(chans []chan bool) (intboolbool) {
    // chans の数分 select-case 文を作る
    cases := make([]reflect.SelectCase, len(chans))
    for i, ch := range chans {
        cases[i] = reflect.SelectCase{
            Dir: reflect.SelectRecv,
            Chan: reflect.ValueOf(ch),
        }
    }

    // 一括で select
    i, v, ok := reflect.Select(cases)
    return i, v.Interface().(bool), ok
}

func main() {
    rand.Seed(time.Now().UnixNano())

    // 100個の chan を作るよ
    chans := make([]chan bool100)
    for i := 0; i < 100; i++ {
        chans[i] = make(chan bool)
    }


    // 非同期で100個の chan を同時に待つ
    var wg sync.WaitGroup
    go func() {
        wg.Add(1)

        if ch, v, ok := multpleSelect(chans); ok {
            fmt.Printf("I am chan-%v, value is %v\n", ch, v)
        }
        wg.Done()
    }()

    // ランダムな chan に true を送る
    chans[rand.Int() % len(chans)] <- true

    wg.Wait()
}

何度も繰り返して待つ場合は、その都度 SelectCase を作るのがコストになり得るので、予め作ったまま保持しておくのが良いかと思います。

Posted at by



2014/07/06


断続的にデータを受けながら並行で時間差リアクションを行う - すぎゃーんメモ

断続的にデータを受けながら並行で時間差リアクションを行う Go はじめて、の次のGo - すぎゃーんメモ にて作った go-genki-bot 、UserStreamから Tweet 取得して返信する...

http://d.hatena.ne.jp/sugyan/20140705/1404493007
ちょっと動きは違うんだけど、こういうパターンもあるよねー。という事で。 package main

import (
    "bufio"
    "fmt"
    "os"
    "runtime"
    "time"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    queue := make(chan rune5)

    t := time.AfterFunc(-1func() {
        // ある分だけ読み込む
        leave: for {
            select {
            case r := <-queue:
                fmt.Println("recv"string(r))
            case <-time.After(100 * time.Millisecond):
                // 100ms 読めなかったら諦める
                break leave
            }
        }
    })

    // タイマーは止めておく
    t.Stop()

    in := bufio.NewReader(os.Stdin)
    for {
        // 何かキー入力
        r, _, err := in.ReadRune()
        if err != nil {
            break
        }

        // LFはいらない
        if r == 0x0A {
            continue
        }

        // タイマーを活性化(3秒後)
        t.Reset(3 * time.Second)

        // タイマーに読んで貰う為に入力したキーをchanで送信しておく
        queue<-r
    }
}
入力を得たらタイマーを活性化する。タイマーは一つしか使わなくて、データが来るたびに更新するのでデータが最後の入力から3秒間何も更新がなければ発動する。タイマーは発動すると100ms内に見つかった分だけ抜き取って表示。 $ go run jisa.go 

recv あ
あいうえ
recv あ
recv い
recv う
recv え
この手法は gof の画面表示部分で使われていて、連続するキー入力に対して毎回描画を行っていると画面がチラ付くという問題を回避出来る。

詳しくは以下を参照して下さい。
Golang でコマンドライン Fuzzy Finder 「gof」作った。 - Qiita

この記事は Go Advent Calendar 2013 の 10 日目の投稿です。 はじめに 業務のツールや連携させる一部の機能として golang を使い出している方もチラホラ現れ始めました。 ...

http://qiita.com/mattn/items/edea1be5a6d84663ab8b
Go Advent Day 19 - Eject the Web - The Gopher Academy Blog

Other Articles Next article Go Advent Day 20 - Go in Academia: Emulating Wireless Networks Previous ...

http://blog.gopheracademy.com/day-19-eject-the-web
Posted at by



2014/06/25


golang と言えば非同期に特化した言語ですが、慣れない内は簡単な非同期しか使えません。しかし sync パッケージを知る事でもっとカジュアルに、かつ確実な非同期処理を行う事が出来る様になります。 今日はそんな sync パッケージについて説明してみたいと思います。

sync.Mutex

ご存じ sync.Mutex です。皆さんが一番使う排他制御だと思います。

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func parallel(wg *sync.WaitGroup) {
    fmt.Println("博")
    time.Sleep(100 * time.Millisecond)
    fmt.Println("多")
    time.Sleep(100 * time.Millisecond)
    fmt.Println("の")
    time.Sleep(100 * time.Millisecond)
    fmt.Println("塩")
    wg.Done()
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    wg := new(sync.WaitGroup)
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go parallel(wg)
    }
    wg.Wait()
}

このコードを実行すると結果はおおよそ以下の様になります。













時には順番が入り乱れる事もあるでしょう。これに sync.Mutex を追加して以下の様にします。

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func parallel(wg *sync.WaitGroup, m *sync.Mutex) {
    m.Lock()
    defer m.Unlock()

    fmt.Println("博")
    time.Sleep(100 * time.Millisecond)
    fmt.Println("多")
    time.Sleep(100 * time.Millisecond)
    fmt.Println("の")
    time.Sleep(100 * time.Millisecond)
    fmt.Println("塩")
    wg.Done()
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    wg := new(sync.WaitGroup)
    m := new(sync.Mutex)
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go parallel(wg, m)
    }
    wg.Wait()
}

すると













期待通りの順番に表示されます。上記のコードでは表示の排他のみを扱いましたが、更新処理と参照処理により排他制御を区別出来る RWMutex もあります。

また、上記のコードでしれーっと書いていますが sync.WaitGroup は Wait() を呼び出すと Add() を呼び出した回数から Done() を呼び出した回数を引いて 0 になるまで待機する機能が簡単に実装出来ます。全ての goroutine の終了を待つ場合に使用します。

sync/atomic

sync/atomic パッケージは、名前の通りアトミックな演算を提供します。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

var v int32

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    wg := new(sync.WaitGroup)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            v++
            v++
            v++
            v++
            v++
            v++
            v++
            v++
            v++
            v++
            wg.Done()
        }()
    }
    wg.Wait()
    fmt.Println(v)
}

このコードを実行すると、10回の加算を10並行で行うため合計で100になる事が期待出来ますが実際には何回かに1回100でない事象が発生します。

演算がアトミックでないからです。こういう場合に sync/atomic を使います。

package main

import (
    "fmt"
    "runtime"
    "sync"
    "sync/atomic"
)

var v int32

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    wg := new(sync.WaitGroup)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            atomic.AddInt32(&v, 1)
            atomic.AddInt32(&v, 1)
            atomic.AddInt32(&v, 1)
            atomic.AddInt32(&v, 1)
            atomic.AddInt32(&v, 1)
            atomic.AddInt32(&v, 1)
            atomic.AddInt32(&v, 1)
            atomic.AddInt32(&v, 1)
            atomic.AddInt32(&v, 1)
            atomic.AddInt32(&v, 1)
            wg.Done()
        }()
    }
    wg.Wait()
    fmt.Println(v)
}

このコードは何回実行しても合計が100になります。

sync.Once

ある関数ではある処理を1回のみ行いたい、そういった場合に使用します。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

var once = new(sync.Once)

func greeting(wg *sync.WaitGroup) {
    once.Do(func() {
        fmt.Println("こんにちわ")
    })

    fmt.Println("ごきげんいかがですか")
    wg.Done()
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    defer fmt.Println("さようなら")

    wg := new(sync.WaitGroup)
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go greeting(wg)
    }
    wg.Wait()
}

このコードの実行結果は以下の様になります。

こんにちわ
ごきげんいかがですか
ごきげんいかがですか
ごきげんいかがですか
ごきげんいかがですか
ごきげんいかがですか
さようなら

並行で走った場合でも1度しか実行されません。

sync.Cond

例えば goroutine を先行で10個用意しておき、ある号令に従い1つずつ並行処理を開始したいとします。こういうケースでは sync.Cond を作り Signal() を1回ずつ呼び出します。

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    l := new(sync.Mutex)
    c := sync.NewCond(l)
    for i := 0; i < 10; i++ {
        go func(i int) {
            fmt.Printf("waiting %d\n", i)
            l.Lock()
            defer l.Unlock()
            c.Wait()
            fmt.Printf("go %d\n", i)
        }(i)
    }

    for i := 0; i < 10; i++ {
        time.Sleep(1 * time.Second)
        c.Signal()
    }
    time.Sleep(1 * time.Second)
}

このコードを実行すると以下の様な出力になります。

waiting 0
waiting 9
waiting 5
waiting 1
waiting 2
waiting 3
waiting 4
waiting 8
waiting 6
waiting 7
go 0
go 9
go 5
go 1
go 2
go 3
go 4
go 8
go 6
go 7

また全 goroutine に対して一斉に号令を掛ける場合は Broadcast() を使います。

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    l := new(sync.Mutex)
    c := sync.NewCond(l)
    for i := 0; i < 10; i++ {
        go func(i int) {
            fmt.Printf("waiting %d\n", i)
            l.Lock()
            defer l.Unlock()
            c.Wait()
            fmt.Printf("go %d\n", i)
        }(i)
    }

    for i := 3; i >= 0; i-- {
        time.Sleep(1 * time.Second)
        fmt.Println(i)
    }
    c.Broadcast()
    time.Sleep(3 * time.Second)
}

このコードを実行すると以下の様な出力になります。

waiting 0
waiting 1
waiting 4
waiting 7
waiting 8
waiting 9
waiting 5
waiting 2
waiting 3
waiting 6
3
2
1
0
go 0
go 9
go 1
go 4
go 7
go 8
go 3
go 5
go 2
go 6

カウントダウン後、「go」の部分が一気に出力されます。

簡略化の為に time.Sleep でカウントダウン待ちを入れていますが、実際は sync.Mutex により c.Broadcastc.Wait を追い越さないようにしなければなりません。

sync.Pool

golang 1.3 から追加された機能です。

例えば、逐次行う作業と非同期に割込みで入ってくる作業を上手く処理したいとします。

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {
    // disable GC so we can control when it happens.
    defer debug.SetGCPercent(debug.SetGCPercent(-1))

    p := sync.Pool{
        New: func() interface{} {
            return "定時作業"
        },
    }

    wg := new(sync.WaitGroup)

    wg.Add(1)
    go func() {
        for i := 0; i < 10; i++ {
            p.Put("割込作業")
            time.Sleep(100 * time.Millisecond)
        }
        wg.Done()
    }()

    wg.Add(1)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(p.Get())
            time.Sleep(50 * time.Millisecond)
        }
        wg.Done()
    }()

    wg.Wait()
}

このコードでは、10個の「定時作業」(Getを行う途中に「割込作業」(Put)が割り込みます。作業を行うのは10個限りとします。

定時作業
定時作業
割込作業
定時作業
割込作業
割込作業
定時作業
定時作業
割込作業
定時作業

作業を割り込む側、作業を処理する側が並行で動いていても上手く処理出来ています。

この様に、golang の sync パッケージにはにとても便利な機能が揃っています。ぜひもっとカジュアルに非同期機能を使ってみて下さい。

きっと面白い物が出来上がると思います。

Posted at by