diff --git a/lib/crowdsec.lua b/lib/crowdsec.lua index 7f73849..cf62d31 100644 --- a/lib/crowdsec.lua +++ b/lib/crowdsec.lua @@ -17,6 +17,8 @@ runtime.remediations["1"] = "ban" runtime.remediations["2"] = "captcha" +runtime.timer_started = false + local csmod = {} @@ -92,7 +94,7 @@ function csmod.validateCaptcha(g_captcha_res, remote_ip) end -function get_http_request(link) +local function get_http_request(link) local httpc = http.new() httpc:set_timeout(runtime.conf['REQUEST_TIMEOUT']) local res, err = httpc:request_uri(link, { @@ -107,7 +109,7 @@ function get_http_request(link) return res, err end -function parse_duration(duration) +local function parse_duration(duration) local match, err = ngx.re.match(duration, "^((?[0-9]+)h)?((?[0-9]+)m)?(?[0-9]+)") local ttl = 0 if not match then @@ -130,7 +132,7 @@ function parse_duration(duration) return ttl, nil end -function get_remediation_id(remediation) +local function get_remediation_id(remediation) for key, value in pairs(runtime.remediations) do if value == remediation then return tonumber(key) @@ -139,7 +141,7 @@ function get_remediation_id(remediation) return nil end -function item_to_string(item, scope) +local function item_to_string(item, scope) local ip, cidr, ip_version if scope:lower() == "ip" then ip = item @@ -169,39 +171,95 @@ function item_to_string(item, scope) return ip_version.."_"..ip_netmask.."_"..ip_network_address end -function stream_query() - -- As this function is running inside coroutine (with ngx.timer.every), +local function set_refreshing(value) + local succ, err, forcible = runtime.cache:set("refreshing", value) + if not succ then + error("Failed to set refreshing key in cache: "..err) + end + if forcible then + ngx.log(ngx.ERR, "Lua shared dict (crowdsec cache) is full, please increase dict size in config") + end +end + +local function stream_query(premature) + -- As this function is running inside coroutine (with ngx.timer.at), -- we need to raise error instead of returning them + + + ngx.log(ngx.DEBUG, "running timers: " .. tostring(ngx.timer.running_count()) .. " | pending timers: " .. tostring(ngx.timer.pending_count())) + + if premature then + ngx.log(ngx.DEBUG, "premature run of the timer, returning") + return + end + + local refreshing = runtime.cache:get("refreshing") + + if refreshing == true then + ngx.log(ngx.DEBUG, "another worker is refreshing the data, returning") + local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query) + if not ok then + error("Failed to create the timer: " .. (err or "unknown")) + end + return + end + + local last_refresh = runtime.cache:get("last_refresh") + if last_refresh ~= nil then + -- local last_refresh_time = tonumber(last_refresh) + local now = ngx.time() + if now - last_refresh < runtime.conf["UPDATE_FREQUENCY"] then + ngx.log(ngx.DEBUG, "last refresh was less than " .. runtime.conf["UPDATE_FREQUENCY"] .. " seconds ago, returning") + local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query) + if not ok then + error("Failed to create the timer: " .. (err or "unknown")) + end + return + end + end + + set_refreshing(true) + local is_startup = runtime.cache:get("startup") - ngx.log(ngx.DEBUG, "Stream Query from worker : " .. tostring(ngx.worker.id()) .. " with startup "..tostring(is_startup)) + ngx.log(ngx.DEBUG, "Stream Query from worker : " .. tostring(ngx.worker.id()) .. " with startup "..tostring(is_startup) .. " | premature: " .. tostring(premature)) local link = runtime.conf["API_URL"] .. "/v1/decisions/stream?startup=" .. tostring(is_startup) local res, err = get_http_request(link) if not res then - if ngx.timer.every == nil then - local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query) - if not ok then - error("Failed to create the timer: " .. (err or "unknown")) - end - end + local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query) + if not ok then + set_refreshing(false) + error("Failed to create the timer: " .. (err or "unknown")) + end + set_refreshing(false) error("request failed: ".. err) end + local succ, err, forcible = runtime.cache:set("last_refresh", ngx.time()) + if not succ then + error("Failed to set last_refresh key in cache: "..err) + end + if forcible then + ngx.log(ngx.ERR, "Lua shared dict (crowdsec cache) is full, please increase dict size in config") + end + local status = res.status local body = res.body + + ngx.log(ngx.DEBUG, "Response:" .. tostring(status) .. " | " .. tostring(body)) + if status~=200 then - if ngx.timer.every == nil then - local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query) - if not ok then - error("Failed to create the timer: " .. (err or "unknown")) - end + local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query) + if not ok then + set_refreshing(false) + error("Failed to create the timer: " .. (err or "unknown")) end + set_refreshing(false) error("HTTP error while request to Local API '" .. status .. "' with message (" .. tostring(body) .. ")") end local decisions = cjson.decode(body) -- process deleted decisions if type(decisions.deleted) == "table" then - if not is_startup then for i, decision in pairs(decisions.deleted) do if decision.type == "captcha" then runtime.cache:delete("captcha_" .. decision.value) @@ -210,7 +268,6 @@ function stream_query() runtime.cache:delete(key) ngx.log(ngx.DEBUG, "Deleting '" .. key .. "'") end - end end -- process new decisions @@ -247,17 +304,19 @@ function stream_query() ngx.log(ngx.ERR, "Lua shared dict (crowdsec cache) is full, please increase dict size in config") end - -- re-occuring timer if there is no timer.every available - if ngx.timer.every == nil then - local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query) - if not ok then - error("Failed to create the timer: " .. (err or "unknown")) - end + + local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query) + if not ok then + set_refreshing(false) + error("Failed to create the timer: " .. (err or "unknown")) end + + set_refreshing(false) + ngx.log(ngx.DEBUG, "end of stream_query") return nil end -function live_query(ip) +local function live_query(ip) local link = runtime.conf["API_URL"] .. "/v1/decisions?ip=" .. ip local res, err = get_http_request(link) if not res then @@ -311,30 +370,14 @@ end function csmod.SetupStream() -- if it stream mode and startup start timer - if runtime.cache:get("first_run") == true and runtime.conf["MODE"] == "stream" then + ngx.log(ngx.DEBUG, "timer started: " .. tostring(runtime.timer_started) .. " in worker " .. tostring(ngx.worker.id())) + if runtime.timer_started == false and runtime.conf["MODE"] == "stream" then local ok, err - if ngx.timer.every == nil then - ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query) - else - ok, err = ngx.timer.every(runtime.conf["UPDATE_FREQUENCY"], stream_query) - end + ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query) if not ok then - local succ, err, forcible = runtime.cache:set("first_run", true) - if not succ then - ngx.log(ngx.ERR, "failed to set startup key in cache: "..err) - end - if forcible then - ngx.log(ngx.ERR, "Lua shared dict (crowdsec cache) is full, please increase dict size in config") - end return true, nil, "Failed to create the timer: " .. (err or "unknown") end - local succ, err, forcible = runtime.cache:set("first_run", false) - if not succ then - ngx.log(ngx.ERR, "failed to set first_run key in cache: "..err) - end - if forcible then - ngx.log(ngx.ERR, "Lua shared dict (crowdsec cache) is full, please increase dict size in config") - end + runtime.timer_started = true ngx.log(ngx.DEBUG, "Timer launched") end end