Leaf 分布式ID生成系统试用及原理阅读,资料来源于:
全局唯一ID
UUID
示例:6ba7b810-9dad-11d1-80b4-00c04fd430c8 优点:本地、性能高 缺点:长度过长、信息泄露(Mac地址泄露)、不适合作为MySQL主键
snowflake
把64-bit分别划分成多段,分开来标示机器、时间等,比如在snowflake中的64-bit分别表示如下图(图片来自网络)所示:
划分:
- 41-bit:时间
- 10-bit:机器
- 10-bit:序列号
优点:时钟在高位,趋势递增、独立部署 缺点:强依赖时钟
数据库
利用 AUTO_INCREMENT 来达到主键自增
数据库:
CREATE TABLE `Tickets64` (
`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
`stub` CHAR(1) NOT NULL DEFAULT '',
PRIMARY KEY (`id`),
UNIQUE INDEX `stub` (`stub`)
)
COLLATE='utf8_general_ci'
通过执行SQL获取ID
begin;
REPLACE INTO Tickets64 (stub) VALUES ('a');
SELECT LAST_INSERT_ID();
commit;
优点:简单 缺点:依赖DB、性能依赖单台MySQL的读写性能
分布式方案下的全局唯一ID
Leaf-segment 数据库方案
方案思路:
- 多 proxy server 部署策略
- proxy server 批量获取号段
- 号段用完后,更新字段
更新号段SQL方式
Begin
UPDATE table SET max_id=max_id+step WHERE biz_tag=xxx
SELECT tag, max_id, step FROM table WHERE biz_tag=xxx
Commit
改进策略:利用线程定期预加载 segment
优点:可扩展、方便使用 缺点:ID 自增,能够暴露ID自增数量
Leaf-snowflake 数据库方案
在 snowflake 方式上进行改进。引入 zookeeper ,workerID 存储到 ZK 上,避免 ZK 单点,服务器文件存储文件
解决时钟问题:
- 上报当前时间到 ZK 上;
- 校验其他服务器时间,做平均值校验;
- 关闭时钟同步服务;
Leaf 使用
号段模式
创建数据库
CREATE DATABASE leaf
CREATE TABLE `leaf_alloc` (
`biz_tag` varchar(128) NOT NULL DEFAULT '',
`max_id` bigint(20) NOT NULL DEFAULT '1',
`step` int(11) NOT NULL,
`description` varchar(256) DEFAULT NULL,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB;
insert into leaf_alloc(biz_tag, max_id, step, description) values('leaf-segment-test', 1, 2000, 'Test leaf Segment Mode Get Id')
配置 leaf.properties 属性 leaf.jdbc.url, leaf.jdbc.username, leaf.jdbc.password
运行,通过接口获取ID:
curl --location --request GET 'http://localhost:8080/api/segment/get/leaf-segment-test'
数据库字段变化
Leaf 源码
Leaf-segment 数据库方案
SegmentBuffer 结构:
public class SegmentBuffer {
private String key;
private Segment[] segments; //双buffer
private volatile int currentPos; //当前的使用的segment的index
private volatile boolean nextReady; //下一个segment是否处于可切换状态
private volatile boolean initOk; //是否初始化完成
private final AtomicBoolean threadRunning; //线程是否在运行中
private final ReadWriteLock lock;
private volatile int step;
private volatile int minStep;
private volatile long updateTimestamp;
……
}
Segment 结构:
public class Segment {
private AtomicLong value = new AtomicLong(0);
private volatile long max;
private volatile int step;
private SegmentBuffer buffer;
……
}
IDGen 接口:
public interface IDGen {
Result get(String key);
boolean init();
}
get 实现过程
@Override
public Result get(final String key) {
if (!initOK) {
return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
}
if (cache.containsKey(key)) {
SegmentBuffer buffer = cache.get(key);
if (!buffer.isInitOk()) {
synchronized (buffer) {
// 从 DB 加载
……
}
}
return getIdFromSegmentBuffer(cache.get(key));
}
return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
}
其中 getIdFromSegmentBuffer 获取ID过程
public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
while (true) {
buffer.rLock().lock();
try {
final Segment segment = buffer.getCurrent();
// 由于有线程异步更新,所以本处为线程是否执行的判断
if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
// 如果未执行,超出阈值,加载
service.execute(new Runnable() {
@Override
public void run() {
Segment next = buffer.getSegments()[buffer.nextPos()];
boolean updateOk = false;
try {
updateSegmentFromDb(buffer.getKey(), next);
updateOk = true;
logger.info("update segment {} from db {}", buffer.getKey(), next);
} catch (Exception e) {
logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
} finally {
if (updateOk) {
buffer.wLock().lock();
buffer.setNextReady(true);
buffer.getThreadRunning().set(false);
buffer.wLock().unlock();
} else {
buffer.getThreadRunning().set(false);
}
}
}
});
}
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
} finally {
buffer.rLock().unlock();
}
// 处理当前线程正在执行,获取ID最大值还超过当前号段最大值情况
waitAndSleep(buffer);
buffer.wLock().lock();
try {
final Segment segment = buffer.getCurrent();
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
if (buffer.isNextReady()) {
buffer.switchPos();
buffer.setNextReady(false);
} else {
logger.error("Both two segments in {} are not ready!", buffer);
return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
}
} finally {
buffer.wLock().unlock();
}
}
}
性能结果
QPS压测结果近5w/s,TP999 1ms
TP指标: 指在一个时间段内,统计该方法每次调用所消耗的时间,并将这些时间按从小到大的顺序进行排序, 并取出结果为 : 总次数 * 指标数 = 对应TP指标的序号 , 再根据序号取出对应排序好的时间,即为TP指标。
参考: