1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.socket.Common; 13 14 import hunt.collection.ByteBuffer; 15 import hunt.event.EventLoop; 16 import hunt.Exceptions; 17 import hunt.Functions; 18 import hunt.logging; 19 20 import hunt.util.Common; 21 import hunt.Version; 22 23 import core.atomic; 24 import core.stdc.stdint; 25 26 import std.bitmanip; 27 import std.datetime; 28 import std.exception; 29 import std.functional; 30 import std.socket; 31 32 version (HAVE_IOCP) import SOCKETOPTIONS = core.sys.windows.winsock2; 33 34 version (Posix) import SOCKETOPTIONS = core.sys.posix.sys.socket; 35 36 alias ReadCallBack = void delegate(Object obj); 37 alias DataReceivedHandler = void delegate(const ubyte[] data); 38 alias DataWrittenHandler = void delegate(const ubyte[] data, size_t size); 39 alias AcceptHandler = void delegate(Socket socket); 40 41 alias ConnectionHandler = void delegate(bool isSucceeded); 42 43 // dfmt off 44 alias UDPReadCallBack = void delegate(in ubyte[] data, Address addr); 45 alias AcceptCallBack = void delegate(Selector loop, Socket socket) ; 46 // dfmt on 47 48 /** 49 */ 50 interface StreamWriteBuffer { 51 // todo Write Data; 52 const(ubyte)[] remaining(); 53 54 // add send offiset and return is empty 55 bool pop(size_t size); 56 57 // do send finish 58 void finish(); 59 60 StreamWriteBuffer next(); 61 void next(StreamWriteBuffer); 62 63 size_t capacity(); 64 } 65 66 67 /** 68 */ 69 interface Channel { 70 71 } 72 73 /** 74 http://tutorials.jenkov.com/java-nio/selectors.html 75 */ 76 abstract class Selector { 77 78 protected shared bool _running; 79 // protected shared bool _isOpen = true; 80 81 abstract bool register(AbstractChannel channel); 82 83 abstract bool reregister(AbstractChannel channel); 84 85 abstract bool deregister(AbstractChannel channel); 86 87 void stop() { 88 atomicStore(_running, false); 89 version (HUNT_DEBUG) trace("Selector stopped."); 90 } 91 92 abstract void dispose(); 93 94 /** 95 * Tells whether or not this selector is open. 96 * 97 * @return <tt>true</tt> if, and only if, this selector is open 98 */ 99 bool isOpen() { 100 return atomicLoad(_running); 101 } 102 103 alias isRuning = isOpen; 104 105 /** 106 timeout: in millisecond 107 */ 108 protected void onLoop(scope void delegate() weakup, long timeout = -1) { 109 _running = true; 110 do { 111 // version (HUNT_DEBUG) trace("Selector rolled once."); 112 weakup(); 113 lockAndDoSelect(timeout); 114 } while (_running); 115 dispose(); 116 } 117 118 /** 119 timeout: in millisecond 120 */ 121 int select(long timeout) { 122 if (timeout < 0) 123 throw new IllegalArgumentException("Negative timeout"); 124 return lockAndDoSelect((timeout == 0) ? -1 : timeout); 125 } 126 127 int select() { 128 return select(0); 129 } 130 131 int selectNow() { 132 return lockAndDoSelect(0); 133 } 134 135 protected abstract int doSelect(long timeout); 136 137 private int lockAndDoSelect(long timeout) { 138 synchronized (this) { 139 // if (!isOpen()) 140 // throw new ClosedSelectorException(); 141 // synchronized (publicKeys) { 142 // synchronized (publicSelectedKeys) { 143 // return doSelect(timeout); 144 // } 145 // } 146 return doSelect(timeout); 147 } 148 } 149 } 150 151 /** 152 */ 153 abstract class AbstractChannel : Channel { 154 socket_t handle = socket_t.init; 155 ErrorEventHandler errorHandler; 156 157 protected bool _isRegistered = false; 158 protected bool _isClosing = false; 159 protected bool _isClosed = false; 160 161 this(Selector loop, ChannelType type) { 162 this._inLoop = loop; 163 _type = type; 164 _flags = BitArray([false, false, false, false, false, false, false, 165 false, false, false, false, false, false, false, false, false]); 166 } 167 168 /** 169 */ 170 bool isRegistered() { 171 return _isRegistered; 172 } 173 174 /** 175 */ 176 bool isClosed() { 177 return _isClosing || _isClosed; 178 } 179 180 protected void onClose() { 181 _isRegistered = false; 182 _isClosed = true; 183 _isClosing = false; 184 version (HAVE_IOCP) { 185 } 186 else { 187 _inLoop.deregister(this); 188 } 189 clear(); 190 191 version (HUNT_DEBUG) tracef("closed [fd=%d]...", this.handle); 192 } 193 194 protected void errorOccurred(string msg) { 195 debug warningf("isRegistered: %s, isClosed: %s, msg=%s", _isRegistered, _isClosed, msg); 196 if (errorHandler !is null) { 197 errorHandler(msg); 198 } 199 } 200 201 void onRead() { 202 assert(false, "not implemented"); 203 } 204 205 void onWrite() { 206 assert(false, "not implemented"); 207 } 208 209 final bool hasFlag(ChannelFlag index) { 210 return _flags[index]; 211 } 212 213 @property ChannelType type() { 214 return _type; 215 } 216 217 @property Selector eventLoop() { 218 return _inLoop; 219 } 220 221 void close() { 222 if (!_isClosed) { 223 version (HUNT_DEBUG) 224 tracef("channel[fd=%d] closing...", this.handle); 225 onClose(); 226 version (HUNT_DEBUG) 227 tracef("channel[fd=%d] closed...", this.handle); 228 } 229 else { 230 debug warningf("The channel[fd=%d] has already been closed", this.handle); 231 } 232 } 233 234 void setNext(AbstractChannel next) { 235 if (next is this) 236 return; // Can't set to self 237 next._next = _next; 238 next._priv = this; 239 if (_next) 240 _next._priv = next; 241 this._next = next; 242 } 243 244 void clear() { 245 if (_priv) 246 _priv._next = _next; 247 if (_next) 248 _next._priv = _priv; 249 _next = null; 250 _priv = null; 251 } 252 253 mixin OverrideErro; 254 255 protected: 256 final void setFlag(ChannelFlag index, bool enable) { 257 _flags[index] = enable; 258 } 259 260 Selector _inLoop; 261 262 private: 263 BitArray _flags; 264 ChannelType _type; 265 266 AbstractChannel _priv; 267 AbstractChannel _next; 268 } 269 270 /** 271 */ 272 class EventChannel : AbstractChannel { 273 this(Selector loop) { 274 super(loop, ChannelType.Event); 275 } 276 277 void call() { 278 assert(false); 279 } 280 281 // override void close() { 282 // if(_isClosing) 283 // return; 284 // _isClosing = true; 285 // version (HUNT_DEBUG) tracef("closing [fd=%d]...", this.handle); 286 287 // if(isBusy) { 288 // import std.parallelism; 289 // version (HUNT_DEBUG) warning("Close operation delayed"); 290 // auto theTask = task(() { 291 // while(isBusy) { 292 // version (HUNT_DEBUG) infof("waitting for idle [fd=%d]...", this.handle); 293 // // Thread.sleep(20.msecs); 294 // } 295 // super.close(); 296 // }); 297 // taskPool.put(theTask); 298 // } else { 299 // super.close(); 300 // } 301 // } 302 } 303 304 mixin template OverrideErro() { 305 bool isError() { 306 return _error; 307 } 308 309 string erroString() { 310 return _erroString; 311 } 312 313 void clearError() { 314 _error = false; 315 _erroString = ""; 316 } 317 318 bool _error = false; 319 string _erroString; 320 } 321 322 enum ChannelType : ubyte { 323 Accept = 0, 324 TCP, 325 UDP, 326 Timer, 327 Event, 328 File, 329 None 330 } 331 332 enum ChannelFlag : ushort { 333 None = 0, 334 Read, 335 Write, 336 337 OneShot = 8, 338 ETMode = 16 339 } 340 341 final class UdpDataObject { 342 Address addr; 343 ubyte[] data; 344 } 345 346 final class BaseTypeObject(T) { 347 T data; 348 } 349 350 class LoopException : Exception { 351 mixin basicExceptionCtors; 352 } 353 354 355 /** 356 */ 357 interface Stream { 358 359 } 360 361 /** 362 */ 363 abstract class AbstractSocketChannel : AbstractChannel { 364 365 protected bool _isWritting = false; 366 367 this(Selector loop, ChannelType type) { 368 super(loop, type); 369 } 370 371 // Busy with reading or writting 372 protected bool isBusy() { return false; } 373 374 protected @property void socket(Socket s) { 375 this.handle = s.handle(); 376 version (Posix) { 377 s.blocking = false; 378 } 379 _socket = s; 380 version (HUNT_DEBUG) 381 infof("new socket: fd=%d", this.handle); 382 } 383 384 protected @property Socket socket() { 385 return _socket; 386 } 387 388 protected Socket _socket; 389 390 override void close() { 391 if(_isClosing) { 392 debug warning("already closed [fd=%d]", this.handle); 393 return; 394 } 395 _isClosing = true; 396 version (HUNT_DEBUG) tracef("closing [fd=%d]...", this.handle); 397 398 if(isBusy) { 399 import std.parallelism; 400 version (HUNT_DEBUG) warning("Close operation delayed"); 401 auto theTask = task(() { 402 while(isBusy) { 403 version (HUNT_DEBUG) infof("waitting for idle [fd=%d]...", this.handle); 404 // Thread.sleep(20.msecs); 405 } 406 super.close(); 407 }); 408 taskPool.put(theTask); 409 } else { 410 super.close(); 411 } 412 } 413 414 /// Get a socket option. 415 /// Returns: The number of bytes written to $(D result). 416 //returns the length, in bytes, of the actual result - very different from getsockopt() 417 pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option, void[] result) @trusted { 418 return this._socket.getOption(level, option, result); 419 } 420 421 /// Common case of getting integer and boolean options. 422 pragma(inline) final int getOption(SocketOptionLevel level, 423 SocketOption option, ref int32_t result) @trusted { 424 return this._socket.getOption(level, option, result); 425 } 426 427 /// Get the linger option. 428 pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option, 429 ref Linger result) @trusted { 430 return this._socket.getOption(level, option, result); 431 } 432 433 /// Get a timeout (duration) option. 434 pragma(inline) final void getOption(SocketOptionLevel level, 435 SocketOption option, ref Duration result) @trusted { 436 this._socket.getOption(level, option, result); 437 } 438 439 /// Set a socket option. 440 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, void[] value) @trusted { 441 this._socket.setOption(forward!(level, option, value)); 442 } 443 444 /// Common case for setting integer and boolean options. 445 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, int32_t value) @trusted { 446 this._socket.setOption(forward!(level, option, value)); 447 } 448 449 /// Set the linger option. 450 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, Linger value) @trusted { 451 this._socket.setOption(forward!(level, option, value)); 452 } 453 454 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, Duration value) @trusted { 455 this._socket.setOption(forward!(level, option, value)); 456 } 457 458 final @property @trusted Address remoteAddress() { 459 return _remoteAddress; 460 } 461 462 protected Address _remoteAddress; 463 464 final @property @trusted Address localAddress() { 465 return _localAddress; 466 } 467 468 protected Address _localAddress; 469 470 version (HAVE_IOCP) { 471 void setRead(size_t bytes) { 472 readLen = bytes; 473 } 474 475 protected size_t readLen; 476 } 477 478 void start(); 479 480 void onWriteDone() { 481 assert(false, "unimplemented"); 482 } 483 484 } 485 486 /** 487 */ 488 class SocketStreamBuffer : StreamWriteBuffer { 489 490 this(const(ubyte)[] data, DataWrittenHandler handler = null) { 491 _buffer = data; 492 _pos = 0; 493 _sentHandler = handler; 494 } 495 496 const(ubyte)[] remaining() { 497 return _buffer[_pos .. $]; 498 } 499 500 bool pop(size_t size) { 501 _pos += size; 502 if (_pos >= _buffer.length) 503 return true; 504 else 505 return false; 506 } 507 508 void finish() { 509 if (_sentHandler) 510 _sentHandler(_buffer, _pos); 511 _sentHandler = null; 512 _buffer = null; 513 } 514 515 StreamWriteBuffer next() { 516 return _next; 517 } 518 519 void next(StreamWriteBuffer v) { 520 _next = v; 521 } 522 523 size_t capacity() { 524 return _buffer.length; 525 } 526 527 private: 528 StreamWriteBuffer _next; 529 size_t _pos = 0; 530 const(ubyte)[] _buffer; 531 DataWrittenHandler _sentHandler; 532 } 533 534 /** 535 */ 536 struct WriteBufferQueue { 537 StreamWriteBuffer front() nothrow @safe { 538 return _first; 539 } 540 541 bool empty() nothrow @safe { 542 return _first is null; 543 } 544 545 void clear() { 546 StreamWriteBuffer current = _first; 547 while (current !is null) { 548 _first = current.next; 549 current.next = null; 550 current = _first; 551 } 552 553 _first = null; 554 _last = null; 555 } 556 557 void enQueue(StreamWriteBuffer wsite) { 558 assert(wsite); 559 if (_last) { 560 _last.next = wsite; 561 } else { 562 _first = wsite; 563 } 564 wsite.next = null; 565 _last = wsite; 566 } 567 568 StreamWriteBuffer deQueue() { 569 // assert(_first && _last); 570 StreamWriteBuffer wsite = _first; 571 if (_first !is null) 572 _first = _first.next; 573 574 if (_first is null) 575 _last = null; 576 577 return wsite; 578 } 579 580 private: 581 StreamWriteBuffer _last = null; 582 StreamWriteBuffer _first = null; 583 } 584 585 586 /** 587 */ 588 Address createAddress(AddressFamily family = AddressFamily.INET, 589 ushort port=InternetAddress.PORT_ANY) { 590 if (family == AddressFamily.INET6) { 591 // addr = new Internet6Address(port); // bug on windows 592 return new Internet6Address("::", port); 593 } 594 else 595 return new InternetAddress(port); 596 } 597 598 599 // dfmt off 600 version(linux): 601 // dfmt on 602 static if (CompilerHelper.isLessThan(2078)) { 603 version (X86) { 604 enum SO_REUSEPORT = 15; 605 } 606 else version (X86_64) { 607 enum SO_REUSEPORT = 15; 608 } 609 else version (MIPS32) { 610 enum SO_REUSEPORT = 0x0200; 611 } 612 else version (MIPS64) { 613 enum SO_REUSEPORT = 0x0200; 614 } 615 else version (PPC) { 616 enum SO_REUSEPORT = 15; 617 } 618 else version (PPC64) { 619 enum SO_REUSEPORT = 15; 620 } 621 else version (ARM) { 622 enum SO_REUSEPORT = 15; 623 } 624 }