1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.BufferedInputStream; 13 14 import hunt.io.Common; 15 import hunt.io.FilterInputStream; 16 import hunt.Exceptions; 17 18 version(HUNT_DEBUG) { 19 import hunt.logging; 20 } 21 22 /** 23 * A <code>BufferedInputStream</code> adds 24 * functionality to another input stream-namely, 25 * the ability to buffer the input and to 26 * support the <code>mark</code> and <code>reset</code> 27 * methods. When the <code>BufferedInputStream</code> 28 * is created, an internal buffer array is 29 * created. As bytes from the stream are read 30 * or skipped, the internal buffer is refilled 31 * as necessary from the contained input stream, 32 * many bytes at a time. The <code>mark</code> 33 * operation remembers a point in the input 34 * stream and the <code>reset</code> operation 35 * causes all the bytes read since the most 36 * recent <code>mark</code> operation to be 37 * reread before new bytes are taken from 38 * the contained input stream. 39 * 40 * @author Arthur van Hoff 41 * @since 1.0 42 */ 43 class BufferedInputStream : FilterInputStream { 44 45 private enum int DEFAULT_BUFFER_SIZE = 8192; 46 47 /** 48 * The maximum size of array to allocate. 49 * Some VMs reserve some header words in an array. 50 * Attempts to allocate larger arrays may result in 51 * OutOfMemoryError: Requested array size exceeds VM limit 52 */ 53 private enum int MAX_BUFFER_SIZE = int.max - 8; 54 55 /** 56 * As this class is used early during bootstrap, it's motivated to use 57 * Unsafe.compareAndSetObject instead of AtomicReferenceFieldUpdater 58 * (or VarHandles) to reduce dependencies and improve startup time. 59 */ 60 // private static final Unsafe U = Unsafe.getUnsafe(); 61 62 // private static final long BUF_OFFSET 63 // = U.objectFieldOffset(BufferedInputStream.class, "buf"); 64 65 /** 66 * The internal buffer array where the data is stored. When necessary, 67 * it may be replaced by another array of 68 * a different size. 69 */ 70 /* 71 * We null this out with a CAS on close(), which is necessary since 72 * closes can be asynchronous. We use nullness of buf[] as primary 73 * indicator that this stream is closed. (The "in" field is also 74 * nulled out on close.) 75 */ 76 protected byte[] buf; 77 78 /** 79 * The index one greater than the index of the last valid byte in 80 * the buffer. 81 * This value is always 82 * in the range <code>0</code> through <code>buf.length</code>; 83 * elements <code>buf[0]</code> through <code>buf[count-1] 84 * </code>contain buffered input data obtained 85 * from the underlying input stream. 86 */ 87 protected int count; 88 89 /** 90 * The current position in the buffer. This is the index of the next 91 * character to be read from the <code>buf</code> array. 92 * <p> 93 * This value is always in the range <code>0</code> 94 * through <code>count</code>. If it is less 95 * than <code>count</code>, then <code>buf[pos]</code> 96 * is the next byte to be supplied as input; 97 * if it is equal to <code>count</code>, then 98 * the next <code>read</code> or <code>skip</code> 99 * operation will require more bytes to be 100 * read from the contained input stream. 101 * 102 * @see java.io.BufferedInputStream#buf 103 */ 104 protected int pos; 105 106 /** 107 * The value of the <code>pos</code> field at the time the last 108 * <code>mark</code> method was called. 109 * <p> 110 * This value is always 111 * in the range <code>-1</code> through <code>pos</code>. 112 * If there is no marked position in the input 113 * stream, this field is <code>-1</code>. If 114 * there is a marked position in the input 115 * stream, then <code>buf[markpos]</code> 116 * is the first byte to be supplied as input 117 * after a <code>reset</code> operation. If 118 * <code>markpos</code> is not <code>-1</code>, 119 * then all bytes from positions <code>buf[markpos]</code> 120 * through <code>buf[pos-1]</code> must remain 121 * in the buffer array (though they may be 122 * moved to another place in the buffer array, 123 * with suitable adjustments to the values 124 * of <code>count</code>, <code>pos</code>, 125 * and <code>markpos</code>); they may not 126 * be discarded unless and until the difference 127 * between <code>pos</code> and <code>markpos</code> 128 * exceeds <code>marklimit</code>. 129 * 130 * @see java.io.BufferedInputStream#mark(int) 131 * @see java.io.BufferedInputStream#pos 132 */ 133 protected int markpos = -1; 134 135 /** 136 * The maximum read ahead allowed after a call to the 137 * <code>mark</code> method before subsequent calls to the 138 * <code>reset</code> method fail. 139 * Whenever the difference between <code>pos</code> 140 * and <code>markpos</code> exceeds <code>marklimit</code>, 141 * then the mark may be dropped by setting 142 * <code>markpos</code> to <code>-1</code>. 143 * 144 * @see java.io.BufferedInputStream#mark(int) 145 * @see java.io.BufferedInputStream#reset() 146 */ 147 protected int marklimit; 148 149 /** 150 * Creates a <code>BufferedInputStream</code> 151 * and saves its argument, the input stream 152 * <code>inputStream</code>, for later use. An internal 153 * buffer array is created and stored in <code>buf</code>. 154 * 155 * @param inputStream the underlying input stream. 156 */ 157 this(InputStream inputStream) { 158 this(inputStream, DEFAULT_BUFFER_SIZE); 159 } 160 161 /** 162 * Creates a <code>BufferedInputStream</code> 163 * with the specified buffer size, 164 * and saves its argument, the input stream 165 * <code>inputStream</code>, for later use. An internal 166 * buffer array of length <code>size</code> 167 * is created and stored in <code>buf</code>. 168 * 169 * @param inputStream the underlying input stream. 170 * @param size the buffer size. 171 * @exception IllegalArgumentException if {@code size <= 0}. 172 */ 173 this(InputStream inputStream, int size) { 174 super(inputStream); 175 if (size <= 0) { 176 throw new IllegalArgumentException("Buffer size <= 0"); 177 } 178 buf = new byte[size]; 179 } 180 181 /** 182 * Check to make sure that underlying input stream has not been 183 * nulled out due to close; if not return it; 184 */ 185 private InputStream getInIfOpen() { 186 InputStream input = inputStream; 187 if (input is null) 188 throw new IOException("Stream closed"); 189 return input; 190 } 191 192 /** 193 * Check to make sure that buffer has not been nulled out due to 194 * close; if not return it; 195 */ 196 private byte[] getBufIfOpen() { 197 byte[] buffer = buf; 198 if (buffer is null) 199 throw new IOException("Stream closed"); 200 return buffer; 201 } 202 203 /** 204 * Fills the buffer with more data, taking into account 205 * shuffling and other tricks for dealing with marks. 206 * Assumes that it is being called by a synchronized method. 207 * This method also assumes that all data has already been read in, 208 * hence pos > count. 209 */ 210 private void fill() { 211 byte[] buffer = getBufIfOpen(); 212 if (markpos < 0) 213 pos = 0; /* no mark: throw away the buffer */ 214 else if (pos >= buffer.length) { /* no room left in buffer */ 215 if (markpos > 0) { /* can throw away early part of the buffer */ 216 int sz = pos - markpos; 217 // System.arraycopy(buffer, markpos, buffer, 0, sz); 218 for(int i=0; i<sz; i++) { 219 buffer[i] = buffer[markpos+i]; 220 } 221 pos = sz; 222 markpos = 0; 223 } else if (buffer.length >= marklimit) { 224 markpos = -1; /* buffer got too big, invalidate mark */ 225 pos = 0; /* drop buffer contents */ 226 } else if (buffer.length >= MAX_BUFFER_SIZE) { 227 throw new OutOfMemoryError("Required array size too large"); 228 } else { /* grow buffer */ 229 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ? 230 pos * 2 : MAX_BUFFER_SIZE; 231 if (nsz > marklimit) 232 nsz = marklimit; 233 byte[] nbuf = buffer[0..pos].dup; 234 // byte[] nbuf = new byte[nsz]; 235 // System.arraycopy(buffer, 0, nbuf, 0, pos); 236 // if (!U.compareAndSetObject(this, BUF_OFFSET, buffer, nbuf)) 237 if(this.buf !is buffer) { 238 // Can't replace buf if there was an async close. 239 // Note: This would need to be changed if fill() 240 // is ever made accessible to multiple threads. 241 // But for now, the only way CAS can fail is via close. 242 // assert buf is null; 243 throw new IOException("Stream closed"); 244 } 245 buffer = nbuf; 246 } 247 } 248 count = pos; 249 int n = getInIfOpen().read(buffer, pos, cast(int)buffer.length - pos); 250 if (n > 0) 251 count = n + pos; 252 } 253 254 /** 255 * See 256 * the general contract of the <code>read</code> 257 * method of <code>InputStream</code>. 258 * 259 * @return the next byte of data, or <code>-1</code> if the end of the 260 * stream is reached. 261 * @exception IOException if this input stream has been closed by 262 * invoking its {@link #close()} method, 263 * or an I/O error occurs. 264 * @see java.io.FilterInputStream#inputStream 265 */ 266 override int read() { 267 if (pos >= count) { 268 fill(); 269 if (pos >= count) 270 return -1; 271 } 272 return getBufIfOpen()[pos++] & 0xff; 273 } 274 275 /** 276 * Read characters into a portion of an array, reading from the underlying 277 * stream at most once if necessary. 278 */ 279 private int read1(byte[] b, int off, int len) { 280 int avail = count - pos; 281 if (avail <= 0) { 282 /* If the requested length is at least as large as the buffer, and 283 if there is no mark/reset activity, do not bother to copy the 284 bytes into the local buffer. In this way buffered streams will 285 cascade harmlessly. */ 286 if (len >= getBufIfOpen().length && markpos < 0) { 287 return getInIfOpen().read(b, off, len); 288 } 289 fill(); 290 avail = count - pos; 291 if (avail <= 0) return -1; 292 } 293 int cnt = (avail < len) ? avail : len; 294 b[off.. off+cnt] = getBufIfOpen()[pos .. pos+cnt]; 295 // System.arraycopy(getBufIfOpen(), pos, b, off, cnt); 296 pos += cnt; 297 return cnt; 298 } 299 300 /** 301 * Reads bytes from this byte-input stream into the specified byte array, 302 * starting at the given offset. 303 * 304 * <p> This method implements the general contract of the corresponding 305 * <code>{@link InputStream#read(byte[], int, int) read}</code> method of 306 * the <code>{@link InputStream}</code> class. As an additional 307 * convenience, it attempts to read as many bytes as possible by repeatedly 308 * invoking the <code>read</code> method of the underlying stream. This 309 * iterated <code>read</code> continues until one of the following 310 * conditions becomes true: <ul> 311 * 312 * <li> The specified number of bytes have been read, 313 * 314 * <li> The <code>read</code> method of the underlying stream returns 315 * <code>-1</code>, indicating end-of-file, or 316 * 317 * <li> The <code>available</code> method of the underlying stream 318 * returns zero, indicating that further input requests would block. 319 * 320 * </ul> If the first <code>read</code> on the underlying stream returns 321 * <code>-1</code> to indicate end-of-file then this method returns 322 * <code>-1</code>. Otherwise this method returns the number of bytes 323 * actually read. 324 * 325 * <p> Subclasses of this class are encouraged, but not required, to 326 * attempt to read as many bytes as possible in the same fashion. 327 * 328 * @param b destination buffer. 329 * @param off offset at which to start storing bytes. 330 * @param len maximum number of bytes to read. 331 * @return the number of bytes read, or <code>-1</code> if the end of 332 * the stream has been reached. 333 * @exception IOException if this input stream has been closed by 334 * invoking its {@link #close()} method, 335 * or an I/O error occurs. 336 */ 337 override int read(byte[] b, int off, int len) { 338 getBufIfOpen(); // Check for closed stream 339 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 340 throw new IndexOutOfBoundsException(); 341 } else if (len == 0) { 342 return 0; 343 } 344 345 int n = 0; 346 for (;;) { 347 int nread = read1(b, off + n, len - n); 348 if (nread <= 0) 349 return (n == 0) ? nread : n; 350 n += nread; 351 if (n >= len) 352 return n; 353 // if not closed but no bytes available, return 354 InputStream input = inputStream; 355 if (input !is null && input.available() <= 0) 356 return n; 357 } 358 } 359 360 /** 361 * See the general contract of the <code>skip</code> 362 * method of <code>InputStream</code>. 363 * 364 * @throws IOException if this input stream has been closed by 365 * invoking its {@link #close()} method, 366 * {@code inputStream.skip(n)} throws an IOException, 367 * or an I/O error occurs. 368 */ 369 override long skip(long n) { 370 getBufIfOpen(); // Check for closed stream 371 if (n <= 0) { 372 return 0; 373 } 374 long avail = count - pos; 375 376 if (avail <= 0) { 377 // If no mark position set then don't keep in buffer 378 if (markpos <0) 379 return getInIfOpen().skip(n); 380 381 // Fill in buffer to save bytes for reset 382 fill(); 383 avail = count - pos; 384 if (avail <= 0) 385 return 0; 386 } 387 388 long skipped = (avail < n) ? avail : n; 389 pos += skipped; 390 return skipped; 391 } 392 393 /** 394 * Returns an estimate of the number of bytes that can be read (or 395 * skipped over) from this input stream without blocking by the next 396 * invocation of a method for this input stream. The next invocation might be 397 * the same thread or another thread. A single read or skip of this 398 * many bytes will not block, but may read or skip fewer bytes. 399 * <p> 400 * This method returns the sum of the number of bytes remaining to be read in 401 * the buffer (<code>count - pos</code>) and the result of calling the 402 * {@link java.io.FilterInputStream#inputStream inputStream}.available(). 403 * 404 * @return an estimate of the number of bytes that can be read (or skipped 405 * over) from this input stream without blocking. 406 * @exception IOException if this input stream has been closed by 407 * invoking its {@link #close()} method, 408 * or an I/O error occurs. 409 */ 410 override int available() { 411 int n = count - pos; 412 int avail = getInIfOpen().available(); 413 return n > (int.max - avail) ? int.max : n + avail; 414 } 415 416 /** 417 * See the general contract of the <code>mark</code> 418 * method of <code>InputStream</code>. 419 * 420 * @param readlimit the maximum limit of bytes that can be read before 421 * the mark position becomes invalid. 422 * @see java.io.BufferedInputStream#reset() 423 */ 424 override void mark(int readlimit) { 425 marklimit = readlimit; 426 markpos = pos; 427 } 428 429 /** 430 * See the general contract of the <code>reset</code> 431 * method of <code>InputStream</code>. 432 * <p> 433 * If <code>markpos</code> is <code>-1</code> 434 * (no mark has been set or the mark has been 435 * invalidated), an <code>IOException</code> 436 * is thrown. Otherwise, <code>pos</code> is 437 * set equal to <code>markpos</code>. 438 * 439 * @exception IOException if this stream has not been marked or, 440 * if the mark has been invalidated, or the stream 441 * has been closed by invoking its {@link #close()} 442 * method, or an I/O error occurs. 443 * @see java.io.BufferedInputStream#mark(int) 444 */ 445 override void reset() { 446 getBufIfOpen(); // Cause exception if closed 447 if (markpos < 0) 448 throw new IOException("Resetting to invalid mark"); 449 pos = markpos; 450 } 451 452 /** 453 * Tests if this input stream supports the <code>mark</code> 454 * and <code>reset</code> methods. The <code>markSupported</code> 455 * method of <code>BufferedInputStream</code> returns 456 * <code>true</code>. 457 * 458 * @return a <code>bool</code> indicating if this stream type supports 459 * the <code>mark</code> and <code>reset</code> methods. 460 * @see java.io.InputStream#mark(int) 461 * @see java.io.InputStream#reset() 462 */ 463 override bool markSupported() { 464 return true; 465 } 466 467 /** 468 * Closes this input stream and releases any system resources 469 * associated with the stream. 470 * Once the stream has been closed, further read(), available(), reset(), 471 * or skip() invocations will throw an IOException. 472 * Closing a previously closed stream has no effect. 473 * 474 * @exception IOException if an I/O error occurs. 475 */ 476 override void close() { 477 byte[] buffer; 478 while ( (buffer = buf) !is null) { 479 // if (U.compareAndSetObject(this, BUF_OFFSET, buffer, null)) 480 if(this.buf is buffer) { 481 this.buf = null; 482 InputStream input = inputStream; 483 inputStream = null; 484 if (input !is null) 485 input.close(); 486 return; 487 } 488 // Else retry in case a new buf was CASed in fill() 489 } 490 } 491 }