読者です 読者をやめる 読者になる 読者になる

clojureでのref実装について(後編)

では、STMのコードを読んでいきます。ほとんどjavaですね。

機能別に読んでいくとして、下記の箇所を読んでいくことになります。ちなみに、コメントはほとんどついていなかったので、僕の解説は推測と気合いです。

1. refにオブジェクトをラップする
2. ラップしたオブジェクトを取得する
3. refが参照しているオブジェクトを変更する
4. refが参照しているオブジェクトに対して関数を適用する
5. 4. より高い平行性で関数を適用する



1. refにオブジェクトをラップする
clojure.lang.Refのコンストラクタ

public Ref(Object initVal) throws Exception{
	this(initVal, null);
}

別なコンストラクタをよんでます。

public Ref(Object initVal,IPersistentMap meta) throws Exception{
    super(meta);
    this.id = ids.getAndIncrement();
	this.faults = new AtomicInteger();
	this.lock = new ReentrantReadWriteLock();
	tvals = new TVal(initVal, 0, System.currentTimeMillis());
}

値が生成されます。普通ですね。


2. ラップしたオブジェクトを取得する
clojure.lang.IDeref#derefで宣言されていて、clojure.lang.Refが実態です。

public Object deref(){
	LockingTransaction t = LockingTransaction.getRunning();
	if(t == null)
		return currentVal();
	return t.doGet(this);
}

LockingTransaction#getRunningでは、生きているトランザクションを取得します。transactionはthread localなので、スレッドごとのトランザクションですね。

static LockingTransaction getRunning(){
	LockingTransaction t = transaction.get();
	if(t == null || t.info == null)
		return null;
	return t;
}

トランザクションが発生していなければ、Refの読み取りロックを取得して値を返します。Ref#currentValに入ります。

Object currentVal(){
	try
		{
		lock.readLock().lock();
		if(tvals != null)
			return tvals.val;
		throw new IllegalStateException(this.toString() + " is unbound.");
		}
	finally
		{
		lock.readLock().unlock();
		}
}

トランザクションが発生していれば、トランザクション中の値を取得しにいきます。複数のトランザクションが走っていても、ほかのトランザクションが未コミットの状態であればその値は見えません。clojureでは、トランザクションごとに値をコピーすることによって実現しています。トランザクションごとの値なので、LockingTransaction#doGetに聞きにいきます。

Object doGet(Ref ref){
	if(!info.running())
		throw retryex;
	if(vals.containsKey(ref))
		return vals.get(ref);
	try
		{
		ref.lock.readLock().lock();
		if(ref.tvals == null)
			throw new IllegalStateException(ref.toString() + " is unbound.");
		Ref.TVal ver = ref.tvals;
		do
			{
			if(ver.point <= readPoint)
				return ver.val;
			} while((ver = ver.prior) != ref.tvals);
		}
	finally
		{
		ref.lock.readLock().unlock();
		}
	//no version of val precedes the read point
	ref.faults.incrementAndGet();
	throw retryex;
}

トランザクションのread pointの値がそのTValの値より小さければ(writeが発生していなければ)、TValの値をそのまま返します。トランザクション中にその値を更新していれば、更新している値が欲しいので、TValを操作して値を探しにいきます。


x. トランザクション内でrefの操作を行う
3. 4. 5. ともにトランザクション内で実行する必要があるため、トランザクション内で何かを実行するコードを3. 4. 5. の前に読んでおきます。実装箇所は、clojure.lang.LockingTransaction#runInTransactionですね。

static public Object runInTransaction(Callable fn) throws Exception{
	LockingTransaction t = transaction.get();
	if(t == null)
		transaction.set(t = new LockingTransaction());

	if(t.info != null)
		return fn.call();

	return t.run(fn);
}

clojureでは、

(dosync (ここの中身))

というような書き方をすると、この中身がどこかでjava.util.concurrent.Callableに変換されて実行されている様です。clojure処理系の実装はみてないのであれですが、S式の中身をリフレクションでもりもり変換してるのかもしれません。

それで、dosyncの中身をCallableで受け取った後、 transactionを取得します。なければ生成します。
初回の実行時にはトランザクションの競合を考慮する必要がないため、おもむろにfn.callします。競合検知しながらfnを実行するには、t.run(fn)を呼び出します。


clojure.lang.LockingTransaction#runがトランザクション処理の本丸です。
ちょっと1メソッドが長いので、概要だけ先に整理しておきます。

