CRDT,实现一个共享画布。

2024/10/22 7:8:56
CRDT ,分布式存储 ,canvas

运用CRDT,实现一个共享画布。

实现画布

先给上次实现的LWW Map再做一层薄包装。

// Pixel.ts
import LWWMap from "./LWWRegisterMap";
export type RGB = [number, number, number];

export class PixelData {
  readonly #id: string;
  #data: LWWMap<RGB>;

  constructor(id: string) {
    this.#id = id;
    this.#data = new LWWMap(this.#id, {});
  }

  /**
   * Returns a stringified version of the given coordinates.
   * @param x X coordinate.
   * @param y Y coordinate.
   * @returns Stringified version of the coordinates.
   */
  static key(x: number, y: number) {
    return `${x},${y}`;
  }

  get id() {
    return this.#id;
  }

  get value() {
    return this.#data.value;
  }

  get state() {
    return this.#data.state;
  }

  set(x: number, y: number, value: RGB) {
    const key = PixelData.key(x, y);
    this.#data.set(key, value);
  }

  get(x: number, y: number): RGB {
    const key = PixelData.key(x, y);

    const register = this.#data.get(key);
    return register ?? [255, 255, 255];
  }

  delete(x: number, y: number) {
    const key = PixelData.key(x, y);
    this.#data.delete(key);
  }

  merge(state: PixelData["state"], remotePeer) {
    this.#data.merge(state, remotePeer);
  }
}

基本功能还是 value\state\merge ,画布需要的是存储canvas中坐标点和对应颜色,所以还添加了部分对坐标处理的函数。

然后实现canvas操作功能:

// PixelEditor

import { RGB } from "./Pixel";
import { PixelData } from "./Pixel";

export class PixelEditor {
  id = Math.random().toString(36).substring(2, 9);

  /** The underlying <canvas> element */
  #el: HTMLCanvasElement;

  /** The 2D canvas rendering context */
  #ctx: CanvasRenderingContext2D;

  /** The artboard size, in drawable pxiels */
  #artboard: { w: number; h: number };

  /** The underlying pixel data */
  data: PixelData;

  /** The selected color */
  #color: RGB = [0, 0, 0];

  /** Listeners for change events */
  #listeners: Array<(state: PixelData["state"]) => void> = [];

  /** The previous position of the mouse cursor */
  #prev: [x: number, y: number] | undefined;

  /** The set of pixel keys that have been painted during the current drag operation */
  #painted = new Set<string>();

  constructor(
    el: HTMLCanvasElement,
    artboard: { w: number; h: number },
    id: string
  ) {
    this.#el = el;
    this.id = id ? `${id} - ${Math.random()}` : this.id;
    this.data = new PixelData(this.id);
    const ctx = el.getContext("2d");
    if (!ctx) throw new Error("Couldn't get rendering context");
    this.#ctx = ctx;

    this.#artboard = artboard;

    // listen for pointer events
    this.#el.addEventListener("pointerdown", this);
    this.#el.addEventListener("pointermove", this);
    this.#el.addEventListener("pointerup", this);

    // resize the canvas
    this.#el.width = this.#el.clientWidth * devicePixelRatio;
    this.#el.height = this.#el.clientHeight * devicePixelRatio;
    this.#ctx.scale(devicePixelRatio, devicePixelRatio);
    this.#ctx.imageSmoothingEnabled = false;
  }

  /**
   * Appends a listener to be called when the state changes.
   * @param listener */
  set onchange(listener: (state: PixelData["state"]) => void) {
    this.#listeners.push(listener);
  }

