1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.channel.Common; 13 14 import hunt.io.IoError; 15 import hunt.collection.ByteBuffer; 16 import hunt.concurrency.SimpleQueue; 17 import hunt.concurrency.TaskPool; 18 import hunt.Functions; 19 import hunt.system.Memory; 20 21 import core.atomic; 22 import std.socket; 23 24 25 alias DataReceivedHandler = void delegate(ByteBuffer buffer); 26 alias AcceptHandler = void delegate(Socket socket); 27 alias ErrorEventHandler = Action1!(IoError); 28 29 alias ConnectionHandler = void delegate(bool isSucceeded); 30 alias UdpDataHandler = void delegate(const(ubyte)[] data, Address addr); 31 32 @property TaskPool workerPool() @trusted { 33 import std.concurrency : initOnce; 34 35 __gshared TaskPool pool; 36 return initOnce!pool({ 37 auto p = new TaskPool(defaultPoolThreads); 38 p.isDaemon = true; 39 return p; 40 }()); 41 } 42 43 // __gshared bool useWorkerThread = false; 44 45 private shared uint _defaultPoolThreads = 0; 46 47 /** 48 These properties get and set the number of worker threads in the `TaskPool` 49 instance returned by `taskPool`. The default value is `totalCPUs` - 1. 50 Calling the setter after the first call to `taskPool` does not changes 51 number of worker threads in the instance returned by `taskPool`. 52 */ 53 @property uint defaultPoolThreads() @trusted { 54 const local = atomicLoad(_defaultPoolThreads); 55 return local < uint.max ? local : totalCPUs - 1; 56 } 57 58 /// Ditto 59 @property void defaultPoolThreads(uint newVal) @trusted { 60 atomicStore(_defaultPoolThreads, newVal); 61 } 62 63 64 /** 65 */ 66 interface Channel { 67 68 } 69 70 71 72 enum ChannelType : ubyte { 73 Accept = 0, 74 TCP, 75 UDP, 76 Timer, 77 Event, 78 File, 79 None 80 } 81 82 enum ChannelFlag : ushort { 83 None = 0, 84 Read, 85 Write, 86 87 OneShot = 8, 88 ETMode = 16 89 } 90 91 final class UdpDataObject { 92 Address addr; 93 ubyte[] data; 94 } 95 96 final class BaseTypeObject(T) { 97 T data; 98 } 99 100 101 102 version (HUNT_IO_WORKERPOOL) { 103 alias WritingBufferQueue = MagedNonBlockingQueue!ByteBuffer; 104 } else { 105 alias WritingBufferQueue = SimpleQueue!ByteBuffer; 106 } 107 108 // alias WritingBufferQueue = MagedNonBlockingQueue!ByteBuffer; 109 // alias WritingBufferQueue = SimpleQueue!ByteBuffer; 110 // alias WritingBufferQueue = MagedBlockingQueue!ByteBuffer; 111 112 /** 113 */ 114 Address createAddress(AddressFamily family = AddressFamily.INET, 115 ushort port = InternetAddress.PORT_ANY) { 116 if (family == AddressFamily.INET6) { 117 // addr = new Internet6Address(port); // bug on windows 118 return new Internet6Address("::", port); 119 } else 120 return new InternetAddress(port); 121 }