1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| local new_tab = require "table.new" local semaphore = require "ngx.semaphore"
local ngx = ngx
local unpack = unpack
local _M = {}
local chan = { new = function(self) local chan_attrs = { _buffer = nil, _has_value = false, _write_sema = semaphore.new(), _waiting_producer = 0, _read_sema = semaphore.new(), _waiting_consumer = 0, } return setmetatable(chan_attrs, { __index = self }) end, send = function(self, value) if self._has_value == true then self._waiting_producer = self._waiting_producer + 1 while self._has_value == true do if self._waiting_consumer > 0 then self._read_sema:post() end self._write_sema:wait(1) end self._waiting_producer = self._waiting_producer - 1 end
self._buffer = value self._has_value = true
if self._waiting_consumer > 0 then self._read_sema:post() end end, receive = function(self) if self._has_value == false then self._waiting_consumer = self._waiting_consumer + 1 while self._has_value == false do if self._waiting_producer > 0 then self._write_sema:post() end self._read_sema:wait(1) end self._waiting_consumer = self._waiting_consumer - 1 end
local value = self._buffer self._has_value = false
if self._waiting_producer > 0 then self._write_sema:post() end return value end, }
local function worker(funcCh) while true do local v = funcCh:receive() if v == nil then return end v.valueCh:send(v.func(unpack(v.args, 1, #v.args))) end end
function _M:run(func, ...) local valueCh = chan:new() self.tasks:send({ func = func, args = {...}, valueCh = valueCh, }) return valueCh end
function _M:create(count) count = count or 10 local _obj = { pool_size = count, tasks = chan:new(), } _obj.pool = new_tab(32, 0) for i = 1, count, 1 do _obj.pool[i] = ngx.thread.spawn(worker, _obj.tasks) end
setmetatable(_obj, self) self.__index = self return _obj end
function _M:destroy() for i = 1, #self.pool, 1 do self.tasks:send(nil) end end
return _M
|