PostgreSQL 与 12306 抢火车票的思考

1,338 阅读4分钟
原文链接: click.aliyun.com
背景

抢火车票是很有意思的一个课题,对IT人的智商以及IT系统的健壮性,尤其是数据库的功能和性能都是一种挑战。

我记得很多年前写过一篇PostgreSQL varbit和火车票相关的文章,翻出来看看,发现又和BIT有关。

回顾一下我之前写的

《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统》

《门禁广告销售系统需求剖析 与 PostgreSQL数据库实现》

PostgreSQL的bit功能还是很强大的,阿里云RDS PostgreSQL的bitpack也是用户实际应用中的需求提炼的新功能,大伙一起来给阿里云提需求。

打造属于国人的PostgreSQL.

正文

在PostgreSQL 中可以使用varbit存储比特位, 下面模拟一个简单的应用场景.

马上春节了, 火车票又到了销售旺季, 一票难求依旧.

下面就以火车票销售为例来介绍一下PostgreSQL varbit类型的用法.

测试环境 :

PostgreSQL 9.2.1

测试表 :

列车信息表 :

create table train   
(id int primary key, --主键  
go_date date, -- 发车日期  
train_num name, -- 车次  
station text[] -- 途径站点  
);   

车厢或bucket信息表 :

create table train_bucket   
(id int primary key, --主键  
tid int references train (id), -- 关联列车ID  
bno int, -- 车厢或bucket号  
sit_level text, -- 席别  
sit_cnt int, -- 该车厢或bucket的座位总数  
sit_remain int, -- 剩余座位  
sit_bit varbit -- 座位BIT位, 已售座位用1表示, 未售座位用0表示  
);  

位置信息表 :

create table train_sit   
(id serial8 primary key, -- 主键  
tid int references train (id), --关联列车ID  
tbid int references train_bucket(id), --关联bucket表ID  
sit_no int,  -- 座位号, 来自train_bucket.sit_bit的位置信息.  
station_bit varbit  -- 途径站点组成的BIT位信息, 已售站点用1表示, 未售站点用0表示.  
);  

创建索引 :

create index idx_train_bucket_sit_remain on train_bucket(sit_remain) where sit_remain>0;  
create index idx_train_sit_station_bit on train_sit (station_bit) where station_bit<>repeat('1', 13)::varbit;  

插入测试数据, 1趟火车, 途径14个站点.

insert into train values (1, '2013-01-20', 'D645', array['上海南','嘉兴','杭州南','诸暨','义乌','金华','衢州','上饶','鹰潭','新余','宜春','萍乡','株洲','长沙']);  

插入测试数据, 共计200W个车厢或bucket, 每个车厢98个位置.

insert into train_bucket values (generate_series(1,1000000), 1, generate_series(1,1000000), '一等座', 98, 98, repeat('0',98)::varbit);  
insert into train_bucket values (generate_series(1000001,2000000), 1, generate_series(1000001,2000000), '二等座', 98, 98, repeat('0',98)::varbit);  

创建取数组中元素位置的函数 :

create or replace function array_pos (a anyarray, b anyelement) returns int as $  
declare  
  i int;  
begin  
  for i in 1..array_length(a,1) loop  
    if b=a[i] then  
      return i;  
    end if;  
    i := i+1;  
  end loop;  
  return null;  
end;  
$ language plpgsql;  

创建购票函数 :

create or replace function buy   
(  
inout i_train_num name,   
inout i_fstation text,   
inout i_tstation text,  
inout i_go_date date,  
out o_slevel text,  
out o_bucket_no int,  
out o_sit_no int,  
out o_order_status boolean  
)   
returns record as $  
declare  
  curs1 refcursor;  
  curs2 refcursor;  
  v_row int;  
  v_station text[];  
  v_train_id int;  
  v_train_bucket_id int;  
  v_train_sit_id int;  
  v_from_station_idx int;  
  v_to_station_idx int;  
  v_station_len int;  
