Skip to content

协同算法

OT(Operational Transformation)

基本概念

Operational Transformation(OT)是一个应用于协同编辑领域的并发控制和冲突解决系统,要解决的是“多个用户对同一个文本域的同一个版本进行编辑时如何处理”的问题。下文中简称冲突问题。

DANGER

冲突问题是:“多个用户对同一个文本域的同一个版本进行编辑时如何处理”的问题

INFO

先解释一下这里的关键词,正确理解冲突问题非常重要:

  1. 同一个文本域:对于不同的底层数据模型来说,“同一个文本域”的概念是不同的,对于<textarea></textarea>来说文本域的概念一定是这个文本框中的全部文本。但对于像Notion、Roam Research或者飞书这样的应用,它们的每一行都是一个Block,每个Block可以看做单独的textarea这时候“同一个文本域”这个概念也许就是文档中的每个Block,原子操作类型设计也会更加复杂(比如:Block的insert、move、delete,Block内嵌组件时,组件内如何协作),具体的定义还需要看应用的设计。但无论怎样,只有对同一个文本域进行的修改才是我们这里说的冲突。
  2. 版本:采用一个全局唯一且自增的数字标识,在协同编辑领域称为revision。生成新的TextOperation时,当前Client的revision加1。版本号一方面表示了文本所处的状态,另一方面也决定了操作是否可以应用于当前的文本上。(PS:之所以文本域的版本不采用字符串比较,避免ABA问题应该也是决定因素之一)。

WARNING

ABA问题:一个线程把数据A变为了B,然后又重新变成了A。此时另外一个线程读取的时候,发现A没有变化,就误以为是原来的那个A。这就是有名的ABA问题。

如果有些业务不需要关心中间过程,仅关注共享变量的起始值和结束值,只要前后值一样就行,过程中共享变量是否被其他线程动过它不关心,这当然无所谓,但有些业务可能但是有些业务需求要求变量在中间过程不能被修改。

TIP

整体来看,OT解决并发编辑冲突问题的思路有以下几步:

  1. 定义原子操作类型:将用户在UI上触发的基于Event的操作抽象成由可枚举的N个原子操作类型组成的操作序列,这样一来复杂的UI界面操作的冲突就转换成了原子操作之间的冲突,同时也为多端数据同步定义了基本数据结构。
  2. 设计冲突解决算法:在冲突问题发生时,通过冲突解决算法解决冲突。冲突解决算法要完成以下任务:
  • 通过算法能将冲突的2个操作序列转换成2个可以和原来冲突的操作序列串行执行的新操作序列。
  • 可串行的两组操作序列作用在文本后,产生的影响相同。
  • 串行后的操作需要保持原有语义
  1. 多端冲突解决:通过向各端同步新的可串行操作序列完成冲突解决

定义原子操作类型

先看看没有原子操作的世界:

INFO

对有状态的UI Component来说其实并没有操作类型的概念,每次UI操作只会生成newValue在浏览器端通过e.target.value获取。如果进行粗略的oldValue和newValue的diff,在浏览器文本框中,用户可以进行以下UI操作:

  • 头部插入:"a" => "ba"
  • 尾部插入:"a" => "ab"
  • 中间插入:"ab" => "acb"
  • 删除并插入:"abbbbb" => "ac"(选中bbbbb,输入c)
  • 头部删除、尾部删除、中间删除、多个字符删除例子省略...... 可以看到用户可在UI上操作种类太多、缺乏定义而且和UI组件绑定没有通用性。因此我们需要定义一些更简洁,更具有原子性的操作。

DANGER

定义某个OT算法的操作为:

  • retain(Integer n): 保留n个字符,可以看作将字符串数组的指针移动到下标为n的位置
  • insert(String str):在当前位置插入str
  • delete(String str):删除当前位置后面的str

定义了原子操作之后,就可以将UI操作表示为由原子操作组成的操作序列了:

javascript
// 头部插入:"a" => "ba"
insert("b");
// 尾部插入:"a" => "ab"
retain(1).insert("b")
// 中间插入:`"ab" => "acb"`
retain(1).insert("c")
// 删除并插入:`"abbbbb" => "ac"`(选中bbbbb,输入c)
retain(1).delete("bbbbb").insert("c")

