読者です 読者をやめる 読者になる 読者になる

clojureでのrefの実装について(前半)

今日はtokyoclj#3なので、前回に引き続きclojureでの平行性実装についてみています。
前回もjavaの実装はチラ見していたのですが、clojure側の実装をみていなかったので、機能の復習もかねて整理しておきます。

clojurejavaでSTMを実装しています。java.util.concurrent.*が大活躍です。なので、clojure側の実装はあまりみても重要ではないのですが、機能を整理しておきたいのでちゃんと読んでみました。

refというのはSTMで扱うことができるオブジェクトです。STMというのはsoftware transactional memoryのことです。ロックフリーなセマンティクスで、複数の処理をatomicにできる、便利なものです。平行処理がロックフリーであるということは、複数のロックを使用して平行性を高めるとか、難しいアルゴリズムをつかうとかしなくてよいということで、楽です。また、atomincな処理にはCompare And Swapなどを使用すると思います。が、複数の値を一気にCASすることはサポートされていないので、これもやはり複数のCASを組み合わせてということをしなければならないので、大変です(the art of multiprocesser programing 461p)。

疑似言語で説明すると、
atomic {
何かの処理・・・
何かの処理・・・
何かの処理・・・
}

としたときに、atomicの内部が全体としてall or nothingになる黒魔術的なものです。

clojureのSTM実装ではMVCCを使っています(プログラミングclojure163p)。トランザクションごとにプライベートコピーをもちます。トランザクション中に値の変更に失敗すると処理が再実行されますが、例外が発生するとアボートされます。値の変更に失敗した際の再実行処理は、常に最初からやりなおすのと、失敗したら処理だけやり直すのと、好きな方が選べます。


refの機能は大まかには下記の通りです。
1. refにオブジェクトをラップする
2. ラップしたオブジェクトを取得する
3. refが参照しているオブジェクトを変更する
4. refが参照しているオブジェクトに対して関数を適用する
5. 4. より高い平行性で関数を適用する


clojureのコードは下記のような感じで書けます。

1. refにオブジェクトをラップする

(ref オブジェクト)


2. ラップしたオブジェクトを取得する

(deref refのオブジェクト)
@refのオブジェクト

3. refが参照しているオブジェクトを変更する