begin  
  set enable_seqscan=off;  
  v_row := 0;  
  o_order_status := false;  

  select array_length(station,1), station, id, array_pos(station, i_fstation), array_pos(station, i_tstation)   
    into v_station_len, v_station, v_train_id, v_from_station_idx, v_to_station_idx   
    from train where train_num=i_train_num and go_date = i_go_date;  
  if ( found and array_pos(v_station, i_fstation) is not null   
       and array_pos(v_station, i_tstation) is not null   
       and array_pos(v_station, i_fstation) < array_pos(v_station, i_tstation)   
     ) then  
  else  
    o_order_status := false;  
    return;  
  end if;  

  open curs2 for select tid,tbid,sit_no from train_sit  
    where (station_bit & bitsetvarbit(repeat('0', v_station_len-1)::varbit, v_from_station_idx-1, v_to_station_idx-v_from_station_idx, 1)) = repeat('0', v_station_len-1)::varbit   
    and station_bit <> repeat('1', v_station_len-1)::varbit  
    -- and ctid not in (select locked_row from pgrowlocks('train_sit')) -- 耗时约300毫秒, 用它来解决热点锁等待不划算.  
    limit 1  
    for update nowait; -- 也可不加nowait, 加了的话如果获取锁失败将返回55P03异常, 需要程序重新提交  
  loop  
    fetch curs2 into v_train_id,v_train_bucket_id,o_sit_no;  
    if found then  
      update train_sit set station_bit=bitsetvarbit(station_bit, v_from_station_idx-1, v_to_station_idx-v_from_station_idx, 1)   
        where current of curs2;  
      GET DIAGNOSTICS v_row = ROW_COUNT;  
      if (v_row = 1) then  
        select sit_level, bno into o_slevel, o_bucket_no from train_bucket where id=v_train_bucket_id;  
 close curs2;  
 o_order_status := true;  
 return;  
      end if;  
    else   
      close curs2;  
      exit;  
    end if;  
  end loop;  

  v_row := 0;  

  open curs1 for select id, tid, strpos(sit_bit::text,'0'), sit_level, bno from train_bucket   
    where sit_remain>0  
    -- and ctid not in (select locked_row from pgrowlocks('train_bucket')) -- 耗时约300毫秒, 用它来解决热点锁等待不划算.  
    limit 1   
    for update nowait; -- 也可不加nowait, 加了的话如果获取锁失败将返回55P03异常, 需要程序重新提交.  
  loop  
    fetch curs1 into v_train_bucket_id, v_train_id, o_sit_no, o_slevel, o_bucket_no;  
    if found then  
      update train_bucket set sit_bit = set_bit(sit_bit, strpos(sit_bit::text,'0')-1, 1), sit_remain = sit_remain-1  
        where current of curs1;  
      GET DIAGNOSTICS v_row = ROW_COUNT;  
      if (v_row = 1) then  
        close curs1;  
 exit;  
      end if;  
    else   
      close curs1;  
      exit;  
    end if;  
  end loop;  

  if v_row = 1 then  
    insert into train_sit(tid,tbid,sit_no,station_bit)  
    values (  
      v_train_id,   
      v_train_bucket_id,   
      o_sit_no,  
      bitsetvarbit(repeat('0', v_station_len-1)::varbit, v_from_station_idx-1, v_to_station_idx-v_from_station_idx, 1)  
      );  
    o_order_status := true;  
    return;  
  else  
    o_order_status := false;  
    return;  
  end if;  

  exception   
  when others then  
    o_order_status := false;  
    return;  
end;  
$ language plpgsql;  

测试 :

digoal=# select * from buy('D645','杭州南','宜春','2013-01-20');  
 i_train_num | i_fstation | i_tstation | i_go_date  | o_slevel | o_bucket_no | o_sit_no | o_order_status   
-------------+------------+------------+------------+----------+-------------+----------+----------------  
 D645        | 杭州南     | 宜春       | 2013-01-20 | 一等座   |       35356 |        9 | t  
(1 row)  

压力测试

vi test.sql  
select * from buy('D645','上海南','长沙','2013-01-20');  

不加nowait测试结果 :

ocz@db-172-16-3-150-> pgbench -M prepared -f ./test.sql -n -r -c 16 -j 8 -T 1200 -U postgres digoal  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 8  
duration: 1200 s  
number of transactions actually processed: 2197407  
tps = 1831.143708 (including connections establishing)  
tps = 1831.169308 (excluding connections establishing)  
statement latencies in milliseconds:  
        8.734424        select * from buy('D645','上海南','长沙','2013-01-20');  

加nowait测试结果 :

ocz@db-172-16-3-150-> pgbench -M prepared -f ./test.sql -n -r -c 16 -j 16 -T 12 -U postgres digoal  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 16  
duration: 12 s  
number of transactions actually processed: 93632  
tps = 7800.056248 (including connections establishing)  
tps = 7818.803904 (excluding connections establishing)  
statement latencies in milliseconds:  
        2.042862        select * from buy('D645','上海南','长沙','2013-01-20');  

小结

1. 需要解决更新热点, 降低等待, 提高并行处理几率.

本例的热点在 :   
update train_bucket set sit_bit = set_bit(sit_bit, strpos(sit_bit::text,'0')-1, 1), sit_remain = sit_remain-1  
   where current of curs1;  

以及

 update train_sit set station_bit=bitsetvarbit(station_bit, v_from_station_idx-1, v_to_station_idx-v_from_station_idx, 1)   
    where current of curs2;  

对应的游标 :

  open curs2 for select tid,tbid,sit_no from train_sit  
    where (station_bit & bitsetvarbit(repeat('0', v_station_len-1)::varbit, v_from_station_idx-1, v_to_station_idx-v_from_station_idx, 1)) = repeat('0', v_station_len-1)::varbit   
    and station_bit <> repeat('1', v_station_len-1)::varbit  
    -- and ctid not in (select locked_row from pgrowlocks('train_sit')) -- 耗时约300毫秒, 用它来解决热点锁等待不划算.  
    limit 1  
    for update;  

以及

  open curs1 for select id, tid, strpos(sit_bit::text,'0'), sit_level, bno from train_bucket   
    where sit_remain>0  
    -- and ctid not in (select locked_row from pgrowlocks('train_bucket')) -- 耗时约300毫秒, 用它来解决热点锁等待不划算.  
    limit 1   
    for update;  

解决的关键在这里.

如果不能解决热点的问题, 那就提高处理速度, 精简字段数量和长度, 精简索引. 提高更新速度.

2. 减少数据扫描的量.

partial index, 避免满座车厢的扫描, 以及全程占位位子的扫描.  

3. 先查bucket 是否空闲, 再查sit是否空闲.

4. 还需要考虑优先级的问题 :

例如有111000和111100两个位子, 如果请求要最后两个站的票, 应该优先匹配111100, 这样更不容易浪费。如下 :

111000 | 000011 = 111011  
111100 | 000011 = 111111  

参考

1. setbitvarbit
blog.163.com/digoal@126/…