Mega Code Archive

 
Categories / Java / File Input Output
 

An input stream which reads sequentially from multiple sources

/*  * Copyright (C) 2004 Stephen Ostermiller  * http://ostermiller.org/contact.pl?regarding=Java+Utilities  *  * This program is free software; you can redistribute it and/or modify  * it under the terms of the GNU General Public License as published by  * the Free Software Foundation; either version 2 of the License, or  * (at your option) any later version.  *  * This program is distributed in the hope that it will be useful,  * but WITHOUT ANY WARRANTY; without even the implied warranty of  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the  * GNU General Public License for more details.  *  * See COPYING.TXT for details.  */ import java.io.*; import java.util.ArrayList; /**  * An input stream which reads sequentially from multiple sources.  * More information about this class is available from <a target="_top" href=  * "http://ostermiller.org/utils/">ostermiller.org</a>.  *  * @author Stephen Ostermiller http://ostermiller.org/contact.pl?regarding=Java+Utilities  * @since ostermillerutils 1.04.00  */ public class ConcatInputStream extends InputStream {   /**    * Current index to inputStreamQueue    *    * @since ostermillerutils 1.04.01    */   private int inputStreamQueueIndex = 0;   /**    * Queue of inputStreams that have yet to be read from.    *    * @since ostermillerutils 1.04.01    */   private ArrayList<InputStream> inputStreamQueue = new ArrayList<InputStream>();   /**    * A cache of the current inputStream from the inputStreamQueue    * to avoid unneeded access to the queue which must    * be synchronized.    *    * @since ostermillerutils 1.04.01    */   private InputStream currentInputStream = null;   /**    * true iff the client may add more inputStreams.    *    * @since ostermillerutils 1.04.01    */   private boolean doneAddingInputStreams = false;   /**    * Causes the addInputStream method to throw IllegalStateException    * and read() methods to return -1 (end of stream)    * when there is no more available data.    * <p>    * Calling this method when this class is no longer accepting    * more inputStreams has no effect.    *    * @since ostermillerutils 1.04.01    */   public void lastInputStreamAdded(){     doneAddingInputStreams = true;   }   /**    * Add the given inputStream to the queue of inputStreams from which to    * concatenate data.    *    * @param in InputStream to add to the concatenation.    * @throws IllegalStateException if more inputStreams can't be added because lastInputStreamAdded() has been called, close() has been called, or a constructor with inputStream parameters was used.    *    * @since ostermillerutils 1.04.01    */   public void addInputStream(InputStream in){     synchronized(inputStreamQueue){       if (in == null) throw new NullPointerException();       if (closed) throw new IllegalStateException("ConcatInputStream has been closed");       if (doneAddingInputStreams) throw new IllegalStateException("Cannot add more inputStreams - the last inputStream has already been added.");       inputStreamQueue.add(in);     }   }   /**    * Add the given inputStream to the queue of inputStreams from which to    * concatenate data.    *    * @param in InputStream to add to the concatenation.    * @throws IllegalStateException if more inputStreams can't be added because lastInputStreamAdded() has been called, close() has been called, or a constructor with inputStream parameters was used.    * @throws NullPointerException the array of inputStreams, or any of the contents is null.    *    * @since ostermillerutils 1.04.01    */   public void addInputStreams(InputStream[] in){     for (InputStream element: in) {       addInputStream(element);     }   }   /**    * Gets the current inputStream, looking at the next    * one in the list if the current one is null.    *    * @since ostermillerutils 1.04.01    */   private InputStream getCurrentInputStream(){     if (currentInputStream == null && inputStreamQueueIndex < inputStreamQueue.size()){       synchronized(inputStreamQueue){         // inputStream queue index is advanced only by the nextInputStream()         // method.  Don't do it here.         currentInputStream = inputStreamQueue.get(inputStreamQueueIndex);       }     }     return currentInputStream;   }   /**    * Indicate that we are done with the current inputStream and we should    * advance to the next inputStream.    *    * @since ostermillerutils 1.04.01    */   private void advanceToNextInputStream(){     currentInputStream = null;     inputStreamQueueIndex++;   }   /**    * True iff this the close() method has been called on this stream.    *    * @since ostermillerutils 1.04.00    */   private boolean closed = false;   /**    * Create a new input stream that can dynamically accept new sources.    * <p>    * New sources should be added using the addInputStream() method.    * When all sources have been added the lastInputStreamAdded() should    * be called so that read methods can return -1 (end of stream).    * <p>    * Adding new sources may by interleaved with read calls.    *    * @since ostermillerutils 1.04.01    */   public ConcatInputStream(){     // Empty constructor   }   /**    * Create a new InputStream with one source.    *    * @param in InputStream to use as a source.    *    * @throws NullPointerException if in is null    *    * @since ostermillerutils 1.04.00    */   public ConcatInputStream(InputStream in){     addInputStream(in);     lastInputStreamAdded();   }   /**    * Create a new InputStream with two sources.    *    * @param in1 first InputStream to use as a source.    * @param in2 second InputStream to use as a source.    *    * @throws NullPointerException if either source is null.    *    * @since ostermillerutils 1.04.00    */   public ConcatInputStream(InputStream in1, InputStream in2){     addInputStream(in1);     addInputStream(in2);     lastInputStreamAdded();   }   /**    * Create a new InputStream with an arbitrary number of sources.    *    * @param in InputStreams to use as a sources.    *    * @throws NullPointerException if the input array on any element is null.    *    * @since ostermillerutils 1.04.00    */   public ConcatInputStream(InputStream[] in){     addInputStreams(in);     lastInputStreamAdded();   }   /**    * Reads the next byte of data from the underlying streams. The value byte is    * returned as an int in the range 0 to 255. If no byte is available because    * the end of the stream has been reached, the value -1 is returned. This method    * blocks until input data is available, the end of the stream is detected, or    * an exception is thrown.    * <p>    * If this class in not done accepting inputstreams and the end of the last known    * stream is reached, this method will block forever unless another thread    * adds an inputstream or interrupts.    *    * @return the next byte of data, or -1 if the end of the stream is reached.    *    * @throws IOException if an I/O error occurs.    */   @Override public int read() throws IOException {     if (closed) throw new IOException("InputStream closed");     int r = -1;     while (r == -1){       InputStream in = getCurrentInputStream();       if (in == null){         if (doneAddingInputStreams) return -1;         try {           Thread.sleep(100);         } catch (InterruptedException iox){           throw new IOException("Interrupted");         }       } else {         r = in.read();         if (r == -1) advanceToNextInputStream();       }     }     return r;   }   /**    * Reads some number of bytes from the underlying streams and stores them into    * the buffer array b. The number of bytes actually read is returned as an    * integer. This method blocks until input data is available, end of file is    * detected, or an exception is thrown.    * <p>    * If the length of b is zero,    * then no bytes are read and 0 is returned; otherwise, there is an attempt    * to read at least one byte.    * <p>    * The read(b) method for class InputStream has the same effect as:<br>    * read(b, 0, b.length)    * <p>    * If this class in not done accepting inputstreams and the end of the last known    * stream is reached, this method will block forever unless another thread    * adds an inputstream or interrupts.    *    * @param b - Destination buffer    * @return The number of bytes read, or -1 if the end of the stream has been reached    *    * @throws IOException - If an I/O error occurs    * @throws NullPointerException - If b is null.    *    * @since ostermillerutils 1.04.00    */   @Override public int read(byte[] b) throws IOException {     return read(b, 0, b.length);   }   /**    * Reads up to length bytes of data from the underlying streams into an array of bytes.    * An attempt is made to read as many as length bytes, but a smaller number may be read,    * possibly zero. The number of bytes actually read is returned as an integer.    * <p>    * If length is zero,    * then no bytes are read and 0 is returned; otherwise, there is an attempt    * to read at least one byte.    * <p>    * This method blocks until input data is available    * <p>    * If this class in not done accepting inputstreams and the end of the last known    * stream is reached, this method will block forever unless another thread    * adds an inputstream or interrupts.    *    * @param b Destination buffer    * @param off Offset at which to start storing bytes    * @param len Maximum number of bytes to read    * @return The number of bytes read, or -1 if the end of the stream has been reached    *    * @throws IOException - If an I/O error occurs    * @throws NullPointerException - If b is null.    * @throws IndexOutOfBoundsException - if length or offset are not possible.    */   @Override public int read(byte[] b, int off, int len) throws IOException {     if (off < 0 || len < 0 || off + len > b.length) throw new IllegalArgumentException();     if (closed) throw new IOException("InputStream closed");     int r = -1;     while (r == -1){       InputStream in = getCurrentInputStream();       if (in == null){         if (doneAddingInputStreams) return -1;         try {           Thread.sleep(100);         } catch (InterruptedException iox){           throw new IOException("Interrupted");         }       } else {         r = in.read(b, off, len);         if (r == -1) advanceToNextInputStream();       }     }     return r;   }   /**    * Skips over and discards n bytes of data from this input stream. The skip method    * may, for a variety of reasons, end up skipping over some smaller number of bytes,    * possibly 0. This may result from any of a number of conditions; reaching end of    * file before n bytes have been skipped is only one possibility. The actual number    * of bytes skipped is returned. If n is negative, no bytes are skipped.    * <p>    * If this class in not done accepting inputstreams and the end of the last known    * stream is reached, this method will block forever unless another thread    * adds an inputstream or interrupts.    *    * @param n he number of characters to skip    * @return The number of characters actually skipped    *    * @throws IOException If an I/O error occurs    *    * @since ostermillerutils 1.04.00    */   @Override public long skip(long n) throws IOException {     if (closed) throw new IOException("InputStream closed");     if (n <= 0) return 0;     long s = -1;     while (s <= 0){       InputStream in = getCurrentInputStream();       if (in == null){         if (doneAddingInputStreams) return 0;         try {           Thread.sleep(100);         } catch (InterruptedException iox){           throw new IOException("Interrupted");         }       } else {         s = in.skip(n);         // When nothing was skipped it is a bit of a puzzle.         // The most common cause is that the end of the underlying         // stream was reached.  In which case calling skip on it         // will always return zero.  If somebody were calling skip         // until it skipped everything they needed, there would         // be an infinite loop if we were to return zero here.         // If we get zero, let us try to read one character so         // we can see if we are at the end of the stream.  If so,         // we will move to the next.         if (s <= 0) {           // read() will advance to the next stream for us, so don't do it again           s = ((read()==-1)?-1:1);         }       }     }     return s;   }   /**    * Returns the number of bytes that can be read (or skipped over) from this input    * stream without blocking by the next caller of a method for this input stream.    * The next caller might be the same thread or or another thread.    *    * @throws IOException If an I/O error occurs    *    * @since ostermillerutils 1.04.00    */   @Override public int available() throws IOException {     if (closed) throw new IOException("InputStream closed");     InputStream in = getCurrentInputStream();     if (in == null) return 0;     return in.available();   }   /**    * Closes this input stream and releases any system resources associated with the stream.    *    * @since ostermillerutils 1.04.00    */   @Override public void close() throws IOException {     if (closed) return;     for (Object element: inputStreamQueue) {       ((InputStream)element).close();     }     closed = true;   }   /**    * Mark not supported    *    * @since ostermillerutils 1.04.00    */   @Override public void mark(int readlimit){     // Mark not supported -- do nothing   }   /**    * Reset not supported.    *    * @throws IOException because reset is not supported.    *    * @since ostermillerutils 1.04.00    */   @Override public void reset() throws IOException {     throw new IOException("Reset not supported");   }   /**    * Does not support mark.    *    * @return false    *    * @since ostermillerutils 1.04.00    */   @Override public boolean markSupported(){     return false;   } }