「Project」BCP资金一致性服务

BCP服务是专门针对交易系统的一个旁路校验服务;通过交易链路中产生的日志,进行关联核对关键信息,保证每次交易链路的资金一致性;

1. 项目背景

在实时交易系统中,资金的一致性要求很高,如果出现商家资损、用户资损都是比较严重的问题,在支付链路中防范是最基本的,但是在支付链路之外也需要一定的旁路校验方案:

  • T+1对账;(延时较高,如果出现漏洞,发现时可能已经出现大面积损失)
  • 准实时规则校验;(延时较低,可以在分钟级别发现问题,从而介入止损)

BCP项目(Business Check Service)所提供的服务:

  • 用户将根据自身服务中需要核对的关键日志,在BCP平台配置元数据解析规则;
  • 用户可以将多个关键日志关联,并配置不同日志之间的核对规则;
  • 根据用户配置的规则,准实时执行核对规则,当核对失败时,发出告警;

核心目标就是通过可配置化的细粒度规则,来准实时地校验支付链路是否正常

  • 实时性:分钟级异常检测(对比传统T+1);
  • 准确性:覆盖全链路交易类型,规则可灵活配置;
  • 自动化:异常自动告警,减少人工介入。

2. 整体技术方案

  • 架构:基于Flink的实时流处理 + 分布式任务调度 + 规则配置 + 规则执行引擎;
  • 核心组件
    • Flink集群:实时清洗日志,过滤出已有的规则所关心的日志,并清洗为特定的易解析的格式;
    • Redis:存储高频访问的待核对消息;
    • MySQL:存储开发运营人员配置的规则元数据、工单任务及结果;
    • Kafka:日志数据消息队列。
  • 数据流
    交易日志 → Flink清洗 → 标准化消息 → Redis/MySQL → 规则引擎核对 → 告警/结果存储。

3. 业务核心模块

3.1 元数据管理模块

功能性服务:

  • 用户定义消息解析规则;用于将用户服务的特定日志解析为JSON格式(清洗);
  • 消息增删改查管理;
  • 元数据分类:
    • 触发消息元数据:触发消息通常为一条支付链路中完成的消息;如支付的回调消息;
    • 核对消息元数据:通常为一条支付链路中的支付、退款等起始消息;

后续用户的服务日志接入后,日志清洗模块则根据消息的清洗配置,执行数据过滤、清洗;

3.2 日志清洗模块

用户服务的日志通过Kafka导流后,提供Flink实例,消费用户的日志数据,根据用户服务的元数据规则,执行:

  1. 过滤:过滤出元数据关心的日志;
  2. 解析:按照用户的配置,解析日志为JSON格式;
  3. 存储:将解析完成的消息,存储在MySQL或Redis中;
    1. 触发消息存储在MySQL:一个触发消息对应着一次核对任务的开始,需要更高的可靠性;
    2. 核对消息存储在Redis:

3.2 规则管理模块

功能性服务:

  • 在元数据消息的基础上,用户需要规定:消息之间以什么规则进行匹配关联;通常指定2个key之间相同,则认为是同一条链路的消息;在支付链路中通常为支付流水号;
  • 关联的消息,其余字段的匹配规则,多个字段之间组合计算;如订单总金额通过优惠券扣减后,应该等于最终支付金额,如果不太相同,则告警;
  • 规则告警等级;

规则计算方式:

  • if
    条件
  • 计算符号:
    +
    -
    *
    ÷
    =
    >
    <
    、等简单计算;
  • 区间值;
  • 字典值;如交易状态、结果、渠道枚举等; 假设现在有payActionMessage、callbackMessage2个消息,设置了一个规则:回调支付消息成功,则比对
    支付消息 - 优惠券金额 = 用户支付金额
if (callbackMessage.status == "SUCCESS") {
	payActionMessage.totalAmout - payActionMessage.couponAmount == callbackMessage.amout;
}

规则数据会在项目启动时,缓存在本地节点中;

4. 基于DB的分布式工单调度

本质就是多节点同时抢占DB的中任务,抢到则执行,执行失败则将任务更新后释放,可再次被抢占执行。

业务的工作流程

工作流程:参考下图

  1. Flink将消息清洗完成后,如果发现有元数据中需要核对的消息,会创建一个任务到DB中。
  2. 消息校验服务会定时轮询DB中的未完成的任务。通过抢占式的方式抢占DB中的任务,抢占成功,则执行所抢占的核对任务;
  3. 校验服务根据任务中的元数据配置、真实元数据的匹配规则,从Redis中拉取所需要核对的消息。
  4. 存中,根据具体的核对规则,完成对应的字段的计算,对于计算结果不满足的case,执行对应的告警;
  5. 消息不存在,则更新任务的状态:执行失败、重试次数等,并释放任务,等待节点再次调度抢占再次执行;

工单调度的核心流程

首先定义什么是一个工单任务:

  1. 工单任务是一个单独的消息核对任务,对应单次的日志;比如一条支付链路,对应一个任务
  2. 工单任务定义了类型,比如支付链路任务、退款、营销活动等等;定义类型的目的是区分不同任务的业务体量,方便针对性的对特定类型的任务增加调度资源;

调度的执行过程:

  1. 每个后端阶节点启动上线时,会到「心跳注册表」中注册上线信息;主要就是将hostname注册到这张表中,并设置状态为上线状态
  2. 每个节点同时会开启2个定时任务:
    1. 心跳更新定时线程任务:定时5min向「心跳注册表」更新自己的最新状态、更新时间;表明在线;
    2. 下线检测定时线程任务:可能存在某些机器宕机,那么就需要定时任务(10min)实时检查这个心跳表中有没有超时未更新的节点,如果有则置为「下线状态」,并将这个节点所持有的所有工单任务释放;
  3. 完成注册后,节点就会开始执行核对任务,会有3个单线程任务执行:
    1. 单线程抢占任务:节点每隔2min,执行一次抢占任务,抢占
      lock_flag
      为空的任务,最大抢占任务量为100;
    2. 单线程加载任务:节点每隔2min,执行一次任务拉取,拉取自己节点抢占了的任务,加载进内存中的任务队列中;
    3. 单线程分发任务:此线程会不断轮询消费任务队列,根据任务的类型,分发给指定的业务线程池,由对应的线程池执行真正的核对任务的处理;
  4. 业务线程池:执行核对任务;
    1. 任务执行成功,则将对应的工单设置为成功;
    2. 任务执行失败,则将对应的工单设置为重试状态,并释放此工单,由所有节点再次抢占执行;
    3. 任务到达最大重试次数,则将工单设置为失败;
  5. 核对任务:
    1. 核对任务会拉取工单中的
      biz_data
      获取要执行的数据,根据其中的匹配键,从Redis中拉取对应的消息。
    2. 根据此任务的元数据所配置的核对规则,匹配对应的字段,执行计算。

为什么选择此方案

  1. 高可靠性:
    1. 工单持久化存储,不会数据丟失;
    2. 工单支持重试;
    3. 节点宕机自动故障转移,分布式节点调度,不依赖任何单一的节点;节点宕机也不会丢数据,不会影响被其他节点调度;
  2. 灵活性:
    1. 工单支持延时重试,对于实时性高的任务,延时时间可以尽可能段,对于实时性低的任务,可以增加延时时间;
    2. 重试次数、重试间隔,都可以根据任务类型设置;

5. BCP项目业务量

BCP的性能考量只要是2点:

  1. TPS:BCP的业务是按完整的日志产生、清洗、存储、执行核对,这一整条链路算做一次事务,因此使用TPS作为衡量指标;
  2. 延时:作为准实时的旁路校验系统,要尽可能快的发现问题;

5.1 TPS

交易中台每天大概有3亿的日志量,Flink根据配置的消息规则,每日会匹配出1000w+的消息量;

这1000w的消息,可能有多条消息是同一个链路的消息,同一个链路的消息会创建一个工单任务,最终每日的工单数量在600w+左右;对应着交易中台每日300w+的订单支付数量;

那么平均的TPS在:

600w / 10 * 60 * 60 = 166TPS
(10h是因为业务低峰期基本没有支付消息),白天的峰值可以到到200TPS;

5.2 核对任务时延

BCP作为准实时的核对系统,那么延时也是不能太高的。整个链路是从日志创建,Flink清洗,落库,任务核对。延时控制在2min内;也就是交易如果存在异常链路,可以在2min内识别并发出告警。

业界监控告警的延时一般在1min内;像阿里的Sunfire监控告警平台是实时监控日志,并没有过多的业务语义,单条日志只要不满足特定规则,则触发告警。

BCP业务有复杂的清洗、计算逻辑,2min是可以接受的。

6. 期间遇到的问题

6.1 Redis的大key发现与处理

我们的非核心消息是存储在Redis中,这类消息一般存储不会超过5min,最快2min内消费完成后,就会删除。

  1. 问题发现:但我们的Redis内存在一次扩容的过程中,CPU突然飙升到80%,随后扩容完成后,又恢复正常。

对于这次异常的CPU飙升,我们怀疑有大key存在,通过Redis的命令

bigkeys
,查到了一个200w大小的list数据;

# --i 可以指定扫描的频率,100ms执行一次,防止阻塞业务
/redis-cli --bigkeys --i 0.1
  1. 问题定位:查到大key后,我们并没有立即删除,因为当前的业务并没有受到影响,因此判断此key并不会影响业务,保留这个key更好的查找原因;

最终查到这个key是一个key中存在

null
字符串的大key;和交易的同学一起排查到,交易的订单状态轮询任务一直在轮询一个空的订单号,导致产生大量存在null订单的日志,Flink清洗后,拼接成key,存储到了Redis中,不断的push后,导致一个大list产生;

  1. 问题解决:我们在业务低峰期,使用Redis的
    unlink
    命令删除大key,此命令会立即返回,后台异步删除key,减少阻塞
unlink large_string_key  # 优先异步删除