调用上面的原子操作后,在当前OT算法中操作序列会维护在一个称为TextOperation的类中:

javascript
// 删除并插入:`"abbbbb" => "ac"`(选中bbbbb,输入c)
TextOperation to = new TextOperation().retain(1).delete("bbbbb").insert("c");
// to.ops = [Retain(1), Insert("bbb"), Delete("c")]

而TextOperation就是进行多端同步的最小单位,同时也实现了操作的可序列化

设计冲突解决算法

那么此时OT算法就是有两个作用在相同revision的文本域上的TextOperation,每个TextOperation中都带着一组原子操作类型序列。那么我们应该如何处理这两个TextOperation才能让冲突自动解决呢?

TIP

  • 解决冲突方法是:当原子操作作用在文本中的同一个位置时,我们通过增加新原子操作拆分当前原子操作的方式让它可串行化。
  • 自动解决的方法是:将定义的可枚举的原子操作类型retain();insert();delete()排列组合,组成9种搭配。就相当于枚举了全部的冲突情况,针对每种情况设计当冲突发生时算法解决方案后就可以自动解决冲突。

接下来用具体的例子解释通过增加操作解决冲突方法:

WARNING

  • 两个操作都对0这个位置进行操作,通过冲突解决算法operationA和operationB被串行化。
  • operationA中的insert操作先执行,operationB通过新增一个Retain()操作从而保留A操作的影响。
  • 生成了两个新操作,新操作中newOperationA和原来一样,newOperationB多了一个原子操作。
javascript
// Text: ""
// opeationA: "" => "a"
operationA.ops = [Insert("a")];

// opeationB: "" => "b"
opeationB.ops = [Insert("b")];

// 冲突解决
transform(operationA, operationB)
// {newOperationA: [Insert("a")], newOperationB: [Retain(1), Insert("b")]}

假如我们只针对这一种情况编写transform方法的处理逻辑:

javascript
function transform(operationA, operationB) {
  // 等待构建和返回的新操作
  newOperationA = new TextOperation();
  newOperationB = new TextOperation();

  // operationA和operationB的原子操作序列
  opsA = operationA.ops;
  opsB = operationB.ops;

  // 遍历opA和opB的指针
  indexA = 0;
  indexB = 0;

  while (indexA < opsA.length && indexB < opsB.length) {
    // 如果是两个插入操作的冲突
    if (opsA[indexA] instanceof Insert && opsB[indexB] instanceof Insert ) {
      // A直接执行插入操作
      op = opsA[indexA ++];
      newA.ops.push(op);
      // op.value = "a"

      // B先进行移动操作,再进行插入操作
      newB.ops.push(new Retain(op.value.length));
      newB.ops.push(opsB[indexB ++]);
    }
  }
  return {newOperationA, newOperationB};
}

另外,假如将操作应用到文本的方法我们定义为String apply(String str, TextOperation op),经过冲突解决后生成的新操作可以满足交换律: apply(apply(str, operationA), newOperationB) === apply(apply(str, operationB), newOperationA)

多端冲突解决

目前为止我们理解了什么是冲突、如何解决冲突。那么在具体的应用中,什么时候会发生冲突呢? 这个问题其实和应用前后端数据流设计有关,在此我举出一个具体的例子进行讨论。 假如发生上面操作时,Client和Server的数据流设计方案如下所示:

Client和Server的数据流设计方案

在图中所有调用transform方法的地方都是冲突发生的地方。也就是说在实际的协同编辑应用中,冲突会在两种场景下发生:

1)Server接收到Client上传的操作。

2)Client接收到Server广播的操作。

TIP

在Client/Server协作时,还有几个事实我们需要意识到:

  • Client和Server都需要进行冲突解决,那么两端需要配置同构的transform算法
  • 由于冲突解决算法的串行化逻辑一定是有倾向的:在上文的实现对同一位置的相同操作,operationA要优先于operationB)。那么为了最终一致性,Client和Server端都必须偏向Server端。所以Client同步下来的操作必须放在operationA的位置,Server获取到的新操作必须放在operationB的位置

可视化演示

总结

DANGER

整体来看,OT是一种基于阻塞同步的多版本并发控制方案。

  • 阻塞同步:每次冲突解决时,只同步处理两个operation
  • 多版本:Client/Server端同时存在从同一个版本衍生出的的修改,最终通过某种方式再次合并成一致的版本

