2018/05/31


Go は goroutine という非同期の仕組みを提供していますが、使い方次第では色々なパターンが実装できる為、初めて goroutine を見た人はどの様な物が正解なのか分からない事があります。以前、このブログでも紹介した事がありますが Go の非同期の仕組みは一見単純な様に見えて実はとても奥深いのです。

Big Sky :: golang の channel を使ったテクニックあれこれ

golang の channel は他の言語に見ない独特のパラダイムを開発者に提供します。 単純にスレッド間でメッセージングをするだけでもC言語で書けばそこそこの量になったり、慣れていない人であればど...

https://mattn.kaoriya.net/software/lang/go/20160706165757.htm
2012 年に Rob Pike 氏が Google I/O で「Go Concurrency Patterns」というトークを行いました。Gopher (Go言語使い) には結構有名なトークでしたが、改めてこのトークで紹介された Go の非同期パターンをおさらいする事で Go の魅力を感じとって頂けたらと思います。

以下はスライド。

Go Concurrency Patterns
https://talks.golang.org/2012/concurrency.slide#1

以下のリポジトリにトークで紹介されたコードが集められています。

GitHub - kevchn/go-concurrency-patterns: Golang concurrency patterns from Rob Pike's 2012 Google I/O talk

Golang concurrency patterns from Rob Pike's 2012 Google I/O talk

https://github.com/kevchn/go-concurrency-patterns

それぞれどの様に動くのか解説します。

基本 - 1-1-basics.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Exposition of Golang's concurrency primitives

package main

import (
    "fmt"
    "time"
)

func main() {
    go regular_print("Hello")
    fmt.Println("Second print statement!")
    time.Sleep(3 * time.Second)
    fmt.Println("Third print statement!"// when main returns, the goroutines also end
}

func regular_print(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Second)
    }
}

The Go Playground

数字の出力は数秒すると停止します。main が終了すると goroutine も終了する事を理解しましょう。

channel - 1-2-channel.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Golang channels

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go channel_print("Hello", ch)
    for i := 0; i < 3; i++ {
        fmt.Println(<-ch) // ends of channel block until both are ready
                          // NOTE: golang supports buffered channels, like mailboxes (no sync)
    }
    fmt.Println("Done!")
}

func channel_print(msg string, ch chan string) {
    for i := 0; ; i++ {
        ch <- fmt.Sprintf("%s %d", msg, i)
        time.Sleep(time.Second)
    }
}

The Go Playground

channel はバッファが1の場合(デフォルト)、送信側と受信側の両方が準備できるまでブロックします。3回読み取るまで goroutine が先に終了してしまう事はないという事です。

ジェネレータ - 1-3-generator.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Golang generator pattern: functions that return channels

package main

import (
    "fmt"
    "time"
)

// goroutine is launched inside the called function (more idiomatic)
// multiple instances of the generator may be called

func main() {
    ch := generator("Hello")
    for i := 0; i < 5; i++ {
        fmt.Println(<- ch)
    }
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

関数呼び出しの中で goroutine を起動するとジェネレータを複数インスタンス生成できます。

ロックステップ - 1-4-lockstep.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Golang channels as handles on a service (working in lockstep)

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := generator("Hello")
    ch2 := generator("Bye")
    for i := 0; i < 5; i++ {
        fmt.Println(<- ch1)
        fmt.Println(<- ch2)
    }
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

Hello/Bye のセットが1秒間隔で出力されます。順番に読み取っているので必ず順番に表示されます。

多重送信 - 1-5-fanin.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Golang multiplexing (fan-in) function to allow multiple channels go through one channel

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := fanIn(generator("Hello"), generator("Bye"))
    for i := 0; i < 10; i++ {
        fmt.Println(<- ch)
    }
}

// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan string<-chan string { // receives two read-only channels
    new_ch := make(chan string)
    go func() { for { new_ch <- <-ch1 } }() // launch two goroutine while loops to continuously pipe to new channel
    go func() { for { new_ch <- <-ch2 } }()
    return new_ch
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

