New approach for handling stream update (#36)

This commit is contained in:
blotus 2022-11-25 14:02:56 +01:00 committed by GitHub
parent eb5de9c364
commit cad85ae199
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -17,6 +17,8 @@ runtime.remediations["1"] = "ban"
runtime.remediations["2"] = "captcha"
runtime.timer_started = false
local csmod = {}
@ -92,7 +94,7 @@ function csmod.validateCaptcha(g_captcha_res, remote_ip)
end
function get_http_request(link)
local function get_http_request(link)
local httpc = http.new()
httpc:set_timeout(runtime.conf['REQUEST_TIMEOUT'])
local res, err = httpc:request_uri(link, {
@ -107,7 +109,7 @@ function get_http_request(link)
return res, err
end
function parse_duration(duration)
local function parse_duration(duration)
local match, err = ngx.re.match(duration, "^((?<hours>[0-9]+)h)?((?<minutes>[0-9]+)m)?(?<seconds>[0-9]+)")
local ttl = 0
if not match then
@ -130,7 +132,7 @@ function parse_duration(duration)
return ttl, nil
end
function get_remediation_id(remediation)
local function get_remediation_id(remediation)
for key, value in pairs(runtime.remediations) do
if value == remediation then
return tonumber(key)
@ -139,7 +141,7 @@ function get_remediation_id(remediation)
return nil
end
function item_to_string(item, scope)
local function item_to_string(item, scope)
local ip, cidr, ip_version
if scope:lower() == "ip" then
ip = item
@ -169,39 +171,95 @@ function item_to_string(item, scope)
return ip_version.."_"..ip_netmask.."_"..ip_network_address
end
function stream_query()
-- As this function is running inside coroutine (with ngx.timer.every),
local function set_refreshing(value)
local succ, err, forcible = runtime.cache:set("refreshing", value)
if not succ then
error("Failed to set refreshing key in cache: "..err)
end
if forcible then
ngx.log(ngx.ERR, "Lua shared dict (crowdsec cache) is full, please increase dict size in config")
end
end
local function stream_query(premature)
-- As this function is running inside coroutine (with ngx.timer.at),
-- we need to raise error instead of returning them
local is_startup = runtime.cache:get("startup")
ngx.log(ngx.DEBUG, "Stream Query from worker : " .. tostring(ngx.worker.id()) .. " with startup "..tostring(is_startup))
local link = runtime.conf["API_URL"] .. "/v1/decisions/stream?startup=" .. tostring(is_startup)
local res, err = get_http_request(link)
if not res then
if ngx.timer.every == nil then
ngx.log(ngx.DEBUG, "running timers: " .. tostring(ngx.timer.running_count()) .. " | pending timers: " .. tostring(ngx.timer.pending_count()))
if premature then
ngx.log(ngx.DEBUG, "premature run of the timer, returning")
return
end
local refreshing = runtime.cache:get("refreshing")
if refreshing == true then
ngx.log(ngx.DEBUG, "another worker is refreshing the data, returning")
local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query)
if not ok then
error("Failed to create the timer: " .. (err or "unknown"))
end
return
end
local last_refresh = runtime.cache:get("last_refresh")
if last_refresh ~= nil then
-- local last_refresh_time = tonumber(last_refresh)
local now = ngx.time()
if now - last_refresh < runtime.conf["UPDATE_FREQUENCY"] then
ngx.log(ngx.DEBUG, "last refresh was less than " .. runtime.conf["UPDATE_FREQUENCY"] .. " seconds ago, returning")
local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query)
if not ok then
error("Failed to create the timer: " .. (err or "unknown"))
end
return
end
end
set_refreshing(true)
local is_startup = runtime.cache:get("startup")
ngx.log(ngx.DEBUG, "Stream Query from worker : " .. tostring(ngx.worker.id()) .. " with startup "..tostring(is_startup) .. " | premature: " .. tostring(premature))
local link = runtime.conf["API_URL"] .. "/v1/decisions/stream?startup=" .. tostring(is_startup)
local res, err = get_http_request(link)
if not res then
local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query)
if not ok then
set_refreshing(false)
error("Failed to create the timer: " .. (err or "unknown"))
end
set_refreshing(false)
error("request failed: ".. err)
end
local succ, err, forcible = runtime.cache:set("last_refresh", ngx.time())
if not succ then
error("Failed to set last_refresh key in cache: "..err)
end
if forcible then
ngx.log(ngx.ERR, "Lua shared dict (crowdsec cache) is full, please increase dict size in config")
end
local status = res.status
local body = res.body
ngx.log(ngx.DEBUG, "Response:" .. tostring(status) .. " | " .. tostring(body))
if status~=200 then
if ngx.timer.every == nil then
local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query)
if not ok then
set_refreshing(false)
error("Failed to create the timer: " .. (err or "unknown"))
end
end
set_refreshing(false)
error("HTTP error while request to Local API '" .. status .. "' with message (" .. tostring(body) .. ")")
end
local decisions = cjson.decode(body)
-- process deleted decisions
if type(decisions.deleted) == "table" then
if not is_startup then
for i, decision in pairs(decisions.deleted) do
if decision.type == "captcha" then
runtime.cache:delete("captcha_" .. decision.value)
@ -211,7 +269,6 @@ function stream_query()
ngx.log(ngx.DEBUG, "Deleting '" .. key .. "'")
end
end
end
-- process new decisions
if type(decisions.new) == "table" then
@ -247,17 +304,19 @@ function stream_query()
ngx.log(ngx.ERR, "Lua shared dict (crowdsec cache) is full, please increase dict size in config")
end
-- re-occuring timer if there is no timer.every available
if ngx.timer.every == nil then
local ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query)
if not ok then
set_refreshing(false)
error("Failed to create the timer: " .. (err or "unknown"))
end
end
set_refreshing(false)
ngx.log(ngx.DEBUG, "end of stream_query")
return nil
end
function live_query(ip)
local function live_query(ip)
local link = runtime.conf["API_URL"] .. "/v1/decisions?ip=" .. ip
local res, err = get_http_request(link)
if not res then
@ -311,30 +370,14 @@ end
function csmod.SetupStream()
-- if it stream mode and startup start timer
if runtime.cache:get("first_run") == true and runtime.conf["MODE"] == "stream" then
ngx.log(ngx.DEBUG, "timer started: " .. tostring(runtime.timer_started) .. " in worker " .. tostring(ngx.worker.id()))
if runtime.timer_started == false and runtime.conf["MODE"] == "stream" then
local ok, err
if ngx.timer.every == nil then
ok, err = ngx.timer.at(runtime.conf["UPDATE_FREQUENCY"], stream_query)
else
ok, err = ngx.timer.every(runtime.conf["UPDATE_FREQUENCY"], stream_query)
end
if not ok then
local succ, err, forcible = runtime.cache:set("first_run", true)
if not succ then
ngx.log(ngx.ERR, "failed to set startup key in cache: "..err)
end
if forcible then
ngx.log(ngx.ERR, "Lua shared dict (crowdsec cache) is full, please increase dict size in config")
end
return true, nil, "Failed to create the timer: " .. (err or "unknown")
end
local succ, err, forcible = runtime.cache:set("first_run", false)
if not succ then
ngx.log(ngx.ERR, "failed to set first_run key in cache: "..err)
end
if forcible then
ngx.log(ngx.ERR, "Lua shared dict (crowdsec cache) is full, please increase dict size in config")
end
runtime.timer_started = true
ngx.log(ngx.DEBUG, "Timer launched")
end
end