常见OT库

shareDBot.js

demo

shareDB使用demo

参考文档

CRDT(Conflict-free Replicated Data Type)

CRDT 的定义为:满足「强最终一致性」的数据类型。

基本概念

  • 冲突自由(无冲突)-复制-数据-类型(CRDT)
  • 一种分布式数据结构(在网络中的多台计算机上复制的数据结构)
  • 副本可以独立和并行地更新,不需要在副本之间进行协调,并保证不会有冲突发生。

异步网络系统

INFO

系统中有多个副本(replica),每个副本代表一个进程/一个计算机,副本可能会崩溃,我们用正确的副本指未崩溃的副本。 一个副本可能会崩溃之后永不恢复,或者保持内存完整性的情况下地恢复。 副本之间可能会被分区。

最终一致性 - Eventual Consistency

INFO

Eventual Delivery: 发布到一个正确的副本上的更新列表,最终将被传达至所有正确的副本。 收敛 Convergence: 收到了一样的更新列表的正确副本们,最终状态将一致。 终止 Termination: 所有执行的方法都会终止(即保证算法能在有限时间内完成计算,而不是要进行 2^128 次计算来遍历整个状态空间的这种算法)。

强最终一致性 - Strong Eventual Consistency (SEC)

INFO

强最终一致性定义为:满足「最终一致性 Eventual Consistency」且具有「强收敛性 Strong Convergence」。 强收敛 Strong Convergence: 收到了一样的更新列表的正确副本们的状态一致。 「强最终一致性 SEC」和「最终一致性 EC」的区别在于: EC 有可能要求用户解决冲突,而 SEC 是不会发生冲突的。

偏序集 - Partial Order

INFO

给定集合S,“≤”是S上的二元关系,若“≤”满足:

  • 自反性:∀a∈S,有a≤a;
  • 反对称性:∀a,b∈S,a≤b且b≤a,则a=b;
  • 传递性:∀a,b,c∈S,a≤b且b≤c,则a≤c; 则称集合 S 为偏序集

A -> B 的箭头表示 A ≤ B。 而 {x} 和 {y} 二者之间是不可比较的。

上确界 Least upper bound (LUB)

INFO

每个非空集合都有上确界或者下确界。 对于有限的点集一定有一个最大值最小值,此时最大值和上确界相同。 但对于某些无限点集来说可能不存在最大值或最小值,如{1-exp(-n)}没有最大值,但有上确界为1,1不在这个集合中。

考虑一个实数集合M. 如果有一个实数S,使得M中任何数都不超过S,那么就称S是M的一个上界。 在所有那些上界中如果有一个最小的上界,就称为M的上确界。 一个有界数集有无数个上界和下界,但是上确界却只有一个。

考虑一个实数集合M. 如果有一个实数S,使得M中任何数都大于等于S,那么就称S是M的一个下界。 在所有那些下界中如果有一个最大的下界,就称为M的下确界。 一个有界数集有无数个上界和下界,但是下确界却只有一个。

半格 - Semilatice

INFO

半格是一个偏序集。 设x,y属于一个偏序集,若对于任意的{x,y}都有最小上界(并),或者对于任意的{x,y}都有最大下界(交),则称该偏序集构成一个半格。

单调半格对象 - Monotonic Semilattice Object

INFO

  • 该对象集合是一个以 ≤ 为顺序的半格
  • 合并「本地状态 s」和「远端状态 s’」的方式为计算二者的上确界(LUB),即 merge(s, s’) = s⊔s′
  • 在所有的更新中,s 都是单调递增的,即 s ≤ update(s, u)

使用场景

TIP

CRDT 常被用在协作软件上,例如:

  • 多个用户需要共同编辑/读取共享的文档、数据库或状态的场景。
  • 在数据库软件,文本编辑软件,聊天软件等都可以用到它。

以多用户在线同时编辑同一篇文档的场景为例: 这个场景要求每个用户看到的内容都是一样的,即使在用户出现冲突编辑后(例如两个用户同时修改标题,两个请求同时到达服务器)也不会产生两个版本,这被称为一致性。(准确地说 CRDT 满足的是最终一致性)。