それぞれの channel から new_ch にリダイレクトする事で多重送信が作れます。処理順に new_ch に書きこまれるので順番はタイミング次第になります。

同調 - 1-6-restoring-sequencing.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Golang restoring sequencing after multiplexing

package main

import (
    "fmt"
    "time"
)

type Message struct {
    str string
    block chan int
}

func main() {
    ch := fanIn(generator("Hello"), generator("Bye"))
    for i := 0; i < 10; i++ {
        msg1 := <-ch
        fmt.Println(msg1.str)

        msg2 := <-ch
        fmt.Println(msg2.str)

        <- msg1.block // reset channel, stop blocking
        <- msg2.block
    }
}

// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan Message) <-chan Message { // receives two read-only channels
    new_ch := make(chan Message)
    go func() { for { new_ch <- <-ch1 } }() // launch two goroutine while loops to continuously pipe to new channel
    go func() { for { new_ch <- <-ch2 } }()
    return new_ch
}

func generator(msg string<-chan Message { // returns receive-only channel
    ch := make(chan Message)
    blockingStep := make(chan int// channel within channel to control exec, set false default
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- Message{fmt.Sprintf("%s %d", msg, i), blockingStep}
            time.Sleep(time.Second)
            blockingStep <- 1 // block by waiting for input
        }
    }()
    return ch
}

The Go Playground

1-5-fanin.go は同じ1秒ずつの処理ですが、ずれが生じてくると同調しなくなります。この様にループの末尾で msg1.blockmsg2.block 待ち、次の channel 読み取りを行わせない事で同調を作る事が出来ます。

先着処理 - 1-7-select-fanin.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Select is a control structure for concurrency (why channels/goroutines are built in; not library)
//  Based off of Dijkstra's guarded commands... providing an idiomatic way for concurrent processes to
//  pass in data without programmer having to worry about 'steps'

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := fanIn(generator("Hello"), generator("Bye"))
    for i := 0; i < 10; i++ {
        fmt.Println(<- ch)
    }
}

// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan string<-chan string { // receives two read-only channels
    new_ch := make(chan string)
    go func() {
        for {
            select {
                case s := <-ch1: new_ch <- s
                case s := <-ch2: new_ch <- s
            }
        }
    }()
    return new_ch
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

select を使うと channel の受信を同時に待つことが出来ます。この場合、どちらか先に受信したほうのメッセージが new_ch に流れてきます。

タイムアウト - 1-8-timeout-select.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  In non deterministic select control block, 1 second timer (created each iteration) may
//  time out if channel does not return a string in a second

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := generator("Hi!")
    for i := 0; i < 10; i++ {
        select {
        case s := <-ch:
            fmt.Println(s)
        case <-time.After(1 * time.Second): // time.After returns a channel that waits N time to send a message
            fmt.Println("Waited too long!")
            return
        }
    }
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

time.After は引数で貰った所要時間だけ有効な channel を返します。このコードでは毎回 time.After を呼び出しているので goroutine が 1 秒以内にメッセージを送信してくる場合に限ってメッセージが表示されます。タイミング次第なのでメッセージが幾つ表示されるかは確定しません。

全体的なタイムアウト - 1-9-timeout-direct-select.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Global timer returns after 5 seconds, stopping execution in select control block

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := generator("Hi!")
    timeout := time.After(5 * time.Second)
    for i := 0; i < 10; i++ {
        select {
        case s := <-ch:
            fmt.Println(s)
        case <-timeout: // time.After returns a channel that waits N time to send a message
            fmt.Println("5s Timeout!")
            return
        }
    }
}

func generator(msg string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

The Go Playground

最初に5秒だけ有効な channel を作っているので5秒間は ch からの読み取りが有効になります。ほぼ5回表示されます。

select の終了 - 2-1-quit-select.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Deterministically quit goroutine with quit channel option in select

package main

import (
    "fmt"
    "math/rand"
)

func main() {
    quit := make(chan bool)
    ch := generator("Hi!", quit)
    for i := rand.Intn(50); i >= 0; i-- {
        fmt.Println(<-ch, i)
    }
    quit <- true
}

func generator(msg string, quit chan bool<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for {
            select {
            case ch <- fmt.Sprintf("%s", msg):
                // nothing
            case <-quit:
                fmt.Println("Goroutine done")
                return
            }
        }
    }()
    return ch
}

