阅读 89

kafka-log日志代码解析

本文是针对kong-plugin-kafka-log的代码进行简要解析,由于kong-plugin-kafka-log适配的kong的版本较老,我对源代码做了更新,适配kong的2.0.x版本,参考(github.com/tzssangglas…)

handler.lua

  1. 本地变量
local mt_cache = { __mode = "k" }
local producers_cache = setmetatable({}, mt_cache)
复制代码

setmetatable背景解析:

元表(metatable)是 Lua 中独有的概念,表现行为类似于操作符重载,比如我们可以重载 add,来计算两个 Lua 数组的并集;或者重载 tostring,来定义转换为字符串的函数。 而 Lua 提供了两个处理元表的函数:

  • 第一个是setmetatable(table, metatable), 用于为一个 table 设置元表;
  • 第二个是getmetatable(table),用于获取 table 的元表。

弱表(weak table),它是 Lua 中很独特的一个概念,和垃圾回收相关。 当一个 table 的元表中存在 mode 字段时,这个 table 就是弱表(weak table)了。

  • 如果 mode 的值是 k,那就意味着这个 table 的 键 是弱引用。
  • 如果 __mode 的值是 v,那就意味着这个 table 的 值 是弱引用。
  • 当然,你也可以设置为 kv,表明这个表的键和值都是弱引用。

这三者中的任意一种弱表,只要它的 键 或者 值 被回收了,那么对应的整个键值 对象都会被回收。 综上,这两行代码的意思是:mt_cache重载了producers_cache的对象回收策略,key是弱引用,只要key被回收了,producers_cache中key和对应的value整个对象都会被回收。

  1. local function log函数
if premature then
    return
end
复制代码

这段代码是计时器的回调。premature是个标记,指示计时器是否过早运行,仅在Nginx worker退出时才会发生(基本上说“我不执行此计时器,但由于关机/重装而取消了它”)

参考(github.com/Kong/kong/i…)

  1. local function log函数 原来的代码,本次改动主要是针对cache_key的
--- Computes a cache key for a given configuration.
local function cache_key(conf)
  -- here we rely on validation logic in schema that automatically assigns a unique id
  -- on every configuartion update
  return conf.uuid
end

--- Publishes a message to Kafka.
-- Must run in the context of `ngx.timer.at`.
local function log(premature, conf, message)
  if premature then
    return
  end

  local cache_key = cache_key(conf)
  if not cache_key then
    ngx.log(ngx.ERR, "[kafka-log] cannot log a given request because configuration has no uuid")
    return
  end

  local producer = producers_cache[cache_key]
  if not producer then
    kong.log.notice("creating a new Kafka Producer for cache key: ", cache_key)

    local err
    producer, err = producers.new(conf)
    if not producer then
      ngx.log(ngx.ERR, "[kafka-log] failed to create a Kafka Producer for a given configuration: ", err)
      return
    end

    producers_cache[cache_key] = producer
  end

  local ok, err = producer:send(conf.topic, nil, cjson_encode(message))
  if not ok then
    ngx.log(ngx.ERR, "[kafka-log] failed to send a message on topic ", conf.topic, ": ", err)
    return
  end
end
复制代码

kafka-log插件原来的逻辑是:

  1. 新增/更新插件时,随机生成uuid,用uuid作为producers_cache的key,value是根据当前配置新建的producer
  2. 由于producers_cache是一个key为弱引用的表,因此每次更新插件后,uuid更新,producers_cache中旧producer会被GC,然后用更新后的uuid作为key,根据更新后的配置新建producer作为value,放入producers_cache中;
  3. 这样做的用处是把插件的最新配置同步到producers_cache中,并进行日志推送,保证了配置与producers_cache同步,避免了每次推送日志都生成producer的开销;

理顺了这层逻辑之后,进行改造就比较好下手了,同样应该在新增或者更新插件的时机来更新uuid;kafka-log刚出来的时候,kong的版本还是0.1.x版本,那时候可能是可以手动配置插件中未声明的属性(uuid)的,所以作者这样写:

