开源软件:Leaf

分布式ID生成系统试用及原理阅读

Posted by Shoukai Huang on August 26, 2020

Leaf 分布式ID生成系统试用及原理阅读,资料来源于:

全局唯一ID

UUID

示例:6ba7b810-9dad-11d1-80b4-00c04fd430c8 优点:本地、性能高 缺点:长度过长、信息泄露(Mac地址泄露)、不适合作为MySQL主键

snowflake

把64-bit分别划分成多段,分开来标示机器、时间等,比如在snowflake中的64-bit分别表示如下图(图片来自网络)所示:

划分:

  • 41-bit:时间
  • 10-bit:机器
  • 10-bit:序列号

优点:时钟在高位,趋势递增、独立部署 缺点:强依赖时钟

数据库

利用 AUTO_INCREMENT 来达到主键自增

数据库:

CREATE TABLE `Tickets64` (
`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
`stub` CHAR(1) NOT NULL DEFAULT '',
PRIMARY KEY (`id`),
UNIQUE INDEX `stub` (`stub`)
)
COLLATE='utf8_general_ci'

通过执行SQL获取ID

begin;
REPLACE INTO Tickets64 (stub) VALUES ('a');
SELECT LAST_INSERT_ID();
commit;

优点:简单 缺点:依赖DB、性能依赖单台MySQL的读写性能

分布式方案下的全局唯一ID

Leaf-segment 数据库方案

方案思路:

  • 多 proxy server 部署策略
  • proxy server 批量获取号段
  • 号段用完后,更新字段

更新号段SQL方式

Begin
UPDATE table SET max_id=max_id+step WHERE biz_tag=xxx
SELECT tag, max_id, step FROM table WHERE biz_tag=xxx
Commit

改进策略:利用线程定期预加载 segment

优点:可扩展、方便使用 缺点:ID 自增,能够暴露ID自增数量

Leaf-snowflake 数据库方案

在 snowflake 方式上进行改进。引入 zookeeper ,workerID 存储到 ZK 上,避免 ZK 单点,服务器文件存储文件

解决时钟问题:

  • 上报当前时间到 ZK 上;
  • 校验其他服务器时间,做平均值校验;
  • 关闭时钟同步服务;

Leaf 使用

号段模式

创建数据库

CREATE DATABASE leaf
CREATE TABLE `leaf_alloc` (
  `biz_tag` varchar(128)  NOT NULL DEFAULT '',
  `max_id` bigint(20) NOT NULL DEFAULT '1',
  `step` int(11) NOT NULL,
  `description` varchar(256)  DEFAULT NULL,
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB;

insert into leaf_alloc(biz_tag, max_id, step, description) values('leaf-segment-test', 1, 2000, 'Test leaf Segment Mode Get Id')

配置 leaf.properties 属性 leaf.jdbc.url, leaf.jdbc.username, leaf.jdbc.password

运行,通过接口获取ID:

curl --location --request GET 'http://localhost:8080/api/segment/get/leaf-segment-test'

数据库字段变化

Leaf 源码

Leaf-segment 数据库方案

SegmentBuffer 结构:

public class SegmentBuffer {
    private String key;
    private Segment[] segments; //双buffer
    private volatile int currentPos; //当前的使用的segment的index
    private volatile boolean nextReady; //下一个segment是否处于可切换状态
    private volatile boolean initOk; //是否初始化完成
    private final AtomicBoolean threadRunning; //线程是否在运行中
    private final ReadWriteLock lock;

    private volatile int step;
    private volatile int minStep;
    private volatile long updateTimestamp;
    
    ……
}

Segment 结构:

public class Segment {
    private AtomicLong value = new AtomicLong(0);
    private volatile long max;
    private volatile int step;
    private SegmentBuffer buffer;
    ……
}

IDGen 接口:

public interface IDGen {
    Result get(String key);
    boolean init();
}

get 实现过程

@Override
public Result get(final String key) {
    if (!initOK) {
        return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
    }
    if (cache.containsKey(key)) {
        SegmentBuffer buffer = cache.get(key);
        if (!buffer.isInitOk()) {
            synchronized (buffer) {
                // 从 DB 加载
                ……
            }
        }
        return getIdFromSegmentBuffer(cache.get(key));
    }
    return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
}

其中 getIdFromSegmentBuffer 获取ID过程

public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
    while (true) {
        buffer.rLock().lock();
        try {
            final Segment segment = buffer.getCurrent();
            // 由于有线程异步更新,所以本处为线程是否执行的判断
            if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
                // 如果未执行,超出阈值,加载
                service.execute(new Runnable() {
                    @Override
                    public void run() {
                        Segment next = buffer.getSegments()[buffer.nextPos()];
                        boolean updateOk = false;
                        try {
                            updateSegmentFromDb(buffer.getKey(), next);
                            updateOk = true;
                            logger.info("update segment {} from db {}", buffer.getKey(), next);
                        } catch (Exception e) {
                            logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                        } finally {
                            if (updateOk) {
                                buffer.wLock().lock();
                                buffer.setNextReady(true);
                                buffer.getThreadRunning().set(false);
                                buffer.wLock().unlock();
                            } else {
                                buffer.getThreadRunning().set(false);
                            }
                        }
                    }
                });
            }
            long value = segment.getValue().getAndIncrement();
            if (value < segment.getMax()) {
                return new Result(value, Status.SUCCESS);
            }
        } finally {
            buffer.rLock().unlock();
        }
        // 处理当前线程正在执行,获取ID最大值还超过当前号段最大值情况
        waitAndSleep(buffer);
        buffer.wLock().lock();
        try {
            final Segment segment = buffer.getCurrent();
            long value = segment.getValue().getAndIncrement();
            if (value < segment.getMax()) {
                return new Result(value, Status.SUCCESS);
            }
            if (buffer.isNextReady()) {
                buffer.switchPos();
                buffer.setNextReady(false);
            } else {
                logger.error("Both two segments in {} are not ready!", buffer);
                return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
            }
        } finally {
            buffer.wLock().unlock();
        }
    }
}

性能结果

QPS压测结果近5w/s,TP999 1ms

TP指标: 指在一个时间段内,统计该方法每次调用所消耗的时间,并将这些时间按从小到大的顺序进行排序, 并取出结果为 : 总次数 * 指标数 = 对应TP指标的序号 , 再根据序号取出对应排序好的时间,即为TP指标。

参考: