TheSchwartzで仕事をあとにまわす - bits and bytes
なければ勝手に作ってくれるのかなと思ってSYNOPSISのコードを実行してみたけどやっぱり自動でできたりはしなそうだったので調べたらCatalyst and TheSchwartz: Reliable JobQueue in a great framework - Voxにschema.sqlを使うといいと書いてありました。このスキーマをmysqlで実行すればTheSchwartzのキューを管理するためのテーブルが出来上がります。
http://labs.gmo.jp/blog/ku/2008/06/theschwartz.html
Twitter / ku: TheSchwartzおもしろかった sqliteで動...
TheSchwartzおもしろかった sqliteで動くようにしたいけどsqliteようのschemeかきかたわかんない
http://twitter.com/ku/statuses/843135311
って事でSQLiteでも動く様に出来ないか調べてみました。
schema.sqlは、MySQL用のスキーマで
CREATE TABLE funcmap (
funcid INT UNSIGNED PRIMARY KEY NOT NULL AUTO_INCREMENT,
funcname VARCHAR(255) NOT NULL,
UNIQUE(funcname)
);
CREATE TABLE job (
jobid BIGINT UNSIGNED PRIMARY KEY NOT NULL AUTO_INCREMENT,
funcid INT UNSIGNED NOT NULL,
arg MEDIUMBLOB,
uniqkey VARCHAR(255) NULL,
insert_time INTEGER UNSIGNED,
run_after INTEGER UNSIGNED NOT NULL,
grabbed_until INTEGER UNSIGNED NOT NULL,
priority SMALLINT UNSIGNED,
coalesce VARCHAR(255),
INDEX (funcid, run_after),
UNIQUE(funcid, uniqkey),
INDEX (funcid, coalesce)
);
CREATE TABLE note (
jobid BIGINT UNSIGNED NOT NULL,
notekey VARCHAR(255),
PRIMARY KEY (jobid, notekey),
value MEDIUMBLOB
);
CREATE TABLE error (
error_time INTEGER UNSIGNED NOT NULL,
jobid BIGINT UNSIGNED NOT NULL,
message VARCHAR(255) NOT NULL,
funcid INT UNSIGNED NOT NULL DEFAULT 0,
INDEX (funcid, error_time),
INDEX (error_time),
INDEX (jobid)
);
CREATE TABLE exitstatus (
jobid BIGINT UNSIGNED PRIMARY KEY NOT NULL,
funcid INT UNSIGNED NOT NULL DEFAULT 0,
status SMALLINT UNSIGNED,
completion_time INTEGER UNSIGNED,
delete_after INTEGER UNSIGNED,
INDEX (funcid),
INDEX (delete_after)
);
というSQLなので、これをSQLite用に書き換えます。cl.pocari.org - SQLite で auto-increment なフィールドを作成する方法こちらの記事でも書かれている通り、SQLiteではINTEGER PRIMARY KEYを指定すればok。
つまり,SQLite で auto-increment なフィールドを作りたければ,INTEGER PRIMARY KEY を指定してあげればいいらしい.
http://cl.pocari.org/2006-02-12-1.html
※ちなみにUNSIGNEDとかNOT NULLとか付けると動かないです。
で書き換えたSQLは以下の通り
CREATE TABLE funcmap (
funcid INTEGER PRIMARY KEY,
funcname VARCHAR(255) NOT NULL,
UNIQUE(funcname)
);
CREATE TABLE job (
jobid INTEGER PRIMARY KEY,
funcid INTEGER NOT NULL,
arg MEDIUMBLOB,
uniqkey VARCHAR(255) NULL,
insert_time INTEGER UNSIGNED,
run_after INTEGER UNSIGNED NOT NULL,
grabbed_until INTEGER UNSIGNED NOT NULL,
priority SMALLINT UNSIGNED,
coalesce VARCHAR(255),
UNIQUE(funcid, uniqkey)
);
CREATE INDEX job_idx_1 ON job (funcid, run_after);
CREATE INDEX job_idx_2 ON job (funcid, coalesce);
CREATE TABLE note (
jobid INTEGER NOT NULL,
notekey VARCHAR(255),
value BLOB
);
CREATE TABLE error (
error_time INTEGER UNSIGNED NOT NULL,
jobid INTEGER NOT NULL,
message VARCHAR(255) NOT NULL,
funcid INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX error_idx_1 ON error (funcid, error_time);
CREATE INDEX error_idx_2 ON error (error_time);
CREATE INDEX error_idx_3 ON error (jobid);
CREATE TABLE exitstatus (
jobid INTEGER PRIMARY KEY,
funcid INTEGER NOT NULL DEFAULT 0,
status SMALLINT UNSIGNED,
completion_time INTEGER UNSIGNED,
delete_after INTEGER UNSIGNED
);
CREATE INDEX exitstatus_idx_1 ON exitstatus (funcid);
CREATE INDEX exitstatus_idx_2 ON exitstatus (delete_after);
あとはコレを
# sqlite3 the_schwartz < theschwartz_schema.sql
としてDBファイルを作る。さてサーバのコードはDSNを"dbi:SQLite:the_schwartz"にするだけ。
#!/usr/bin/env perl
package MyWorker;
use strict;
use utf8;
use warnings;
use base qw( TheSchwartz::Worker );
use Data::Dumper;
binmode STDERR, ":encoding(cp932)" if $^O eq "MSWin32";
sub work {
my ($class, $job) = @_;
warn "お仕事ですよ! @{[ Dumper($job->arg) ]}\n";
$job->completed();
}
package main;
use strict;
use warnings;
use TheSchwartz;
my $client = TheSchwartz->new(
databases => [ +{ dsn => 'dbi:SQLite:the_schwartz' } ]
);
$client->can_do('MyWorker');
$client->work();
さらにクライアントのコード
#!/usr/bin/env perl
use strict;
use warnings;
use TheSchwartz;
my $client = TheSchwartz->new(
databases => [ +{ dsn => 'dbi:SQLite:the_schwartz' } ]
);
$client->insert('MyWorker' => +{ hoge => "fuga" });
実行すると
お仕事ですよ! $VAR1 = {
'hoge' => 'fuga'
};
という風にサーバ側で表示されます。
ちなみに、DBD::SQLiteのバグで
closing dbh with active statement handles at .../Data/ObjectDriver/Driver/DBI.pm line 566.
という表示が出る様ならばココにある"issue-17603.tar.gz"のパッチを当てるといいです。どうやらこのバグ、デグレっぽいですね。ま、結局SQLiteでやっちゃったら同一マシンだし負荷分散にはならない(UIロックを避けるという意味では有用)んですけどね。