The Go Playground

終了用の channel quit を作り main 側は終了指示、goroutine 側はそれを受信してループを終了します。

終了の受信 - 2-2-receive-quit.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Deterministically quit goroutine with quit channel option in select

package main

import (
    "fmt"
    "math/rand"
)

func main() {
    quit := make(chan string)
    ch := generator("Hi!", quit)
    for i := rand.Intn(10); i >= 0; i-- {
        fmt.Println(<-ch, i)
    }
    quit <- "Bye!"
    fmt.Printf("Generator says %s"<-quit)
}

func generator(msg string, quit chan string<-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for {
            select {
            case ch <- fmt.Sprintf("%s", msg):
                // nothing
            case <-quit:
                quit <- "See you!"
                return
            }
        }
    }()
    return ch
}

The Go Playground

2-1-quit-select.go は main 側から終了を指示しましたが、こちらは goroutine 側から完了通知を送る事で main は goroutine の終了を待ちます。

チェイン - 2-3-daisy-chain.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Daisy chaining goroutines... all routines at once, so if one is fulfilled
//  everything after should also have their blocking commitment (input) fulfilled

package main

import (
    "fmt"
)

// takes two int channels, stores right val (+1) into left
func f(left, right chan int) {
    left <- 1 + <-right // bafter 1st right read, locks until left read
}

func main() {
    const n = 10000

    // construct an array of n+1 int channels
    var channels [n + 1]chan int
    for i := range channels {
        channels[i] = make(chan int)
    }

    // wire n goroutines in a chain
    for i := 0; i < n; i++ {
        go f(channels[i], channels[i+1])
    }

    // insert a value into right-hand end
    go func(c chan<- int) { c <- 1 }(channels[n])

    // get value from the left-hand end
    fmt.Println(<-channels[0])
}

The Go Playground

2つの channel を貰って右から左へ受け流す(♪)関数 f を 10000 個 groutine として起動しておきます。この状態では全て入力待ちになりますが、一番末尾の channel に 1 を流し込むと、関数 f が右の channel から得た値を +1 して左の channel に渡し、その channel を読み取って各 goroutine が動き出します。実際には main の一番最後で channel を読み取るまで末尾の channel 送信は行われませんので、channel がバッファを持たないのであればこのコードの様に goroutine で送信する必要があります。

Google 検索 - 2-4-google-search.go

//  Kevin Chen (2017)
//  Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  Using go patterns with experiment 'Google Search' i.e concurrent goroutines getting results

package main

import (
    "fmt"
    "math/rand"
    "time"
)

var (
    Web    = fakeSearch("web")
    Web2   = fakeSearch("web")
    Image  = fakeSearch("image")
    Image2 = fakeSearch("image")
    Video  = fakeSearch("video")
    Video2 = fakeSearch("video")
)

type Result string
type Search func(query string) Result

func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := Google("golang"// collate results
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
}

func fakeSearch(kind string) Search {
    return func(query string) Result {
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
        return Result(fmt.Sprintf("%s result for %q\n", kind, query))
    }
}

func First(query string, replicas ...Search) Result {
    c := make(chan Result)
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}

func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- First(query, Web, Web2) }()
    go func() { c <- First(query, Image, Image2) }()
    go func() { c <- First(query, Video, Video2) }()

    timeout := time.After(80 * time.Millisecond)

    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return
}

The Go Playground

Google で検索するとウェブ検索結果と画像検索結果と動画検索結果が同時に表示されます。これを実現する方法を紹介しています。関数 Google を呼び出すと各 groutine が処理を開始し、結果の channel に書き込みます。結果は3つに決まっているのでループを3回 channel を読み取っていますが、一定以上処理に時間が掛かる処理結果は無視する様になっています。

