1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.EventLoop; 13 14 import hunt.io.socket.Common; 15 import hunt.event.selector; 16 import hunt.event.Task; 17 import hunt.logging; 18 19 import core.thread; 20 import std.parallelism; 21 22 /** 23 24 */ 25 final class EventLoop : AbstractSelector { 26 private long timeout = -1; // in millisecond 27 28 this() { 29 super(); 30 } 31 32 /** 33 timeout: in millisecond 34 */ 35 void run(long timeout = -1) { 36 this.timeout = timeout; 37 doRun(); 38 } 39 40 /** 41 timeout: in millisecond 42 */ 43 void runAsync(long timeout = -1) { 44 this.timeout = timeout; 45 version (HUNT_DEBUG) trace("runAsync ..."); 46 // BUG: Reported defects -@zxp at 12/3/2018, 8:17:58 PM 47 // The task may not be executed. 48 // auto runTask = task(&run, timeout); 49 // taskPool.put(runTask); // 50 Thread th = new Thread(&doRun); 51 th.start(); 52 } 53 54 private void doRun() { 55 if (_running) { 56 version (HUNT_DEBUG) warning("The current eventloop is running!"); 57 } else { 58 version (HUNT_DEBUG) trace("running eventloop..."); 59 _thread = Thread.getThis(); 60 onLoop(&onWeakUp, timeout); 61 } 62 } 63 64 override void stop() { 65 if(!_running) { 66 version (HUNT_DEBUG) trace("event loop has been stopped."); 67 return; 68 } 69 70 version (HUNT_DEBUG) trace("Stopping event loop..."); 71 if(isLoopThread()) { 72 auto stopTask = task(&stop); 73 taskPool.put(stopTask); 74 } else { 75 _thread = null; 76 super.stop(); 77 // dispose(); 78 } 79 } 80 81 // bool isRuning() { 82 // return (_thread !is null); 83 // } 84 85 bool isLoopThread() { 86 return _thread is Thread.getThis(); 87 } 88 89 EventLoop postTask(AbstractTask task) { 90 synchronized (this) { 91 _queue.enQueue(task); 92 } 93 return this; 94 } 95 96 static AbstractTask createTask(alias fun, Args...)(Args args) { 97 return newTask!(fun, Args)(args); 98 } 99 100 static AbstractTask createTask(F, Args...)(F delegateOrFp, Args args) 101 if (is(typeof(delegateOrFp(args)))) { 102 return newTask(F, Args)(delegateOrFp, args); 103 } 104 105 protected void onWeakUp() { 106 TaskQueue queue; 107 synchronized (this) { 108 queue = _queue; 109 _queue = TaskQueue(); 110 } 111 while (!queue.empty) { 112 auto task = queue.deQueue(); 113 task.job(); 114 } 115 } 116 117 private: 118 Thread _thread; 119 TaskQueue _queue; 120 }