Hacking Limbo

Reading / Coding / Hacking

Kafka Partition Reassignment

第一次操作 topic partition 的迁移,有点紧张,实际过程很简单,而且迁移也很快(可能是因为数据不多)。要吐槽的是 Kafka 官方文档里对 partition / replica / broker 的编号规则有点混乱,一会是 0-based, 一会又变成 1-based 🙄. 具体操作过程如下:

  1. 运行 kafka-topics.sh --zookeeper zk:2181 --describe --topic <topic> 确认 topic partition 分布情况(集群操作只支持 zookeeper 定位,不能传 bootstrap server 地址),可以看到这个 topic 的 partition / replica 数量和所在节点。

  2. 构造 reassignment.json, 内容是:

    {
      "version": 1,
      "partitions": [
        {
          "topic": "<topic>",
          "partition": 0,
          "replicas": [2, 3]
        }
      ]
    }
  3. 运行 kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file reassignment.json —verify 看看 JSON 有没有写对──写对了的话会看到报错(是的)ERROR: Assigned replicas (1) don't match the list of replicas for reassignment (2,3) for partition [<topic>,0].

  4. 运行 kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file reassignment.json —execute 发起变更,然后再用 —verify 确认是否生效。

从官方文档里抄几个名词解释:

Replicas is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.

Isr is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

对于 PartitionCount == 1 && ReplicationFactor == 1 的 topic partition, 在 kafka-topics.sh --describe 输出里看到的 ReplicasIsr 应该只有(相同的)一个 broker 编号(假定 replica 和 broker 编号是一一对应的)。如果想列出只在 broker-1 上才有的 topics, 可以用命令 kafka-topics.sh --zookeeper zk:2181 --describe | grep -E 'Isr: 1$' 来过滤。

参考:

  1. Step 6: Setting up a multi-broker cluster
  2. Kafka Replication - Replica placements

Octave in Docker

试图在 macOS 上用 Homebrew 安装 Octave, 然而依赖多得有点可怕,居然还要从源码编译 GCC, 吓得赶紧 Ctrl-C. 在 Docker Hub 上找了一个镜像,凑合能用,但觉得 plot() 输出的 ASCII Art 太惨,就想折腾一下 X Server 的转发。

参考 Google 到的信息,成功地在 macOS 上运行了 Octave in Docker 的 QT GUI. 关键步骤如下:

  1. 启用 XQuartz 的 "Allow connections from network clients" 设置。
  2. 运行命令 env DISPLAY=:0 /opt/X11/bin/xhost + $(ipconfig getifaddr en0), 将本机 IP 加入白名单(必须加上 env DISPLAY=:0, 否则 xhost 会报错)。
  3. 启动容器时传入 DISPLAY 环境变量, docker run -it -e DISPLAY="$(ipconfig getifaddr en0):0" octave:latest.

还有一个小问题:使用 plot 之前要先执行 graphics_toolkit gnuplot, 不然会闪退,原因不明。于是我把这个命令加到 Startup File 里了。

参考:

Immutable.js 的坑

在 Redux reducers 里尝试使用 Immutable.js, 整体印象是这个项目试图以库的方式解决语言(运行时)层面的问题,但是整合得并不够好,用起来很别扭。遇到的问题有:

  1. ListMap 与 JavaScript 原生的 ArrayObject 接口 & 语法都不兼容,比如不支持 xs[0] / obj.key 这种取值方式,更没法用 ES 2015 新增的 deconstructing, 导致切换到 Immutable 之后代码啰嗦了很多。
  2. 由于问题 1 的存在,在不引入 TypeScript / Flow 的情况下,很容易因为混淆变量类型而遇到各种奇怪的错误,比如 TypeError: xs.count is not a function.
  3. JavaScript 原生对象转换成 Immutable 对象的过程中,会陷入 "all-or-nothing" 的尴尬处境。

第 3 点再展开说一下。假设前端通过 AJAX API 请求(比如用 axios 库)拿到了一堆 JSON, 可能会这样处理:

// one of many reducers...
function onFetchComplete(state, { payload }) {
  const ids = []
  const itemsById = {}

  for (const item of payload.items) {
    const { id } = item
    ids.push(id)
    itemsById[id] = item
  }

  // state 是个 Immutable Map
  // 此处有坑!
  return state.merge({
    ids,
    itemsById,
  })
}

以上代码有个隐藏的坑,即 state.merge()idsitemsById 转换成了 Immutable 对象。文档里是这样说的(一开始没注意,后来 debug 的时候才发现):

If any of the values provided to merge are not Collection (would return false for isCollection) then they are deeply converted via fromJS before being merged.

考虑到短期的成本 / 收益,尝试不到一个星期就放弃了(还好前端项目暂时只有我一个人写 💩),相关问题也没有再深究,之后有精力会再重新考虑一下。

用 Heka 实现简易的日志收集

帮朋友的一个小项目搭建日志收集系统,由于资源有限没法用 ELK 全家桶,就入了 Heka 的坑。

设计

Heka 同时负责 producer 和 consumer 的角色,通过配置来区分:

  1. 每台机器上有一个 Heka Agent 实例,以 UDP Input (AppLogsInput) 和 File Polling 两种途径收集本机的日志。其中 AppLogsInput 使用统一的 JSON schema. File Polling 主要针对 Nginx access logs 的收集。

  2. Heka Agent 通过 TCP Output (AppLogsOutput) 将收集到的日志发送给 Heka Master(暂时是全局单点),encoder 默认为 Heka 内置的 ProtobufEncoder.

  3. Heka Master 收集到日志后,转成 JSON 落地到本地磁盘。

