CRDT 全称 convergent or commutative replicated data type ,CRDT(无冲突可复制数据类型)是一种可以存储在不同计算机(节点-Peers)上的数据结构。每个节点都可以立刻修改更新自己的状态而无需网络请求与其他节点进行检查。每个对等的节点可能在不同时间点具有不同的状态,但是最终会收敛到同一个商定的状态。这种特性让其十分适合构建协作应用程序,同时无需中心化服务器。
CRDT主要分两类:
Operation-based CRDT 在数据传输数量方面有一定优势,但是对每次操作的顺序、网络传输、操作的因果关系等都有较高的要求。而State-based CRDT更倾向于定期将整个状态进行合并得到最新状态,避免操作或者数据传输导致的状态错误等问题。
后文的所有CRDT都将只关注State-based CRDT,不再特别说明。
CRDT的基本数据结构:
interface CRDT<T, S> {
value: T;
state: S;
merge(state: S): void;
}
value:T , 是我们的最终目的,需要维护的状态。 state: S , 用于保证数据一致的元数据。 merge function , 用来合并数据的方法。
合并函数需要达到以下目标:
寄存器(Register)是保存单个值得CRDT,Last Write Wins Register(LWW Register)是最简单的寄存器。
顾名思义,最后写入的值为最终值。使用timestamp(逻辑时钟,而非实际时间)来判断是否是新值,每当本地值更新时,该值递增。以下是合并逻辑:
以下是LWW Register的代码
type LWWRegisterState<T> = [timestamp: number, value: T];
class LWWRegister<T> {
readonly id: string;
state: LWWRegisterState<T>;
get value() {
return this.state[1];
}
constructor(id: string, state: [number, T]) {
this.id = id;
this.state = state;
}
set(value: T) {
this.state = [this.state[0] + 1, value];
}
merge(state: LWWRegisterState<T>, remotePeerId: string) {
const [remoteTimestamp] = state;
const [localTimestamp] = this.state;
if (localTimestamp > remoteTimestamp) return;
if (localTimestamp === remoteTimestamp && this.id > remotePeerId) return;
this.state = state;
}
}
大多数程序都涉及多个值,这意味着我们需要比LWW寄存器更复杂的CRDT,将更多的LWW Rigister组合起来形成一个 LWW Map。
LWW Map的值是由字符串到T值得映射:
type Value<T> = {
[key: string]: T;
}
LWW Map 的state 即为 LWW Rigisters 的集合。
type State<T> = {
[key: string]: LWWRegister<T | null>["state"];
};
现在,来逐步创建一个LWW Map:
class LWWMap<T> {
readonly id: string = "";
#data = new Map<string, LWWRegister<T | null>>();
constructor(id: string, state: State<T>) {
this.id = id;
for (const [key, register] of Object.entries(state)) {
this.#data.set(key, new LWWRegister(this.id, register));
}
}
id LWW Map 的id,也是用于合并的最后判断。 #data 是一个包含指向 LWW Rigister 实例的密钥(key)映射。
然后是CRDT 的三个重要属性: value
、state
、merge
:
get value() {
const value: Value<T> = {};
for (const [key, register] of this.#data.entries()) {
if (register.value !== null) value[key] = register.value;
}
return value;
}
get state() {
const state: State<T> = {};
for (const [key, register] of this.#data.entries()) {
if (register) state[key] = register.state;
}
return state;
}
merge(state: State<T>, remotePeer) {
// 合并,遍历需要合并进来的state
for (const [key, remote] of Object.entries(state)) {
// 通过key获取到本地的register
const local = this.#data.get(key);
// 如果本地已有内容,则调用本地的merge方法进行合并
if (local) local.merge(remote, remotePeer);
else this.#data.set(key, new LWWRegister(this.id, remote));
}
}
与寄存器类似,但应为有 Map 的额外一层包裹,需要在循环后获取对应的值或者做出merge操作。
随后补充 get
、 set
、 has
、delete
:
set(key: string, value: T) {
const register = this.#data.get(key);
if (register) register.set(value);
else this.#data.set(key, new LWWRegister(this.id, [1, value]));
}
get(key: string) {
return this.#data.get(key)?.value ?? undefined;
}
delete(key: string) {
// CRDT 永远不会删除,而只会设置为空,
this.#data.get(key)?.set(null);
}
has(key: string) {
this.#data.get(key)?.value !== null;
}
对于delete 的置空操作,可能会有疑问,我们举个例子:A节点在某日删除了某键。
// id: A
{
1999: [8,'hello'],
// 2000: [11, 'world'],
2001: [12, 'hello world']
}
另一日,一古早节点Z连上了网络开始同步。
// id: Z
{
1999: [3,'hel'],
2000: [5, 'worl'],
2001: [1, '']
}
这时A节点因为没有2000的任何记录,在它合并时,会认为2000是一个新节点而新实例化一个寄存器存储2000的值。但实际情况是A 在第11次操作时,删除了2000,而Z的2000的记录还在版本5,预期得到的结果应该是Z的2000的值被删除(置空)。若A节点保留了2000的记录,则可以正确更新状态和值。
一下是完整的 LWW Map。
type Value<T> = {
[key: string]: T;
};
type State<T> = {
[key: string]: LWWRegister<T | null>["state"];
};
class LWWMap<T> {
readonly id: string = "";
#data = new Map<string, LWWRegister<T | null>>();
constructor(id: string, state: State<T>) {
this.id = id;
for (const [key, register] of Object.entries(state)) {
this.#data.set(key, new LWWRegister(this.id, register));
}
}
get value() {
const value: Value<T> = {};
for (const [key, register] of this.#data.entries()) {
if (register.value !== null) value[key] = register.value;
}
return value;
}
get state() {
const state: State<T> = {};
for (const [key, register] of this.#data.entries()) {
if (register) state[key] = register.state;
}
return state;
}
merge(state: State<T>, remotePeerId:string) {
// 合并,遍历需要合并进来的state
for (const [key, remote] of Object.entries(state)) {
// 通过key获取到本地的register
const local = this.#data.get(key);
// 如果本地已有内容,则调用本地的merge方法进行合并
if (local) local.merge(remote, remotePeerId);
else this.#data.set(key, new LWWRegister(this.id, remote));
}
}
set(key: string, value: T) {
const register = this.#data.get(key);
if (register) register.set(value);
else this.#data.set(key, new LWWRegister(this.id, [1, value]));
}
get(key: string) {
return this.#data.get(key)?.value ?? undefined;
}
delete(key: string) {
// CRDT 永远不会删除,而只会设置为空,
this.#data.get(key)?.set(null);
}
has(key: string) {
this.#data.get(key)?.value !== null;
}
}