2009/03/04

Recent entries from same category

  1. PerlでWindowsと親和性の高いreadlineが欲しい → あった「Caroline」
  2. Perl をゆるふわと語ろう
  3. cpanfile とは何か、なぜそれを使いたいのか
  4. plackup の --path 引数
  5. Github Notification API が出たので通知を Growl するの書いた。

Perlでジョブキューサーバと言えばTheSchwartzかGearmanといった所ですが、TheSchwartzみたいに不揮発なデータを扱わないのであればGearmanも有用かと思います。
Gearmanのクライアントライブラリ実装には
Gearman - Language Support (Client APIs)
http://www.danga.com/gearman/

Gearman - Client/Worker APIs

The C client/worker API can be found in the same package as the C server (BSD license):

The Perl API can be found as the Gearman::Client and Gearman::Worker modules in CPAN.

The PHP API can be found as Net_Gearman on PEAR.

The Python API can be found on PyPI as “gearman”, and it can be installed with "easy_install gearman".

These are a set of MySQL UDFs that depend on the Gearman server and library in C.

http://www.gearman.org/doku.php?id=download#client_worker_apis
と数種類ありますが、サーバ実装としては
Gearman Job Server (gearmand)
  • Gearman - Gearman server and library (0.3)
  • Gearwoman - Another server written in C
  • Gearman::Server - Gearman::Server - function call "router" and load balancer
http://www.gearman.org/doku.php?id=download#job_server_gearmand
の様に、CによるものとPerlによるものしかありません。Perl版でも良いのですがC版の方が少しでも速いのではないか...という事でgearmandのC版をWindowsにポーティングする事にしました。
ただ、オリジナルのソースは結構UNIX臭く作ってあり差分も大きくなりそうだったので、別の方が実装したGearwomanをベースにWindowsポーティングしてみました。
以下パッチ diff --git a/client.c b/client.c
index ba21f1d..e4154b9 100644
--- a/client.c
+++ b/client.c
@@ -10,7 +10,11 @@ See LICENSE and COPYING for license details.
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
+#ifndef _WIN32
 #include <sys/socket.h>
+#else
+#include <winsock2.h>
+#endif
 
 #include <assert.h>
 
diff --git a/common.h b/common.h
index 0a56ad5..204a9d2 100644
--- a/common.h
+++ b/common.h
@@ -12,6 +12,10 @@ See LICENSE and COPYING for license details.
 #define MSG_NOSIGNAL 0
 #endif
 
+#if defined(_WIN32)
+#define MSG_NOSIGNAL 0
+#endif
+
 #ifndef max
 #define max(a,b) (a<b?a:b)
 #define min(a,b) (a<b?a:b)
diff --git a/gearmand.c b/gearmand.c
index 16e21ac..edfe2b0 100644
--- a/gearmand.c
+++ b/gearmand.c
@@ -13,6 +13,7 @@ See LICENSE and COPYING for license details.
 #include <unistd.h>
 #include <fcntl.h>
 #include <time.h>
+#ifndef _WIN32
 #include <netinet/in.h>
 #include <getopt.h>
 #include <arpa/inet.h>
@@ -21,6 +22,12 @@ See LICENSE and COPYING for license details.
 #include <sys/signal.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#else
+#include <getopt.h>
+#include <signal.h>
+#include <ws2tcpip.h>
+#define in_addr_t unsigned long
+#endif
 
 #include <event.h>
 #include <glib.h>
@@ -48,7 +55,8 @@ GHashTable *g_workers   = NULL; /* maps functions -> list of worker clients (GPt
 int g_foreground = 1;
 char *g_logfilename = "gearmand.log";
 char *g_bind = "0.0.0.0";
-int g_port = 4730;
+/* int g_port = 4730; */
+int g_port = 7003;
 char g_handle_base[MAX_HANDLE_LEN];
 
 void work_fail(Job *job);
@@ -72,7 +80,14 @@ int listen_on(in_addr_t addr, int port)
     int i = 1;
     setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *)&i, sizeof(i));
     // setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (const char *)&i, sizeof(i));
+#ifndef _WIN32
     fcntl(sock, F_SETFL, O_NONBLOCK);