(dosync (ref-set refのオブジェクト 変更後のオブジェクト)

4. refが参照しているオブジェクトに対して関数を適用する

(dosync (alter refのオブジェクト 適用する関数 適用する関数に対する引数)

5. 4. より高い平行性で関数を適用する

(dosync (commute refのオブジェクト 適用する関数 適用する関数に対する引数)

commuteの更新順序はトランザクションが制御するというセマンティクスとなっているため、トランザクション間での実行順序に依存する処理はalterを使用する必要があります。また、commuteでトランザクションのコミットに失敗すると、トランザクション全体ではなく失敗したcommuteの再実行のみを行う場合があります。再実行時にcommuteがrefを読み取る場合は、コミット済みの最新の値を読み取る可能性があります。

失敗しやすい処理と成功しやすい処理を適用するのであれば、alterとcommuteを同じdosyncの中で書けばパフォーマンスが向上する可能性はありますが、トランザクションが失敗したあとの再実行のパフォーマンスなので、あまりこだわる必要はなさそうです(プログラミングclojure 165p)。


では、それぞれの機能の実装をみていきます。コードをコピーしてきていますが、もとにしているソースは今朝時点でgithubにあがっていたものなので、多分1.2.系のものだと思います。http://github.com/richhickey/clojure/blob/master/src/clj/clojure/core.clj で参照できます。

1. refにオブジェクトをラップする

 (defn ref
  "Creates and returns a Ref with an initial value of x and zero or
  more options (in any order):

  :meta metadata-map

  :validator validate-fn

  :min-history (default 0)
  :max-history (default 10)

  If metadata-map is supplied, it will be come the metadata on the
  ref. validate-fn must be nil or a side-effect-free fn of one
  argument, which will be passed the intended new state on any state
  change. If the new state is unacceptable, the validate-fn should
  return false or throw an exception. validate-fn will be called on
  transaction commit, when all refs have their final values.

  Normally refs accumulate history dynamically as needed to deal with
  read demands. If you know in advance you will need history you can
  set :min-history to ensure it will be available when first needed (instead
  of after a read fault). History is limited, and the limit can be set
  with :max-history."
  {:added "1.0"}
  ([x] (new clojure.lang.Ref x))
  ([x & options] 
   (let [r  ^clojure.lang.Ref (setup-reference (ref x) options)
         opts (apply hash-map options)]
    (when (:max-history opts)
      (.setMaxHistory r (:max-history opts)))
    (when (:min-history opts)
      (.setMinHistory r (:min-history opts)))
    r)))

metadataは型情報などを保持することができます。詳細はプログラミングclojureの54pを参照してください。
validatorを使うと、refの値についてバリデーションをかけることができます。バリデーションに失敗すると例外がスローされるので、トランザクションはアボートされます。

historyは、値の履歴です。おそらく、トランザクションが再実行される可能性に応じてmin historyを設定し、使用してもよいリソースに応じてmax historyを設定すればいいはずです。競合を想定しなければ、これもあまり考慮しなくてよいと思います。

refの関数ではclojure.lang.Refを生成しているので、そこを追いかければいいことがわかります。


2. ラップしたオブジェクトを取得する

(defn deref
  "Also reader macro: @ref/@agent/@var/@atom/@delay/@future. Within a transaction,
  returns the in-transaction-value of ref, else returns the
  most-recently-committed value of ref. When applied to a var, agent
  or atom, returns its current state. When applied to a delay, forces
  it if not already forced. When applied to a future, will block if
  computation not complete"
  {:added "1.0"}
  [^clojure.lang.IDeref ref] (.deref ref))

refにラップされた値を取得します。リーダーマクロの機能で@でも代替できます。

トランザクション内ではそのトランザクションが参照する値を返し、トランザクション外では直近でコミットされた値を返します。


3. refが参照しているオブジェクトを変更する

(defn ref-set
  "Must be called in a transaction. Sets the value of ref.
  Returns val."
  {:added "1.0"}
  [^clojure.lang.Ref ref val]
    (. ref (set val)))

refのsetを呼び出しています。

(defmacro dosync
  "Runs the exprs (in an implicit do) in a transaction that encompasses
  exprs and any nested calls.  Starts a transaction if none is already
  running on this thread. Any uncaught exception will abort the
  transaction and flow out of dosync. The exprs may be run more than
  once, but any effects on Refs will be atomic."
  {:added "1.0"}
  [& exprs]
  `(sync nil ~@exprs))

ref-set関数は、dosyncのマクロ内で実行する必要があります。
実行したスレッドでトランザクションが実行されていなければ、トランザクションは自動的に開始します。
dosyncで例外がスローされると、トランザクションはアボートされ、制御はdosyncの呼び出し元にかえります。

別のマクロsyncに処理を委譲しているので、そちらをみてみます。

(defmacro sync
  "transaction-flags => TBD, pass nil for now

  Runs the exprs (in an implicit do) in a transaction that encompasses
  exprs and any nested calls.  Starts a transaction if none is already
  running on this thread. Any uncaught exception will abort the
  transaction and flow out of sync. The exprs may be run more than
  once, but any effects on Refs will be atomic."
  {:added "1.0"}
  [flags-ignored-for-now & body]
  `(. clojure.lang.LockingTransaction
      (runInTransaction (fn [] ~@body))))

コメントはだいたいにたような感じですが、clojure.lang.LockingTransactionを読まないといけなさそうです。


4. refが参照しているオブジェクトに対して関数を適用する

(defn alter
  "Must be called in a transaction. Sets the in-transaction-value of
  ref to:

  (apply fun in-transaction-value-of-ref args)

  and returns the in-transaction-value of ref."
  {:added "1.0"}
  [^clojure.lang.Ref ref fun & args]
    (. ref (alter fun args)))

これもトランザクション内で実行する必要があります。
clojure.lang.Refのalterを呼び出しています。

5. 4. より高い平行性で関数を適用する

(defn commute
  "Must be called in a transaction. Sets the in-transaction-value of
  ref to:

  (apply fun in-transaction-value-of-ref args)

  and returns the in-transaction-value of ref.

  At the commit point of the transaction, sets the value of ref to be:

  (apply fun most-recently-committed-value-of-ref args)

  Thus fun should be commutative, or, failing that, you must accept
  last-one-in-wins behavior.  commute allows for more concurrency than
  ref-set."
  {:added "1.0"}

  [^clojure.lang.Ref ref fun & args]
    (. ref (commute fun args)))

これもトランザクション内で実行する必要があります。
clojure.lang.Refのcommuteを呼び出しています。トランザクション内でのrefの読み出しは、「そのトランザクションの値」でしたが、トランザクションのコミットポイント以降は「直近コミットされた値」となります。複数トランザクションがあったときに、実行順序がかわる可能性があり、いわゆる後勝ちのような振る舞いを許容する必要があります。

ということで、
clojure.lang.Ref
clojure.lang.LockingTransaction

を読んでいきます。(後半へ続く)