--- (Re)assigns a unique id on every configuration update.
-- since `uuid` is not a part of the `fields`, clients won't be able to change it
local function regenerate_uuid(schema, plugin_t, dao, is_updating)
  plugin_t.uuid = utils.uuid()
  return true
end
复制代码

即uuid不属于插件配置中已声明的属性,所以不需要用户关心,在self_check的时机去给插件配置新增一个uuid属性。这样用户无感知,但是每次更新的时候,执行self_check悄悄更新uuid属性,完成producers_cache更新。 在kong的2.0.x版本中,self_check被删除,但是有entity_check属性,我修改如下:

entity_checks = {
        { custom_entity_check = {
            field_sources = { "config" },
            fn = function(entity)
                local config = entity.config
                 ……
                --更新配置的时候同时更新uuid属性
                config.uuid = utils.uuid()
                return true
            end
        } },
    },
复制代码

剩下的代码比较好理解 producerproducers.new(conf)构建出来的对象 producers来自于插件中另一个代码 local producers = require "kong.plugins.kafka-log.producers" 拿到缓存中的producer,执行send函数,执行失败则记录本地日志。

  1. function KafkaLogHandler:log
function KafkaLogHandler:log(conf, other)
    KafkaLogHandler.super.log(self)
    local message = basic_serializer.serialize(ngx)
    local ok, err = ngx.timer.at(0, log, conf, message)
    if not ok then
        ngx.log(ngx.ERR, "[kafka-log] failed to create timer: ", err)
    end
end
复制代码

这是kong提供的插件执行声明周期中的一个阶段,log阶段,在请求接收到来自upstream响应之后,返回给下游客户端之前,这个阶段执行。 其中,kong推送出去的日志来源是 basic_serializer.serialize(ngx),即序列化后的当前请求在nginx中的上下文,后面会转成json格式。 触发local function log函数执行的代码是

ngx.timer.at(0, log, conf, message)
复制代码

ngx.timer.at背景解析:

在 OpenResty 中,我们有时候需要在后台定期地执行某些任务,比如同步数据、清理日志等。如果让你来设计,你会怎么做呢?最容易想到的方法,便是对外提供一个 API 接口,在接口中完成这些任务;然后用系统的 crontab 定时调用 curl,来访问这个接口,进而曲线地实现这个需求。不过,这样一来不仅会有割裂感,也会给运维带来更高的复杂度。所以, OpenResty 提供了 ngx.timer 来解决这类需求。你可以把ngx.timer ,看作是 OpenResty 模拟的客户端请求,用以触发对应的回调函数。其实,OpenResty 的定时任务可以分为下面两种:

  • ngx.timer.at,用来执行一次性的定时任务;
  • ngx.time.every,用来执行固定周期的定时任务。 (以上引用来自温铭)

为什么要用ngx.timer.at来执行local function log函数呢? 因为 cosocket API 在 set_by_lua, log_by_lua, header_filter_by_lua* 和 body_filter_by_lua* 中是无法使用的。而在 init_by_lua*init_worker_by_lua* 中暂时也不能用。 而local function log函数就是log_by_lua*阶段,无法直接使用cosocket API。而用ngx.timer.at(0, log, conf, message)的方式可以绕过这种限制。这种绕过方式也是OpenResty应用开发中类似case的主流方式。

producers.lua

  1. create_producer函数
--- Creates a new Kafka Producer.
local function create_producer(conf)
    ……
end
return { new = create_producer }
复制代码

先从最下面看 return { new = create_producer } 这行代码呼应了handler.lua中的 producers.new(conf),相当于调用producers代码中的create_producer()函数;

  1. create_producer函数
local function create_producer(conf)
  local broker_list = {}
  for idx, value in ipairs(conf.bootstrap_servers) do
    local server = types.bootstrap_server(value)
    if not server then
      return nil, "invalid bootstrap server value: " .. value
    end
    broker_list[idx] = server
  end
复制代码

先循环校验插件配置中的bootstrap_servers参数合法性,即kafka的broker的ip+port,这是kafka的推送消息的端口。

  1. producer_config
