2012/06/27


昨日は rm を読みました。今日は、ずいぶん前に「GNU textutilsに入ってるsort(1)にはコア数によって動的にスレッドを生成してソートする処理が入ってるのでそういうの興味ある人はコード読むといいと思います。」と言ってた部分を読もうと思います。
ソースは昨日と同様に CoreUtils の src/sort.c です。
ソートのメイン処理は main の一番下の方にあって   if (mergeonly)
    {
      struct sortfile *sortfiles = xcalloc (nfiles, sizeof *sortfiles);
      size_t i;

      for (i = 0; i < nfiles; ++i)
        sortfiles[i].name = files[i];

      merge (sortfiles, 0, nfiles, outfile);
      IF_LINT (free (sortfiles));
    }
  else
    {
      if (!nthreads)
        {
          unsigned long int np = num_processors (NPROC_CURRENT_OVERRIDABLE);
          nthreads = MIN (np, DEFAULT_MAX_THREADS);
        }

      /* Avoid integer overflow later.  */
      size_t nthreads_max = SIZE_MAX / (2 * sizeof (struct merge_node));
      nthreads = MIN (nthreads, nthreads_max);

      sort (files, nfiles, outfile, nthreads);
    }
この様に呼び出しています。mergeonly は -m を指定しない限り、false なのでデフォルトで下側に入ります。
num_processors は pthread_getaffinity_np 等を使い、コアの数を取得しています。ソート時に使う同時スレッド数は DEFAULT_MAX_THREADS、8になっています。オーバーフローを避ける為に、マージソート時がデータ量の半分以上スレッドを起こしても意味がない事への配慮もあります。
この値を使って       size_t nthreads_max = SIZE_MAX / (2 * sizeof (struct merge_node));
さて sort 関数に入ります。       while (fillbuf (&buf, fp, file))
        {
          struct line *line;

          if (buf.eof && nfiles
              && (bytes_per_line + 1
                  < (buf.alloc - buf.used - bytes_per_line * buf.nlines)))
fillbuf により逐次読みを行います。fillbuf は最大 256 * 1024 バイト貯まってバッファフルになるか改行文字(リミット文字)を見つけると行数をカウントアップします。           if (1 < buf.nlines)
            {
              struct merge_node_queue queue;
              queue_init (&queue, nthreads);
              struct merge_node *merge_tree =
                merge_tree_init (nthreads, buf.nlines, line);
              struct merge_node *root = merge_tree + 1;

              sortlines (line, nthreads, buf.nlines, root,
                         &queue, tfp, temp_output);
              queue_destroy (&queue);
              pthread_mutex_destroy (&root->lock);
              merge_tree_destroy (merge_tree);
            }
          else
            write_unique (line - 1, tfp, temp_output);
貯めたバッファが1行以上であれば sortlines を呼びます。   if (nthreads > 1 && SUBTHREAD_LINES_HEURISTIC <= nlines
      && pthread_create (&thread, NULL, sortlines_thread, &args) == 0)
    {
      sortlines (lines - node->nlo, hi_threads, total_lines,
                 node->hi_child, queue, tfp, temp_output);
      pthread_join (thread, NULL);
    }
  else
    {
      /* Nthreads = 1, this is a leaf NODE, or pthread_create failed.
         Sort with 1 thread. */
      size_t nlo = node->nlo;
      size_t nhi = node->nhi;
      struct line *temp = lines - total_lines;
      if (1 < nhi)
        sequential_sort (lines - nlo, nhi, temp - nlo / 2false);
      if (1 < nlo)
        sequential_sort (lines, nlo, temp, false);

      /* Update merge NODE. No need to lock yet. */
      node->lo = lines;
      node->hi = lines - nlo;
      node->end_lo = lines - nlo;
      node->end_hi = lines - nlo - nhi;

      queue_insert (queue, node);
      merge_loop (queue, total_lines, tfp, temp_output);
    }

  pthread_mutex_destroy (&node->lock);
nthreads が 2 以上で行数が 128 * 1024 未満であればスレッドを起動しています。これによりマージソートの最小比較単位に分割しているんですね。そうでない場合は、sequential_sort により、上部側と下部側に分け再帰を行いながら分割しつつ最小単位で比較し、その結果をマージしています。 static void
sequential_sort (struct line *restrict lines, size_t nlines,
                 struct line *restrict temp, bool to_temp)
{
  if (nlines == 2)
    {
      /* Declare 'swap' as int, not bool, to work around a bug
         <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
         in the IBM xlc 6.0.0.0 compiler in 64-bit mode.  */
      int swap = (0 < compare (&lines[-1], &lines[-2]));
      if (to_temp)
        {
          temp[-1] = lines[-1 - swap];
          temp[-2] = lines[-2 + swap];
        }
      else if (swap)
        {
          temp[-1] = lines[-1];
          lines[-1] = lines[-2];
          lines[-2] = temp[-1];
        }
    }
  else
    {
      size_t nlo = nlines / 2;
      size_t nhi = nlines - nlo;
      struct line *lo = lines;
      struct line *hi = lines - nlo;

      sequential_sort (hi, nhi, temp - (to_temp ? nlo : 0), to_temp);
      if (1 < nlo)
        sequential_sort (lo, nlo, temp, !to_temp);
      else if (!to_temp)
        temp[-1] = lo[-1];

      struct line *dest;
      struct line const *sorted_lo;
      if (to_temp)
        {
          dest = temp;
          sorted_lo = lines;
        }
      else
        {
          dest = lines;
          sorted_lo = temp;
        }
      mergelines (dest, nlines, sorted_lo);
    }
}
mergelines は以下の通り、ポインタによるスワッピング。 static void
mergelines (struct line *restrict t, size_t nlines,
            struct line const *restrict lo)
{
  size_t nlo = nlines / 2;
  size_t nhi = nlines - nlo;
  struct line *hi = t - nlo;

  while (true)
    if (compare (lo - 1, hi - 1) <= 0)
      {
        *--t = *--lo;
        if (! --nlo)
          {
            /* HI must equal T now, and there is no need to copy from
               HI to T. */
            return;
          }
      }
    else
      {
        *--t = *--hi;
        if (! --nhi)
          {
            do
              *--t = *--lo;
            while (--nlo);

            return;
          }
      }
}
さて、ここで気になるのがスレッド間の競合ですが sortlines の if 文の下側     {
      /* Nthreads = 1, this is a leaf NODE, or pthread_create failed.
         Sort with 1 thread. */
      size_t nlo = node->nlo;
      size_t nhi = node->nhi;
      struct line *temp = lines - total_lines;
      if (1 < nhi)
        sequential_sort (lines - nlo, nhi, temp - nlo / 2false);
      if (1 < nlo)
        sequential_sort (lines, nlo, temp, false);

      /* Update merge NODE. No need to lock yet. */
      node->lo = lines;
      node->hi = lines - nlo;
      node->end_lo = lines - nlo;
      node->end_hi = lines - nlo - nhi;

      queue_insert (queue, node);
      merge_loop (queue, total_lines, tfp, temp_output);
    }
で、2線共にシーケンシャルソートした後、queue_insert を呼び出しています。 static void
queue_insert (struct merge_node_queue *queue, struct merge_node *node)
{
  pthread_mutex_lock (&queue->mutex);
  heap_insert (queue->priority_queue, node);
  node->queued = true;
  pthread_mutex_unlock (&queue->mutex);
  pthread_cond_signal (&queue->cond);
}
ミューテックスでロックし、ヒープ領域に差し込んでいます。ねじ込んだアイテムを compare を使いながら移動位置を探します。2分探索ですね。 static void
heapify_up (void **array, size_t count,
            int (*compare) (void const *, void const *))
{
  size_t k = count;
  void *new_element = array[k];

  while (k != 1 && compare (array[k/2], new_element) <= 0)
    {
      array[k] = array[k/2];
      k /= 2;
    }

  array[k] = new_element;
}
merge_loop は以下の通り。 static void
merge_loop (struct merge_node_queue *queue,
            size_t total_lines, FILE *tfp, char const *temp_output)
{
  while (1)
    {
      struct merge_node *node = queue_pop (queue);

      if (node->level == MERGE_END)
        {
          unlock_node (node);
          /* Reinsert so other threads can pop it. */
          queue_insert (queue, node);
          break;
        }
      mergelines_node (node, total_lines, tfp, temp_output);
      queue_check_insert (queue, node);
      queue_check_insert_parent (queue, node);

      unlock_node (node);
    }
}
merge_lines は queue_insert でインサートされているマージ指示に対して実際にマージ処理を行い、親ノードを指定してマージ指示を依頼しています。 queue_pop で queue からマージ指示を取り出し static void
mergelines_node (struct merge_node *restrict node, size_t total_lines,
                 FILE *tfp, char const *temp_output)
{
  struct line *lo_orig = node->lo;
  struct line *hi_orig = node->hi;
  size_t to_merge = MAX_MERGE (total_lines, node->level);
  size_t merged_lo;
  size_t merged_hi;

  if (node->level > MERGE_ROOT)
    {
      /* Merge to destination buffer. */
      struct line *dest = *node->dest;
      while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--)
        if (compare (node->lo - 1, node->hi - 1) <= 0)
          *--dest = *--node->lo;
        else
          *--dest = *--node->hi;

      merged_lo = lo_orig - node->lo;
      merged_hi = hi_orig - node->hi;

      if (node->nhi == merged_hi)
        while (node->lo != node->end_lo && to_merge--)
          *--dest = *--node->lo;
      else if (node->nlo == merged_lo)
        while (node->hi != node->end_hi && to_merge--)
          *--dest = *--node->hi;
      *node->dest = dest;
    }
  else
    {
      /* Merge directly to output. */
      while (node->lo != node->end_lo && node->hi != node->end_hi && to_merge--)
        {
          if (compare (node->lo - 1, node->hi - 1) <= 0)
            write_unique (--node->lo, tfp, temp_output);
          else
            write_unique (--node->hi, tfp, temp_output);
        }

      merged_lo = lo_orig - node->lo;
      merged_hi = hi_orig - node->hi;

      if (node->nhi == merged_hi)
        {
          while (node->lo != node->end_lo && to_merge--)
            write_unique (--node->lo, tfp, temp_output);
        }
      else if (node->nlo == merged_lo)
        {
          while (node->hi != node->end_hi && to_merge--)
            write_unique (--node->hi, tfp, temp_output);
        }
    }

  /* Update NODE. */
  merged_lo = lo_orig - node->lo;
  merged_hi = hi_orig - node->hi;
  node->nlo -= merged_lo;
  node->nhi -= merged_hi;
}
大小比較しながら確保した領域内で並べ替えます。マージ対象がルートの線なら、マージ中に全ての並べ替えは完了しているはずなので、逐次アウトプットしていきます。
static void
write_unique (struct line const *line, FILE *tfp, char const *temp_output)
{
  static struct line saved;

  if (unique)
    {
      if (saved.text && ! compare (line, &saved))
        return;
      saved = *line;
    }

  write_line (line, tfp, temp_output);
}
static 変数を使って前回値を保持し、unique 出力できる様にもしてあります。

