Citus:仅作用于 PostgreSQL 的分布式数据库

可先参读:

问题演进

简单来讲:只适用于PG的分布式数据库,以插件的形式集成 PG。

单机

用的最熟悉最多的是单库:比如:MySQL、Postgresql等
业务量大的时候,单机会有瓶颈:数据容量瓶颈,或 性能瓶颈,而且单机扩容会比较麻烦。

分库分表

解析SQL,确定数据在哪一台节点上。把SQL分发到对应的实例上。

问题1:不适合跨库的SQL,有些可以支持但是比较弱,跨库的事务也是。

问题2:维护比较麻烦
分库分表的中间件给用户提供的是两套东西:一套是分库分表的中间件,一套是底下的各个分散的数据库(那么多于业务开发者来讲,他必须要维护这两套东西)。

假设我们要做表结构的变更:加个索引
这样的操作必须是先到数据库里面,去把所有的库的索引都要加好,然后再改中间件上的配置。

分布式数据库

上面这一种就比较麻烦,因此我们有更方便的解决方案:分布式数据库。

像 Citus 属于其中一种,Citus 比较特殊,它是建立在 PG 基础上的(最开始已经提到这一点)。
它在 PG 上安装一个插件,叫 Citus 插件,安装完之后,做一些简单的配置,这样一些普通的 PG 就会变成 Citus 集群。
然后这些业务要和 coordinator(协调节点)打交道,然后 coordinator(协调节点)再把 SQL 分发到底下的各个 worker(工作节点)上。

总结图

在这里插入图片描述

分片及工作原理

我们首先看,在这一套架构:Citus 怎么管理数据,数据是怎么分布的。

适用场景:HTAP

数据分布:
1、分片表(distributed table):rows会分布在 worker节点中。主要用于大量数据的事实表。
hash:按分片字段 hash 值分布数据
》》》按照某个字段进行分片,每个 worker 来管理多个分片。

append:分片 size 增加到一定量自动创建新的分片

2、参考表、广播表(reference table):每个 worker 节点都保存一模一样的数据。主要用于维度表。
仅一个分片,每个 worker 一个副本,主要用于维度表。
》》》主要是维度表,协助分片表做 join。(参考下面案例)

3、本地表
建一个表在上面,不做任何配置。

例如下面:
把 原SQL 改写(逻辑表名 改为 分片表名),把 SQL 分发到各个 SQL 上面执行。
最后再把 worker 上的结果进行收集汇总返回给应用。

在这里插入图片描述

主要特性和性能参考

主要特性:
1、PostgreSQL兼容
》》》因为它本身就是工作在PostgreSQL里面(作为插件)。

2、水平扩展
》》》2个 worker 不够可以扩容到 4个 worker。coordinator(协调节点)默认 1个,不够的话也可以扩容到 2个。

3、高并发增删改查
》》》可以作为交易系统,OLTP的交易系统。

4、实时数据分析
》》》分析性的SQL可以快速的给到结果。

5、支持分布式事务

6、支持常用DDL

基于以上:我们可以把 Citus 集群,可以当做一个单库来看待(使用层面)。

性能方面:
OLTP场景:基于主键的随机点查询SQL。
单个 coordinator(协调节点)架构的 QPS 是 PG性能的三分之一 ~ 二分之一。
》》》Citus coordinator(协调节点)1w(单核); PG非预编译语句 1w(单核); PG预编译语句 2.5w(单核);
》》》测试是 Citus 7.4,后面 Citus9以后性能和 PG 就差不多了。

上面是理想场景下,单个小表的随机点查询,单个PG没有多个IO,基本上都是基于内存的情况下。

但是真实场景下:
第一、表变大了,缓存的命中率不一定那么高;
第二、我们的SQL不一定都是简单的单个表查询,可能是复杂的关联查询,Citus的优势就来了,Citus coordinator 节点就简单的做一些计算,它没有 IO。

所以说,真实的场景下,单机的 Citus 可能比单机的 PG 要快。

OLAP场景:最典型的就是聚合分组计算,这样的场景下,Citus 有个很大的好处,它可以做并行,它是做分片并行的。

理想情况下,聚合SQL执行时间和分片数量成反比。

举个例子:
日常维护性能参考(1个 coordinator节点 和 8个 worker节点,96个分片,PG 10 + Citus 7.4)
PG 默认两个并发:count(*)
在这里插入图片描述

分布式方案对比

在这里插入图片描述

Citus 苏宁的应用案例

在这里插入图片描述

Citus 环境搭建

1、准备集群资源(一个节点指一套 PG HA 集群,测试环境可以用单实例PG)。
》》》一个coordinator(协调节点);条件允许的话,可以选择高配 CPU 的主机。
》》》2^n (2,4,8,16,…)个 worker (工作节点);(2的幂是为了方便扩容)。

2、安装配置 PostgreSQL:省略
Citus 的日常运维,除了扩缩容,节点迁移外。
其他方面和普通 PG是一样的,例如:HA部署,备份恢复,监控告警…等

3、安装插件

1
2
3
4
5
6
7
8
9
tar xzf citus-9.2.3.tar.gz

cd citus-9.2.3/

./configure

make

make install

注意:也可以通过社区预编译包安装:postgresql 官方下载地址

4、创建扩展

1
2
3
4
5
6
7
8
9
10
11
# 必须将 citus 作为第一个库,作为预加载
vi postgresql.conf
shared_preload_libraries='citus,...'

su - postgres

# 配置文件修改完毕之后,重启 PG
pg_ctl restart

# 执行命令,来创建扩展
psql -c "create extension citus"

5、添加 worker (工作节点) ===> 只在 coordinator (协调节点)执行

1
2
3
4
5
# 把下面的替换成对应的 IP、PORT 就好了。
# 下面是 citus 的函数
SELECT * FROM master_add_node('$wk1_ip', $wk1_port);
SELECT * FROM master_add_node('$wk2_ip', $wk2_port);
...

案例演示:建表

1
2
3
4
5
6
7
创建分片表 sale_order

create table sale_order(oid int PRIMARY KEY,cid int, other text);
set citus.shard_count = 8;

// oid 作为分片字段,默认分片方法是 hash
select create_distributed_table('sale_order, 'oid');
1
2
3
4
创建参考表 customer

create table customer(cid int, age int, other text);
select create_reference_table('customer');

1、在每个 worker 上创建分片
2、更新元数据:pg_list_partition、pg_dist_shard、pg_dist_shard_placement

元数据:三个表

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

带分片字段的SQL查询

在 citus 里面,所有的查询,为了方便我们理解,所有查询分成两类:一类是带有分片字段的查询,还有一类是不带分片字段的查询。

例如:下面的带有分片字段的SQL,下面的分片字段 oid = 100,coordinator 节点拿到这个 SQL 之后,会解析这个 100,会把 100 算成 hash 值,然后拿这个 hash 值算它属于哪个分片,再看分片的位置,在分片所在的 worker 上执行这条 SQL。

关键点在于 Task Count: 1,这样的执行方式是比较适合高并发的交易系统,交易类的业务的。
为了提升高并发场景的性能,coordinator 会对每个 worker 上的连接做缓存,进行复用提高性能。
在这里插入图片描述

不带分片字段的SQL查询

例如下面案例,这里有一个典型的分组的聚合 join。

在这个 SQL 执行的时候,citus 会把这个 SQL 做成子的 join SQL,然后在每个 worker 上按照分组字段先做一次聚集,然后再把所有的结果返回到 coordinator 上,coordinator 收到 worker 的结果之后,再做一次最终的聚合。

这里面,coordinator在执行的时候,它是按照分片在每个 worker 上并发去执行的,并发数主要受分片数影响。
比如,我有 8 个分片,这里就有八个并发同时在跑。
这里有一个参数 max_adaptive_executor_pool_size 可以控制单个 worker 的并发数,防止单个 worker 负载过高。

在这里插入图片描述

亲和性概念

前面只是简单的入门,我们想要用好 citus 的话,还要更深入了解它的一些概念。

例如:最基本的亲和的概念。

亲和什么意思?
我们在执行 SQL的时候,会涉及到多张表,最典型的例子就是涉及到 join。
假设两个表做 join,效率最高的就是把每个 worker 上进行两张表进行 join,要做到这一点,就要把这两个表的分片逻辑(分片位置)是一致的,也就是两个表做亲和。

在这里插入图片描述

亲和性定义

2022-03-15 08:02:19
在这里插入图片描述

亲和性的 join 案例

  • 表定义
    在这里插入图片描述

  • 亲和 join
    在这里插入图片描述

  • 非亲和 join
    在这里插入图片描述

分布式事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 1. Citus对各种跨库操作采用2PC保障事务一致
# 以下操作 Citus 会自动包装成 2PC 事务来执行
+ DDL
+ Copy
+ 参考表的更新
+ 应用发起的跨库事务
+ 其他

# 2. "Citus Maintenance Daemon"
它是一个工作进程,会自动检测和处理分布式死锁

# 3. "Citus Maintenance Daemon"
进程会根据记录在pg_dist_transaction中的事务信息自动处理因故障残留的未决事务
(节点鸡掰,恢复之后会根据 pg_dist_transaction 来继续处理事务)

跨库事务案例

2022-03-15 09:22:03
在这里插入图片描述
这里会有两个问题:
问题1、9002 COMMIT 完成,此时另外一个来进行 sum 计算,计算完成之后,9001 COMMIT 完成。
》》》这样就会产生全局的不一致性读问题,所以说 Citus 不支持 全局的MVCC。
》》》这种缺陷,所有基于分库分表的中间件方案,也都是有这个问题的,只不过大家分库分表用的那么多,也都没人提这个事情。

问题2、9002 COMMIT 完成,此时9001节点鸡掰,恢复之后,”Citus Maintenance Daemon”进程会自动恢复事务,9001会继续 COMMIT,然后整个事务完成。

表设计

我们一张业务表,究竟要做成那种类型的???分片表、参考表、本地表

在这里插入图片描述

  • 分片表
    Citus 的核心肯定是分片表,这个是它的核心价值。它的作用就是把大表拆小,然后把负载分散开来。
    但是要注意:分片字段,分片数,表亲和 这些东西。

    • 参考表
      参考表就是为了配合分片表做 join。
      为什么小表?因为参考表在每个 worker 上都有,大表的话,worker 占用的空间太大了。
      另外每个参考表的更新都会产生分布式事务,都会在每个 worker 上都会更新,这样它也不适合频繁更新的表,那样性能肯定不会好。
  • 本地表
    本地表尽量不要用,除非上面分片表,参考表都不适合,又想做小表的高并发查询。

分片字段

  • 每个SQL中都带等值条件的字段(比如用户ID)
  • join 关联的字段
  • 高基数且值分布均匀的字段
  • 日期(按天)通常不适合作为分片字段,业务往往倾向于访问最近的数据,容易形成热点

分片数

  • 建议2的乘方。比如: 条件好的话直接 32,64,128…(条件不好的话 2、4、8、16 也可以,顶不住的适合,也可以后续扩容)
    》》》前面已经说了,为了方便扩容

  • 控制单分片最大记录数和大小(比如1亿,10GB)
    》》》超过限制,会影响性能

  • 建议不超过worker总core数的2~4倍
    》》》防止单 worker 负载过高。
    》》》前面已经说了,新版本可以使用:max_adaptive_executor_pool_size 可以控制单个 worker 的并发数,防止单个 worker 负载过高。

  • 为今后 worker 扩容保留一定余量
    》》》防止意外,要留有余地

分片数影响查询性能

第一类是最简单的带有主键的查询 SQL。
数据量大的时候,分片数越多,数据分布的越散,性能更好(前提是数据量大哟)。

不带分片的话,要到每个 worker 上去执行,所以说,分片越多,性能越差。
在这里插入图片描述

第二类分析类的,分组聚合的 SQL。
也要看场景,分组数少的话,性能差不多。但是分组数多的话,性能差异就会很明显。
因为执行这样 SQL 的时候,每个 worker 上先做 group by,再把结果集返回到 coordinator 上,再做聚合的结果。
但当分片数多的话,结果集就越多,导致 coordinator 的压力越大,执行时间就越长。
在这里插入图片描述

SQL限制

2022-03-15 10:06:23

Citus 的 SQL 兼容性在不断的提升,但我们要用的话,还是要了解它是有哪些 SQL 的不支持的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# join的限制
+ 不支持2个非亲和分片表的outer join
+ 仅citus.enable_repartition_joins = on时支持2个非亲和分片表的inner join
+ 对分片表和参考表的outer join,参考表只能出现在left join的右边或right join的左边

# 其他部分限制
+ 不支持按非分片字段分组的Window函数
+ 本地表不能和分片表(参考表)混用
+ "insert into ... select ... from" 不支持分片表->本地表

# 回避方法
+ CTE或子查询
》》》把 SQL 中一部分内容改为子查询,把子查询从 coordinator 分发到各 worker 上执行,注意性能
+ 通过临时表(或dblink,postgres_fdw)中转
》》》把 SQL中涉及到不同的表的结果拆成临时表,把临时表拉到 coordinator 本地做处理。

注:更多限制参考:Citus 不支持 PostgreSQL 哪些功能?

SQL改写

  • SQL 改写

    1
    2
    3
    4
    5
    6
    7
    8
    select a.userid,c1,c3 from tb1 a join tb3 b on(a.userid=b.userid);

    -- 改写 ↓↓↓

    with b as(
    select * from tb3
    )select a.userid,c1,c3 from tb1 a join b on(a.userid=b.userid);
    -- with 就相同于一个子查询,然后再把子查询结果与 tb1 进行关联。
  • 执行计划
    在这里插入图片描述

    FDW改写

    FDW是一种变相的”临时表”,如果直接通过临时表回避SQL限制,需要应用把一个SQL拆成多个SQL,增加业务改造成本。

1、在CN上创建本地FDW Server

1
2
3
4
5
6
7
create extension postgres_fdw;

create server server_cn_local
FOREIGN data wrapper postgres_fdw
OPTIONS(host '127.0.0.1', port '9000', dbname 'postgres');

create user mapping for postgres server server_cn_local options(user 'postgres');

2、封装FDW表

1
2
3
4
5
6
7
CREATE FOREIGN TABLE tb1_fdw(userid integer,c1 text)
server server_cn_local
options (table_name 'tb1');

CREATE FOREIGN TABLE tb3_fdw(productid integer,userid integer,c3 text)
server server_cn_local
options (table_name 'tb3');

3、SQL改写

1
2
3
4
5
6
7
8
9
10
select a.userid,c1,c3 from tb1 a join tb3 b on(a.userid=b.userid);

-- 改写 ↓↓↓

with a as MATERIALIZED(
select * from tb1_fdw),
b as MATERIALIZED(
select * from tb3_fdw
)
select a.userid,c1,c3 from a join b on(a.userid=b.userid);

4、执行计划
在这里插入图片描述

非亲和 inner join 执行性能对比

在这里插入图片描述

  • FDW改写适用于两个小表 join
  • CTE改写适用于大表和小表 inner join或大表对小表的 left join
  • 重分布适用于大表和大表 inner join

注:大表小表指的是WHERE条件过滤后的结果集大小

DDL支持

在这里插入图片描述

DDL执行案例

  • Citus不允许修改分片字段类型

    1
    2
    3
    4
    5
    create table tbdist(id int,c1 varchar(20));
    select create_distributed_table('tbdist','c1');

    postgres=# alter table tbdist alter c1 type varchar(50);
    ERROR: cannot execute ALTER TABLE command involving partition column
  • varchar(20)改成varchar(50)不影响分片字段hash值计算,可以通过修改元数据实现,避免重导数据。
    反之:若修改的是字段类型,就要重导数据了。

    1
    2
    3
    4
    5
    6
    SELECT run_command_on_placements('tbdist','ALTER TABLE %s ALTER COLUMN c1 TYPE varchar(50)');
    BEGIN;
    SET LOCAL citus.enable_ddl_propagation TO off;
    ALTER TABLE tbdist ALTER COLUMN c1 TYPE varchar(50);
    UPDATE pg_dist_partition SET partkey = column_name_to_column(logicalrelid, 'c1') WHERE logicalrelid = 'tbdist'::regclass;
    END;

SQL的连接数控制

不带分片字段SQL的连接数控制

  • 设置 citus 任务执行器

    1
    SET citus.task_executor_type = 'adaptive'|'task-tracker';
  • adaptive执行器(默认执行器)

    1
    2
    3
    4
    5
    6
    7
    执行不带分片字段SQL时,CN对所有worker上的所有shard同时发起连接,并执行SQL收集结果。(原real_time执行器的增强版。)

    citus.max_adaptive_executor_pool_size:
    执行不带分片字段SQL时,每worker发起的最大并发连接数,默认值16。

    citus.max_cached_conns_per_worker
    每worker缓存的连接数,默认值为1。
  • task-tracker执行器
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    执行不带分片字段SQL时,CN只和worker上的`task-tracker`进程交互,
    调用worker上的`task_tracker_assign_task()`函数将任务分配给`task-tracker`,
    然后轮询任务的完成状况,待任务结束后再开一个连接从worker取回结果。

    citus.max_running_tasks_per_node:
    每个worker同时执行的最大并发数,默认8


    》》》通过调参合理控制并发数,避免并发连接过大影响系统稳定。
    比如:对数据导出任务,可以设置citus.max_adaptive_executor_pool_size为1
    在这里插入图片描述

高并发点查询SQL优化示例

不带分片字段的高并发点查询SQL优化示例

对于高并发的业务,我们要尽可能做到这个 SQL 是带分片字段,并且是能够分发到单个分片上去的。
但是,我们有些业务它没办法做到这一点,那么怎么去做优化呢?

  • 表定义
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    postgres=# \d tb3
    Table "public.tb3"
    Column | Type | Collation | Nullable | Default
    -----------+---------+-----------+----------+---------
    productid | integer | | |
    userid | integer | | |
    c3 | text | | |
    Indexes:
    "tb3_productid" btree (productid)
    "tb3_userid" UNIQUE, btree (userid)
    在这里插入图片描述
  • 原始SQL
    1
    select * from tb3 where userid=9
    执行计划(原始SQL)
    1
    2
    3
    4
    5
    6
    7
    8
    9
     QUERY PLAN
    -----------------------------------------------------------------------------------------------------
    Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
    Task Count: 32
    Tasks Shown: One of 32
    -> Task
    Node: host=127.0.0.1 port=9001 dbname=postgres
    -> Index Scan using tb3_userid_102081 on tb3_102081 tb3 (cost=0.28..2.50 rows=1 width=11)
    Index Cond: (userid = 9)

↓↓↓优化↓↓↓:拆分成2次带分片字段的查询

  • 创建分片字段索引表

    1
    2
    3
    4
    create table tb3map_userid2productid(userid int primary key, productid int);
    select create_distributed_table('tb3map_userid2productid', 'userid');

    insert to tb3map_userid2productid select userid,productid from tb3;
  • SQL 改写一:应用发起2次SQL

    1
    2
    select productid from tb3map_userid2productid where userid=9;
    select * from tb3 where productid=? and userid=9;
  • SQL 改写二:通过FDW包装成一个SQL
    (改写二只需要改SQL不需要改应用程序,但QPS性能只有方法一的1/3)

    1
    2
    3
    4
    5
    6
    7
    create foreign table tb3map_userid2productid_fdw(userid int, productid int)
    server server_cn_local
    options (table_name 'tb3map_userid2productid');

    select * from tb3_fdw where
    productid = (select productid from tb3map_userid2productid_fdw where userid=9)
    and userid=9;

    执行计划(SQL 改写二)

    1
    2
    3
    4
    5
     QUERY PLAN                                         
    --------------------------------------------------------------------------------------------
    Foreign Scan on tb3_fdw (cost=246.86..276.08 rows=1 width=40)
    InitPlan 1 (returns $0)
    -> Foreign Scan on tb3map_userid2productid_fdw (cost=100.00..146.86 rows=15 width=4)

数据导入

  • INSERT
    1、批量插入的数据被CN拆成单条非预编译INSERT发送给Worker
    (即使你插入的是预编译SQL发给CN,CN也会拆分非预编译发给Worker)
    2、单个insert中如有多个value,属于同一分片的值仍以多value的形式发送给worker
    3、JDBC的批更新和reWriteBatchedInserts=true优化参数可一定程度提升插入性能,但整体上INSERT的性能偏低

  • COPY(速度较快,取决于索引和数据量)
    1、典型业务表Copy导入速度:大约10w/s(速度依赖部署环境和表定义)
    2、索引较多的大表导入速度慢。每个Worker上的所有分片同时导入,容易导致缓存不足。可以通过分区进行优化。

  • 官方测试

    1
    2
    3
    Insert and Update:  10-50k/s
    Bulk Copy: 250k/s-2M/s
    Citus MX: 50k/s-500k/s

参考表负载不均

应用读参考表时默认始终访问第一个分片,高频访问会造成负载不均衡。

可使用轮询策略:

1
SET citus.task_assignment_policy TO "round-robin"