CRDT 让用户即使离线也可使用,并在恢复网络后能继续和所有人同步至一致的状态。

也可以和其他用户通过 P2P 的方式一起协同编辑。这被称为分区容错性。

这让 CRDT 可以很好地支持去中心化的应用:即使没有中心化服务器各端之间也能完成同步。

背景

WARNING

根据 CAP 定理,对于一个分布式计算系统来说,不可能同时完美地满足以下三点:

  • 一致性(Consistency): 每一次读都会收到最近的写的结果或报错;表现起来像是在访问同一份数据
  • 可用性(Availability): 每次请求都能获取到非错的响应——但是不保证获取的数据为最新数据
  • 分区容错性(Partition tolerance): 以实际效果而言,分区相当于对通信的时限要求 系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择,所以「完美的一致性」与「完美的可用性」是冲突的。 “CAP 特性三选二” 的描述其实具有误导性,实际上 CAP 只禁止了设计空间的很小一部分即存在分区时的完美可用性和一致性; 而实际上在 C 和 A 之间的权衡的设计非常灵活,CRDT 就是一个很好的例子。

CAP

CRDT 不提供「完美的一致性」,它提供了 强最终一致性 Strong Eventual Consistency (SEC) 。 这代表进程 A 可能无法立即反映进程 B 上发生的状态改动,但是当 A B 同步消息之后它们二者就可以恢复一致性,并且不需要解决潜在冲突(CRDT 在数学上就不让冲突发生)。 而「强最终一致性」是不与「可用性」和「分区容错性」冲突的,所以 CRDT 同时提供了这三者,提供了很好的 CAP 上的权衡。

CRDT应用案例

INFO

下述两种方法都是 CRDT:

  • 他们都可以被独立并发地更新,而不需要副本之间进行协调(加锁)
  • 多个更新之间不可能发生冲突
  • 总是可以保证最终一致性

1.在分布式系统中不加锁地统计一件事情的发生次数(Grow-only Counter)

INFO

  • 让每个副本只能递增自己的计数器 => 不用加锁同步 & 不会发生冲突
  • 每个副本上同时保存着所有其他副本的计数值
  • 发生次数 = 所有副本计数值之和
  • 因为每个副本都只会更新自己的计数值,不会与其他计数器产生冲突,所以该类型在消息同步后便满足一致性 不加锁地统计

2.在分布式系统中不加锁的得到最终一致的内容(Grow-only Set)

INFO

  • Grow-only Set 当中的元素是只能增加不能减少的
  • 将两个这样的状态合并就只需要做并集
  • 因为元素只增不减,不存在冲突操作,所以该类型在消息同步后便满足一致性

最终一致性

CRDT原理

CRDT 有两种类型:Op-based CRDT 和 State-based CRDT

State-based CRDT

设计一个基于状态的 CRDT 中需要先定义一个 State-based Object。 当它满足一定的性质时就能成为 State-based CRDT 👏。

INFO

