2009/03/04


Perlでジョブキューサーバと言えばTheSchwartzかGearmanといった所ですが、TheSchwartzみたいに不揮発なデータを扱わないのであればGearmanも有用かと思います。
Gearmanのクライアントライブラリ実装には
Gearman - Language Support (Client APIs)
http://www.danga.com/gearman/

Gearman - Client/Worker APIs

The C client/worker API can be found in the same package as the C server (BSD license):

The Perl API can be found as the Gearman::Client and Gearman::Worker modules in CPAN.

The PHP API can be found as Net_Gearman on PEAR.

The Python API can be found on PyPI as “gearman”, and it can be installed with "easy_install gearman".

These are a set of MySQL UDFs that depend on the Gearman server and library in C.

http://www.gearman.org/doku.php?id=download#client_worker_apis
と数種類ありますが、サーバ実装としては
Gearman Job Server (gearmand)
  • Gearman - Gearman server and library (0.3)
  • Gearwoman - Another server written in C
  • Gearman::Server - Gearman::Server - function call "router" and load balancer
http://www.gearman.org/doku.php?id=download#job_server_gearmand
の様に、CによるものとPerlによるものしかありません。Perl版でも良いのですがC版の方が少しでも速いのではないか...という事でgearmandのC版をWindowsにポーティングする事にしました。
ただ、オリジナルのソースは結構UNIX臭く作ってあり差分も大きくなりそうだったので、別の方が実装したGearwomanをベースにWindowsポーティングしてみました。
以下パッチ diff --git a/client.c b/client.c
index ba21f1d..e4154b9 100644
--- a/client.c
+++ b/client.c
@@ -10,7 +10,11 @@ See LICENSE and COPYING for license details.
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
+#ifndef _WIN32
 #include <sys/socket.h>
+#else
+#include <winsock2.h>
+#endif
 
 #include <assert.h>
 
diff --git a/common.h b/common.h
index 0a56ad5..204a9d2 100644
--- a/common.h
+++ b/common.h
@@ -12,6 +12,10 @@ See LICENSE and COPYING for license details.
 #define MSG_NOSIGNAL 0
 #endif
 
+#if defined(_WIN32)
+#define MSG_NOSIGNAL 0
+#endif
+
 #ifndef max
 #define max(a,b) (a<b?a:b)
 #define min(a,b) (a<b?a:b)
diff --git a/gearmand.c b/gearmand.c
index 16e21ac..edfe2b0 100644
--- a/gearmand.c
+++ b/gearmand.c
@@ -13,6 +13,7 @@ See LICENSE and COPYING for license details.
 #include <unistd.h>
 #include <fcntl.h>
 #include <time.h>
+#ifndef _WIN32
 #include <netinet/in.h>
 #include <getopt.h>
 #include <arpa/inet.h>
@@ -21,6 +22,12 @@ See LICENSE and COPYING for license details.
 #include <sys/signal.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#else
+#include <getopt.h>
+#include <signal.h>
+#include <ws2tcpip.h>
+#define in_addr_t unsigned long
+#endif
 
 #include <event.h>
 #include <glib.h>
@@ -48,7 +55,8 @@ GHashTable *g_workers   = NULL; /* maps functions -> list of worker clients (GPt
 int g_foreground = 1;
 char *g_logfilename = "gearmand.log";
 char *g_bind = "0.0.0.0";
-int g_port = 4730;
+/* int g_port = 4730; */
+int g_port = 7003;
 char g_handle_base[MAX_HANDLE_LEN];
 
 void work_fail(Job *job);
@@ -72,7 +80,14 @@ int listen_on(in_addr_t addr, int port)
     int i = 1;
     setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *)&i, sizeof(i));
     // setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (const char *)&i, sizeof(i));
+#ifndef _WIN32
     fcntl(sock, F_SETFL, O_NONBLOCK);
+#else
+    {
+        unsigned long flags = 1;
+        ioctlsocket(sock, FIONBIO, &flags);
+    }
+#endif
 
     sin.sin_addr.s_addr = addr;
     sin.sin_port        = htons(port);
@@ -822,7 +837,14 @@ void listener_cb(int fd, short events, void *arg)
 
     int s = accept(fd, (struct sockaddr *)&sin, &addrlen);
    