1. トランザクションのコミットまでに競合を検知したら、再実行を試みます
2. 変更を実行するため、read pointとトランザクション開始時間を設定します
3. 関数適用または値の更新を行います。commuteの関数適用はここでは行いません
4. ステータスをCASでcommitingに変更します
5. ensureされているものについて再度ロックを取得し直しつつ、トランザクション内で実行するすべてのcommuteを処理します
6. 変更対象のrefの書き込みロックを取得します
7. バリデーターが設定されていれば、バリデーションを実行します
8. lastpointを+1します
9. ref.tvalsに新しい値を追加します。ここでトランザクションがの再実行時には、historyを保持できる許容量のレンジをオーバーしていれば、変更対象のref.tvalsのうち一番最後のものの値を更新しにいきます。トランザクションがうまくいっていれば値の履歴を持っておく必要があまりないので、(たぶん)楽観的に直接値を更新しにいきます
10. トランザクションのステータスをCOMMITEDに書き換えます
11. 取得したrefの書き込みロックを取得した逆順に解放します。
12. ensureしたものの読み込みロックを解放します

Object run(Callable fn) throws Exception{
	boolean done = false;
	Object ret = null;
	ArrayList<Ref> locked = new ArrayList<Ref>();
	ArrayList<Notify> notify = new ArrayList<Notify>();

	for(int i = 0; !done && i < RETRY_LIMIT; i++)
		{
		try
			{
			getReadPoint();
			if(i == 0)
				{
				startPoint = readPoint;
				startTime = System.nanoTime();
				}
			info = new Info(RUNNING, startPoint);
			ret = fn.call();
			//make sure no one has killed us before this point, and can't from now on
			if(info.status.compareAndSet(RUNNING, COMMITTING))
				{
				for(Map.Entry<Ref, ArrayList<CFn>> e : commutes.entrySet())
					{
					Ref ref = e.getKey();
					if(sets.contains(ref)) continue;
					
					boolean wasEnsured = ensures.contains(ref);
					//can't upgrade readLock, so release it
					releaseIfEnsured(ref);
					tryWriteLock(ref);
					locked.add(ref);
					if(wasEnsured && ref.tvals != null && ref.tvals.point > readPoint)
						throw retryex;

					Info refinfo = ref.tinfo;
					if(refinfo != null && refinfo != info && refinfo.running())
						{
						if(!barge(refinfo))
							throw retryex;
						}
					Object val = ref.tvals == null ? null : ref.tvals.val;
					vals.put(ref, val);
					for(CFn f : e.getValue())
						{
						vals.put(ref, f.fn.applyTo(RT.cons(vals.get(ref), f.args)));
						}
					}
				for(Ref ref : sets)
					{
					tryWriteLock(ref);
					locked.add(ref);
					}

				//validate and enqueue notifications
				for(Map.Entry<Ref, Object> e : vals.entrySet())
					{
					Ref ref = e.getKey();
					ref.validate(ref.getValidator(), e.getValue());
					}

				//at this point, all values calced, all refs to be written locked
				//no more client code to be called
				long msecs = System.currentTimeMillis();
				long commitPoint = getCommitPoint();
				for(Map.Entry<Ref, Object> e : vals.entrySet())
					{
					Ref ref = e.getKey();
					Object oldval = ref.tvals == null ? null : ref.tvals.val;
					Object newval = e.getValue();
					int hcount = ref.histCount();

					if(ref.tvals == null)
						{
						ref.tvals = new Ref.TVal(newval, commitPoint, msecs);
						}
					else if((ref.faults.get() > 0 && hcount < ref.maxHistory)
							|| hcount < ref.minHistory)
						{
						ref.tvals = new Ref.TVal(newval, commitPoint, msecs, ref.tvals);
						ref.faults.set(0);
						}
					else
						{
						ref.tvals = ref.tvals.next;
						ref.tvals.val = newval;
						ref.tvals.point = commitPoint;
						ref.tvals.msecs = msecs;
						}
					if(ref.getWatches().count() > 0)
						notify.add(new Notify(ref, oldval, newval));
					}

				done = true;
				info.status.set(COMMITTED);
				}
			}
		catch(RetryEx retry)
			{
			//eat this so we retry rather than fall out
			}
		finally
			{
			for(int k = locked.size() - 1; k >= 0; --k)
				{
				locked.get(k).lock.writeLock().unlock();
				}
			locked.clear();
			for(Ref r : ensures)
				{
				r.lock.readLock().unlock();
				}
			ensures.clear();
			stop(done ? COMMITTED : RETRY);
			try
				{
				if(done) //re-dispatch out of transaction
					{
					for(Notify n : notify)
						{
						n.ref.notifyWatches(n.oldval, n.newval);
						}
					for(Agent.Action action : actions)
						{
						Agent.dispatchAction(action);
						}
					}
				}
			finally
				{
				notify.clear();
				actions.clear();
				}
			}
		}
	if(!done)
		throw new Exception("Transaction failed after reaching retry limit");
	return ret;
}

結構大変ですね。refに対してnotifyWatchesしたりAgent.dispatchActionしたりしているのですが、ref以外のところのコードを読んでないので、まだこの辺はわかってません。


