1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.io.BufferedInputStream;
13 
14 import hunt.io.Common;
15 import hunt.io.FilterInputStream;
16 import hunt.Exceptions;
17 
18 version(HUNT_DEBUG) {
19     import hunt.logging;
20 }
21 
22 /**
23  * A <code>BufferedInputStream</code> adds
24  * functionality to another input stream-namely,
25  * the ability to buffer the input and to
26  * support the <code>mark</code> and <code>reset</code>
27  * methods. When  the <code>BufferedInputStream</code>
28  * is created, an internal buffer array is
29  * created. As bytes  from the stream are read
30  * or skipped, the internal buffer is refilled
31  * as necessary  from the contained input stream,
32  * many bytes at a time. The <code>mark</code>
33  * operation  remembers a point in the input
34  * stream and the <code>reset</code> operation
35  * causes all the  bytes read since the most
36  * recent <code>mark</code> operation to be
37  * reread before new bytes are  taken from
38  * the contained input stream.
39  *
40  * @author  Arthur van Hoff
41  * @since   1.0
42  */
43 class BufferedInputStream : FilterInputStream {
44 
45     private enum int DEFAULT_BUFFER_SIZE = 8192;
46 
47     /**
48      * The maximum size of array to allocate.
49      * Some VMs reserve some header words in an array.
50      * Attempts to allocate larger arrays may result in
51      * OutOfMemoryError: Requested array size exceeds VM limit
52      */
53     private enum int MAX_BUFFER_SIZE = int.max - 8;
54 
55     /**
56      * As this class is used early during bootstrap, it's motivated to use
57      * Unsafe.compareAndSetObject instead of AtomicReferenceFieldUpdater
58      * (or VarHandles) to reduce dependencies and improve startup time.
59      */
60     // private static final Unsafe U = Unsafe.getUnsafe();
61 
62     // private static final long BUF_OFFSET
63     //         = U.objectFieldOffset(BufferedInputStream.class, "buf");
64 
65     /**
66      * The internal buffer array where the data is stored. When necessary,
67      * it may be replaced by another array of
68      * a different size.
69      */
70     /*
71      * We null this out with a CAS on close(), which is necessary since
72      * closes can be asynchronous. We use nullness of buf[] as primary
73      * indicator that this stream is closed. (The "in" field is also
74      * nulled out on close.)
75      */
76     protected byte[] buf;
77 
78     /**
79      * The index one greater than the index of the last valid byte in
80      * the buffer.
81      * This value is always
82      * in the range <code>0</code> through <code>buf.length</code>;
83      * elements <code>buf[0]</code>  through <code>buf[count-1]
84      * </code>contain buffered input data obtained
85      * from the underlying  input stream.
86      */
87     protected int count;
88 
89     /**
90      * The current position in the buffer. This is the index of the next
91      * character to be read from the <code>buf</code> array.
92      * <p>
93      * This value is always in the range <code>0</code>
94      * through <code>count</code>. If it is less
95      * than <code>count</code>, then  <code>buf[pos]</code>
96      * is the next byte to be supplied as input;
97      * if it is equal to <code>count</code>, then
98      * the  next <code>read</code> or <code>skip</code>
99      * operation will require more bytes to be
100      * read from the contained  input stream.
101      *
102      * @see     java.io.BufferedInputStream#buf
103      */
104     protected int pos;
105 
106     /**
107      * The value of the <code>pos</code> field at the time the last
108      * <code>mark</code> method was called.
109      * <p>
110      * This value is always
111      * in the range <code>-1</code> through <code>pos</code>.
112      * If there is no marked position in  the input
113      * stream, this field is <code>-1</code>. If
114      * there is a marked position in the input
115      * stream,  then <code>buf[markpos]</code>
116      * is the first byte to be supplied as input
117      * after a <code>reset</code> operation. If
118      * <code>markpos</code> is not <code>-1</code>,
119      * then all bytes from positions <code>buf[markpos]</code>
120      * through  <code>buf[pos-1]</code> must remain
121      * in the buffer array (though they may be
122      * moved to  another place in the buffer array,
123      * with suitable adjustments to the values
124      * of <code>count</code>,  <code>pos</code>,
125      * and <code>markpos</code>); they may not
126      * be discarded unless and until the difference
127      * between <code>pos</code> and <code>markpos</code>
128      * exceeds <code>marklimit</code>.
129      *
130      * @see     java.io.BufferedInputStream#mark(int)
131      * @see     java.io.BufferedInputStream#pos
132      */
133     protected int markpos = -1;
134 
135     /**
136      * The maximum read ahead allowed after a call to the
137      * <code>mark</code> method before subsequent calls to the
138      * <code>reset</code> method fail.
139      * Whenever the difference between <code>pos</code>
140      * and <code>markpos</code> exceeds <code>marklimit</code>,
141      * then the  mark may be dropped by setting
142      * <code>markpos</code> to <code>-1</code>.
143      *
144      * @see     java.io.BufferedInputStream#mark(int)
145      * @see     java.io.BufferedInputStream#reset()
146      */
147     protected int marklimit;
148 
149     /**
150      * Creates a <code>BufferedInputStream</code>
151      * and saves its  argument, the input stream
152      * <code>inputStream</code>, for later use. An internal
153      * buffer array is created and  stored in <code>buf</code>.
154      *
155      * @param   inputStream   the underlying input stream.
156      */
157     this(InputStream inputStream) {
158         this(inputStream, DEFAULT_BUFFER_SIZE);
159     }
160 
161     /**
162      * Creates a <code>BufferedInputStream</code>
163      * with the specified buffer size,
164      * and saves its  argument, the input stream
165      * <code>inputStream</code>, for later use.  An internal
166      * buffer array of length  <code>size</code>
167      * is created and stored in <code>buf</code>.
168      *
169      * @param   inputStream     the underlying input stream.
170      * @param   size   the buffer size.
171      * @exception IllegalArgumentException if {@code size <= 0}.
172      */
173     this(InputStream inputStream, int size) {
174         super(inputStream);
175         if (size <= 0) {
176             throw new IllegalArgumentException("Buffer size <= 0");
177         }
178         buf = new byte[size];
179     }
180 
181     /**
182      * Check to make sure that underlying input stream has not been
183      * nulled out due to close; if not return it;
184      */
185     private InputStream getInIfOpen() {
186         InputStream input = inputStream;
187         if (input is null)
188             throw new IOException("Stream closed");
189         return input;
190     }
191 
192     /**
193      * Check to make sure that buffer has not been nulled out due to
194      * close; if not return it;
195      */
196     private byte[] getBufIfOpen() {
197         byte[] buffer = buf;
198         if (buffer is null)
199             throw new IOException("Stream closed");
200         return buffer;
201     }
202 
203     /**
204      * Fills the buffer with more data, taking into account
205      * shuffling and other tricks for dealing with marks.
206      * Assumes that it is being called by a synchronized method.
207      * This method also assumes that all data has already been read in,
208      * hence pos > count.
209      */
210     private void fill() {
211         byte[] buffer = getBufIfOpen();
212         if (markpos < 0)
213             pos = 0;            /* no mark: throw away the buffer */
214         else if (pos >= buffer.length) {  /* no room left in buffer */
215             if (markpos > 0) {  /* can throw away early part of the buffer */
216                 int sz = pos - markpos;
217                 // System.arraycopy(buffer, markpos, buffer, 0, sz);
218                 for(int i=0; i<sz; i++) {
219                     buffer[i] = buffer[markpos+i];
220                 }
221                 pos = sz;
222                 markpos = 0;
223             } else if (buffer.length >= marklimit) {
224                 markpos = -1;   /* buffer got too big, invalidate mark */
225                 pos = 0;        /* drop buffer contents */
226             } else if (buffer.length >= MAX_BUFFER_SIZE) {
227                 throw new OutOfMemoryError("Required array size too large");
228             } else {            /* grow buffer */
229                 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
230                         pos * 2 : MAX_BUFFER_SIZE;
231                 if (nsz > marklimit)
232                     nsz = marklimit;
233                 byte[] nbuf = buffer[0..pos].dup;
234                 // byte[] nbuf = new byte[nsz];
235                 // System.arraycopy(buffer, 0, nbuf, 0, pos);
236                 // if (!U.compareAndSetObject(this, BUF_OFFSET, buffer, nbuf))
237                 if(this.buf !is buffer) {
238                     // Can't replace buf if there was an async close.
239                     // Note: This would need to be changed if fill()
240                     // is ever made accessible to multiple threads.
241                     // But for now, the only way CAS can fail is via close.
242                     // assert buf is null;
243                     throw new IOException("Stream closed");
244                 }
245                 buffer = nbuf;
246             }
247         }
248         count = pos;
249         int n = getInIfOpen().read(buffer, pos, cast(int)buffer.length - pos);
250         if (n > 0)
251             count = n + pos;
252     }
253 
254     /**
255      * See
256      * the general contract of the <code>read</code>
257      * method of <code>InputStream</code>.
258      *
259      * @return     the next byte of data, or <code>-1</code> if the end of the
260      *             stream is reached.
261      * @exception  IOException  if this input stream has been closed by
262      *                          invoking its {@link #close()} method,
263      *                          or an I/O error occurs.
264      * @see        java.io.FilterInputStream#inputStream
265      */
266     override int read() {
267         if (pos >= count) {
268             fill();
269             if (pos >= count)
270                 return -1;
271         }
272         return getBufIfOpen()[pos++] & 0xff;
273     }
274 
275     /**
276      * Read characters into a portion of an array, reading from the underlying
277      * stream at most once if necessary.
278      */
279     private int read1(byte[] b, int off, int len) {
280         int avail = count - pos;
281         if (avail <= 0) {
282             /* If the requested length is at least as large as the buffer, and
283                if there is no mark/reset activity, do not bother to copy the
284                bytes into the local buffer.  In this way buffered streams will
285                cascade harmlessly. */
286             if (len >= getBufIfOpen().length && markpos < 0) {
287                 return getInIfOpen().read(b, off, len);
288             }
289             fill();
290             avail = count - pos;
291             if (avail <= 0) return -1;
292         }
293         int cnt = (avail < len) ? avail : len;
294         b[off.. off+cnt] = getBufIfOpen()[pos .. pos+cnt];
295         // System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
296         pos += cnt;
297         return cnt;
298     }
299 
300     /**
301      * Reads bytes from this byte-input stream into the specified byte array,
302      * starting at the given offset.
303      *
304      * <p> This method implements the general contract of the corresponding
305      * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
306      * the <code>{@link InputStream}</code> class.  As an additional
307      * convenience, it attempts to read as many bytes as possible by repeatedly
308      * invoking the <code>read</code> method of the underlying stream.  This
309      * iterated <code>read</code> continues until one of the following
310      * conditions becomes true: <ul>
311      *
312      *   <li> The specified number of bytes have been read,
313      *
314      *   <li> The <code>read</code> method of the underlying stream returns
315      *   <code>-1</code>, indicating end-of-file, or
316      *
317      *   <li> The <code>available</code> method of the underlying stream
318      *   returns zero, indicating that further input requests would block.
319      *
320      * </ul> If the first <code>read</code> on the underlying stream returns
321      * <code>-1</code> to indicate end-of-file then this method returns
322      * <code>-1</code>.  Otherwise this method returns the number of bytes
323      * actually read.
324      *
325      * <p> Subclasses of this class are encouraged, but not required, to
326      * attempt to read as many bytes as possible in the same fashion.
327      *
328      * @param      b     destination buffer.
329      * @param      off   offset at which to start storing bytes.
330      * @param      len   maximum number of bytes to read.
331      * @return     the number of bytes read, or <code>-1</code> if the end of
332      *             the stream has been reached.
333      * @exception  IOException  if this input stream has been closed by
334      *                          invoking its {@link #close()} method,
335      *                          or an I/O error occurs.
336      */
337     override int read(byte[] b, int off, int len) {
338         getBufIfOpen(); // Check for closed stream
339         if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
340             throw new IndexOutOfBoundsException();
341         } else if (len == 0) {
342             return 0;
343         }
344 
345         int n = 0;
346         for (;;) {
347             int nread = read1(b, off + n, len - n);
348             if (nread <= 0)
349                 return (n == 0) ? nread : n;
350             n += nread;
351             if (n >= len)
352                 return n;
353             // if not closed but no bytes available, return
354             InputStream input = inputStream;
355             if (input !is null && input.available() <= 0)
356                 return n;
357         }
358     }
359 
360     /**
361      * See the general contract of the <code>skip</code>
362      * method of <code>InputStream</code>.
363      *
364      * @throws IOException  if this input stream has been closed by
365      *                      invoking its {@link #close()} method,
366      *                      {@code inputStream.skip(n)} throws an IOException,
367      *                      or an I/O error occurs.
368      */
369     override long skip(long n) {
370         getBufIfOpen(); // Check for closed stream
371         if (n <= 0) {
372             return 0;
373         }
374         long avail = count - pos;
375 
376         if (avail <= 0) {
377             // If no mark position set then don't keep in buffer
378             if (markpos <0)
379                 return getInIfOpen().skip(n);
380 
381             // Fill in buffer to save bytes for reset
382             fill();
383             avail = count - pos;
384             if (avail <= 0)
385                 return 0;
386         }
387 
388         long skipped = (avail < n) ? avail : n;
389         pos += skipped;
390         return skipped;
391     }
392 
393     /**
394      * Returns an estimate of the number of bytes that can be read (or
395      * skipped over) from this input stream without blocking by the next
396      * invocation of a method for this input stream. The next invocation might be
397      * the same thread or another thread.  A single read or skip of this
398      * many bytes will not block, but may read or skip fewer bytes.
399      * <p>
400      * This method returns the sum of the number of bytes remaining to be read in
401      * the buffer (<code>count&nbsp;- pos</code>) and the result of calling the
402      * {@link java.io.FilterInputStream#inputStream inputStream}.available().
403      *
404      * @return     an estimate of the number of bytes that can be read (or skipped
405      *             over) from this input stream without blocking.
406      * @exception  IOException  if this input stream has been closed by
407      *                          invoking its {@link #close()} method,
408      *                          or an I/O error occurs.
409      */
410     override int available() {
411         int n = count - pos;
412         int avail = getInIfOpen().available();
413         return n > (int.max - avail) ? int.max : n + avail;
414     }
415 
416     /**
417      * See the general contract of the <code>mark</code>
418      * method of <code>InputStream</code>.
419      *
420      * @param   readlimit   the maximum limit of bytes that can be read before
421      *                      the mark position becomes invalid.
422      * @see     java.io.BufferedInputStream#reset()
423      */
424     override void mark(int readlimit) {
425         marklimit = readlimit;
426         markpos = pos;
427     }
428 
429     /**
430      * See the general contract of the <code>reset</code>
431      * method of <code>InputStream</code>.
432      * <p>
433      * If <code>markpos</code> is <code>-1</code>
434      * (no mark has been set or the mark has been
435      * invalidated), an <code>IOException</code>
436      * is thrown. Otherwise, <code>pos</code> is
437      * set equal to <code>markpos</code>.
438      *
439      * @exception  IOException  if this stream has not been marked or,
440      *                  if the mark has been invalidated, or the stream
441      *                  has been closed by invoking its {@link #close()}
442      *                  method, or an I/O error occurs.
443      * @see        java.io.BufferedInputStream#mark(int)
444      */
445     override void reset() {
446         getBufIfOpen(); // Cause exception if closed
447         if (markpos < 0)
448             throw new IOException("Resetting to invalid mark");
449         pos = markpos;
450     }
451 
452     /**
453      * Tests if this input stream supports the <code>mark</code>
454      * and <code>reset</code> methods. The <code>markSupported</code>
455      * method of <code>BufferedInputStream</code> returns
456      * <code>true</code>.
457      *
458      * @return  a <code>bool</code> indicating if this stream type supports
459      *          the <code>mark</code> and <code>reset</code> methods.
460      * @see     java.io.InputStream#mark(int)
461      * @see     java.io.InputStream#reset()
462      */
463     override bool markSupported() {
464         return true;
465     }
466 
467     /**
468      * Closes this input stream and releases any system resources
469      * associated with the stream.
470      * Once the stream has been closed, further read(), available(), reset(),
471      * or skip() invocations will throw an IOException.
472      * Closing a previously closed stream has no effect.
473      *
474      * @exception  IOException  if an I/O error occurs.
475      */
476     override void close() {
477         byte[] buffer;
478         while ( (buffer = buf) !is null) {
479             // if (U.compareAndSetObject(this, BUF_OFFSET, buffer, null))
480             if(this.buf is buffer) {
481                 this.buf = null;
482                 InputStream input = inputStream;
483                 inputStream = null;
484                 if (input !is null)
485                     input.close();
486                 return;
487             }
488             // Else retry in case a new buf was CASed in fill()
489         }
490     }
491 }