前段时间在忙,这几天抽时间写一下阿里比赛总结,以往比赛都是前十名入围,这次只有复赛前五才能入围,竞争非常激烈,我和普哥一组,在普哥的帮助下学到不少优化技巧。先贴下成绩
初赛第五,有些遗憾复赛只搞到第九,重在参与哈哈。 2、初赛2.1、赛题回顾
自适应负载均衡:实现一套负载均衡算法,使系统吞吐量达到最大
要求:
实现接口LoadBalance,实现一套自适应负载均衡机制。要求能够具备以下能力:
1、Gateway(Consumer) 端能够自动根据服务处理能力变化动态最优化分配请求保证较低响应时间,较高吞吐量;
2、Provider 端能自动进行服务容量评估,当请求数量超过服务能力时,允许拒绝部分请求,以保证服务不过载;
3、当请求速率高于所有的 Provider 服务能力之和时,允许 Gateway( Consumer ) 拒绝服务新到请求。
LoadBalance接口(其他辅助接口不再一一列举):
public interface LoadBalance {
/**
* select one invoker in list.
*
* @param invokers invokers.
* @param url refer url
* @param invocation invocation.
* @return selected invoker.
*/
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
2.2、赛题解析
在解析赛题之前我们先了解两个概念:
延迟
:处理单次事件所需时间。
吞吐
:单位时间内程序可以处理的事件数。
假如一个程序只有1个线程,这个线程每秒可以处理10次事件,那么我们说这个程序处理单次事件的延迟为100ms,吞吐为10次/秒
假如一个程序有4个线程,每个线程每秒可以处理5次事件,那么我们说这个程序处理单次事件的延迟为200ms,吞吐为20次/秒
假如一个程序有1个线程,每个线程每秒可以处理20次事件,那么我们说这个程序处理单次事件的延迟为50ms,吞吐为20次/秒
以上我们可以看出,在一个没有瓶颈的程序中,增加线程可以增加吞吐,降低延迟也可以增加吞吐,但低延迟不一定高吞吐,高延迟也不一定低吞吐
。
赛题要求实时根据Provider能力大小动态选择Provider来处理请求,这里说一下三个Provider的线程和延迟都是会变
,且单个Provider总的能力呈泊松分布
,也就是说多次请求的rtt方差会比较大
,但是rtt均值在时间段内波动较小
,所以只要我们每次请求都打到正确(延迟小且有剩余线程
)的Provider上就ok,前面在钉钉群里发现有不少选手有这样两个疑问这里解释一下,
选手A
:我一顿操作下来分数竟然比随机算法低很多,为什么随机算法会有那么高的分数?
解析
:其实随机算法的分数是可以计算出来的,举个例子,假如我们有三个容器,容量分别为1L,2L,3L,我们有6L水,每次随机往某个容器倒入1L水,容器里最终能够保持多少水呢?答案是5L,因为6L水随机倒入三个容器每个容器都会获得2L水,第一个容器因为容量只有1L所以溢出1L,假如满分120分的话我们可以拿到100分,如果三个容器容量分别是1.5L,2L,2.5L,那么我们的分数将会高达110分。(评测环境情况比这个要复杂一些,需要多考虑一些变量,原理差不多这里不再展开)
选手B:我的程序评测下来全程没有error出现可是分数还是上不去,咋回事呢?
解析
:不知道大家有没有注意到Provider的总连接数是大于请求数的,有些请求打到有剩余线程但是延迟不一定小的Provider上了,所以看起来没错误但是延迟很高RPS很难上去(因为评测程序中Provider总吞吐是大于请求总数的,所以一般情况下也很难出现错误)。
贴一张Provider请求数与RTT的图,
基于历史rtt为参考的选择策略
,因为rtt是呈泊松分布的,所以单看rtt(延迟低的)数量,能力大的Provider一定比能力小的Provider的多
,所以我们可以搞棵树把每次请求rtt存储起来,排序一下,这里需要为rtt特别的低的Provider提权,即当rtt小于某值(或百分比)时,往树里多add几个该Provider,同时从数尾移除相同数量的Provider,提权的目的探测Provider能力变化及探测Provider的真实能力(比如说在所有Provider都不拒绝服务的情况下,单凭树里的rtt并不能真实反映出Provider的实际处理能力),每次选择Provider是从树里移除rtt最小的那个Provider,把请求发送到该Provider,如果某个低延迟的Provider线程不够时,该Provider的rtt逐渐变大,直到出现拒绝服务,此时只要不把reject的res加入到树就ok。
3、复赛
3.1、赛题回顾
实现一个进程内基于队列的消息持久化存储引擎
,要求包含以下功能:
发送消息功能
- 根据一定的条件做查询或聚合计算,包括
A. 查询一定时间窗口内的消息
B. 对一定时间窗口内的消息属性某个字段求平均
例子:t表示时间,时间窗口[1000, 1002]表示: t>=1000 & t<=1002 (这里的t和实际时间戳没有任何关系, 只是一个模拟时间范围)
消息包括两个字段,一个是业务字段a,一个是时间戳,以及一个byte数组消息体。 程序接口如下:
public abstract class MessageStore {
/**
* 写入一个消息;
* 这个接口需要是线程安全的,也即评测程序会并发调用该接口进行put;
* @param message message,代表消息的内容,评测时内容会随机产生
*/
abstract void put(Message message);
/**
* 根据a和t的条件,返回符合条件的消息的集合. t是输入时间戳模拟值,和实际时间戳没有关系, 线程内升序
* 这个接口需要是线程安全的,也即评测程序会并发调用该接口
* 返回的List需要按照t升序排列 (a不要求排序). 如果没有符合的消息, 返回大小为0的List. 如果List里有null元素, 会当结果失败处理
* 单条线程最大返回消息数量不会超过8万
* @param aMin 代表a的最小值(包含此值)
* @param aMax 代表a的最大值(包含此值)
* @param tMin 代表t的最小值(包含此值)
* @param tMax 代表t的最大值(包含此值)
*/
abstract List<Message> getMessage(long aMin, long aMax, long tMin, long tMax);
/**
* 根据a和t的条件,返回符合条件消息的a值的求平均结果. t是输入时间戳模拟值,和实际时间戳没有关系, 线程内升序
* 这个接口需要是线程安全的,也即评测程序会并发调用该接口
* 结果忽略小数位,返回整数位即可. 如果没有符合的消息, 返回0
* 单次查询求和最大值不会超过Long.MAX_VALUE
* @param aMin 代表a的最小值(包含此值)
* @param aMax 代表a的最大值(包含此值)
* @param tMin 代表t的最小值(包含此值)
* @param tMax 代表t的最大值(包含此值)
*/
abstract long getAvgValue(long aMin, long aMax, long tMin, long tMax);
}
发送消息如下(忽略消息体):
消息1,消息属性{"a":1,"t":1001}
消息2,消息属性{"a":2,"t":1002}
消息3,消息属性{"a":3,"t":1003}
查询如下:
示例1-
输入:时间窗口[1001,9999],对a求平均
输出:2, 即:(1+2+3)/3=2
示例2-
输入:时间窗口[1002,9999],求符合的消息
输出:{"a":1,"t":1002},{"a":3,"t":1003}
示例3-
输入:时间窗口[1000,9999]&(a>=2),对a求平均
输出:2 (去除小数位)
语言限定
: JAVA
评测指标和规模
:
评测程序分为3个阶段: 发送阶段、查询聚合消息阶段、查询聚合结果阶段:
发送阶段:假设发送消息条数为N1,所有消息发送完毕的时间为T1;发送线程多个,消息属性为: a(随机整数), t(输入时间戳模拟值,和实际时间戳没有关系, 线程内升序).消息总大小为50字节,消息条数在20亿条左右,总数据在100G左右
查询聚合消息阶段:有多次查询,消息总数为N2,所有查询时间为T2; 返回以t和a为条件的消息, 返回消息按照t升序排列
查询聚合结果阶段: 有多次查询,消息总数为N3,所有查询时间为T3; 返回以t和a为条件对a求平均的值
若查询结果都正确,则最终成绩为N1/T1 + N2/T2 + N3/T3
3.2、赛题分析- 实现一个进程内消息持久化引擎,
不需要考虑程序崩溃及被kill情况
- 评测三个阶段相互独立,
不需要考虑同时读写情况
- 消息有三个属性,t(long),a(long),b(byte[34]),
消息为定长消息,简化索引设计及存储缓存设计
- t在线程内有序(可能重复),线程间无序
线程内无需排序,只需要做线程间消息排序
- 消息数在2G(20亿)条左右,消息总量约为100G,
其中t16G,a16G,b68G,
- 两次查询分别是msg和avg,
t,a,b,分别存储,有利于avg查询
3.3.1、存储
前面提到过,t在线程内有序,借助这一点我们可以少很多排序量,只需要做线程间排序就好,这里我们把每个发送线程当作一个Queue,把每个Queue当作一个整体,对多个Queue进行排序,得到一个有序的Queue队列,贴张图,
每次从Queue的头部取出一个消息,然后对Queue队列进行排序:MQueue sort_head = this.sort_head;
if (queue == sort_head) {
for (; ; ) {
Message message = sort_head.remove();
if (message == null) {
break;
}
put_message(message);
MQueue next = sort_head.next;
if (next == null) {
break;
}
if (sort_head.cur_t() > next.cur_t()) {
sort_head = sort(next, sort_head);
}
}
this.sort_head = sort_head;
}
这样我们得到一个全局有序的队列,buffer落盘即可。这里落盘之前我们需要对t和a进行压缩(delta),t压缩后接近3.8G可以全部放在内存,a压缩后大约9G出头。 因为t排序后再次对a排序,导致t在分片内乱序,所以t用zigzag压缩。 因为a排序是升序,所以对a使用vlong压缩,这里说下a压缩对vlong做下改进,采用定长(4字节)+变长的方式进行压缩,代码如下:
public long readVLong() {
long b1 = get() & 0xff;
long b2 = get() & 0xff;
long b3 = get() & 0xff;
long v = 0;
int off = 0;
for (; ; ) {
long b = get();
v |= (b & 0b0111_1111) << off;
if (b >= 0) {
break;
}
off += 7;
}
v <<= 24;
return v | ((b1 << 16) | (b2 << 8) | (b3));
}
public void writeVLong(long v) {
long w = v & (0xffffff);
v >>>= 24;
put((byte) (w >>> 16));
put((byte) (w >>> 8));
put((byte) (w));
for (; ; ) {
w = v & 0b0111_1111;
v >>>= 7;
if (v != 0) {
put((byte) (w | 0b1000_0000));
} else {
put((byte) w);
break;
}
}
}
有对b尝试使用lz4压缩,效果不理想,放弃对b压缩
3.3.2、查询先上张图,
(我们是按t的个数分segment的,按t值划分的也写了一版,试了下两个方案分数差不多,因为感觉按t个数划分的更好处理些,因为时间原因后面都是按个数划分来优化的,理论上按t值划分分数应该更好)从图中可以每隔16384做一次segment,segment内每隔128条消息做一个block,每个block内有128条消息,每隔128条消息做一次索引,索引分别是:
ByteBuffer index_t_sparse = allocateDirect(SPARSE_INDEX_COUNT * 8);
ByteBuffer index_t_data_pos = allocateDirect(COMPACT_INDEX_COUNT * 8);
long[] index_a_data_pos = new long[COMPACT_INDEX_COUNT];
long[] index_t_compact = new long[COMPACT_INDEX_COUNT];
long[] index_a_compact = new long[COMPACT_INDEX_COUNT];
long[] index_a_sum = new long[COMPACT_INDEX_COUNT];
long[] index_a_min_max = new long[COMPACT_INDEX_COUNT * 2];
index_t_sparse
:segment索引,记录t值,用于快速定位查询所在segment
index_t_data_pos
:block索引,t压缩后位置索引,记录t数据block起始位置,用于计算t真实值
index_a_data_pos
:同index_t_data_pos
index_t_compact
:block索引,记录t值,用于计算t真实值
index_a_compact
:同index_t_compact
index_a_sum
:block索引,记录block内a的sum值,用于avg时快速跳过block
index_a_min_max
:block索引,记录block内a的min,max值,用于avg时快速跳过block
查询msg时首先定位所在segment:
int t_index_cnt = (index_t_sparse.position() >>> 3) - 1; // size -1
int t_index_min = half_find(index_t_sparse_addr, 0, t_index_cnt, min_t);
int t_index_max = get_t_index_max(index_t_sparse_addr, t_index_min, t_index_cnt, max_t);
得到segment起始位置:t_index_min,t_index_max 这里需要处理下首尾segment,因为首尾segment的t值可能不在查询范围内需要单独处理, 遍历segment定位block范围:
int a_index_off = get_a_read_index_off(index_a_min_max, a_index_cnt, a_index_min, min_a);
int a_index_len = get_a_read_index_len(index_a_min_max, a_index_cnt, a_index_min, a_index_off, max_a);
得到某个segment内block的off和len:a_index_off,a_index_len 根据off和len定位a,b的读取pos和len
long a_read_pos = index_a_data_pos[a_index_off];
long b_read_pos = 1L * (a_index_off * A_BLOCK) * B_LEN;
int a_read_len = (int) (index_a_data_pos[Math.min(a_index_off + a_index_len + 1, a_index_cnt)] - a_read_pos);
a_read_buf.clear().limit(a_read_len);
b_read_buf.clear().limit(msg_size * B_LEN);
接着定位t的读取范围:
long t_read_index = get_long(index_t_data_pos_addr, a_index_off * 8);
long t_write_index = get_limit_index(index_t_data_pos_addr, index_t_data_pos_limit, (a_index_off + 1) * 8, t_data_buf.write_index);
VIntBuf t_data_temp = t_data_buf.slice(t_read_index, t_write_index);
接着遍历得到的数据:
for (int i = 0, part_i = 0; i < msg_size; i++) {
t += t_data_temp.readZigZag();
a += a_data_buf.readVLong();
if (t >= min_t && t <= max_t && a >= min_a && a <= max_a) {
byte[] body = new byte[B_LEN];
copy_data(b_read_buf_addr, i * B_LEN, body, B_LEN);
msg_sort_buf[msg_sort_buf_size++] = new Message(a, t, body);
}
part_i++;
if (part_i == A_BLOCK) {
part_i = 0;
a_index_off++;
t = index_t_compact[a_index_off];
a = index_a_compact[a_index_off];
t_read_index = get_long(index_t_data_pos_addr, a_index_off * 8);
t_write_index = get_limit_index(index_t_data_pos_addr, index_t_data_pos_limit, (a_index_off + 1) * 8, t_data_buf.write_index);
t_data_temp = t_data_buf.slice(t_read_index, t_write_index);
}
}
排序最终查询的数据:
QuickSort.sort(msg_sort_buf, QuickSort.SORT_VALUE_T, 0, msg_sort_buf_size - 1);
for (int i = 0; i < msg_sort_buf_size; i++) {
msg_list.add(msg_sort_buf[i]);
}
查询avg
:查询和avg和msg类似,这里说下跳过逻辑
if (part_i == A_BLOCK) {
part_i = 0;
a_index_off++;
for (; i + A_BLOCK < msg_size; ) {
int a_min_max_pos = (a_index_off + 1) << 1;
long a_min = index_a_min_max[a_min_max_pos];
long a_max = index_a_min_max[a_min_max_pos + 1];
if (a_min >= min_a && a_max <= max_a) {
sum += index_a_sum[a_index_off + 1];
count += A_BLOCK;
i += A_BLOCK;
a_index_off++;
continue;
} else {
break;
}
}
a_read_pos = index_a_data_pos[a_index_off];
a_read_len = (int) (index_a_data_pos[Math.min(a_index_off + 1, a_index_cnt)] - a_read_pos);
a_read_buf.clear().limit(a_read_len);
do_read(a_read_channel, a_read_buf, a_read_pos);
a = index_a_compact[a_index_off];
a_data_buf.set_read_index(index_a_data_pos[a_index_off] - a_read_pos);
}
i++;
当part_i等于128时说明接下来是一个完整的block,可以进行block的min_max判断, 如果min_max在查询范围内则直接累加sum, 如果min_max部分包含查询范围则遍历该block
3.3.3、可探索的改进跳读
,在avg查询时,部分a的block是可以直接跳过的,这部分跳读应该会有一定提升。
按t值划分segment
:一直是按照t的个数划分segment的,按t值划分理论上可以划分出更大的segment,理论上会有一定提升。
复赛源码地址:github.com/wangkaish/a…
4、总结这次比赛竞争太激烈了,大佬很多,没能入围确实很遗憾,但是也确实学到不少东西,因为前面在搞华为的比赛,后面参与复赛的时间不多,而且相信很多选手有这样的感觉,线下程序是好的,到线上总是跪,其实我们也是时间大多花在调bug上了(水平太菜),线下和线上评测数据样本相差太多,如果比赛能搞成线下线上评测程序比较相似就好了,这样可以多些时间尝试方案。
另附上华为比赛参赛总结,有兴趣可以阅读一下:华为云TaurusDB性能挑战赛参赛总结