Gearmanのクライアントライブラリ実装には
Gearman - Language Support (Client APIs)
http://www.danga.com/gearman/
- C client and server -- reimplementation in C of both client and daemon.
- Perl client -- What you need for both Perl workers to callers.
- Perl async client -- Async caller client for running in event loops. Uses Danga::Socket, but could be adapted to use POE somewhat easily.
- Ruby [svn]
- Python [svn]
Gearman - Client/Worker APIs
The C client/worker API can be found in the same package as the C server (BSD license):
The Perl API can be found as the Gearman::Client and Gearman::Worker modules in CPAN.
The PHP API can be found as Net_Gearman on PEAR.
The Python API can be found on PyPI as “gearman”, and it can be installed with "easy_install gearman".
- Python Gearman on PyPI
Python Gearman SVNThese are a set of MySQL UDFs that depend on the Gearman server and library in C.
http://www.gearman.org/doku.php?id=download#client_worker_apis と数種類ありますが、サーバ実装としては
Gearman Job Server (gearmand)の様に、CによるものとPerlによるものしかありません。Perl版でも良いのですがC版の方が少しでも速いのではないか...という事でgearmandのC版をWindowsにポーティングする事にしました。
http://www.gearman.org/doku.php?id=download#job_server_gearmand
- Gearman - Gearman server and library (0.3)
- Gearwoman - Another server written in C
- Gearman::Server - Gearman::Server - function call "router" and load balancer
ただ、オリジナルのソースは結構UNIX臭く作ってあり差分も大きくなりそうだったので、別の方が実装したGearwomanをベースにWindowsポーティングしてみました。
以下パッチdiff --git a/client.c b/client.c
ここでひとつ問題が起こりました。libeventです。libeventのWindowsポーティングとしてはmemcached for Win32が有名で、その配布物にlibeventのソースとバイナリがあります。これをmingw32からリンクしようかと思ったのですが、どうやらWindowsのバッファオーバーフローチェックライブラリbufferoverflow.libをリンクしているらしく__security_cookieが無いよとエラーが出ました。
index ba21f1d..e4154b9 100644
--- a/client.c
+++ b/client.c
@@ -10,7 +10,11 @@ See LICENSE and COPYING for license details.
#include <stdio.h>
#include <string.h>
#include <errno.h>
+#ifndef _WIN32
#include <sys/socket.h>
+#else
+#include <winsock2.h>
+#endif
#include <assert.h>
diff --git a/common.h b/common.h
index 0a56ad5..204a9d2 100644
--- a/common.h
+++ b/common.h
@@ -12,6 +12,10 @@ See LICENSE and COPYING for license details.
#define MSG_NOSIGNAL 0
#endif
+#if defined(_WIN32)
+#define MSG_NOSIGNAL 0
+#endif
+
#ifndef max
#define max(a,b) (a<b?a:b)
#define min(a,b) (a<b?a:b)
diff --git a/gearmand.c b/gearmand.c
index 16e21ac..edfe2b0 100644
--- a/gearmand.c
+++ b/gearmand.c
@@ -13,6 +13,7 @@ See LICENSE and COPYING for license details.
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
+#ifndef _WIN32
#include <netinet/in.h>
#include <getopt.h>
#include <arpa/inet.h>
@@ -21,6 +22,12 @@ See LICENSE and COPYING for license details.
#include <sys/signal.h>
#include <sys/types.h>
#include <sys/stat.h>
+#else
+#include <getopt.h>
+#include <signal.h>
+#include <ws2tcpip.h>
+#define in_addr_t unsigned long
+#endif
#include <event.h>
#include <glib.h>
@@ -48,7 +55,8 @@ GHashTable *g_workers = NULL; /* maps functions -> list of worker clients (GPt
int g_foreground = 1;
char *g_logfilename = "gearmand.log";
char *g_bind = "0.0.0.0";
-int g_port = 4730;
+/* int g_port = 4730; */
+int g_port = 7003;
char g_handle_base[MAX_HANDLE_LEN];
void work_fail(Job *job);
@@ -72,7 +80,14 @@ int listen_on(in_addr_t addr, int port)
int i = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *)&i, sizeof(i));
// setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (const char *)&i, sizeof(i));
+#ifndef _WIN32
fcntl(sock, F_SETFL, O_NONBLOCK);
+#else
+ {
+ unsigned long flags = 1;
+ ioctlsocket(sock, FIONBIO, &flags);
+ }
+#endif
sin.sin_addr.s_addr = addr;
sin.sin_port = htons(port);
@@ -822,7 +837,14 @@ void listener_cb(int fd, short events, void *arg)
int s = accept(fd, (struct sockaddr *)&sin, &addrlen);
+#ifndef _WIN32
fcntl(s, F_SETFL, O_NONBLOCK);
+#else
+ {
+ unsigned long flags = 1;
+ ioctlsocket(s, FIONBIO, &flags);
+ }
+#endif
Client *cli = client_new();
cli->state = CLIENT_STATE_CONNECTED;
@@ -871,8 +893,12 @@ void logger(const gchar *domain, GLogLevelFlags level, const gchar *message, gpo
{
struct tm dt;
time_t tme = time(NULL);
- localtime_r(&tme, &dt);
char str[64], *lvl = "OTHER";
+#ifndef _WIN32
+ localtime_r(&tme, &dt);
+#else
+ memcpy(&dt, localtime(&tme), sizeof(dt));
+#endif
strftime(str, 64, "%F %T", &dt);
switch(level) {
@@ -954,6 +980,7 @@ void signal_cb(int fd, short event, void *arg)
void detach()
{
+#ifndef _WIN32
if (fork() != 0)
exit(0);
@@ -977,6 +1004,10 @@ void detach()
open("/dev/null", O_RDWR, 0); /* 0 stdin */
dup2(0, 1); /* 1 stdout */
dup2(0, 2); /* 2 stderr */
+#else
+ perror("daemon mode is disabled on win32 ...");
+ exit(0);
+#endif
}
/****************************************************************************
@@ -986,6 +1017,11 @@ int main(int argc, char *argv[])
int nsockets = 0;
struct event listeners[10];
+#ifdef _WIN32
+ WSADATA wsaData;
+ WSAStartup(MAKEWORD(2, 0), &wsaData);
+#endif
+
parseargs(argc, argv);
if (g_foreground == 0) {
@@ -1009,8 +1045,11 @@ int main(int argc, char *argv[])
event_init();
//printf("%s %s\n", event_get_version(), event_get_method());
+#ifndef _WIN32
signal(SIGPIPE, SIG_IGN);
+#endif
+#ifndef _WIN32
struct event sig_int, sig_hup, sig_term;/*, sig_pipe;*/
if (g_foreground) {
event_set(&sig_int, SIGINT, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_int);
@@ -1025,6 +1064,17 @@ int main(int argc, char *argv[])
event_add(&sig_term, NULL);
/*event_set(&sig_pipe, SIGPIPE, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_pipe);
event_add(&sig_pipe, NULL);*/
+#else
+ struct event sig_int, sig_term;/*, sig_pipe;*/
+ if (g_foreground) {
+ event_set(&sig_int, SIGINT, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_int);
+ event_add(&sig_int, NULL);
+ } else {
+ signal(SIGINT, SIG_IGN);
+ }
+ event_set(&sig_term, SIGTERM, EV_SIGNAL|EV_PERSIST, signal_cb, &sig_term);
+ event_add(&sig_term, NULL);
+#endif
int s = listen_on(inet_addr(g_bind), g_port);
if (s == -1) {
diff --git a/job.c b/job.c
index 812226b..9a28043 100644
--- a/job.c
+++ b/job.c
@@ -9,7 +9,11 @@ See LICENSE and COPYING for license details.
#include <stdio.h>
#include <string.h>
#include <errno.h>
+#ifndef _WIN32
#include <sys/socket.h>
+#else
+#include <winsock2.h>
+#endif
#include <assert.h>
diff --git a/util.c b/util.c
index a482011..6135c9a 100644
--- a/util.c
+++ b/util.c
@@ -9,7 +9,11 @@ See LICENSE and COPYING for license details.
#include <stdio.h>
#include <string.h>
#include <errno.h>
+#ifndef _WIN32
#include <sys/socket.h>
+#else
+#include <winsock2.h>
+#endif
#include <assert.h>
しかしながら諦めず、libeventのオフィシャルからlibevent-1.4.9-stableをダウンロードし、以下の様にlibeventをWindowsに移植しました。
% cat > config.h
あとはgearwomanのMakefileです。
#define VERSION "1.4.9"
#define HAVE_STRTOLL 1
^D
% gcc -c -DHAVE_CONFIG_H -Icompat -Iwin32-code -I. event.c log.c signal.c evutil.c WIN32-Code/win32.c
% ar cr libevent.a event.o log.o signal.o evutil.o win32.o
% cp libevent.a c:/mingw/lib/.
% mkdir c:/mingw/include/libevent
% cp ev*.h c:/mingw/include/libevent/.
--- Makefile 2009-03-04 09:42:20.000000000 +0900
拡張子と、libeventへのパス指定だけです。
+++ Makefile.w32 2009-03-04 15:52:29.765625000 +0900
@@ -3,7 +3,7 @@
CC = gcc
LD = gcc -o
AR = ar
-LDFLAGS = `pkg-config --libs gthread-2.0` -levent
+LDFLAGS = `pkg-config --libs gthread-2.0` -levent -lws2_32
CFLAGS = -Wall
# # Debug
@@ -12,10 +12,10 @@
# OPTIMIZATIONS =
# Production (NDEBUG = NO DEBUG / remove asserts)
-CPPFLAGS += -Wall `pkg-config --cflags glib-2.0` -DNDEBUG
+CPPFLAGS += -Wall `pkg-config --cflags glib-2.0` -DNDEBUG -Ic:/mingw/include/libevent
OPTIMIZATIONS = -O2 -funroll-loops -finline-functions
-BIN = gearmand
+BIN = gearmand.exe
OBJ = gearmand.o \
client.o \
この状態でmingw32-makeすると、Windowsネイティブ(cygwin未使用)なgearmand.exeが出来上がります。
さて実際にはどれくらい速いのか...
Perl版をポート番号7003で、Native版をポート番号7004で動作させuse strict;
ワーカーとしては数値を足すだけの物をそれぞれのポートで起動しました。
use Gearman::Worker;
my $gw = Gearman::Worker->new;
$gw->job_servers('127.0.0.1:7004');
#$gw->job_servers('127.0.0.1:7003');
$gw->register_function(
'sum' => sub {
my ( $lhs, $rhs ) = split /,/, shift->arg;
$lhs + $rhs;
}
);
$gw->work while 1;
クライアントは接続、ジョブ登録500回、処理実行という流れを30回行うベンチマークで検証してみました。
use strict;
実行結果は
use Benchmark;
use Gearman::Client;
timethese(
30,
{
'perl gearmand' => '&test("127.0.0.1:7003");',
'native gearmand' => '&test("127.0.0.1:7004");',
}
);
sub test {
my $server = shift;
my $gc = Gearman::Client->new;
$gc->job_servers($server);
my $ts = $gc->new_task_set;
for my $i ( 1 .. 500 ) {
$ts->add_task(
"sum" => "3,4",
{
on_complete => sub {
#print "$i:" . ${ $_[0] } . "\n";
}
}
);
}
$ts->wait;
}
Benchmark: timing 30 iterations of native gearmand, perl gearmand...
の様になり、若干ですがネイティブ版の方が速い様です。
native gearmand: 19 wallclock secs (11.88 usr + 1.08 sys = 12.95 CPU) @ 2.32/s (n=30)
perl gearmand: 30 wallclock secs (13.25 usr + 1.17 sys = 14.42 CPU) @ 2.08/s (n=30)
クライアント・ワーカーライブラリにもC言語の物を使うのであれば、クライアント・ワーカー・サーバ全てでPerlを必要とする事なくGearmanが使える様になりますね。
まぁWindowsでネイティブなGearman使いたいなんて変態は私しかいないかもしれませんが、ご参考になれば...