3261N/A * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
893N/A *
893N/A * This code is free software; you can redistribute it and/or modify it
893N/A * under the terms of the GNU General Public License version 2 only, as
893N/A * published by the Free Software Foundation.
893N/A *
893N/A * This code is distributed in the hope that it will be useful, but WITHOUT
893N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
893N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
893N/A * version 2 for more details (a copy is included in the LICENSE file that
893N/A * accompanied this code).
893N/A *
893N/A * You should have received a copy of the GNU General Public License version
893N/A * 2 along with this work; if not, write to the Free Software Foundation,
893N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
893N/A *
2362N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
2362N/A * or visit www.oracle.com if you need additional information or have any
2362N/A * questions.
893N/A */
893N/A/* @test
1580N/A * @bug 4607272 6822643 6830721 6842687
893N/A * @summary Unit test for AsynchronousFileChannel
893N/A */
893N/Aimport java.nio.file.*;
893N/Aimport java.nio.channels.*;
893N/Aimport java.nio.ByteBuffer;
893N/Aimport java.io.File;
893N/Aimport java.io.IOException;
893N/Aimport java.util.*;
893N/Aimport java.util.concurrent.*;
893N/Aimport java.util.concurrent.atomic.AtomicReference;
893N/Aimport static java.nio.file.StandardOpenOption.*;
893N/Apublic class Basic {
893N/A private static final Random rand = new Random();
893N/A public static void main(String[] args) throws IOException {
893N/A // create temporary file
893N/A File blah = File.createTempFile("blah", null);
893N/A blah.deleteOnExit();
2546N/A AsynchronousFileChannel ch = AsynchronousFileChannel
893N/A .open(blah.toPath(), READ, WRITE);
2546N/A try {
2546N/A // run tests
2546N/A testUsingCompletionHandlers(ch);
2546N/A testUsingWaitOnResult(ch);
2546N/A testInterruptHandlerThread(ch);
2546N/A } finally {
2546N/A ch.close();
2546N/A }
2546N/A // run test that expects channel to be closed
893N/A testClosedChannel(ch);
893N/A // these tests open the file themselves
1109N/A testLocking(blah.toPath());
893N/A testCustomThreadPool(blah.toPath());
893N/A testAsynchronousClose(blah.toPath());
893N/A testCancel(blah.toPath());
893N/A testTruncate(blah.toPath());
2546N/A // eagerly clean-up
2546N/A blah.delete();
893N/A }
893N/A /*
893N/A * Generate buffer with random contents
893N/A * Writes buffer to file using a CompletionHandler to consume the result
893N/A * of each write operation
893N/A * Reads file to EOF to a new buffer using a CompletionHandler to consume
893N/A * the result of each read operation
893N/A * Compares buffer contents
893N/A */
893N/A static void testUsingCompletionHandlers(AsynchronousFileChannel ch)
893N/A throws IOException
893N/A {
893N/A System.out.println("testUsingCompletionHandlers");
893N/A ch.truncate(0L);
893N/A // generate buffer with random elements and write it to file
893N/A ByteBuffer src = genBuffer();
893N/A writeFully(ch, src, 0L);
893N/A // read to EOF or buffer is full
893N/A ByteBuffer dst = (rand.nextBoolean()) ?
893N/A ByteBuffer.allocateDirect(src.capacity()) :
893N/A ByteBuffer.allocate(src.capacity());
893N/A readAll(ch, dst, 0L);
893N/A // check buffers are the same
893N/A src.flip();
893N/A dst.flip();
893N/A if (!src.equals(dst)) {
893N/A throw new RuntimeException("Contents differ");
893N/A }
893N/A }
893N/A /*
893N/A * Generate buffer with random contents
893N/A * Writes buffer to file, invoking the Future's get method to wait for
893N/A * each write operation to complete
893N/A * Reads file to EOF to a new buffer, invoking the Future's get method to
893N/A * wait for each write operation to complete
893N/A * Compares buffer contents
893N/A */
893N/A static void testUsingWaitOnResult(AsynchronousFileChannel ch)
893N/A throws IOException
893N/A {
893N/A System.out.println("testUsingWaitOnResult");
893N/A ch.truncate(0L);
893N/A // generate buffer
893N/A ByteBuffer src = genBuffer();
893N/A // write buffer completely to file
893N/A long position = 0L;
893N/A while (src.hasRemaining()) {
893N/A Future<Integer> result = ch.write(src, position);
893N/A try {
893N/A int n = result.get();
893N/A // update position
893N/A position += n;
893N/A } catch (ExecutionException x) {
893N/A throw new RuntimeException(x.getCause());
893N/A } catch (InterruptedException x) {
893N/A throw new RuntimeException(x);
893N/A }
893N/A }
893N/A // read file into new buffer
893N/A ByteBuffer dst = (rand.nextBoolean()) ?
893N/A ByteBuffer.allocateDirect(src.capacity()) :
893N/A ByteBuffer.allocate(src.capacity());
893N/A position = 0L;
893N/A int n;
893N/A do {
893N/A Future<Integer> result = ch.read(dst, position);
893N/A try {
893N/A n = result.get();
893N/A // update position
893N/A if (n > 0) position += n;
893N/A } catch (ExecutionException x) {
893N/A throw new RuntimeException(x.getCause());
893N/A } catch (InterruptedException x) {
893N/A throw new RuntimeException(x);
893N/A }
893N/A } while (n > 0);
893N/A // check buffers are the same
893N/A src.flip();
893N/A dst.flip();
893N/A if (!src.equals(dst)) {
893N/A throw new RuntimeException("Contents differ");
893N/A }
893N/A }
893N/A // exercise lock methods
1109N/A static void testLocking(Path file) throws IOException {
893N/A System.out.println("testLocking");
1109N/A AsynchronousFileChannel ch = AsynchronousFileChannel
1109N/A .open(file, READ, WRITE);
893N/A FileLock fl;
893N/A try {
1109N/A // test 1 - acquire lock and check that tryLock throws
1109N/A // OverlappingFileLockException
1109N/A try {
1109N/A fl = ch.lock().get();
1109N/A } catch (ExecutionException x) {
1109N/A throw new RuntimeException(x);
1109N/A } catch (InterruptedException x) {
1109N/A throw new RuntimeException("Should not be interrupted");
1109N/A }
1109N/A if (!fl.acquiredBy().equals(ch))
1109N/A throw new RuntimeException("FileLock#acquiredBy returned incorrect channel");
1109N/A try {
1109N/A ch.tryLock();
1109N/A throw new RuntimeException("OverlappingFileLockException expected");
1109N/A } catch (OverlappingFileLockException x) {
1109N/A }
1109N/A fl.release();
1109N/A // test 2 - acquire try and check that lock throws OverlappingFileLockException
1109N/A fl = ch.tryLock();
1109N/A if (fl == null)
1109N/A throw new RuntimeException("Unable to acquire lock");
1109N/A try {
1434N/A ch.lock((Void)null, new CompletionHandler<FileLock,Void> () {
1109N/A public void completed(FileLock result, Void att) {
1109N/A }
1109N/A public void failed(Throwable exc, Void att) {
1109N/A }
1109N/A });
1109N/A throw new RuntimeException("OverlappingFileLockException expected");
1109N/A } catch (OverlappingFileLockException x) {
1109N/A }
1109N/A } finally {
1109N/A ch.close();
893N/A }
1109N/A // test 3 - channel is closed so FileLock should no longer be valid
1109N/A if (fl.isValid())
1109N/A throw new RuntimeException("FileLock expected to be invalid");
893N/A }
893N/A // interrupt should not close channel
893N/A static void testInterruptHandlerThread(final AsynchronousFileChannel ch) {
893N/A System.out.println("testInterruptHandlerThread");
893N/A ByteBuffer buf = ByteBuffer.allocateDirect(100);
893N/A final CountDownLatch latch = new CountDownLatch(1);
1434N/A ch.read(buf, 0L, (Void)null, new CompletionHandler<Integer,Void>() {
893N/A public void completed(Integer result, Void att) {
893N/A try {
893N/A Thread.currentThread().interrupt();
893N/A long size = ch.size();
893N/A latch.countDown();
893N/A } catch (IOException x) {
893N/A x.printStackTrace();
893N/A }
893N/A }
893N/A public void failed(Throwable exc, Void att) {
893N/A }
893N/A });
893N/A // wait for handler to complete
893N/A await(latch);
893N/A }
893N/A // invoke method on closed channel
893N/A static void testClosedChannel(AsynchronousFileChannel ch) {
893N/A System.out.println("testClosedChannel");
893N/A if (ch.isOpen())
893N/A throw new RuntimeException("Channel should be closed");
893N/A ByteBuffer buf = ByteBuffer.allocateDirect(100);
893N/A // check read fails with ClosedChannelException
893N/A try {
893N/A ch.read(buf, 0L).get();
893N/A throw new RuntimeException("ExecutionException expected");
893N/A } catch (ExecutionException x) {
893N/A if (!(x.getCause() instanceof ClosedChannelException))
893N/A throw new RuntimeException("Cause of ClosedChannelException expected");
893N/A } catch (InterruptedException x) {
893N/A }
893N/A // check write fails with ClosedChannelException
893N/A try {
893N/A ch.write(buf, 0L).get();
893N/A throw new RuntimeException("ExecutionException expected");
893N/A } catch (ExecutionException x) {
893N/A if (!(x.getCause() instanceof ClosedChannelException))
893N/A throw new RuntimeException("Cause of ClosedChannelException expected");
893N/A } catch (InterruptedException x) {
893N/A }
893N/A // check lock fails with ClosedChannelException
893N/A try {
893N/A ch.lock().get();
893N/A throw new RuntimeException("ExecutionException expected");
893N/A } catch (ExecutionException x) {
893N/A if (!(x.getCause() instanceof ClosedChannelException))
893N/A throw new RuntimeException("Cause of ClosedChannelException expected");
893N/A } catch (InterruptedException x) {
893N/A }
893N/A }
893N/A // exercise custom thread pool
893N/A static void testCustomThreadPool(Path file) throws IOException {
893N/A System.out.println("testCustomThreadPool");
893N/A // records threads that are created
893N/A final List<Thread> threads = new ArrayList<Thread>();
893N/A ThreadFactory threadFactory = new ThreadFactory() {
893N/A @Override
893N/A public Thread newThread(Runnable r) {
893N/A Thread t = new Thread(r);
893N/A t.setDaemon(true);
893N/A synchronized (threads) {
893N/A threads.add(t);
893N/A }
893N/A return t;
893N/A }
893N/A };
893N/A // exercise tests with varied number of threads
893N/A for (int nThreads=1; nThreads<=5; nThreads++) {
893N/A synchronized (threads) {
893N/A threads.clear();
893N/A }
893N/A ExecutorService executor = Executors.newFixedThreadPool(nThreads, threadFactory);
893N/A Set<StandardOpenOption> opts = EnumSet.of(WRITE);
893N/A AsynchronousFileChannel ch = AsynchronousFileChannel.open(file, opts, executor);
893N/A try {
893N/A for (int i=0; i<10; i++) {
893N/A // do I/O operation to see which thread invokes the completion handler
893N/A final AtomicReference<Thread> invoker = new AtomicReference<Thread>();
893N/A final CountDownLatch latch = new CountDownLatch(1);
1434N/A ch.write(genBuffer(), 0L, (Void)null, new CompletionHandler<Integer,Void>() {
893N/A public void completed(Integer result, Void att) {
893N/A invoker.set(Thread.currentThread());
893N/A latch.countDown();
893N/A }
893N/A public void failed(Throwable exc, Void att) {
893N/A }
893N/A });
893N/A await(latch);
893N/A // check invoker
893N/A boolean found = false;
893N/A synchronized (threads) {
893N/A for (Thread t: threads) {
893N/A if (t == invoker.get()) {
893N/A found = true;
893N/A break;
893N/A }
893N/A }
893N/A }
893N/A if (!found)
893N/A throw new RuntimeException("Invoker thread not found");
893N/A }
893N/A } finally {
893N/A ch.close();
1580N/A executor.shutdown();
893N/A }
893N/A }
1580N/A // test sharing a thread pool between many channels
1580N/A ExecutorService executor = Executors
1580N/A .newFixedThreadPool(1+rand.nextInt(10), threadFactory);
1580N/A final int n = 50 + rand.nextInt(50);
1580N/A AsynchronousFileChannel[] channels = new AsynchronousFileChannel[n];
1580N/A try {
1580N/A for (int i=0; i<n; i++) {
1580N/A Set<StandardOpenOption> opts = EnumSet.of(WRITE);
1580N/A channels[i] = AsynchronousFileChannel.open(file, opts, executor);
1580N/A final CountDownLatch latch = new CountDownLatch(1);
1580N/A channels[i].write(genBuffer(), 0L, (Void)null, new CompletionHandler<Integer,Void>() {
1580N/A public void completed(Integer result, Void att) {
1580N/A latch.countDown();
1580N/A }
1580N/A public void failed(Throwable exc, Void att) {
1580N/A }
1580N/A });
1580N/A await(latch);
1580N/A // close ~half the channels
1580N/A if (rand.nextBoolean())
1580N/A channels[i].close();
1580N/A }
1580N/A } finally {
1580N/A // close remaining channels
1580N/A for (int i=0; i<n; i++) {
1580N/A if (channels[i] != null) channels[i].close();
1580N/A }
1580N/A executor.shutdown();
1580N/A }
893N/A }
893N/A // exercise asynchronous close
893N/A static void testAsynchronousClose(Path file) throws IOException {
893N/A System.out.println("testAsynchronousClose");
893N/A // create file
893N/A AsynchronousFileChannel ch = AsynchronousFileChannel
893N/A long size = 0L;
893N/A do {
893N/A ByteBuffer buf = genBuffer();
893N/A int n = buf.remaining();
893N/A writeFully(ch, buf, size);
893N/A size += n;
893N/A } while (size < (50L * 1024L * 1024L));
893N/A ch.close();
893N/A ch = AsynchronousFileChannel.open(file, WRITE, SYNC);
893N/A // randomize number of writers, buffer size, and positions
893N/A int nwriters = 1 + rand.nextInt(8);
893N/A ByteBuffer[] buf = new ByteBuffer[nwriters];
893N/A long[] position = new long[nwriters];
893N/A for (int i=0; i<nwriters; i++) {
893N/A buf[i] = genBuffer();
893N/A position[i] = rand.nextInt((int)size);
893N/A }
893N/A // initiate I/O
893N/A Future[] result = new Future[nwriters];
893N/A for (int i=0; i<nwriters; i++) {
893N/A result[i] = ch.write(buf[i], position[i]);
893N/A }
893N/A // close file
893N/A ch.close();
893N/A // write operations should complete or fail with AsynchronousCloseException
893N/A for (int i=0; i<nwriters; i++) {
893N/A try {
893N/A result[i].get();
893N/A } catch (ExecutionException x) {
893N/A Throwable cause = x.getCause();
893N/A if (!(cause instanceof AsynchronousCloseException))
893N/A throw new RuntimeException(cause);
893N/A } catch (CancellationException x) {
893N/A throw new RuntimeException(x); // should not happen
893N/A } catch (InterruptedException x) {
893N/A throw new RuntimeException(x); // should not happen
893N/A }
893N/A }
893N/A }
893N/A // exercise cancel method
893N/A static void testCancel(Path file) throws IOException {
893N/A System.out.println("testCancel");
893N/A for (int i=0; i<2; i++) {
893N/A boolean mayInterruptIfRunning = (i == 0) ? false : true;
893N/A // open with SYNC option to improve chances that write will not
893N/A // complete immediately
893N/A AsynchronousFileChannel ch = AsynchronousFileChannel
893N/A .open(file, WRITE, SYNC);
893N/A // start write operation
1580N/A Future<Integer> res = ch.write(genBuffer(), 0L);
893N/A // cancel operation
893N/A boolean cancelled = res.cancel(mayInterruptIfRunning);
893N/A // check post-conditions
893N/A if (!res.isDone())
893N/A throw new RuntimeException("isDone should return true");
893N/A if (res.isCancelled() != cancelled)
893N/A throw new RuntimeException("isCancelled not consistent");
893N/A try {
893N/A res.get();
1143N/A if (cancelled)
893N/A throw new RuntimeException("CancellationException expected");
893N/A } catch (CancellationException x) {
1143N/A if (!cancelled)
1143N/A throw new RuntimeException("CancellationException not expected");
893N/A } catch (ExecutionException x) {
893N/A throw new RuntimeException(x);
893N/A } catch (InterruptedException x) {
893N/A throw new RuntimeException(x);
893N/A }
893N/A try {
893N/A res.get(1, TimeUnit.SECONDS);
1143N/A if (cancelled)
1143N/A throw new RuntimeException("CancellationException expected");
893N/A } catch (CancellationException x) {
1143N/A if (!cancelled)
1143N/A throw new RuntimeException("CancellationException not expected");
893N/A } catch (ExecutionException x) {
893N/A throw new RuntimeException(x);
893N/A } catch (TimeoutException x) {
893N/A throw new RuntimeException(x);
893N/A } catch (InterruptedException x) {
893N/A throw new RuntimeException(x);
893N/A }
893N/A ch.close();
893N/A }
893N/A }
893N/A // exercise truncate method
893N/A static void testTruncate(Path file) throws IOException {
893N/A System.out.println("testTruncate");
893N/A // basic tests
893N/A AsynchronousFileChannel ch = AsynchronousFileChannel
893N/A try {
893N/A writeFully(ch, genBuffer(), 0L);
893N/A long size = ch.size();
893N/A // attempt to truncate to a size greater than the current size
893N/A if (ch.truncate(size + 1L).size() != size)
893N/A throw new RuntimeException("Unexpected size after truncation");
893N/A // truncate file
893N/A if (ch.truncate(size - 1L).size() != (size - 1L))
893N/A throw new RuntimeException("Unexpected size after truncation");
893N/A // invalid size
893N/A try {
893N/A ch.truncate(-1L);
893N/A throw new RuntimeException("IllegalArgumentException expected");
893N/A } catch (IllegalArgumentException e) { }
893N/A } finally {
893N/A ch.close();
893N/A }
893N/A // channel is closed
893N/A try {
893N/A ch.truncate(0L);
893N/A throw new RuntimeException("ClosedChannelException expected");
893N/A } catch (ClosedChannelException e) { }
893N/A // channel is read-only
893N/A ch = AsynchronousFileChannel.open(file, READ);
893N/A try {
893N/A try {
893N/A ch.truncate(0L);
893N/A throw new RuntimeException("NonWritableChannelException expected");
893N/A } catch (NonWritableChannelException e) { }
893N/A } finally {
893N/A ch.close();
893N/A }
893N/A }
893N/A // returns ByteBuffer with random bytes
893N/A static ByteBuffer genBuffer() {
893N/A int size = 1024 + rand.nextInt(16000);
893N/A byte[] buf = new byte[size];
893N/A boolean useDirect = rand.nextBoolean();
893N/A if (useDirect) {
893N/A ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
893N/A bb.put(buf);
893N/A bb.flip();
893N/A return bb;
893N/A } else {
893N/A return ByteBuffer.wrap(buf);
893N/A }
893N/A }
893N/A // writes all remaining bytes in the buffer to the given channel at the
893N/A // given position
893N/A static void writeFully(final AsynchronousFileChannel ch,
893N/A final ByteBuffer src,
893N/A long position)
893N/A {
893N/A final CountDownLatch latch = new CountDownLatch(1);
893N/A // use position as attachment
893N/A ch.write(src, position, position, new CompletionHandler<Integer,Long>() {
893N/A public void completed(Integer result, Long position) {
893N/A int n = result;
893N/A if (src.hasRemaining()) {
893N/A long p = position + n;
893N/A ch.write(src, p, p, this);
893N/A } else {
893N/A latch.countDown();
893N/A }
893N/A }
893N/A public void failed(Throwable exc, Long position) {
893N/A }
893N/A });
893N/A // wait for writes to complete
893N/A await(latch);
893N/A }
893N/A static void readAll(final AsynchronousFileChannel ch,
893N/A final ByteBuffer dst,
893N/A long position)
893N/A {
893N/A final CountDownLatch latch = new CountDownLatch(1);
893N/A // use position as attachment
893N/A ch.read(dst, position, position, new CompletionHandler<Integer,Long>() {
893N/A public void completed(Integer result, Long position) {
893N/A int n = result;
893N/A if (n > 0) {
893N/A long p = position + n;
893N/A ch.read(dst, p, p, this);
893N/A } else {
893N/A latch.countDown();
893N/A }
893N/A }
893N/A public void failed(Throwable exc, Long position) {
893N/A }
893N/A });
893N/A // wait for reads to complete
893N/A await(latch);
893N/A }
893N/A static void await(CountDownLatch latch) {
893N/A // wait until done
893N/A boolean done = false;
893N/A while (!done) {
893N/A try {
893N/A latch.await();
893N/A done = true;
893N/A } catch (InterruptedException x) { }
893N/A }
893N/A }