3. refが参照しているオブジェクトを変更する
clojure.lang.Ref#setで実装されています。

public Object set(Object val){
	return LockingTransaction.getEx().doSet(this, val);
}

getExはgetRunningとほぼ同じですが、トランザクションが発生していなければ例外をスローします。名前びみょいですが。

static LockingTransaction getEx(){
	LockingTransaction t = transaction.get();
	if(t == null || t.info == null)
		throw new IllegalStateException("No transaction running");
	return t;
}

で、LockingTransaction#doSetでトランザクション内の値にセットしにいきます。

Object doSet(Ref ref, Object val){
	if(!info.running())
		throw retryex;
	if(commutes.containsKey(ref))
		throw new IllegalStateException("Can't set after commute");
	if(!sets.contains(ref))
		{
		sets.add(ref);
		lock(ref);
		}
	vals.put(ref, val);
	return val;
}

1. トランザクションが変更対象とするrefの集合(sets)に変更隊h層を追加
2. LockingTransaction#lockで書き込みロックを取得する
3. 変更対象とするrefと上書きする値を変更対象のマップ(vals)追加します

この時点では値の書き換えは(当然)行われず、LockingTransaction#runで値の確定をしにいきます。トランザクションローカルな値を確保しにいってる感じですね。LockingTransaction#doGetではvalsに実態を聞きにいくので、トランザクション内ではここで適用した値が正なものとして取り扱われるようになっています。
また、commuteの後は値を書き換えられないこと(というかトランザクションでcommuteでの関数適用は実行順序を制御するので、clojureの整合性の制御の問題ではあるけど)、言い換えれば、値が更新された後じゃないとcommuteの関数適用をしないような実装になっていることが見て取れます。

LockingTransaction#lockでは、書き込みロックを取得しにいきます。
//returns the most recent val

Object lock(Ref ref){
	//can't upgrade readLock, so release it
	releaseIfEnsured(ref);

	boolean unlocked = true;
	try
		{
		tryWriteLock(ref);
		unlocked = false;

		if(ref.tvals != null && ref.tvals.point > readPoint)
			throw retryex;
		Info refinfo = ref.tinfo;

		//write lock conflict
		if(refinfo != null && refinfo != info && refinfo.running())
			{
			if(!barge(refinfo))
				{
				ref.lock.writeLock().unlock();
				unlocked = true;
				return blockAndBail(refinfo);
				}
			}
		ref.tinfo = info;
		return ref.tvals == null ? null : ref.tvals.val;
		}
	finally
		{
		if(!unlocked)
			ref.lock.writeLock().unlock();
		}
}


4. refが参照しているオブジェクトに対して関数を適用する
clojure.lang.Ref#alterですね。

public Object alter(IFn fn, ISeq args) throws Exception{
	LockingTransaction t = LockingTransaction.getEx();
	return t.doSet(this, fn.applyTo(RT.cons(t.doGet(this), args)));
}

実態はLockingTransaction.doSetです。割とシンプルに、t.doGetしたin-transaction-valueに対して関数を適用し、doSetしにいきます。


5. 4. より高い平行性で関数を適用する
clojure.lang.Ref#commuteです。LockingTransaction#runでもちらっと出てきましたが、alterとはだいぶ違うことをします。

public Object commute(IFn fn, ISeq args) throws Exception{
	return LockingTransaction.getEx().doCommute(this, fn, args);
}

LockingTransaction#doCommuteの呼び出しが実行されます。

Object doCommute(Ref ref, IFn fn, ISeq args) throws Exception{
	if(!info.running())
		throw retryex;
	if(!vals.containsKey(ref))
		{
		Object val = null;
		try
			{
			ref.lock.readLock().lock();
			val = ref.tvals == null ? null : ref.tvals.val;
			}
		finally
			{
			ref.lock.readLock().unlock();
			}
		vals.put(ref, val);
		}
	ArrayList<CFn> fns = commutes.get(ref);
	if(fns == null)
		commutes.put(ref, fns = new ArrayList<CFn>());
	fns.add(new CFn(fn, args));
	Object ret = fn.applyTo(RT.cons(vals.get(ref), args));
	vals.put(ref, ret);
	return ret;
}

ここでも、doSetと同様に、値の更新はせずに、変更対象としてvalsで管理、commutesに適用する関数をおいておく、という感じです。

1. 読み込みロックを取得して関数の適用対象となるTValを取得し、変更対象に追加します
2. commuteの再実行時にはcommuteだけ実行するので、値に適用するcommuteを保持しておきます。commutesに入れます。
3. commutesに値を入れたら、トランザクション内だけで見えるrefに関数を適用して値を返します。vals.get(ref)の部分が、そのトランザクションでしか見えないrefです。

3. があるので、setsとvalsは別に管理しているんだろうなということがわかります。



とりあえずこんな感じですね。結構がんばりました。