+#ifndef _WIN32
     fcntl(s, F_SETFL, O_NONBLOCK);
+#else
+    {
+        unsigned long flags = 1;
+        ioctlsocket(s, FIONBIO, &flags);
+    }
+#endif
 
     Client *cli = client_new();
     cli->state = CLIENT_STATE_CONNECTED;
@@ -871,8 +893,12 @@ void logger(const gchar *domain, GLogLevelFlags level, const gchar *message, gpo
 {
     struct tm dt;
     time_t tme = time(NULL);
-    localtime_r(&tme, &dt);
     char str[64], *lvl = "OTHER";
+#ifndef _WIN32
+    localtime_r(&tme, &dt);
+#else
+    memcpy(&dt, localtime(&tme), sizeof(dt));
+#endif
 
     strftime(str, 64, "%F %T", &dt);
     switch(level) {
@@ -954,6 +980,7 @@ void signal_cb(int fd, short event, void *arg)
 
 void detach()
 {
+#ifndef _WIN32
     if (fork() != 0)
         exit(0);
 
@@ -977,6 +1004,10 @@ void detach()
     open("/dev/null", O_RDWR, 0);   /* 0 stdin */
     dup2(0, 1);  /* 1 stdout */
     dup2(0, 2);  /* 2 stderr */
+#else
+    perror("daemon mode is disabled on win32 ...");
+    exit(0);
+#endif
 }
 
 /****************************************************************************
@@ -986,6 +1017,11 @@ int main(int argc, char *argv[])
     int nsockets = 0;
     struct event listeners[10];
 
+#ifdef _WIN32
+    WSADATA wsaData;
+    WSAStartup(MAKEWORD(2, 0), &wsaData);
+#endif
+
     parseargs(argc, argv);
 
     if (g_foreground == 0) {
@@ -1009,8 +1045,11 @@ int main(int argc, char *argv[])
     event_init();
     //printf("%s %s\n", event_get_version(), event_get_method());
 
+#ifndef _WIN32
     signal(SIGPIPE, SIG_IGN);
+#endif
 
+#ifndef _WIN32
     struct event sig_int, sig_hup, sig_term;/*, sig_pipe;*/
     if (g_foreground) {
         event_set(&sig_int, SIGINT, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_int);
@@ -1025,6 +1064,17 @@ int main(int argc, char *argv[])
     event_add(&sig_term, NULL);
     /*event_set(&sig_pipe, SIGPIPE, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_pipe);
     event_add(&sig_pipe, NULL);*/
+#else
+    struct event sig_int, sig_term;/*, sig_pipe;*/
+    if (g_foreground) {
+        event_set(&sig_int, SIGINT, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_int);
+        event_add(&sig_int, NULL);
+    } else {
+        signal(SIGINT, SIG_IGN);
+    }
+    event_set(&sig_term, SIGTERM, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_term);
+    event_add(&sig_term, NULL);
+#endif
 
     int s = listen_on(inet_addr(g_bind), g_port);
     if (s == -1) {
diff --git a/job.c b/job.c
index 812226b..9a28043 100644
--- a/job.c
+++ b/job.c
@@ -9,7 +9,11 @@ See LICENSE and COPYING for license details.
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
+#ifndef _WIN32
 #include <sys/socket.h>
+#else
+#include <winsock2.h>
+#endif
 
 #include <assert.h>
 
diff --git a/util.c b/util.c
index a482011..6135c9a 100644
--- a/util.c
+++ b/util.c
@@ -9,7 +9,11 @@ See LICENSE and COPYING for license details.
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
+#ifndef _WIN32
 #include <sys/socket.h>
+#else
+#include <winsock2.h>
+#endif
 
 #include <assert.h>
 
ここでひとつ問題が起こりました。libeventです。libeventのWindowsポーティングとしてはmemcached for Win32が有名で、その配布物にlibeventのソースとバイナリがあります。これをmingw32からリンクしようかと思ったのですが、どうやらWindowsのバッファオーバーフローチェックライブラリbufferoverflow.libをリンクしているらしく__security_cookieが無いよとエラーが出ました。
しかしながら諦めず、libeventのオフィシャルからlibevent-1.4.9-stableをダウンロードし、以下の様にlibeventをWindowsに移植しました。
% cat > config.h
#define VERSION "1.4.9"
#define HAVE_STRTOLL 1
^D

% gcc -c -DHAVE_CONFIG_H -Icompat -Iwin32-code -I. event.c log.c signal.c evutil.c WIN32-Code/win32.c
% ar cr libevent.a event.o log.o signal.o evutil.o win32.o
% cp libevent.a c:/mingw/lib/.
% mkdir c:/mingw/include/libevent
% cp ev*.h c:/mingw/include/libevent/.
あとはgearwomanのMakefileです。
--- Makefile    2009-03-04 09:42:20.000000000 +0900
+++ Makefile.w32    2009-03-04 15:52:29.765625000 +0900
@@ -3,7 +3,7 @@
 CC = gcc
 LD = gcc -o
 AR = ar
-LDFLAGS = `pkg-config --libs gthread-2.0` -levent
+LDFLAGS = `pkg-config --libs gthread-2.0` -levent -lws2_32
 CFLAGS = -Wall
 
 # # Debug
@@ -12,10 +12,10 @@
 # OPTIMIZATIONS =
 
 # Production (NDEBUG = NO DEBUG / remove asserts)
-CPPFLAGS += -Wall `pkg-config --cflags glib-2.0` -DNDEBUG
+CPPFLAGS += -Wall `pkg-config --cflags glib-2.0` -DNDEBUG -Ic:/mingw/include/libevent
 OPTIMIZATIONS = -O2 -funroll-loops -finline-functions
 
-BIN =   gearmand
+BIN =   gearmand.exe
 
 OBJ =   gearmand.o      \
         client.o        \
拡張子と、libeventへのパス指定だけです。
この状態でmingw32-makeすると、Windowsネイティブ(cygwin未使用)なgearmand.exeが出来上がります。

さて実際にはどれくらい速いのか...
Perl版をポート番号7003で、Native版をポート番号7004で動作させ use strict;
use Gearman::Worker;

my $gw = Gearman::Worker->new;
$gw->job_servers('127.0.0.1:7004');
#$gw->job_servers('127.0.0.1:7003');
$gw->register_function(
    'sum' => sub {
        my ( $lhs, $rhs ) = split /,/, shift->arg;
        $lhs + $rhs;

    }
);
$gw->work while 1;
ワーカーとしては数値を足すだけの物をそれぞれのポートで起動しました。
クライアントは接続、ジョブ登録500回、処理実行という流れを30回行うベンチマークで検証してみました。
use strict;
use Benchmark;
use Gearman::Client;



timethese(
    30,
    {
        'perl gearmand'   => '&test("127.0.0.1:7003");',
        'native gearmand' => '&test("127.0.0.1:7004");',
    }
);

sub test {
    my $server = shift;

    my $gc = Gearman::Client->new;
    $gc->job_servers($server);
    my $ts = $gc->new_task_set;
    for my $i ( 1 .. 500 ) {
        $ts->add_task(
            "sum" => "3,4",
            {
                on_complete => sub {
                    #print "$i:" . ${ $_[0] } . "\n";
                  }
            }
        );
    }
    $ts->wait;
}
実行結果は Benchmark: timing 30 iterations of native gearmand, perl gearmand...
native gearmand: 19 wallclock secs (11.88 usr +  1.08 sys = 12.95 CPU) @  2.32/s (n=30)
perl gearmand: 30 wallclock secs (13.25 usr +  1.17 sys = 14.42 CPU) @  2.08/s (n=30)
の様になり、若干ですがネイティブ版の方が速い様です。

クライアント・ワーカーライブラリにもC言語の物を使うのであれば、クライアント・ワーカー・サーバ全てでPerlを必要とする事なくGearmanが使える様になりますね。

まぁWindowsでネイティブなGearman使いたいなんて変態は私しかいないかもしれませんが、ご参考になれば...
Posted at by



2009/02/26


IO::Lambdaを見てて、おーいいねー、と思ってサンプル動かしたらエラー出た。
追ってくと、IOなんて根底のモジュールに原因がある事が分かった。原因っていうかエラーが出るように仕込んであった。
以下パッチ作ってオフィシャルにメールした。
--- IO.xs.orig  2006-03-26 11:27:13.000000000 +0900
+++ IO.xs   2009-02-24 20:16:34.921875000 +0900
@@ -121,7 +121,12 @@
     }
     return RETVAL;
 #else
+#  ifdef WIN32
+   unsigned long flags = block;
+   return ioctl(PerlIO_fileno(f), FIONBIO, &flags);
+#  else
     return -1;
+#  endif
 #endif
 }
 
もしかしたらFAQなpatchで蹴られるだろうけど...

追記 2009/02/26
patchがマージされました。
Posted at by



2009/02/24


memcachedサーバへのアクセスモジュールは数ありますが、一般的にはlibmemcachedが使われる事が多いと思います。
Perlにおいても
  • Cache::Memcached
  • Cache::Memcached::Fast
  • Cache::Memcached::libmemcached
  • Memcached::libmemcached
と数種類存在し、一般的に使用されるlibmemcachedのラッパインタフェースであるCache::Memcached::libmemcachedが使われる事が多い様に思います。
libmemcachedに関しては、以前Windowsへのポーティングを行い、オフィシャルへのパッチ送付も行いました。
成果物としてはcodereposに置いてあります。オフィシャルからもリンクを張って頂けるようになりました。
さらにCache::Memcached::Fastについても、以前Windowsへのポーティングを行い、Cache::Memcached::Fast version 0.13に取り込まれました。
つまり特殊な事をせずにWindowsから利用出来る高速なPerlのmemcachedクライアントライブラリとしてはCache::Memcached::Fastになります。
ちなみこのCache::Memcached::Fast、実はlibmemcachedと比較しても格段に速く、tokuhiromさんが取ったベンチマークでも素晴らしい結果を叩き出してくれています。

さて今日は、このCache::Memcached::Fastが内部で使用しているXSクライアントモジュールを使用して、高速にmemcachedにアクセスする物を作ってみたいと思います。
このCache::Memcached::FastのXSコードは、Perlに依存した部分とPerlに依存していない部分で分けられており、その後者は一般的なC言語のソースから呼び出しが可能になっています。
なぜこのCache::Memcached::Fastが速いかと言うと、libmemcachedの様に逐次送信を行っているのではなくwritev(2)を使った一括送信を行っているからです。またCache::Memcached::Fastはselect(2)ではなくpoll(2)を使っている為、FD_SETの設定を毎回行わなくて良いというのも微量ではありますが影響しているのではないかと思っています。
このクライアントモジュールは、libmemcachedの様に単一取得(memcached_get)や複数取得(memcached_mget)のAPI呼び出し時に逐次送受信されるのではなく、client_prepare_get/client_prepare_setを使った前準備方式を使っています。これにより複数の問い合わせに対しても内部では一括送信してくれ、一括で受信してくれます。libmemcachedは複数の問い合わせに対してそれぞれ結果待ちをし、全ての問い合わせが完了した時点で制御が戻ります。これについてはMEMCACHED_BEHAVIOR_NO_BLOCKを使用する事でよく似た動作をする事が出来ます。
このクライアントモジュールを使用した実際のサンプルコードは以下の様になります。
/* fast memcached client using client module of Cache::Memcached::Client.
 *   compile   : gcc -o a.exe foo.c libclient.a
 *   for win32 : gcc -o a.exe foo.c libclient.a -lws2_32
 */

#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include "client.h"

static void* alloc_value(value_size_type value_size, void **opaque) {
    *opaque = (char *) malloc(value_size + 1);
    memset(*opaque, 0, value_size + 1);
    return (void *) *opaque;
}

static void free_value(void *opaque) {
    free(opaque);
}

static void result_store(void *arg, void *opaque, int key_index, void *meta) {
    char* res = (char*) opaque;
    if (res) *(char**)arg = strdup(res);
    else     *(char**)arg = strdup("");
}

int main(int argc, char* argv[]) {
    struct client* c = NULL;
    const char* host = "127.0.0.1";
    const char* port = "11211";
    const char* key = "foo";
    const char* value = "bar";
    struct result_object object;
    char* result = NULL;
    int ret;

    /* initialize client module of Cache::Memcached::Fast */
    c = client_init();

    /* add memcached server */
    client_add_server(c, host, strlen(host), port, strlen(port), 1.0, 1);

    /*--- set function --------------------------------*/
    /* initialize staff context for set */
    object.alloc = NULL;
    object.store = result_store;
    object.free = NULL;
    object.arg = NULL;
    client_reset(c, &object, 1);

    /* prepare for set function */
    printf("setting value of '%s' as '%s'\n", key, value);
    client_prepare_set(c, CMD_SET, 0, key, strlen(key), 0, 0, value, strlen(value));

    /* execute set function */
    client_execute(c);
    /*-------------------------------------------------*/

    /*--- get function --------------------------------*/
    /* initialize staff context for get */
    object.alloc = alloc_value;
    object.store = result_store;
    object.free = free_value;
    object.arg = &result;
    client_reset(c, &object, 0);

    /* prepare for get function */
    printf("getting value of '%s'\n", key);
    client_prepare_get(c, CMD_GET, 0, key, strlen(key));

    /* execute set function */
    client_execute(c);
    /*-------------------------------------------------*/

    printf("result value of '%s' is '%s'\n", key, result);

    free(result);

    return 0;
}
少し変わったコードになりますが、メモリの確保から開放まで自分でハンドリングでき、自前の構造を使った処理も行えるかと思います。
なお、libmemcachedとCache::Memcached::Fastのクライアントモジュールでget/setを繰り返すベンチマークを取ってみました。
まずはlibmemcachedのコード
#include <winsock2.h>
#include <memcached.h>
#include <stdio.h>

#define SERVER_NAME "127.0.0.1"
#define SERVER_PORT 11211
#define KEY "foo"
#define VALUE "bar"

int main(void) {
    memcached_return rc;
    memcached_st *memc;
    char* value;
    int value_length = 0;
    int flags = 0;
    int n;

    memc = memcached_create(NULL);
    memcached_server_add(memc, SERVER_NAME, SERVER_PORT);
    for (n = 0; n < 30000; n++) {
        memcached_set(memc, KEY, strlen(KEY), VALUE, strlen(VALUE), 0, 0);
        value = memcached_get(memc, KEY, strlen(KEY), &value_length, &flags, &rc);
    }
    memcached_free(memc);
    return 0;
}
次にCache::Memcached::Fast(CMF)のクライアントモジュールのコード
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include "client.h"

#define SERVER_NAME "127.0.0.1"
#define SERVER_PORT "11211"
#define KEY "foo"
#define VALUE "bar"

static void* alloc_value(value_size_type value_size, void **opaque) {
    *opaque = (char *) malloc(value_size + 1);
    memset(*opaque, 0, value_size + 1);
    return (void *) *opaque;
}

static void free_value(void *opaque) {
    free(opaque);
}

static void result_store(void *arg, void *opaque, int key_index, void *meta) {
    char* res = (char*) opaque;
    if (res) *(char**)arg = strdup(res);
    else     *(char**)arg = strdup("");
}

int main(int argc, char* argv[]) {
    struct client* c = NULL;
    struct result_object object_set, object_get;
    char* result = NULL;
    int n;

    c = client_init();
    client_add_server(c, SERVER_NAME, strlen(SERVER_NAME), SERVER_PORT, strlen(SERVER_PORT), 1.0, 1);

    object_set.alloc = NULL;
    object_set.store = result_store;
    object_set.free = NULL;
    object_set.arg = NULL;
    object_get.alloc = alloc_value;
    object_get.store = result_store;
    object_get.free = free_value;
    object_get.arg = &result;

    for (n = 0; n < 30000; n++) {
        client_reset(c, &object_set, 1);
        client_prepare_set(c, CMD_SET, 0, KEY, strlen(KEY), 0, 0, VALUE, strlen(VALUE));
        client_execute(c);
        client_reset(c, &object_get, 0);
        client_prepare_get(c, CMD_GET, 0, KEY, strlen(KEY));
        client_execute(c);
    }

    free(result);

    return 0;
}
計測結果は

libmemcached: 6.953125

CMF: 5.984375

となりました。get/setのループだけなのにCMFは速いですね。
Perlに依存していないので、通常アプリケーションでも問題なく使えるかと思います。
memcachedと通信するC言語で作ったプログラムのパフォーマンスが悪いと思われたならば、一度試して見られてはどうでしょうか?
Posted at by