countUp できる CountDownLatch

先日、CountDownLatch の Count をインスタンス生成してから決定したいというケースがありました。
JDK標準で提供されているモノでなんとかならないかと思ったのですが、要望に合致するものが見つけられなかったので書いてみました。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class IncreasableCountDownLatch {

    private static final class Sync extends AbstractQueuedSynchronizer {

        private static final long serialVersionUID = 5190125766813327468L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        public int tryAcquireShared(int acquires) {
            return getState() == 0 ? 1 : -1;
        }

        public boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0 && releases > 0) return false;
                int nextc = c - releases;
                if (compareAndSetState(c, nextc)) return nextc == 0;
            }
        }
    }

    private final Sync sync;

    public IncreasableCountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countUp() {
        sync.releaseShared(-1);
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

}

殆ど CountDownLatch のソースと同じです。tryReleaseShared が releases を使うようになった位ですね。

書いたはいいものの、もっと良い方法があるような気もしています。何か上手いアイデアをご存知の方がいらっしゃいましたらご教授下さい。

とりあえず使用サンプルは以下に。
次のように CountDownLatch の生成時に Count が決定できないケースに利用できます。

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class IncreasableCountDownLatchTest {

    private static class Worker implements Runnable {
        private IncreasableCountDownLatch latch;
        private String text;
        Worker(IncreasableCountDownLatch latch, String text) {
            this.latch = latch;
            this.text = text;
        }
        public void run() {
            try {
                Thread.sleep(3000);
                System.out.println("do work: " + text);
            } catch (InterruptedException e) {
                Thread.interrupted();
            } finally {
                latch.countDown();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException, FileNotFoundException, IOException {
        Reader reader = new FileReader(args[0]);
        try {
            ExecutorService executor = Executors.newFixedThreadPool(3);
            try {
                BufferedReader bReader = new BufferedReader(reader, 4096);
                IncreasableCountDownLatch latch = new IncreasableCountDownLatch(0);
                for (String line = bReader.readLine(); line != null; line = bReader.readLine()) {
                    latch.countUp();
                    executor.submit(new Worker(latch, line));
                }
                latch.await();
                System.out.println("done!");
            } finally {
                executor.shutdown();
            }
        } finally {
            reader.close();
        }
    }
    
}

2009-04-10 04:40 追記

ぶ、この要求なら ExecutorService#shutdown と ExecutorService#awaitTermination で充分なことに気付きました。

ExecutorService#shutdown て過去に submit したものはちゃんと実行してくれるんですね。
ExecutorService#shutdownNow と二種類あるって把握してたはずなのになぁ。まだまだですね。

サンプルコードは以下に直せます。

public class IncreasableCountDownLatchTest {

    private static class Worker implements Runnable {
        private String text;
        Worker(String text) {
            this.text = text;
        }
        public void run() {
            try {
                Thread.sleep(3000);
                System.out.println("do work: " + text);
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException, FileNotFoundException, IOException {
        Reader reader = new FileReader(args[0]);
        try {
            ExecutorService executor = Executors.newFixedThreadPool(3);
            try {
                BufferedReader bReader = new BufferedReader(reader, 4096);
                for (String line = bReader.readLine(); line != null; line = bReader.readLine()) {
                    executor.submit(new Worker(line));
                }
            } finally {
                executor.shutdown();
            }
        } finally {
            reader.close();
        }
        executor.awaitTermination(1, TimeUnit.HOURS);
        System.out.println("done!");
    }
    
}

CountDownLatch は必要ないですねorz