ソースは昨日と同様に CoreUtils の src/sort.c です。
ソートのメイン処理は main の一番下の方にあって
if (mergeonly)
{
struct sortfile *sortfiles = xcalloc (nfiles, sizeof *sortfiles);
size_t i;
for (i = 0; i < nfiles; ++i)
sortfiles[i].name = files[i];
merge (sortfiles, 0, nfiles, outfile);
IF_LINT (free (sortfiles));
}
else
{
if (!nthreads)
{
unsigned long int np = num_processors (NPROC_CURRENT_OVERRIDABLE);
nthreads = MIN (np, DEFAULT_MAX_THREADS);
}
/* Avoid integer overflow later. */
size_t nthreads_max = SIZE_MAX / (2 * sizeof (struct merge_node));
nthreads = MIN (nthreads, nthreads_max);
sort (files, nfiles, outfile, nthreads);
}
この様に呼び出しています。mergeonly は -m を指定しない限り、false なのでデフォルトで下側に入ります。num_processors は pthread_getaffinity_np 等を使い、コアの数を取得しています。ソート時に使う同時スレッド数は DEFAULT_MAX_THREADS、8になっています。オーバーフローを避ける為に、マージソート時がデータ量の半分以上スレッドを起こしても意味がない事への配慮もあります。
この値を使って
size_t nthreads_max = SIZE_MAX / (2 * sizeof (struct merge_node));
さて sort 関数に入ります。
while (fillbuf (&buf, fp, file))
{
struct line *line;
if (buf.eof && nfiles
&& (bytes_per_line + 1
< (buf.alloc - buf.used - bytes_per_line * buf.nlines)))
fillbuf により逐次読みを行います。fillbuf は最大 256 * 1024 バイト貯まってバッファフルになるか改行文字(リミット文字)を見つけると行数をカウントアップします。
if (1 < buf.nlines)
{
struct merge_node_queue queue;
queue_init (&queue, nthreads);
struct merge_node *merge_tree =
merge_tree_init (nthreads, buf.nlines, line);
struct merge_node *root = merge_tree + 1;
sortlines (line, nthreads, buf.nlines, root,
&queue, tfp, temp_output);
queue_destroy (&queue);
pthread_mutex_destroy (&root->lock);
merge_tree_destroy (merge_tree);
}
else
write_unique (line - 1, tfp, temp_output);
貯めたバッファが1行以上であれば sortlines を呼びます。
if (nthreads > 1 && SUBTHREAD_LINES_HEURISTIC <= nlines
&& pthread_create (&thread, NULL, sortlines_thread, &args) == 0)
{
sortlines (lines - node->nlo, hi_threads, total_lines,
node->hi_child, queue, tfp, temp_output);
pthread_join (thread, NULL);
}
else
{
/* Nthreads = 1, this is a leaf NODE, or pthread_create failed.
Sort with 1 thread. */
size_t nlo = node->nlo;
size_t nhi = node->nhi;
struct line *temp = lines - total_lines;
if (1 < nhi)
sequential_sort (lines - nlo, nhi, temp - nlo / 2, false);
if (1 < nlo)
sequential_sort (lines, nlo, temp, false);
/* Update merge NODE. No need to lock yet. */
node->lo = lines;
node->hi = lines - nlo;
node->end_lo = lines - nlo;
node->end_hi = lines - nlo - nhi;
queue_insert (queue, node);
merge_loop (queue, total_lines, tfp, temp_output);
}
pthread_mutex_destroy (&node->lock);
nthreads が 2 以上で行数が 128 * 1024 未満であればスレッドを起動しています。これによりマージソートの最小比較単位に分割しているんですね。そうでない場合は、sequential_sort により、上部側と下部側に分け再帰を行いながら分割しつつ最小単位で比較し、その結果をマージしています。
static void
sequential_sort (struct line *restrict lines, size_t nlines,
struct line *restrict temp, bool to_temp)
{
if (nlines == 2)
{
/* Declare 'swap' as int, not bool, to work around a bug
<http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
in the IBM xlc 6.0.0.0 compiler in 64-bit mode. */
int swap = (0 < compare (&lines[-1], &lines[-2]));
if (to_temp)
{
temp[-1] = lines[-1 - swap];
temp[-2] = lines[-2 + swap];
}
else if (swap)
{
temp[-1] = lines[-1];
lines[-1] = lines[-2];
lines[-2] = temp[-1];
}
}
else
{
size_t nlo = nlines / 2;
size_t nhi = nlines - nlo;
struct line *lo = lines;
struct line *hi = lines - nlo;
sequential_sort (hi, nhi, temp - (to_temp ? nlo : 0), to_temp);
if (1 < nlo)
sequential_sort (lo, nlo, temp, !to_temp);
else if (!to_temp)
temp[-1] = lo[-1];
struct line *dest;
struct line const *sorted_lo;
if (to_temp)
{
dest = temp;
sorted_lo = lines;
}
else
{
dest = lines;
sorted_lo = temp;
}
mergelines (dest, nlines, sorted_lo);
}
}
mergelines は以下の通り、ポインタによるスワッピング。
static void
mergelines (struct line *restrict t, size_t nlines,
struct line const *restrict lo)
{
size_t nlo = nlines / 2;
size_t nhi = nlines - nlo;
struct line *hi = t - nlo;
while (true)
if (compare (lo - 1, hi - 1) <= 0)
{
*--t = *--lo;
if (! --nlo)
{
/* HI must equal T now, and there is no need to copy from
HI to T. */
return;
}
}
else
{
*--t = *--hi;
if (! --nhi)
{
do
*--t = *--lo;
while (--nlo);
return;
}
}
}
さて、ここで気になるのがスレッド間の競合ですが sortlines の if 文の下側
{
/* Nthreads = 1, this is a leaf NODE, or pthread_create failed.
Sort with 1 thread. */
size_t nlo = node->nlo;
size_t nhi = node->nhi;
struct line *temp = lines - total_lines;
if (1 < nhi)
sequential_sort (lines - nlo, nhi, temp - nlo / 2, false);
if (1 < nlo)
sequential_sort (lines, nlo, temp, false);
/* Update merge NODE. No need to lock yet. */
node->lo = lines;
node->hi = lines - nlo;
node->end_lo = lines - nlo;
node->end_hi = lines - nlo - nhi;
queue_insert (queue, node);
merge_loop (queue, total_lines, tfp, temp_output);
}
で、2線共にシーケンシャルソートした後、queue_insert を呼び出しています。
static void
queue_insert (struct merge_node_queue *queue, struct merge_node *node)
{
pthread_mutex_lock (&queue->mutex);
heap_insert (queue->priority_queue, node);
node->queued = true;
pthread_mutex_unlock (&queue->mutex);
pthread_cond_signal (&queue->cond);
}
ミューテックスでロックし、ヒープ領域に差し込んでいます。ねじ込んだアイテムを compare を使いながら移動位置を探します。2分探索ですね。
static void
heapify_up (void **array, size_t count,
int (*compare) (void const *, void const *))
{
size_t k = count;
void *new_element = array[k];
while (k != 1 && compare (array[k/2], new_element) <= 0)
{
array[k] = array[k/2];
k /= 2;
}
array[k] = new_element;
}
merge_loop は以下の通り。
static void
merge_loop (struct merge_node_queue *queue,
size_t total_lines, FILE *tfp, char const *temp_output)
{
while (1)
{
struct merge_node *node = queue_pop (queue);
if (node->level == MERGE_END)
{
unlock_node (node);
/* Reinsert so other threads can pop it. */
queue_insert (queue, node);
break;
}
mergelines_node (node, total_lines, tfp, temp_output);
queue_check_insert (queue, node);
queue_check_insert_parent (queue, node);
unlock_node (node);
}
}
merge_lines は queue_insert でインサートされているマージ指示に対して実際にマージ処理を行い、親ノードを指定してマージ指示を依頼しています。
queue_pop で queue からマージ指示を取り出し
static void
mergelines_node (struct merge_node *restrict node, size_t total_lines,
FILE *tfp, char const *temp_output)
{
struct line *lo_orig = node->lo;
struct line *hi_orig = node->hi;
size_t to_merge = MAX_MERGE (total_lines, node->level);
size_t merged_lo;
size_t merged_hi;
if (node->level > MERGE_ROOT)
{
/* Merge to destination buffer. */
struct line *dest = *node->dest;
while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--)
if (compare (node->lo - 1, node->hi - 1) <= 0)
*--dest = *--node->lo;
else
*--dest = *--node->hi;
merged_lo = lo_orig - node->lo;
merged_hi = hi_orig - node->hi;
if (node->nhi == merged_hi)
while (node->lo != node->end_lo && to_merge--)
*--dest = *--node->lo;
else if (node->nlo == merged_lo)
while (node->hi != node->end_hi && to_merge--)
*--dest = *--node->hi;
*node->dest = dest;
}
else
{
/* Merge directly to output. */
while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--)
{
if (compare (node->lo - 1, node->hi - 1) <= 0)
write_unique (--node->lo, tfp, temp_output);
else
write_unique (--node->hi, tfp, temp_output);
}
merged_lo = lo_orig - node->lo;
merged_hi = hi_orig - node->hi;
if (node->nhi == merged_hi)
{
while (node->lo != node->end_lo && to_merge--)
write_unique (--node->lo, tfp, temp_output);
}
else if (node->nlo == merged_lo)
{
while (node->hi != node->end_hi && to_merge--)
write_unique (--node->hi, tfp, temp_output);
}
}
/* Update NODE. */
merged_lo = lo_orig - node->lo;
merged_hi = hi_orig - node->hi;
node->nlo -= merged_lo;
node->nhi -= merged_hi;
}
大小比較しながら確保した領域内で並べ替えます。マージ対象がルートの線なら、マージ中に全ての並べ替えは完了しているはずなので、逐次アウトプットしていきます。
static void
write_unique (struct line const *line, FILE *tfp, char const *temp_output)
{
static struct line saved;
if (unique)
{
if (saved.text && ! compare (line, &saved))
return;
saved = *line;
}
write_line (line, tfp, temp_output);
}
static 変数を使って前回値を保持し、unique 出力できる様にもしてあります。以上がソートの処理です。スレッドの使い方ですが、Wikipedia のマージソート にある基本手順で言うと
- データ列を分割する(通常、等分する)
- 各々をソートする
- 二つのソートされたデータ列をマージする
3は、1と2が行った処理結果 queue を抜き取りつつマージして行きながら、別スレッドが作ったキューもついでに処理する。fifo キューなので、最初に作ったキューが最後のキュー MERGE_ROOT となり、MERGE_ROOT ノードの場合に結果出力する仕組みです。
以上のソースを見た感想としては、非常にまどろっこしくて、データ量が少なけりゃ、とても無駄な事をやっている様にも見えます。sort という、一見単純そうな機能の割にはとてもしっかりしたジョブキューイングシステムが組まれているなーと思いました。1行あたりに保持するデータ量はIA32で16から20バイトなので、ちょっと大きい感じもしますが、Wikipedia のマージソート にあるgifアニメの処理を並列で行い、しかも行のテキストを別領域に確保している訳ではないので、トータルで言うとトントンくらいなのかもなーと思いました。
もしかすると、行数の少ないソートを数百回行う様な場合はスレッドを作らない -m の方が安定した速度を得られるのかもしれません。調べてませんが。