遇到的问题:

  1. 在同一个 Heka 进程里配置 Agent + Master 会触发消息的无限循环转发——因为 input / output 都无法修改 Type 字段,导致 Heka Master 实例的 AppLogsOutput 同样会匹配到 MasterInput 接收到的消息,再次发送给 Heka Master, 触发死循环。

    解决方法:拆分成两个实例,在某台机器上同时部署 heka-agent 和 heka-master.

  2. Heka Message 默认的 timestamp 格式是 epoch nanoseconds, 也就是 second * 1000^3,换算出一个巨大的数字,再三确认,应该是没有算错。

  3. 没有通用的 JSON Encoder. Heka 自带的 ES Payload Encoder 只会输出 payload 部分,而且每一条日志前面都会带一行 ES Index 信息,只好自己写 Lua 脚本。在 Heka 里调试 Lua 脚本非常痛苦:修改代码,重启 Heka,发送消息,查日志,连 print 都没有。

由于 Heka 脑残的设定,sandbox output 无法搭配 encoder 使用,只好偷懒用 Heka 自带的 FileOutput, 把应用日志都输出到 app-logs 里。想要快速查询,大不了再写个脚本实现 tail + filter-by-field 的功能。

JSON Encoder 实现要点

  1. 将完整的 Heka Message 转换成 Lua table:

    local msg = decode_message(read_message('raw'))
  2. msg 里貌似包含了一些无法被 JSON 编码的元素,所以直接调用 cjson.encode(msg) 只会得到不可读的 blob data,必须重新构造一个「干净的」table——先遍历 Heka Message 固有的几个字段(Type / Payload 什么的),然后将 msg.Fields 转换成 key-value mapping(见下文)。

  3. msg.Fields 的元素 schema 非常别扭,每个 field 都能取到 field.name,但是 field.value 却全都是 nil,而整个 field 直接转成 string 却能看到具体的值……看了半天文档,总算搞清楚读取的正确姿势:

    -- 参数:
    -- 1. field name
    -- 2. field index,只能取 0
    -- 3. value index,还是只能取 0
    read_message(string.format('Fields[%s]', field.name), 0, 0)

GlusterFS 增量备份

GlusterFS 没有提供现成的增量备份方案,但稍微折腾一下还是有希望实现的,用到了一个没有文档说明的特性:changelog. 官方文档完全没有提到怎样使用,所幸在邮件列表里翻到一个回复,提到了 glusterfind 这个工具,大致的用法是:

# 首次运行时创建 session
glusterfind create SESSION_NAME VOLUME_NAME

# 获取完整的文件列表
glusterfind pre --full SESSION_NAME VOLUME_NAME all-files.txt

# 获取增量列表
glusterfind pre SESSION_NAME VOLUME_NAME files.txt
glusterfind post SESSION_NAME VOLUME_NAME

基于 glusterfind 设计出来的增量备份方案就是,用 glusterfind pre 导出 changelog, 排除掉排除不需要的文件后,生成文件列表,扔给 rsync 执行文件增量同步。

比较棘手的部分:区分 NEW, MODIFY, RENAMEDELETE 操作。NEWMODIFY 都好说,直接复制就完事,DELETE 也不难,正常情况下很少删除,可以攒起来隔一段时间集中删除一次(注意:要考虑删除后重新写入原路径的情况),但是 RENAME 就有点麻烦了。

RENAME 涉及到的情况:

  1. 最简单的情况,mv /dir/file1 /dir/file2,按 NEW + DELETE 处理即可。
  2. 难点 1, mv /dir /newdir,把整个新目录重新同步一遍?由于 RENAME 只有一条记录(被更名的那个目录),如果生成文件列表时排除了目录类型的路径,就会漏掉这个更新。
  3. 难点 2, date >> /dir/file3 && mv /dir /newdir,源路径和备份路径不一致,要么追踪目录的 RENAME 事件,从 /newdir 里复制 file3;要么先执行 RENAME,再复制文件。
  4. 难点 3, 跨目录的移动,比如 mv /dir1/subdir /dir2/abc.

如果不考虑 RENAME 事件,整个备份过程大致如下:

  1. 在 GlusterFS 机器上导出 changelogs - 调用 glusterfind pre 输出 changelogs 到按日期划分的目录里,顺便备份 status 文件(记录了前一次增量的时间戳)。

  2. 同步 changelogs 到备份机器。

  3. 解析 changelogs 事件生成文件列表,并记录最后读取的位置。这一步涉及到的逻辑有:

    • 根据最后一次读取的 changelogs 文件位置,按日期向后遍历所有 changelogs.
    • 校验 states.json 记录的 last_log_md5sum
    • 将所有事件加载进内存,按文件顺序(即时间顺序)遍历。
    • 忽略所有目录类型的路径,并跳过符合 exclude patterns 的路径。
    • 保证在各种情况下,程序退出时都记录下最后处理成功的事件。
    • 检查上一次执行的状态,如果上一次执行出错,必须手动跳过出错的事件才能继续执行。
    • 遇到非预期的事件 (RENAME) 时,抛异常终止程序。
  4. 调用 rsync 完成文件同步。用 rsync 是为了简化 file metadata 的备份,除了文件本身的 owner / group / permission 之外,其上层目录的 metadata 也要备份。

如果有的选,还是别用 GlusterFS 比较好。

- More Articles in Archives -