Hacking Limbo

Reading / Coding / Hacking

用 Heka 实现简易的日志收集

帮朋友的一个小项目搭建日志收集系统,由于资源有限没法用 ELK 全家桶,就入了 Heka 的坑。

设计

Heka 同时负责 producer 和 consumer 的角色,通过配置来区分:

  1. 每台机器上有一个 Heka Agent 实例,以 UDP Input (AppLogsInput) 和 File Polling 两种途径收集本机的日志。其中 AppLogsInput 使用统一的 JSON schema. File Polling 主要针对 Nginx access logs 的收集。

  2. Heka Agent 通过 TCP Output (AppLogsOutput) 将收集到的日志发送给 Heka Master(暂时是全局单点),encoder 默认为 Heka 内置的 ProtobufEncoder.

  3. Heka Master 收集到日志后,转成 JSON 落地到本地磁盘。

遇到的问题:

  1. 在同一个 Heka 进程里配置 Agent + Master 会触发消息的无限循环转发——因为 input / output 都无法修改 Type 字段,导致 Heka Master 实例的 AppLogsOutput 同样会匹配到 MasterInput 接收到的消息,再次发送给 Heka Master, 触发死循环。

    解决方法:拆分成两个实例,在某台机器上同时部署 heka-agent 和 heka-master.

  2. Heka Message 默认的 timestamp 格式是 epoch nanoseconds, 也就是 second * 1000^3,换算出一个巨大的数字,再三确认,应该是没有算错。

  3. 没有通用的 JSON Encoder. Heka 自带的 ES Payload Encoder 只会输出 payload 部分,而且每一条日志前面都会带一行 ES Index 信息,只好自己写 Lua 脚本。在 Heka 里调试 Lua 脚本非常痛苦:修改代码,重启 Heka,发送消息,查日志,连 print 都没有。

由于 Heka 脑残的设定,sandbox output 无法搭配 encoder 使用,只好偷懒用 Heka 自带的 FileOutput, 把应用日志都输出到 app-logs 里。想要快速查询,大不了再写个脚本实现 tail + filter-by-field 的功能。

JSON Encoder 实现要点

  1. 将完整的 Heka Message 转换成 Lua table:

    local msg = decode_message(read_message('raw'))
  2. msg 里貌似包含了一些无法被 JSON 编码的元素,所以直接调用 cjson.encode(msg) 只会得到不可读的 blob data,必须重新构造一个「干净的」table——先遍历 Heka Message 固有的几个字段(Type / Payload 什么的),然后将 msg.Fields 转换成 key-value mapping(见下文)。

  3. msg.Fields 的元素 schema 非常别扭,每个 field 都能取到 field.name,但是 field.value 却全都是 nil,而整个 field 直接转成 string 却能看到具体的值……看了半天文档,总算搞清楚读取的正确姿势:

    -- 参数:
    -- 1. field name
    -- 2. field index,只能取 0
    -- 3. value index,还是只能取 0
    read_message(string.format('Fields[%s]', field.name), 0, 0)