local producer_config = {
    -- settings affecting all Kafka APIs (including Metadata API, Produce API, etc)
    socket_timeout = conf.timeout,
    keepalive_timeout = conf.keepalive,
    -- settings specific to Kafka Produce API
    required_acks = conf.producer_request_acks,
    request_timeout = conf.producer_request_timeout,
    batch_num = conf.producer_request_limits_messages_per_request,
    batch_size = conf.producer_request_limits_bytes_per_request,
    max_retry = conf.producer_request_retries_max_attempts,
    retry_backoff = conf.producer_request_retries_backoff_timeout,
    producer_type = conf.producer_async and "async" or "sync",
    flush_time = conf.producer_async_flush_timeout,
    max_buffering = conf.producer_async_buffering_limits_messages_in_memory,
  }
  local cluster_name = conf.uuid
  return kafka_producer:new(broker_list, producer_config, cluster_name)
复制代码

producer_config是一个table,在这个table中设置插件配置里面的各种tcp连接和kafka相关的配置。

local cluster_name = conf.uuid
return kafka_producer:new(broker_list, producer_config, cluster_name)
复制代码

配置cluster_name集群名,这里又使用了conf.uuid,后续可能需要优化,找到提取conf的唯一标识的方法,代替uuid。 调用lua-resty-kafka库中的resty.kafka.producer对象,执行真正的推送消息到kafka的底层方法。

schema.lua

local types = require "kong.plugins.kafka-log.types"
local utils = require "kong.tools.utils"
--- Validates value of `bootstrap_servers` field.
local function check_bootstrap_servers(values)
  if values and 0 < #values then
    for _, value in ipairs(values) do
      local server = types.bootstrap_server(value)
      if not server then
        return false, "invalid bootstrap server value: " .. value
      end
    end
    return true
  end
  return false, "bootstrap_servers is required"
end
--- (Re)assigns a unique id on every configuration update.
-- since `uuid` is not a part of the `fields`, clients won't be able to change it
local function regenerate_uuid(schema, plugin_t, dao, is_updating)
  plugin_t.uuid = utils.uuid()
  return true
end
return {
  fields = {
    bootstrap_servers = { type = "array", required = true, func = check_bootstrap_servers },
    topic = { type = "string", required = true },
    timeout = { type = "number", default = 10000 },
    keepalive = { type = "number", default = 60000 },
    producer_request_acks = { type = "number", default = 1, enum = { -1, 0, 1 } },
    producer_request_timeout = { type = "number", default = 2000 },
    producer_request_limits_messages_per_request = { type = "number", default = 200 },
    producer_request_limits_bytes_per_request = { type = "number", default = 1048576 },
    producer_request_retries_max_attempts = { type = "number", default = 10 },
    producer_request_retries_backoff_timeout = { type = "number", default = 100 },
    producer_async = { type = "boolean", default = true },
    producer_async_flush_timeout = { type = "number", default = 1000 },
    producer_async_buffering_limits_messages_in_memory = { type = "number", default = 50000 },
  },
  self_check = regenerate_uuid,
}
复制代码

这是插件的配置页面相关的代码,其中

local function regenerate_uuid(schema, plugin_t, dao, is_updating)
  plugin_t.uuid = utils.uuid()
  return true
end
复制代码

即是上面conf.uuid相关问题的代码,self_check属性在kong的新版本中被删除了,替换为entity_checks,因此将不会执行regenerate_uuid函数,所以解决conf.uuid的问题最好是在这里入手。

types.lua

--- Parses `host:port` string into a `{host: ..., port: ...}` table.
function _M.bootstrap_server(string)
  local m = re_match(string, bootstrap_server_regex, "jo")
  if not m then
    return nil, "invalid bootstrap server value: " .. string
  end
  return { host = m[1], port = m[2] }
end
return _M
复制代码

这个就是前面producers.lua中调用的types.bootstrap_server(value),用于校验参数配置的bootstrap_server是否是合法的ip+port属性。