State-based Object 的定义包括:

  • State 副本的内部状态的类型。
  • state_zero 内部状态的初始值。
  • ≤ 定义了 state 之间的顺序。
  • update(s, u) 定义 state 的更新方式。
  • merge(s, s') 函数可以用于合并两个状态得到新状态。
  • 副本之间通过传递自己的 state,并进行 merge 操作来达到一致性。

TIP

假设有 Eventual delivery 和 Termination 的属性,那么任意单调半格的 State-based 对象都是有强最终一致性(Strong Eventual Consistency)的。 观察到最小上确界操作 ⊔ 符合以下性质:

  • 结合律: (a⊔b)⊔c = a⊔(b⊔c)
  • 交换律: a⊔b = b⊔a 从而收到同样的更新列表 u = {s0,s1,...sn},但更新顺序不同的副本,他们的最终状态因为有交换律和结合律都等于 s0⊔s1⊔s2...⊔sn,所以此时所有副本的状态都一致,符合 SEC。

如何设计 State-based CRDT:

INFO

  • 保证系统满足 Eventual delivery 和 Termination
  • state 在 ≤ 上满足偏序集的要求
  • 对任意的 s, u 满足 s ≤ update(s, u)
  • merge(s, s') 得到的是两个状态的上确界

Op-based CRDT

TIP

Op-based CRDT 的思路为:

  • 如果两个用户的操作序列是完全一致的,那么最终文档的状态也一定是一致的。
  • 所以索性各个用户保存对数据的所有操作(Operations),用户之间通过同步 Operations 来达到最终一致状态。
  • 但我们怎么保证 Op 的顺序是一致的呢,如果有并行的修改操作应该谁先谁后?
  • 所以 Op-based CRDT 要求可能并行的 Op 都是可交换的,由此就可以满足最终一致性的要求。

设计一个基于操作的 CRDT 中需要先定义一个 Op-based Object。 当它满足一定的性质时就能成为 Op-based CRDT。

INFO

Op-based Object 的定义包括:

  • state 是每个副本的内部状态
  • state_zero 内部状态的初始值
  • op 是每个原子操作的类型,副本直接通过传递 op 来达到同步
  • apply_op(state, op) 是在一个 state 上应用 op 的函数,返回新的状态;op 的交换律指的就是 apply_op(apply_op(state, opA), opB) == apply_op(apply_op(state, opB), opA)
  • check_state(state, op) 确认一个状态是否满足应用 op 的前置条件
  • Op-based CRDT 的每一个操作 op 都有对应的前置状态检查函数 check_state,在应用 op 之前需要检查 check_state 函数是否满足,如果不满足就将阻塞延迟
  • check_state 函数的目的是为了保证 op 所依赖的因果顺序成立,例如删除 x 节点的操作依赖于 x 节点被创建的操作,否则就无法应用该操作。这也意味着 Op-based CRDT 的使用者有责任证明每个操作的前置状态都是能够得到满足的
  • 副本之间通过传递彼此缺失的 op,并进行 apply_op 来达到最终一致性
并行的操作 - Concurrent Operation

INFO

根据每个 op 在副本上的执行顺序我们可以定义 op 之间的偏序关系:

  • 如果一个 op A 在某个副本上先于 op B 执行,那么 A < B
  • 如果既没有 A < B 也没有 B < A,那么我们就称 A 和 B 是并行的操作。

INFO

构建一个 Op-based CRDT 的充分条件为:

  • 所有的并行的操作都满足交换律
  • 每个操作的前置条件都能通过按因果顺序应用得到满足
  • 假设所有操作都满足 Eventual Deliver (按照随机顺序 deliver),且所有更新函数都会终止(满足 Termination)。那么所有满足以下条件的 op-based object 就具有强最终一致性(SEC)

即如果有可能冲突的 Op 都是可以交换的,那么它们在一个副本上的应用顺序就对 Object 的最终状态没有影响,那么任意两个应用了同样的 op 集合的 Op-based CRDT 就都具有一致的状态,从而满足 SEC。

如何设计 Op-based CRDT:

TIP

  • 保证系统满足 Eventual delivery 和 Termination
  • 保证可能出现并行的 Operation 都满足交换律(不管先应用哪个 Op,最终状态都一致)
  • 在应用 Op 时需要保证该 Op 所依赖的前置状态得到满足

CmRDT 和 CvRDT 是等价的

TIP

在形式上,Op-based CRDT 和 State-based 是可以互相转换的。思路为:

  • 通过 Op-based CRDT 构建 State-based CRDT 的方式为:

    1. 将新的 State-based Object 的 state 定义为一个二元组(s, M),s 和 Op-based CRDT 的内部状态一致,M 是 Op-based CRDT 的内部 Op 的集合。
    2. 将新的 State-based Object 的 merge 操作定义为 merge((s, M), (s', M')) = apply_ops(s, M' - M)
  • 通过 State-based CRDT 构建 Op-based CRDT 的方式为:

    1. 将新的 Op-based object 的 Op 定义为 State-based CRDT 的 State
    2. 将 apply_op 的操作定义为 apply_op(state, op) = merge(state, op),而 merge 是服从对称性的操作,从而我们能够满足 SEC 得到一个 Op-based CRDT

设计一个可增加删除元素的 Set CRDT

基于上面的 CRDT 框架我们已经有很好的理论支持我们设计一个 Last-write-wins Set,即出现同时删除和添加同一个元素时后写入的操作将覆盖先写入的操作。

但是在设计之前需要先了解在分布式系统中我们该如何判断事件的“先后”。

Lamport Timestamp

在中心化的系统中我们可以通过事件到达中心服务器的时间作为时间发生的时间戳来判断先后。但是在分布式环境中这种方式就不能使用了。此时我们可以使用 Lamport Timestamp。

INFO

Lamport Timestamp 的算法很简单:

  • 每个进程维护一个 counter
  • 本地每发生一个事件就将 counter + 1,并将事件的时间戳设置为 counter 值
  • 每当进程发送一个消息,就将本地 counter + 1,并将最新的 counter 值附带在消息上
  • 当进程收到消息后,让自己的 counter = max(counter, message.counter) + 1

从而每一个消息都有一个明确的时间戳,根据时间戳我们就能够得到消息间的全序关系。 但为了能够处理消息的 counter 相同的情况,我们还需要在消息中带上进程自己的 id,从而能够将全序关系定义为: a.counter == b.counter ? (a.pid < b.pid) : (a.counter < b.counter)

Lamport Timestamp

实现

Details

以 Op-based CRDT 的思路设计 Last-write-wins Set(LWWSet):

  • 通过 Lamport timestamp 定义 Op 之间的全序关系
  • 存在共同修改同一个元素的操作时,让 timestamp 更大的 Op 获胜,如果 Timestamp 一致,则让 pid 更大的 Op 获胜
typescript
interface Op {
  time: number;
  pid: number;
  type: "add" | "remove";
  value: string;
}

type State = Map<string, Op>;
type ProcessState = { counter: number; state: State };
const state_zero: State = new Map();
const process_state_zero: ProcessState = {counter: 0, state: state_zero};

function apply(state:ProcessState, op: Op): ProcessState {
  const new_state: ProcessState = {
    counter: Math.max(state.counter, op.time) + 1,
    state: new Map(state.state),
  };
  if (!new_state.state.has(op.value)) {
    new_state.state.set(op.value, op);
  } else {
    const old_op = state.state.get(op.value);
    if (
      old_op.time < op.time ||
      (old_op.time === op.time && old_op.pid < op.pid)
    ) {
      new_state.state.set(op.value, op);
    }
  }

  return new_state;
}

function check_state(state: State, op: Op) {
  return true;
}

// 判断 LWWSet 是否含有某个值
function has(state: State, v: string): boolean {
  return state.get(v)?.type === "add";
}

// 插入 v 到 LWWSet
function add(state: ProcessState, v: string): ProcessState {
  return apply(state, {
    time: state.counter,
    pid: getPid(),
    type: "add",
    value: v,
  });
}

// 删除 LWWSet 中的 v
function remove(state: ProcessState, v: string): ProcessState {
  return apply(state, {
    time: state.counter,
    pid: getPid(),
    type: "remove",
    value: v,
  });
}

CRDT的应用

你不用自己从头开始设计和实现 CRDTs 算法(CRDT 很容易被实现得很糟糕)。 你可以直接基于开源 CRDTs 项目来搭建你的应用例如 Automerge, Yjs 以及 Loro。

Automerge, Yjs 以及 Loro 性能对比

参考文档

二者比较

CRDT 和 Operation Transformation(OT) 都可以被用于在线协作型的应用中,二者的差别如下:

OTCRDT
OT 依赖中心化服务器完成协作; 如果想要让它在分布式环境中工作就异常困难CRDT 算法可以通过 P2P 的方式完成数据的同步
为保证一致性,OT 的算法设计时的复杂度更高为保证一致性,CRDT 算法设计更简单
OT 的设计更容易保留用户意图设计一个保留用户意图的 CRDT 算法更困难
OT 不影响文档体积CRDT 文档比原文档数据更大
OT 最早的论文于 1989 年提出CRDT 最早的论文出现于 2006 年

WARNING

为什么目前在协作软件中大多数看到的还是应用 OT 算法而不是 CRDT 呢? 首先因为 CRDT 这类方法相比 OT 还比较年轻,而且有些难点近几年才被比较好地解决,例如:

TIP

而目前在以下方面的研究还有待展开:

  • CRDT 中常常存在难以回收的墓碑数据,如何才能更好地回收 CRDT 的墓碑?
  • 如何降低更新 CRDT 文档的开销?