この様に、Go 言語(golang) が提供する channel と goroutine の2つだけを使うだけでも色んなパターンを作る事が出来ます。おそらくこれだけではありません。面白いパターンを発明してみて下さい。

みんなのGo言語[現場で使える実践テクニック] みんなのGo言語[現場で使える実践テクニック]
松木雅幸, mattn, 藤原俊一郎, 中島大一, 牧大輔, 鈴木健太
技術評論社 / (2016-09-09)
 
発送可能時間:


2018/05/13


AF_UNIX comes to Windows – Windows Command Line Tools For Developers

Introduction:   Beginning in Insider Build 17063 , you’ll be able to use the unix socket ( AF_UNIX )...

https://blogs.msdn.microsoft.com/commandline/2017/12/19/af_unix-comes-to-windows/

昨日、Windows 10 April 2018 Update が来た。WSL (Windows Subsystem for Linux) の常駐もちゃんと動く様になってた。仕組みはどうやら WSL 上のプロセスが一つでも生きていればバックグラウンドで Ubuntu.exe が生き続けてくれるという物らしい。WSL でも tmux が問題なく使える様になって開発しやすくなった。

ところでこの Windows 10 April 2018 Update には、開発者が待ちに待った Windows での AF_UNIX 対応が入っている。つまりは UNIX Domain Socket が Windows で動く様になったという事だ。物は試しと簡単なプログラムを作ってみた。
#ifdef _WIN32
# include <ws2tcpip.h>
# include <io.h>
#else
# include <sys/fcntl.h>
# include <sys/types.h>
# include <sys/socket.h>
# include <netinet/in.h>
# include <netdb.h>
# define closesocket(fd) close(fd)
#endif
#include <stdio.h>

#define UNIX_PATH_MAX 108

typedef struct sockaddr_un {
  ADDRESS_FAMILY sun_family;
  char sun_path[UNIX_PATH_MAX];
} SOCKADDR_UN, *PSOCKADDR_UN;