  /** Sets the drawing color. */
  set color(color: RGB) {
    this.#color = color;
  }
  /**
   * Handles events on the canvas.
   * @param e Pointer event from the canvas element.
   */
  handleEvent(e: PointerEvent) {
    switch (e.type) {
      case "pointerdown": {
        this.#el.setPointerCapture(e.pointerId);
        // fallthrough
      }

      case "pointermove": {
        if (!this.#el.hasPointerCapture(e.pointerId)) return;

        // convert canvas pixels to artboard pixels
        const x = Math.floor(
            (this.#artboard.w * e.offsetX) / this.#el.clientWidth
          ),
          y = Math.floor(
            (this.#artboard.h * e.offsetY) / this.#el.clientHeight
          );

        this.#paint(x, y);
        this.#prev = [x, y];
        break;
      }

      case "pointerup": {
        this.#el.releasePointerCapture(e.pointerId);
        this.#painted.clear();
        this.#prev = undefined;
        break;
      }
    }
  }

  /**
   * Sets pixel under the mouse cursor with the current color.
   * @param x X coordinate of the destination pixel.
   * @param y Y coordinate of the destination pixel.
   */

  #paint(x: number, y: number) {
    if (x < 0 || this.#artboard.w <= x) return;
    if (y < 0 || this.#artboard.h <= y) return;

    if (!this.#checkPainted(x, y)) this.data.set(x, y, this.#color);

    let [x0, y0] = this.#prev || [x, y];

    const dx = x - x0,
      dy = y - y0;

    const steps = Math.max(Math.abs(dx), Math.abs(dy));
    const xinc = dx / steps,
      yinc = dy / steps;

    for (let i = 0; i < steps; i++) {
      x0 += xinc;
      y0 += yinc;
      const x1 = Math.round(x0);
      const y1 = Math.round(y0);

      if (!this.#checkPainted(x1, y1)) this.data.set(x1, y1, this.#color);
    }

    this.#draw();
    this.#notify();
  }

  /** Draw each pixel on the canvas. */
  async #draw() {
    /** Number of channels per pixel; R, G, B, A */
    const chans = 4;

    /** A buffer to hold the raw pixel data.
     * Each pixel corresponds to four bytes in the buffer,
     * so the full size is the number of pixels times the number of channels per pixel. */
    const buffer = new Uint8ClampedArray(
      this.#artboard.w * this.#artboard.h * chans
    );

    /** The number of bytes in the buffer representing a single artboard row. */
    const rowsize = this.#artboard.w * chans;

    for (let row = 0; row < this.#artboard.h; row++) {
      // calculate the byte offset of the start of the row relative to the start of the buffer
      const offsetY = row * rowsize;

      for (let col = 0; col < this.#artboard.w; col++) {
        // calculate the byte offset of the pixel relative to the start of the row
        const offsetX = col * chans;

        // calculate the byte offset of the pixel relative to the start of the buffer
        const offset = offsetY + offsetX;
        const [r, g, b] = this.data.get(col, row);
        buffer[offset] = r;
        buffer[offset + 1] = g;
        buffer[offset + 2] = b;
        buffer[offset + 3] = 255;
      }
    }

    const data = new ImageData(buffer, this.#artboard.w, this.#artboard.h);
    const bitmap = await createImageBitmap(data);
    this.#ctx.drawImage(
      bitmap,
      0,
      0,
      this.#el.clientWidth,
      this.#el.clientHeight
    );
  }

  /** Notify all listeners that the state has changed. */
  #notify() {
    const state = this.data.state;
    for (const listener of this.#listeners) listener(state);
  }

  /**
   * Merge remote state with the current state and redraw the canvas.
   * @param state State to merge into the current state.
   * @param remotePeer The peer that sent the state.
   * */
  receive(state: PixelData["state"], remotePeer: string) {
    // console.log(this.id, ': ',state,remotePeer);
    this.data.merge(state, remotePeer);
    this.#draw();
  }

  /**
   * Check whether a pixel has been painted during the current drag operation
   * @param x X coordinate of the target pixel.
   * @param y Y coordinate of the target pixel.
   */
  #checkPainted(x: number, y: number) {
    const key = PixelData.key(x, y);

    const painted = this.#painted.has(key);
    this.#painted.add(key);

    return painted;
  }

  // 卸载操作
  unload() {
    this.#el.removeEventListener("pointerdown", this);
    this.#el.removeEventListener("pointermove", this);
    this.#el.removeEventListener("pointerup", this);
  }
}

大致功能即:data存储 Pixel实例,实现canvas的绘图,实现对canvas的pointerdown,pointermove,pointerup事件监听以及相关卸载方法,绘制时向data中set数据,添加事件监听列表。具体实现和功能可以参看Building a Collaborative Pixel Art Editor with CRDTs

然后在React中实现一下页面:

// page.tsx
'use client'
import styles from './page.module.css'
import { useRef, useEffect, useState } from 'react'
import { PixelEditor } from '@/lib/PixelEditor'
import { RGB } from '@/lib/Pixel'

const DrawBordPage = () => {
    const aliceRef = useRef(null)
    const bobRef = useRef(null)
    const [color, setColor] = useState('#000000')
    const artboardSize = { w: 100, h: 100 };
    const alice = useRef<PixelEditor | null>(null)
    const bob = useRef<PixelEditor | null>(null)
    
    useEffect(() => {
        if (aliceRef.current) alice.current = new PixelEditor(aliceRef.current, artboardSize, 'alice')
        if (bobRef.current) bob.current = new PixelEditor(bobRef.current, artboardSize, 'bob')
        initReceive()
        return () => {
            alice.current?.unload()
            bob.current?.unload()
        }
    }, [])

    const initReceive = () => {
        alice.current.onchange = state => {
            bob.current.receive(state)
        };
        bob.current.onchange = state => {
            alice.current.receive(state)
        };
    }
    const colorHandle = e => {
        setColor(e.target.value)
        const hex = e.target.value.substring(1).match(/[\da-f]{2}/g) || [];
        const rgb = hex.map((byte: string) => parseInt(byte, 16));
        if (rgb.length === 3) {
            alice.current.color = rgb as RGB;
            bob.current.color = rgb as RGB;
        }
    }

    return (
        <>
            <div className={styles.wrapper}>
                <div className={styles.canvases}>
                    <canvas ref={aliceRef} className={styles.canvas} id="alice"></canvas>
                    <canvas ref={bobRef} className={styles.canvas} id="bob"></canvas>
                </div>
                <input onChange={colorHandle} className={styles.color} type="color" value={color} />
            </div>
        </>
    )
}

export default DrawBordPage

样式内容自己稍微补充一下,然后打开页面,大概就是两个画布+一个颜色选择器,当你在任意画布绘制时,另一个会同步展示出来,如果想要实现延迟效果,在initReceive中的change事件里,加上setTimeout来模拟通讯延迟。

// page.tsx
    const initReceive = () => {
        alice.current.onchange = state => {
            setTimeout(() => {
                bob.current.receive(state)
            }, 500)
        };
        bob.current.onchange = state => {
            setTimeout(() => {
                alice.current.receive(state)
            }, 500)
        };
    }

好了,这就是一个基于LWW Rigister的CRDT 实现的共享画布了。

实现联网+本地存储的共享画布

客户端改造

但是,目前只在本地的两个canvas里相互通讯,刷新页面也会清空画布,并没有那种“感觉”。所以我们现在继续修改page.tsx,以及使用express+socket简单实现一个服务器进行通讯。

首先,先去掉bob相关的内容… 然后先实现本地存储:

// ...
  useEffect(() => {
     // ...   
    loadLocalData()
    //...
  }, [])

// ...

  const initReceive = () => {
    alice.current.onchange = state => {
      socket.current.emit('message', {state, id:alice.current.id})
      saveLocalData()
    };
  }

// ...
  const saveLocalData = () => {
    const id = alice.current.id
    const state = alice.current.data.state
    localStorage.setItem('drawBord_id', id)
    localStorage.setItem('drawBord_state', JSON.stringify(state))
  }

  const loadLocalData = () => {
    const id = localStorage.getItem('drawBord_id')
    const state = JSON.parse(localStorage.getItem('drawBord_state') || '{}')
    alice.current?.receive(state, id)
  }

在进入页面时,读取本地数据,在绘制导致数据更新时,同步保存到本地。initReceive中的 socket是用于发送socket消息的,在下面socket部分会实现。

然后接入socket.io, npm i socket.io-client

// ...
import io from 'socket.io-client'
// ...
  const socket = useRef(null)
  useEffect(() => {
     // ...   
    loadLocalData()
    initSocket()
    //...
  }, [])
  const initSocket = () => {
    socket.current = io('http://localhost:5001', {
      path: '/ws'
    })
    socket.current.on('connect', (msg) => {
      console.log('connect: ', msg)
    })
    socket.current.on('message', (msg) => {
      alice.current.receive(msg.state, msg.id)
      saveLocalData()
    })
    socket.current.on('error', err => {
      console.log('err: ', err)
    })
  }

服务端实现

服务端使用nodejs + express + socket.io 实现。为了确保所有节点只要联网就收敛一致,我们在服务端也要实例化一个CRDT,但是不需要画布操作,所以只需要实例化PixelData


import { PixelData } from "./lib/Pixel";
import express from "express";
import { createServer } from "node:http";
import { Server } from "socket.io";

const port = 5001;
const CenterPixelData = new PixelData("server-side");

const app = express();
app.use((req, res, next) => {
  res.header("Access-Control-Allow-Origin", "*");
  res.header(
    "Access-Control-Allow-Headers",
    "Authorization,X-API-KEY, Origin, X-Requested-With, Content-Type, Accept, Access-Control-Request-Method"
  );
  res.header(
    "Access-Control-Allow-Methods",
    "GET, POST, OPTIONS, PATCH, PUT, DELETE"
  );
  res.header("Allow", "GET, POST, PATCH, OPTIONS, PUT, DELETE");
  next();
});

const server = createServer(app);
const io = new Server(server, {
  path: "/ws",
  cors: {
    origin: "*",
    METHODS: ["GET", "POST"],
  },
});

// 当客户端连接时触发
io.on("connection", (socket) => {
  console.log("A user connected: " + socket.id);
  // 向当前客户端发送当前状态
  socket.emit("message", {
    state: CenterPixelData.state,
    id: CenterPixelData.id,
  });

  // 处理来自客户端的消息
  socket.on("message", (msg) => {
    const { state, id } = msg;
    CenterPixelData.merge(state, id);
    // 向所有客户端广播消息
    io.emit("message", msg);
  });

  // 处理断开连接
  socket.on("disconnect", () => {
    console.log("User disconnected: " + socket.id);
  });
});

server.listen(port, () => {
  console.log(`Example app listening at http://localhost:${port}`);
});

每当节点连接上socket,则将服务端的数据发送给节点,节点在本地再做合并操作,每当节点修改内容时,修改的内容都会实时保存到本地,并且发送到服务器,然后服务器对所有链接的节点发出更新通知。即便离线或者服务器关停、数据丢失,本地依旧可以修改、保存,等待下次连线后再推拉更新状态。

结尾

上述的代码比较粗糙,只是最低限度的实现功能。要说优化的话,还有挺多地方可以操作。例如事件发送的地方添加节流,在服务端的通知也添加节流。或者是对传输的数据结构做修改,现在只是一个100 * 100 的画布,传输的state就已经快200kb了,首先是颜色储存可以改成十六进制的,且十六进制颜色在同一个颜色下的两个字符相同的情况下可以省略一个字符,例如#ffffff#fff 是完全等价的。还可以使用messagepack 之类的将数据二进制化。都可以一定程度的减少数据传输量。

从了解CRDT, 到实践CRDT。也是一次完整的感受到分布式存储的魅力,即便服务器挂了,只要还有一个曾经有连结过它的节点还有数据,就能东山再起。