以上がソートの処理です。スレッドの使い方ですが、Wikipedia のマージソート にある基本手順で言うと
  1. データ列を分割する(通常、等分する)
  2. 各々をソートする
  3. 二つのソートされたデータ列をマージする
1、2の部分は排他無しに最小単位になるまでスレッドを作り、最小単位になった場合は後半戦として、sequential_sort を行ってそのマージを依頼する。
3は、1と2が行った処理結果 queue を抜き取りつつマージして行きながら、別スレッドが作ったキューもついでに処理する。fifo キューなので、最初に作ったキューが最後のキュー MERGE_ROOT となり、MERGE_ROOT ノードの場合に結果出力する仕組みです。

以上のソースを見た感想としては、非常にまどろっこしくて、データ量が少なけりゃ、とても無駄な事をやっている様にも見えます。sort という、一見単純そうな機能の割にはとてもしっかりしたジョブキューイングシステムが組まれているなーと思いました。1行あたりに保持するデータ量はIA32で16から20バイトなので、ちょっと大きい感じもしますが、Wikipedia のマージソート にあるgifアニメの処理を並列で行い、しかも行のテキストを別領域に確保している訳ではないので、トータルで言うとトントンくらいなのかもなーと思いました。
もしかすると、行数の少ないソートを数百回行う様な場合はスレッドを作らない -m の方が安定した速度を得られるのかもしれません。調べてませんが。
Posted at by