int
main(int argc, char* argv[]) {
  int server_fd;
  int client_fd;
  struct sockaddr_un server_addr; 
  size_t addr_len;

#ifdef _WIN32
  WSADATA wsa;
  WSAStartup(MAKEWORD(22), &wsa);
#endif

  unlink("./server.sock");

  if ((server_fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
    perror("server: socket");
    exit(1);
  }

  memset((char *) &server_addr, 0sizeof(server_addr));
  server_addr.sun_family = AF_UNIX;
  strcpy(server_addr.sun_path, "./server.sock");

  if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
    perror("server: bind");
    exit(1);
  }

  if (listen(server_fd, 5) < 0) {
    perror("server: listen");
    closesocket(server_fd);
    exit(1);
  }

  while (1) {
    if ((client_fd = accept(server_fd, NULLNULL)) < 0) {
      perror("server: accept");
      break;
    }
    while (1) {
      char buf[256];
      int n = recv(client_fd, buf, sizeof(buf), 0);
      if (n <= 0break;
      buf[n] = 0;
      puts(buf);
    }
    closesocket(client_fd);
  }

  closesocket(server_fd);

#ifdef _WIN32
  WSACleanup();
#endif
}

普通のソケットプログラムだ。mingw のヘッダには sockaddr_un の定義が無いので冒頭で宣言している。Visual Studio だと afunix.h というヘッダが入るはず。Windows はソケットディスクリプタに対して read/write/close が動作しないのはこれまで通りだった。次にクライアントのコード。

#ifdef _WIN32
# include <ws2tcpip.h>
# include <io.h>
#else
# include <sys/fcntl.h>
# include <sys/types.h>
# include <sys/socket.h>
# include <netinet/in.h>
# include <netdb.h>
# define closesocket(fd) close(fd)
#endif
#include <stdio.h>

#define UNIX_PATH_MAX 108

typedef struct sockaddr_un {
  ADDRESS_FAMILY sun_family;
  char sun_path[UNIX_PATH_MAX];
} SOCKADDR_UN, *PSOCKADDR_UN;

int
main(int argc, char* argv[]) {
  int client_fd;
  struct sockaddr_un client_addr; 

#ifdef _WIN32
  WSADATA wsa;
  WSAStartup(MAKEWORD(22), &wsa);
#endif

  if ((client_fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
    perror("server: socket");
    exit(1);
  }

  memset((char *) &client_addr, 0sizeof(client_addr));
  client_addr.sun_family = AF_UNIX;
  strcpy(client_addr.sun_path, "./server.sock");

  if (connect(client_fd, (struct sockaddr *)&client_addr, sizeof(client_addr)) < 0) {
    perror("client: connect");
    exit(1);
  }

  while (1) {
    char buf[256];
    if (!fgets(buf, sizeof(buf), stdin))
      break;
    char *p = strpbrk(buf, "\r\n");
    if (p) *p = 0;
    if (send(client_fd, buf, strlen(buf), 0) < 0)
      break;
  }

  closesocket(client_fd);

#ifdef _WIN32
  WSACleanup();
#endif
}

こちらも至って普通のコード。一応 UNIX でもコンパイル出来る様にしたつもりだけど、試してはない。

AF_UNIX

mingw でも問題なくコンパイルして動作する様になったし、実行は Cygwin も msys2 も WSL も必要なく動作した。Windows 10 でしか動作しないので、しばらくは OSS で使われる事はないだろうけど、UAC の画面も開かないので個人的には便利だと思う。今後は Windows 版の nginx がリバースプロキシ対応したりするんじゃないかと思う。


2018/04/18


golang の html/template について書かれたブログ等を色々見ていると、みんなレイアウトとコンテンツの分離に苦労している感があったのでどうやるか書いておきます。

t.ExecuteTemplate(w, "content", data)

Go の html/template はテンプレートの名称を指定して ExecuteTemplate を実行します。しかし html/template には、その content を囲う layout テンプレートを指定する方法がありません。特に以下の様に ParseGlob を使った場合、各 html で同じ content という名前で define する事は出来ません。

template.ParseGlob("public/views/*.html")

やりたいのは layout というテンプレートの中から content という名称で読み込まれ、その content に今回実行したいテンプレート、例えば search を適用したいはずです。

{{define "layout"}}
<!DOCTYPE html>
<html>
<head>
  <meta charset="UTF-8">
  <title>{{.Title}}</title>
</head>
<body>
{{template "content" .}}
</body>
</html>
{{end}}

これを実現するには AddParseTree を使います。

template.Must(template.Must(t.templates.Lookup("layout").Clone()).AddParseTree("content", t.templates.Lookup("search").Tree)).ExecuteTemplate(w, "layout", data)

layout という名称のテンプレート(上記)をクローンし、そのパース済みツリーに content という名前で実行したい別のテンプレート search のパース済みツリーを追加します。これで、layout という名前の一時テンプレートには、content という名前で search の内容が出力される事になります。もちろん ExecuteTemplate に渡すデータには、layout テンプレートで使うタイトル文字列も渡す事になります。

template.Must(template.Must(t.templates.Lookup("layout").Clone()).AddParseTree("content", t.templates.Lookup("search").Tree)).ExecuteTemplate(w, "layout"struct {
    Title  string
    Message string
}{
    Title:   "検索",
    Message: "検索結果はありません",
})

ただしこの方法は、ソースコードに layout という文言が埋め込まれる事になり、content テンプレートの方からレイアウトテンプレートを選ぶ方法がありません。この問題を回避するには、Jekyll のテンプレートの様に、HTML ファイルのヘッダ部に YAML の様なメタ情報を書かせてテンプレート名とレイアウトのペアをプログラムで管理しておき、名称からそのペアを取り出せる仕組みを作っておけば良いですね。

みんなのGo言語[現場で使える実践テクニック] みんなのGo言語[現場で使える実践テクニック]
松木雅幸, mattn, 藤原俊一郎, 中島大一, 牧大輔, 鈴木健太
技術評論社 / (2016-09-09)
 
発送可能時間: