背景
有赞会员系统主要承载着有赞的客户经营领域,致力于给商家提供全渠道客户经营的能力。随着社交网络的普及,其社会化、多元化和创新化特质让商家与消费者之间的联系方式更加丰富,互动更加频繁,相应的运营需求也大大增加。除了传统的会员经营手段之外,会员系统需要提供能力,来帮助商家定义客户的生命周期,构建精准的消费画像:商家可以由此全面、及时地了解客户的喜好、行为轨迹、消费能力等属性,定义进而进行差异化的客户经营。我们需要构建一套实时的客户行为收集处理系统,来满足上述业务需求。本文就简单聊聊客户行为收集系统的设计。
行为模型
我们把客户行为事件定义为客户与业务系统间的交互,客户行为事件模型则描述了客户在业务系统中的轨迹。它记录了某个业务场景下一类或多类的客户行为事件,并能够反映事件的先后顺序。通过对客户事件的研究,我们可以评估客户事件的发生以及它对企业价值的影响程度,预测相关事件的发生;或通过追踪客户行为或业务过程,研究与事件发生关联的所有因素,来挖掘用户行为事件背后的原因、交互影响等。
对于客户行为事件,除客户本身的标识外,我们还要定义关注的事件的业务属性(如点击商品事件中的商品信息、下单下单商品及交易属性等),以及事件窗口的长度。其实体关系大致描述如下:
逻辑结构
逻辑架构共分为三层:
客户端 (Client)
客户端主要包括两类角色:一是客户行为事件的产生源,另一类是客户行为的输出方。
收集器 (Collector)
收集器的主要职责是从客户端获取客户事件,并按照行为模型的定义转换数据格式。考虑到客户端的差异,收集器需要支持“推”和“拉”两种模式
获取到客户事件之后,收集器根据预设或自定义的路由规则,将事件发布到分布式队列服务中。在有赞我们使用的是 NSQ (关于 NSQ,可以移步重塑 NSQ 之路 系列了解更多详情)。
队列的消费端是流式计算引擎,通过引擎处理,最终将事件数据发送给存储层。
推模式:由收集器提供收集接口,由客户端调用;或客户端嵌入 SDK 的方式,将行为事件推送给收集器
拉模式:由收集器通过定时任务或消息队列,从业务方系统获取客户事件
存储层 (Storage)
存储层会根据业务需要选择 MySQL 或者 HBase 来事件数据的持久化。目前我们使用的是 HBase,主要考虑因素是:1. HBase 具有相对灵活的 Schema 2. 与 Hadoop 集群的集成的便捷,使得事件数据不仅仅能够支持实时处理,也能轻松地用于离线分析。
服务层 (Service)
服务层用以支撑客户端 / 外部系统对于客户行为的查询,目前只支持以随机读。
数据流
整个客户行为收集系统主要有两个方向的数据流:
处理流
处理流将来自各个系统(包括业务系统、H5 页 /App 等)的客户行为,按照行为模型定义的消息格式,发布到分布式队列服务 (NSQ) ,由流式计算框架 (Storm) 对消息进行消费,并持久化到存储层 (HBase 或 MySQL) 中。
输出流
输出流的逻辑相对简单,将存储的客户行为从存储层读取出来,通过查询服务提供给使用方。目前我们的使用场景只涉及到随机读取。另外如果需要在离线分析(如 Hive)中使用的话,则可以通过 Hive 的 External Table 集成。
系统设计要求
实时性
作为客户行为收集系统,实时性越高,对于商家就能够更为及时地挖掘客户特征、进行实时推荐或发现一些突发的状况。因此,实时性是该系统的重要非功能性指标之一。在构建实时系统时,我们常常需要解决如下问题:
突发的流量
部分组件故障导致大量消息需要重试
数据积压
业务逻辑的 bug 需要进行数据的重新处理
我们引入了 Storm 作为支撑整个客户行为系统实时性的组件。Storm 作为最早的开源分布式实时计算框架,被行业广泛地应用于生产环境。它能够支持到消息粒度的控制与处理,具有很好的容错性、扩展性;从模型上来说,Storm 的 Continueous Streaming 模型相对于 Micro Batch 模型能够满足更严格的时延要求(当然相对更低的时延的带来了更高的开销,在吞吐方面的表现较 Micro Batch 模型逊色)。
Storm 的向外扩展 (Scale Out) 能力强大,能够通过调节 worker 数量并重启 Topology (拓扑,Storm 的计算任务)来完成计算能力的扩展。
当有基础组件发生故障时,Storm Topology 的对应部分 (Spout/Bolt) 无法在 TOPOLOGY.MESSAGE.TIMEOUT 内处理完消息,会触发 Storm 的重试;如果短时间内重试消息过多,势必会影响新生产的消息的消费,从而造成数据的延迟。因此,我们使用两组 Topic 来应对这种场景。
如上图所示,正常的生产客户行为事件消息由生产者发布到 Normal Topic,由对应的 Storm Spout 来消费。当 Topology 的部分业务异常时,会触发到 Spout 的失败处理,此时 Spout 将消息 publish 到 Retry Topic ,由重试的 Spout 按照一定的策略进行重试;或者在超过一定重试次数后,Ack 此消息,并发布一个延时的离线补偿任务进行该业务单元的全量计算。下图展示了一种重试的策略。
对于需要重新处理数据的场景,我们可以使用分布式队列服务的消息重放功能,由 Spout 进行重新消费。
消息抵达保证
Storm 本身支持多种消息抵达保证的语义:通过 Acker 和 Reliability API 来实现 At Least Once 语义;通过 Trident 来实现 Exactly Once 语义。对于消息抵达,我们有如下几个要求:
业务对于实时数据的要求是保证消息不丢失
部分业务场景需要我们支持 Exactly Once 语义。
此外,作为 Storm 的 Source,NSQ 本身会保证消息一定推给 consumer —— 即在极小的机率下 NSQ Server 会重复推送消息给 Consumer。综合上述情况,结合前面提到的多种重试场景的支持,我们使用 At Least Once 语义,而由具体的处理逻辑来保证幂等性。
public class NSQSpout extends BaseRichSpout {
public void nextTuple() {
String messageId = UUID.randomUUID().toString();
try {
...
// 消息处理逻辑
this.collector.emit("stream_id", new Values(message), messageId);
} catch (Exception e) {
logger.error("Emit message failed. ", e);
}
}
@Override
public void ack(Object msgId) {
// Get origin NSQMessage and finish it
try {
this.consumer.finish(NSQMessage message);
} catch(NSQException e) {
logger.error("Failed to ack message. id: {}", msgId, e);
}
}
@Override
public void fail(Object msgId) {
logger.info("msg failed: {}", msgId);
// 按照重试策略处理
}
}
public class BizBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
Object obj = tuple.getValues();
// Do your business
...
this.collector.emit("another-stream-id", tuple, new Values(obj));
this.collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// 声明 bolt 的输出
outputFieldsDeclarer.declareStream("log-stream", new Fields("content"));
}
}
存储
我们主要选择 HBase 来存储客户的行为。在 HBase 中,是通过 Rowkey, ColumnFamily+Qualifier 及 Timestamp 来定位数据的。Rowkey 作为唯一标识,在设计业务表 Schema 时主要需要考虑以下几点:
长度:尽可能短,HBase 的持久化文件 HFile 是按照 Key-Value 存储的,如果 Rowkey 过长,会影响 HFile 的存储效率。
散列:针对随机读取的场景,需要散列 Rowkey 来避免查询热点集中到一个 RegionServer 上。我们采用了两种方式:
随机化(如 MD5)
当 Rowkey 中需要保存递增的序列(如:时间戳),同时要求 Rowkey 可读时。可将其他的 ID(如:客户 ID) 截取后 N 位 + 递增序列拼接成 Rowkey 。一种参考的实现方式如下。
// 方式一
byte rowkey = MessageDigest.getInstance("MD5").digest(identifier.getBytes());
// 方式二
byte rowkey = String.format("d%d", prefix, timestamp).getBytes();
唯一性:在该业务上必须是唯一的。
部署
Storm Topology 的部署相当容易,只需要上传新的 JAR 包即可。NSQ Server 保存了 Consumer 当前的 Offset,只要我们通过相同的 Consumer Name (Channel) 重连 NSQ Server,即可获得之前消费的 Offset。当然,如果需要强制重新消费,调整对应 Channel 的 Offset 即可。
总结
实时系统能够有效弥补离线”T+1″的短板,同时也有更为严格的时效性和容错要求,其实时性、可用性、可扩展性各个方面值得去仔细推敲和打磨。支撑更多的业务场景,改善数据收集效率是我们持续改进的动力,欢迎有兴趣的同学勾搭。liyumeng@youzan.com
参考资料
https://storm.apache.org/relea … .html
https://community.hortonworks. … .html
https://hbase.apache.org/0.94/ … .html
https://storm.apache.org/relea … .html
作者:有赞技术团队
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/256495.html