+#else
+    {
+        unsigned long flags = 1;
+        ioctlsocket(sock, FIONBIO, &flags);
+    }
+#endif
 
     sin.sin_addr.s_addr = addr;
     sin.sin_port        = htons(port);
@@ -822,7 +837,14 @@ void listener_cb(int fd, short events, void *arg)
 
     int s = accept(fd, (struct sockaddr *)&sin, &addrlen);
    
+#ifndef _WIN32
     fcntl(s, F_SETFL, O_NONBLOCK);
+#else
+    {
+        unsigned long flags = 1;
+        ioctlsocket(s, FIONBIO, &flags);
+    }
+#endif
 
     Client *cli = client_new();
     cli->state = CLIENT_STATE_CONNECTED;
@@ -871,8 +893,12 @@ void logger(const gchar *domain, GLogLevelFlags level, const gchar *message, gpo
 {
     struct tm dt;
     time_t tme = time(NULL);
-    localtime_r(&tme, &dt);
     char str[64], *lvl = "OTHER";
+#ifndef _WIN32
+    localtime_r(&tme, &dt);
+#else
+    memcpy(&dt, localtime(&tme), sizeof(dt));
+#endif
 
     strftime(str, 64, "%F %T", &dt);
     switch(level) {
@@ -954,6 +980,7 @@ void signal_cb(int fd, short event, void *arg)
 
 void detach()
 {
+#ifndef _WIN32
     if (fork() != 0)
         exit(0);
 
@@ -977,6 +1004,10 @@ void detach()
     open("/dev/null", O_RDWR, 0);   /* 0 stdin */
     dup2(0, 1);  /* 1 stdout */
     dup2(0, 2);  /* 2 stderr */
+#else
+    perror("daemon mode is disabled on win32 ...");
+    exit(0);
+#endif
 }
 
 /****************************************************************************
@@ -986,6 +1017,11 @@ int main(int argc, char *argv[])
     int nsockets = 0;
     struct event listeners[10];
 
+#ifdef _WIN32
+    WSADATA wsaData;
+    WSAStartup(MAKEWORD(2, 0), &wsaData);
+#endif
+
     parseargs(argc, argv);
 
     if (g_foreground == 0) {
@@ -1009,8 +1045,11 @@ int main(int argc, char *argv[])
     event_init();
     //printf("%s %s\n", event_get_version(), event_get_method());
 
+#ifndef _WIN32
     signal(SIGPIPE, SIG_IGN);
+#endif
 
+#ifndef _WIN32
     struct event sig_int, sig_hup, sig_term;/*, sig_pipe;*/
     if (g_foreground) {
         event_set(&sig_int, SIGINT, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_int);
@@ -1025,6 +1064,17 @@ int main(int argc, char *argv[])
     event_add(&sig_term, NULL);
     /*event_set(&sig_pipe, SIGPIPE, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_pipe);
     event_add(&sig_pipe, NULL);*/
+#else
+    struct event sig_int, sig_term;/*, sig_pipe;*/
+    if (g_foreground) {
+        event_set(&sig_int, SIGINT, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_int);
+        event_add(&sig_int, NULL);
+    } else {
+        signal(SIGINT, SIG_IGN);
+    }
+    event_set(&sig_term, SIGTERM, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_term);
+    event_add(&sig_term, NULL);
+#endif
 
     int s = listen_on(inet_addr(g_bind), g_port);
     if (s == -1) {
diff --git a/job.c b/job.c
index 812226b..9a28043 100644
--- a/job.c
+++ b/job.c
@@ -9,7 +9,11 @@ See LICENSE and COPYING for license details.
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
+#ifndef _WIN32
 #include <sys/socket.h>
+#else
+#include <winsock2.h>
+#endif
 
 #include <assert.h>
 
diff --git a/util.c b/util.c
index a482011..6135c9a 100644
--- a/util.c
+++ b/util.c
@@ -9,7 +9,11 @@ See LICENSE and COPYING for license details.
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
+#ifndef _WIN32
 #include <sys/socket.h>
+#else
+#include <winsock2.h>
+#endif
 
 #include <assert.h>
 
ここでひとつ問題が起こりました。libeventです。libeventのWindowsポーティングとしてはmemcached for Win32が有名で、その配布物にlibeventのソースとバイナリがあります。これをmingw32からリンクしようかと思ったのですが、どうやらWindowsのバッファオーバーフローチェックライブラリbufferoverflow.libをリンクしているらしく__security_cookieが無いよとエラーが出ました。
しかしながら諦めず、libeventのオフィシャルからlibevent-1.4.9-stableをダウンロードし、以下の様にlibeventをWindowsに移植しました。
% cat > config.h
#define VERSION "1.4.9"
#define HAVE_STRTOLL 1
^D

% gcc -c -DHAVE_CONFIG_H -Icompat -Iwin32-code -I. event.c log.c signal.c evutil.c WIN32-Code/win32.c
% ar cr libevent.a event.o log.o signal.o evutil.o win32.o
% cp libevent.a c:/mingw/lib/.
% mkdir c:/mingw/include/libevent
% cp ev*.h c:/mingw/include/libevent/.
あとはgearwomanのMakefileです。
--- Makefile    2009-03-04 09:42:20.000000000 +0900
+++ Makefile.w32    2009-03-04 15:52:29.765625000 +0900
@@ -3,7 +3,7 @@
 CC = gcc
 LD = gcc -o
 AR = ar
-LDFLAGS = `pkg-config --libs gthread-2.0` -levent
+LDFLAGS = `pkg-config --libs gthread-2.0` -levent -lws2_32
 CFLAGS = -Wall
 
 # # Debug
@@ -12,10 +12,10 @@
 # OPTIMIZATIONS =
 
 # Production (NDEBUG = NO DEBUG / remove asserts)
-CPPFLAGS += -Wall `pkg-config --cflags glib-2.0` -DNDEBUG
+CPPFLAGS += -Wall `pkg-config --cflags glib-2.0` -DNDEBUG -Ic:/mingw/include/libevent
 OPTIMIZATIONS = -O2 -funroll-loops -finline-functions
 
-BIN =   gearmand
+BIN =   gearmand.exe
 
 OBJ =   gearmand.o      \
         client.o        \
拡張子と、libeventへのパス指定だけです。
この状態でmingw32-makeすると、Windowsネイティブ(cygwin未使用)なgearmand.exeが出来上がります。

さて実際にはどれくらい速いのか...
Perl版をポート番号7003で、Native版をポート番号7004で動作させ use strict;
use Gearman::Worker;

my $gw = Gearman::Worker->new;
$gw->job_servers('127.0.0.1:7004');
#$gw->job_servers('127.0.0.1:7003');
$gw->register_function(
    'sum' => sub {
        my ( $lhs, $rhs ) = split /,/, shift->arg;
        $lhs + $rhs;

    }
);
$gw->work while 1;
ワーカーとしては数値を足すだけの物をそれぞれのポートで起動しました。
クライアントは接続、ジョブ登録500回、処理実行という流れを30回行うベンチマークで検証してみました。
use strict;
use Benchmark;
use Gearman::Client;



timethese(
    30,
    {
        'perl gearmand'   => '&test("127.0.0.1:7003");',
        'native gearmand' => '&test("127.0.0.1:7004");',
    }
);

sub test {
    my $server = shift;

    my $gc = Gearman::Client->new;
    $gc->job_servers($server);
    my $ts = $gc->new_task_set;
    for my $i ( 1 .. 500 ) {
        $ts->add_task(
            "sum" => "3,4",
            {
                on_complete => sub {
                    #print "$i:" . ${ $_[0] } . "\n";
                  }
            }
        );
    }
    $ts->wait;
}
実行結果は Benchmark: timing 30 iterations of native gearmand, perl gearmand...
native gearmand: 19 wallclock secs (11.88 usr +  1.08 sys = 12.95 CPU) @  2.32/s (n=30)
perl gearmand: 30 wallclock secs (13.25 usr +  1.17 sys = 14.42 CPU) @  2.08/s (n=30)
の様になり、若干ですがネイティブ版の方が速い様です。

クライアント・ワーカーライブラリにもC言語の物を使うのであれば、クライアント・ワーカー・サーバ全てでPerlを必要とする事なくGearmanが使える様になりますね。

まぁWindowsでネイティブなGearman使いたいなんて変態は私しかいないかもしれませんが、ご参考になれば...
Posted at by | Edit