2012/06/26


何に関連して記事を書こうと思った訳でもないです。たんなる一人コードリーディングです。
GNU CoreUtils に入ってる rm を読みました。
GNU Project Archives
http://ftp.gnu.org/gnu/coreutils/
読んだのは coreutils-8.17.tar.xz に入ってる src/rm.c
preserve_root 変数は 203行目にある main で int
main (int argc, char **argv)
{
  bool preserve_root = true;
  struct rm_options x;
true に初期化されていて319行目   if (x.recursive && preserve_root)
    {
      static struct dev_ino dev_ino_buf;
      x.root_dev_ino = get_root_dev_ino (&dev_ino_buf);
      if (x.root_dev_ino == NULL)
        error (EXIT_FAILURE, errno, _("failed to get attributes of %s"),
               quote ("/"));
    }
で参照される。x.recursive は -r もしくは -R が指定された場合に true となる。つまり「--no-preserve-root」が指定されていない場合に入る。get_root_dev_ino は lib/root-dev-ino.c にある。   if (lstat ("/", &statbuf))
    return NULL;
ド頭で / を確認しているのでもしlstatに失敗したら上記の319行目のコードは       if (x.root_dev_ino == NULL)
        error (EXIT_FAILURE, errno, _("failed to get attributes of %s"),
               quote ("/"));
に遷移する。error 関数は lib/error.c にあり、そこから error_tail へ。   if (status)
    exit (status);
error 関数は EXIT_FAILURE (値1)で呼び出しているのでこの後はプログラムは終了。つまり --no-preserve-root を付けた場合で / のデバイス情報が取れる場合だけ削除処理に入る。
逆に上記の x.recursive が false つまり、-r や -R を指定しなかった場合、src/remove.c にある rm 関数に入る。rm 関数では引数 file は / として渡り、xfts_open 関数(fts_open の wrapper)へ渡り、ディレクトリリストの一部として rm_fts に渡る。/ はディレクトリなので rm_fts でいきなり入る switch 文   switch (ent->fts_info)
    {
    case FTS_D:         /* preorder directory */
      if (! x->recursive)
        {
          /* This is the first (pre-order) encounter with a directory.
             Not recursive, so arrange to skip contents.  */
          error (0EISDIR, _("cannot remove %s"), quote (ent->fts_path));
          mark_ancestor_dirs (ent);
          fts_skip_tree (fts, ent);
          return RM_ERROR;
        }
の FTS_D のケースに入る。ここには x.recursive は false で入ったはずなので、ブロック内のエラーメッセージが表示され、配下のファイルもスキップされる。関数の戻り値は RM_ERROR。rm 関数でエラーコード rm_status が s で更新される。ディレクトリリストとしては / だけなのでそのまま終了。戻り値は RM_ERROR。main に戻ってきて   exit (status == RM_ERROR ? EXIT_FAILURE : EXIT_SUCCESS);
EXIT_FAILURE で exit してプログラム終了。

-r --no-root-preserve の場合は       if (ent->fts_level == FTS_ROOTLEVEL)
        {
          if (strip_trailing_slashes (ent->fts_path))
            ent->fts_pathlen = strlen (ent->fts_path);

          /* If the basename of a command line argument is "." or "..",
             diagnose it and do nothing more with that argument.  */
          if (dot_or_dotdot (last_component (ent->fts_accpath)))
            {
              error (00, _("cannot remove directory: %s"),
                     quote (ent->fts_path));
              fts_skip_tree (fts, ent);
              return RM_ERROR;
            }

          /* If a command line argument resolves to "/" (and --preserve-root
             is in effect -- default) diagnose and skip it.  */
          if (ROOT_DEV_INO_CHECK (x->root_dev_ino, ent->fts_statp))
            {
              ROOT_DEV_INO_WARN (ent->fts_path);
              fts_skip_tree (fts, ent);
              return RM_ERROR;
            }
        }
上記で得た x.root_dev_ino を使いガードされている。
まとめると

GNU CoreUtils に入っている rm を使っている限り、--no-preserve-root を指定しないと / は消せない

事になる。冗長的に言うと # rm -r /$FOO は、対話型でないシェルスクリプトから起動したとしても、-f を付けたとしても、ユーザが root であったとしても関係無く、--no-preserve-root を付けないと消せない という事になり、さらに $FOO が宣言されていなくて # rm -r / になったとしても消せない、という事が分かった。

以上、コードリーディング終了。
Posted at by



2012/06/18


「JSX 書くなら Vim だよね」というのが当たり前になる明るい未来を目指して jsx.vim に何個か pull req を送りました!
jsx/jsx.vim - GitHub
https://github.com/jsx/jsx.vim
pull reqを送った機能は次のとおりです。
  • コンパイラプラグインの追加
  • 補完機能の追加
簡単に説明していきます。

コンパイラプラグインの追加

デフォルトではオンになっていないので、:compiler jsx にしておくか、vimrc で以下を実行する必要があります。 autocmd FileType jsx compiler jsx
この状態で :make を実行すると quickfix にエラーが表示されます。vim-hier と併用すると以下の様になります。
jsx-quickfix

補完機能の追加

manga_osyo さんが neocomplcache での実装を作ってますが、こちらは通常の omni 補完として実装しました。
jsx-complete
現在は kazuho/omnifunc ブランチに取り込まれてますが、jsx に補完機能が入ればmasterにマージされるかと思います。ただし kazuho さんが言うには jsx コマンドが出力するフォーマットが変わる可能性があるとの事なので、変わった際にもお手伝い出来ればと思っています。
以上、jsx.vim に pull req を送った説明でした!

何はともあれ、JSX 書くなら Vim ですよね!!

参考資料

JSX の Emacs 環境を整備してみた ~ jsx-mode.el 0.1.0 をリリースしました ~ - あらびき日記
Posted at by