An implementation proposal for Design: Agent Assisted Template Curation using JSON Joy for both the client-side and server-side implementation:
Client-side - Clients will use the json-joy TypeScript client.
Server-side - All CRDTs will have a relational database representation.
Note: Due to the way logical clocks work, both the client and server must enforce a single writer per replica (multiple readers are allowed).
We will use the following example to demonstrate both the client-side and server-side implementation. The grid will start off empty. We will then add three rows and three columns to our examples grid in the following order:
Create a column named “type”.
Add a row with the with type=dog
Add a column named “age”
Set age=9 for the first row.
Add a column after “type” named “name” (column order should be type,name,age).
Set name=max for the first row.
Add a new row after the first with: type=cat, name=paws, age=15.
Add a new row after the first with: type=rat, name=whiskers, age=2.
The final grid should be:
type | name | age |
---|---|---|
dog | max | 9 |
rat | whiskers | 2 |
cat | paws | 15 |
Notice that the name column was inserted between type and age, and that whiskers was inserted between max and paws.
Both the client and sever must share the grid document schema. Specifically, this schema will represent a single replica’s grid document. We will adopt the json-joy node type definitions for our schema. For the first version, we will only need the following sub-set of types:
con - This represents an immutable constant.
vec - A LLW, grow-only, append-only, index-value-pair of references to other nodes . Has a max length of 256.
arr - RGA (Replicated Growable Array) is mutable ordered tree of nodes.
The document schema is defined as follows:
const gridRowSchema = s.obj({ data: s.vec(s.con('')), metadata: s.obj({ synapseRow: s.obj({ rowId: s.con(0), versionNumber: s.con(0), etag: s.con(''), }), }), }); const gridSchema = s.obj({ doc_version: s.con('0.1.0'), columnNames: s.vec(s.con('')), columnOrder: s.arr([s.con(0)]), rows: s.arr([gridRowSchema]), }); |
name | description |
---|---|
| The semantic version of the document schema. This value will change anytime we change this schema. |
| This vector captures the name of the columns in their natural order (order they were added). Vectors are append-only (up to 256 values). Vectors allow LWW changes to each index. |
| This array defines the order that the columns should be displayed. Each value is an index into the |
| This is an |
| The row data |
| The row This data should be treated as read-only for all replicas other than the hub. |
| The |
Note: Each vec
of the rows
is append-only (cannot be re-order) with each index matching an index from columnNames
. When a new column name is appended to columnNames
vector, each row’s vector will be smaller (by one), indicating that the cells of newly added columns are null. For such cases, row vectors only need to be extended when setting a cell value for the new column.
The following uses the json-joy Typescript library to follow the eight example steps:
import {ConApi, Model, VecApi} from 'json-joy/lib/json-crdt'; import {ClockVector, konst, Konst, s} from 'json-joy/lib/json-crdt-patch'; import {encode, decode} from 'json-joy/lib/json-crdt-patch/codec/compact'; const schema = s.obj({ doc_version: s.con('0.0.2'), columnNames: s.vec(s.con('')), columnOrder: s.arr([s.con(0)]), rows: s.arr([s.vec(s.con(''))]) }); const replicaId = 0x10000; // Create a new JSON CRDT document. const model = Model.create(schema, replicaId); console.log('Initial Model:', model + ''); // The first patch is from the initalization of an emtpty model const p0 = model.api.flush(); console.log('Encoded patch zero:'); console.log(JSON.stringify(encode(p0), null, 0)); const cn = model.api.vec(['columnNames']); const co = model.api.arr(['columnOrder']); const rows = model.api.arr(['rows']); const r0 = (rows.get(0) as VecApi); // 1. set the name of the first column cn.set([[0, konst('type')]]); // 2. Set type=dog for the first cell of the first row r0.set([[0,konst('dog')]]); // 3. Add age; cn.set([[1,konst('age')]]); // set the column order with age as index 1 co.ins(1, [konst(1)]); // 4. Set age=9 r0.set([[1, konst(9)]]) // 5. Add 'name' column. cn.set([[2,konst('name')]]); // set the order or the name column to be at index 1 co.ins(1, [konst(2)]); // 6. Set name=max for the first row r0.set([[2, konst('max')]]) // 7. Add new row type=cat name=paws, age=15. rows.ins(1, [model.api.builder.vec()]); const r1 = (rows.get(1) as VecApi); r1.set([[0, konst('cat')],[1, konst(15)],[2, konst('paws')]]) // 8. add a new row after first with type=rat, name=whiskers rows.ins(1, [model.api.builder.vec()]); const r2 = (rows.get(1) as VecApi); r2.set([[0, konst('rat')],[1, konst(2)],[2, konst('whiskers')]]) console.log('Updated Model:', model + ''); const p1 = model.api.flush(); console.log('Encoded patch one:'); console.log(JSON.stringify(encode(p1), null, 0)); |
Initializing an empty document with a schema will start the document with default values. It is important to capture a patch after initialization as the internal replica ID of 2 is used for initialization. If we fail to capture a patch here, the next patch will erroneously contain a combination of the changes from replica 2 and replica 65536. The follow is the output from the script:
Initial Model: model ├─ root 0.0 │ └─ obj 2.1 │ ├─ "doc_version" │ │ └─ con 2.2 { "0.0.2" } │ ├─ "columnNames" │ │ └─ vec 2.3 │ │ └─ 0: con 2.4 { "" } │ ├─ "columnOrder" │ │ └─ arr 2.6 │ │ └─ chunk 2.8:1 .1. │ │ └─ [0]: con 2.7 { 0 } │ └─ "rows" │ └─ arr 2.9 │ └─ chunk 2.13:1 .1. │ └─ [0]: vec 2.10 │ └─ 0: con 2.11 { "" } │ ├─ index (9 nodes) │ ├─ obj 2.1 │ ├─ con 2.2 │ ├─ vec 2.3 │ ├─ con 2.4 │ ├─ arr 2.6 │ ├─ con 2.7 │ ├─ arr 2.9 │ ├─ vec 2.10 │ └─ con 2.11 │ ├─ view │ └─ { │ "doc_version": "0.0.2", │ "columnNames": [ │ "" │ ], │ "columnOrder": [ │ 0 │ ], │ "rows": [ │ [ │ "" │ ] │ ] │ } │ └─ clock 65536.16 └─ 2.15 Encoded patch zero: [[[2,1]],[2],[0,"0.0.2"],[3],[0,""],[11,3,[[0,4]]],[6],[0,0],[14,6,6,[7]],[6],[3],[0,""],[11,10,[[0,11]]],[14,9,9,[10]],[10,1,[["doc_version",2],["columnNames",3],["columnOrder",6],["rows",9]]],[9,[0,0],1]] Updated Model: model ├─ root 0.0 │ └─ obj 2.1 │ ├─ "doc_version" │ │ └─ con 2.2 { "0.0.2" } │ ├─ "columnNames" │ │ └─ vec 2.3 │ │ ├─ 0: con ..5536.16 { "type" } │ │ ├─ 1: con ..5536.20 { "age" } │ │ └─ 2: con ..5536.26 { "name" } │ ├─ "columnOrder" │ │ └─ arr 2.6 │ │ └─ chunk ..5536.29:1 .3. │ │ └─ [1]: con ..5536.28 { 2 } │ │ ← chunk 2.8:1 .1. │ │ └─ [0]: con 2.7 { 0 } │ │ → chunk ..5536.23:1 .1. │ │ └─ [2]: con ..5536.22 { 1 } │ └─ "rows" │ └─ arr 2.9 │ └─ chunk ..5536.39:1 .3. │ └─ [1]: vec ..5536.38 │ ├─ 0: con ..5536.40 { "rat" } │ ├─ 1: con ..5536.41 { 2 } │ └─ 2: con ..5536.42 { "whiskers" } │ ← chunk 2.13:1 .1. │ └─ [0]: vec 2.10 │ ├─ 0: con ..5536.18 { "dog" } │ ├─ 1: con ..5536.24 { 9 } │ └─ 2: con ..5536.30 { "max" } │ → chunk ..5536.33:1 .1. │ └─ [2]: vec ..5536.32 │ ├─ 0: con ..5536.34 { "cat" } │ ├─ 1: con ..5536.35 { 15 } │ └─ 2: con ..5536.36 { "paws" } │ ├─ index (23 nodes) │ ├─ obj 2.1 │ ├─ con 2.2 │ ├─ vec 2.3 │ ├─ arr 2.6 │ ├─ con 2.7 │ ├─ arr 2.9 │ ├─ vec 2.10 │ ├─ con ..5536.16 │ ├─ con ..5536.18 │ ├─ con ..5536.20 │ ├─ con ..5536.22 │ ├─ con ..5536.24 │ ├─ con ..5536.26 │ ├─ con ..5536.28 │ ├─ con ..5536.30 │ ├─ vec ..5536.32 │ ├─ con ..5536.34 │ ├─ con ..5536.35 │ ├─ con ..5536.36 │ ├─ vec ..5536.38 │ ├─ con ..5536.40 │ ├─ con ..5536.41 │ └─ con ..5536.42 │ ├─ view │ └─ { │ "doc_version": "0.0.2", │ "columnNames": [ │ "type", │ "age", │ "name" │ ], │ "columnOrder": [ │ 0, │ 2, │ 1 │ ], │ "rows": [ │ [ │ "dog", │ 9, │ "max" │ ], │ [ │ "rat", │ 2, │ "whiskers" │ ], │ [ │ "cat", │ 15, │ "paws" │ ] │ ] │ } │ └─ clock 65536.44 └─ 2.15 Encoded patch one: [[[65536,16]],[0,"type"],[11,[2,3],[[0,16]]],[0,"dog"],[11,[2,10],[[0,18]]],[0,"age"],[11,[2,3],[[1,20]]],[0,1],[14,[2,6],[2,8],[22]],[0,9],[11,[2,10],[[1,24]]],[0,"name"],[11,[2,3],[[2,26]]],[0,2],[14,[2,6],[2,8],[28]],[0,"max"],[11,[2,10],[[2,30]]],[3],[14,[2,9],[2,13],[32]],[0,"cat"],[0,15],[0,"paws"],[11,32,[[0,34],[1,35],[2,36]]],[3],[14,[2,9],[2,13],[38]],[0,"rat"],[0,2],[0,"whiskers"],[11,38,[[0,40],[1,41],[2,42]]]] |
After the client can send both encoded patches to the hub to share the changes with other replicas.
Clients work with the grid by maintaining an in-memory representation of the grid document (see above). The memory needed to load a large scale model with 10M would exceed the limits of a typical server-side worker. Server-side workers do not typically load large datasets into memory. Instead, the worker will manage large scale datasets using one of the following options:
Streaming - Read/write of large files is done by streaming over data rather than loading the data into memory.
Database - Large datasets are read from the database one page at time, and written to the database one block at a time.
For example, when a user makes a request to export a CSV from a Synapse table/view, a worker will process the request as follows:
The query results are read one page at a time.
Each row from a page is transformed into CSV row.
Each CSV row is streamed to file or S3.
This means the worker can process a table with an unbounded size using a small, fixed amount of memory.
The server-side implementation of a grid replica will need to read/write the grid document to a database. Consider an example where the worker is tasked with applying a patch from a client to the database. Ideally, the patch would be applied directly to database without loading the entire document into memory.
The following is a working prototype of SQL that achieves the goal of applying a patch by writing to a SQL database:
drop database grid1; create database grid1; USE grid1; SET autocommit = 1; -- Patches are not transactions! # We need to support tables of up to 100K rows. SET SESSION cte_max_recursion_depth = 1000000; -- The index of all nodes in the document. CREATE TABLE R1_INDEX ( NODE_REP BIGINT NOT NULL, NODE_SEQ BIGINT NOT NULL, KIND ENUM ('con','obj','vec','arr'), PRIMARY KEY (NODE_REP, NODE_SEQ) ); -- This table is a simple linked list that stores the Replicated Growable Array (RGA). -- Data is inserted into the table using 'call rga_insert(....)' CREATE TABLE R1_ARR ( CHUNK_REP BIGINT NOT NULL, CHUNK_SEQ BIGINT NOT NULL, NODE_REP BIGINT NOT NULL, NODE_SEQ BIGINT NOT NULL, PARENT_REP BIGINT, PARENT_SEQ BIGINT, TOMBSTONE BOOLEAN DEFAULT FALSE, -- on delete, set to true, indicating that the node was deleted. PRIMARY KEY (CHUNK_REP, CHUNK_SEQ), INDEX (PARENT_REP, PARENT_SEQ), CONSTRAINT `R1_ARR_PARENT_FK` FOREIGN KEY (PARENT_REP, PARENT_SEQ) REFERENCES R1_ARR(CHUNK_REP, CHUNK_SEQ) ON DELETE RESTRICT, CONSTRAINT `R1_ARR_NODE_INDEX_FK` FOREIGN KEY (NODE_REP, NODE_SEQ) REFERENCES R1_INDEX(NODE_REP, NODE_SEQ) ON DELETE RESTRICT ); -- Each replica will have its own vec table. -- Each row in this table is a single vector (vec) identify by its ID (VEC_SEQ,VEC_REP). -- The vector is grown by adding columns. The name of the columns is derived from the vector index ('_C' + <index> + '_') -- Each index column is of type JSON and contains both the ID and value of a constant. -- Updates to exsiting cells will use LWW check (update if the new ID is greater than the current ID). CREATE TABLE R1_VEC ( VEC_REP BIGINT NOT NULL, VEC_SEQ BIGINT NOT NULL, _C0_ JSON, PRIMARY KEY (VEC_REP, VEC_SEQ), CONSTRAINT `R1_VEC_INDEX_FK` FOREIGN KEY (VEC_REP, VEC_SEQ) REFERENCES R1_INDEX(NODE_REP, NODE_SEQ) ON DELETE RESTRICT ); -- Store of all objects (obj) in the document. CREATE TABLE R1_OBJ ( OBJ_REP BIGINT NOT NULL, OBJ_SEQ BIGINT NOT NULL, _KEY VARCHAR(100) NOT NULL, VAL_REP BIGINT NOT NULL, VAL_SEQ BIGINT NOT NULL, PRIMARY KEY (OBJ_REP, OBJ_SEQ, _KEY), CONSTRAINT `R1_OBJ_INDEX_FK` FOREIGN KEY (OBJ_REP, OBJ_SEQ) REFERENCES R1_INDEX(NODE_REP, NODE_SEQ) ON DELETE RESTRICT ); -- Store of all constants (con) in the document. CREATE TABLE R1_CON ( CON_REP BIGINT NOT NULL, CON_SEQ BIGINT NOT NULL, VAL JSON NOT NULL, PRIMARY KEY (CON_SEQ, CON_REP), CONSTRAINT `R1_CON_INDEX_FK` FOREIGN KEY (CON_REP, CON_SEQ) REFERENCES R1_INDEX(NODE_REP, NODE_SEQ) ON DELETE RESTRICT ); DELIMITER // -- Function to compare two logical timestamps. CREATE FUNCTION compare_logical_timestamps ( left_rep BIGINT, left_seq BIGINT, right_rep BIGINT, right_seq BIGINT ) RETURNS INTEGER DETERMINISTIC BEGIN IF left_seq IS NULL THEN SET left_seq = -1; END IF; IF left_rep IS NULL THEN SET left_rep = -1; END IF; IF right_seq IS NULL THEN SET right_seq = -1; END IF; IF right_rep IS NULL THEN SET right_rep = -1; END IF; IF left_seq > right_seq THEN RETURN 1; ELSEIF left_seq < right_seq THEN RETURN -1; ELSE -- Sequence counters are equal, compare replica IDs IF left_rep > right_rep THEN RETURN 1; ELSEIF left_rep < right_rep THEN RETURN -1; ELSE RETURN 0; END IF; END IF; END// -- Create a new object with the provied id. CREATE PROCEDURE new_obj ( IN OBJ_REP BIGINT, IN OBJ_SEQ BIGINT ) BEGIN INSERT IGNORE INTO R1_INDEX (NODE_REP, NODE_SEQ, KIND) VALUE (OBJ_REP, OBJ_SEQ, 'obj'); END// CREATE PROCEDURE ins_obj ( IN P_OBJ_REP BIGINT, IN P_OBJ_SEQ BIGINT, IN P_KEY VARCHAR(100), IN P_VAL_REP BIGINT, IN P_VAL_SEQ BIGINT ) BEGIN DECLARE v_cur_rep BIGINT; DECLARE v_cur_seq BIGINT; SELECT VAL_REP, VAL_SEQ INTO v_cur_rep, v_cur_seq FROM R1_OBJ WHERE (OBJ_REP,OBJ_SEQ) = (P_OBJ_REP,P_OBJ_SEQ) AND _KEY = P_KEY; -- Last-Writer-Wins (LWW) IF compare_logical_timestamps(P_VAL_REP, P_VAL_SEQ, v_cur_rep,v_cur_seq) = 1 THEN INSERT INTO R1_OBJ (OBJ_REP, OBJ_SEQ, _KEY, VAL_REP, VAL_SEQ) VALUE (P_OBJ_REP, P_OBJ_SEQ, P_KEY,P_VAL_REP,P_VAL_SEQ) ON DUPLICATE KEY UPDATE VAL_REP = P_VAL_REP, VAL_SEQ = P_VAL_SEQ; END IF; END// -- Create a new constant. CREATE PROCEDURE new_con ( IN CON_REP BIGINT, IN CON_SEQ BIGINT, IN _VAL JSON ) BEGIN INSERT IGNORE INTO R1_INDEX (NODE_REP, NODE_SEQ, KIND) VALUE (CON_REP, CON_SEQ, 'con'); INSERT IGNORE INTO R1_CON (CON_REP, CON_SEQ, VAL) VALUE (CON_REP, CON_SEQ, _VAL); END// -- Create a new vec. CREATE PROCEDURE new_vec ( IN VEC_REP BIGINT, IN VEC_SEQ BIGINT ) BEGIN INSERT IGNORE INTO R1_INDEX (NODE_REP, NODE_SEQ, KIND) VALUE (VEC_REP, VEC_SEQ, 'vec'); INSERT IGNORE INTO R1_VEC (VEC_REP, VEC_SEQ) VALUE (VEC_REP, VEC_SEQ); END// -- Create a new arr. CREATE PROCEDURE new_arr ( IN ARR_REP BIGINT, IN ARR_SEQ BIGINT ) BEGIN INSERT IGNORE INTO R1_INDEX (NODE_REP, NODE_SEQ, KIND) VALUE (ARR_REP, ARR_SEQ, 'arr'); call ins_arr(ARR_REP,ARR_SEQ, ARR_REP,ARR_SEQ, NULL,NULL, ARR_REP,ARR_SEQ); END// CREATE PROCEDURE ins_vec ( IN P_ID_REP BIGINT, IN P_ID_SEQ BIGINT, IN P_VEC_REP BIGINT, IN P_VEC_SEQ BIGINT, IN P_INDEX INT, IN P_VAL_REP BIGINT, IN P_VAL_SEQ BIGINT ) BEGIN DECLARE v_colum_name CHAR(100); DECLARE v_existing_name CHAR(100); DECLARE v_con_json JSON; DECLARE v_new_json JSON; DECLARE v_cur_rep BIGINT; DECLARE v_cur_seq BIGINT; SET v_colum_name = CONCAT('_C',P_INDEX,'_'); SELECT `COLUMN_NAME` into v_existing_name FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = 'grid1' AND TABLE_NAME = 'r1_vec' AND COLUMN_NAME = v_colum_name; IF v_existing_name IS NULL THEN -- Construct the ALTER TABLE statement dynamically SET @alterTable = CONCAT('ALTER TABLE R1_VEC ADD COLUMN `', v_colum_name, '` JSON'); -- Prepare and execute the dynamic SQL PREPARE stmt FROM @alterTable; EXECUTE stmt; DEALLOCATE PREPARE stmt; END IF; SET @p_rep = P_VEC_REP; SET @p_seq = P_VEC_SEQ; -- Fetch existing rep and seq if the column exists SET @select_stmt = CONCAT('SELECT JSON_EXTRACT(`', v_colum_name, '`, \'$.rep\'), JSON_EXTRACT(`', v_colum_name, '`, \'$.seq\') INTO @v_cur_rep, @v_cur_seq FROM R1_VEC WHERE (VEC_REP, VEC_SEQ) = (?, ?)'); PREPARE select_stmt FROM @select_stmt; EXECUTE select_stmt USING @p_rep, @p_seq; DEALLOCATE PREPARE select_stmt; SET v_cur_rep = @v_cur_rep; SET v_cur_seq = @v_cur_seq; -- Last-Writer-Wins (LLW) check. IF compare_logical_timestamps(P_VAL_REP, P_VAL_SEQ, v_cur_rep,v_cur_seq) = 1 THEN SELECT VAL INTO v_con_json FROM R1_CON WHERE CON_REP = P_VAL_REP AND CON_SEQ = P_VAL_SEQ; SET @v_new_json = JSON_OBJECT('rep', P_VAL_REP, 'seq', P_VAL_SEQ, 'con', JSON_EXTRACT(v_con_json, '$.con')); -- Use the dynamic column name in the UPDATE statement, quoting it SET @update_statement = CONCAT('UPDATE R1_VEC SET `', v_colum_name, '` = ? WHERE (VEC_REP, VEC_SEQ) = (?, ?)'); PREPARE update_stmt FROM @update_statement; EXECUTE update_stmt USING @v_new_json, @p_rep, @p_seq; DEALLOCATE PREPARE update_stmt; END IF; END// -- Iplementaion of the https://jsonjoy.com/specs/json-crdt/model-document/crdt-algorithms#RGA-Insertion-Routine CREATE PROCEDURE ins_arr ( IN P_ID_REP BIGINT, IN P_ID_SEQ BIGINT, IN P_NODE_REP BIGINT, IN P_NODE_SEQ BIGINT, IN P_REF_REP BIGINT, IN P_REF_SEQ BIGINT, IN P_ELE_REP BIGINT, IN P_ELE_SEQ BIGINT ) BEGIN DECLARE v_cursor_seq BIGINT; DECLARE v_cursor_rep BIGINT; DECLARE v_after_cursor_seq BIGINT; DECLARE v_after_cursor_rep BIGINT; DECLARE v_after_chunk_seq BIGINT; DECLARE v_after_chunk_rep BIGINT; -- 1. Insertion cursor is set to the position before all elements in the RGA node. SET v_cursor_seq = P_NODE_SEQ; SET v_cursor_rep = P_NODE_REP; -- 2. If the rga.ref is not equal to the rga.node, then the cursor is moved to the position right after the element with the rga.ref ID IF compare_logical_timestamps(P_NODE_REP, P_NODE_SEQ, P_REF_REP, P_REF_SEQ) <> 0 THEN SET v_cursor_seq = P_REF_SEQ; SET v_cursor_rep = P_REF_REP; END IF; -- 3. If the ID of the element after the cursor is greater than the ID of elem, then the cursor is moved one position forward and step 3 is repeated. Otherwise, continue to step 4. label1: LOOP SELECT NODE_SEQ, NODE_REP, CHUNK_SEQ, CHUNK_REP INTO v_after_cursor_seq, v_after_cursor_rep, v_after_chunk_seq, v_after_chunk_rep FROM R1_ARR WHERE (PARENT_SEQ, PARENT_REP) = (v_cursor_seq, v_cursor_rep); IF compare_logical_timestamps(v_after_cursor_rep, v_after_cursor_seq, P_ELE_REP, P_ELE_SEQ) = 1 THEN SET v_cursor_seq = v_after_cursor_seq; SET v_cursor_rep = v_after_cursor_rep; ELSE LEAVE label1; END IF; END LOOP label1; -- 4. If the ID of the element after the cursor is equal to the ID of elem, then the insertion stops. The elements have already been inserted by a previous application of this algorithm. Otherwise, continue to step 5. IF compare_logical_timestamps(v_after_cursor_rep, v_after_cursor_seq, v_cursor_rep, v_cursor_seq) <> 0 OR (v_cursor_seq IS NULL AND v_cursor_rep IS NULL) THEN -- 5. Insert the elem at the cursor position. INSERT INTO R1_ARR (CHUNK_REP, CHUNK_SEQ, NODE_REP, NODE_SEQ, PARENT_REP, PARENT_SEQ) VALUE (P_ID_REP, P_ID_SEQ, P_ELE_REP, P_ELE_SEQ, v_cursor_rep, v_cursor_seq); -- the after cursor needs to be moved if it exists IF v_after_chunk_seq IS NOT NULL AND v_after_chunk_rep IS NOT NULL THEN UPDATE R1_ARR SET PARENT_REP = P_ID_REP, PARENT_SEQ = P_ID_SEQ WHERE (CHUNK_REP,CHUNK_SEQ) = (v_after_chunk_rep,v_after_chunk_seq); END IF; END IF; END // -- calls rga_insert TO_ADD times to populate a large grid. CREATE PROCEDURE at_scale ( IN TO_ADD BIGINT -- The number of nodes to add to the array ) BEGIN DECLARE v_seq BIGINT; SET v_seq = 1; call rga_insert(1,1, NULL,NULL, v_seq,1); label1: LOOP SET v_seq = v_seq + 1; IF v_seq > TO_ADD THEN LEAVE label1; END IF; call rga_insert(1,1, (v_seq-1),1, v_seq,1); END LOOP label1; END // DELIMITER ; |
Using the above tables/procedures we can apply the patch that was generated by the client:
-- ├─ new_con 2.2 { "0.0.2" } CALL new_con(2,2, '{ "con":"0.0.2" }'); -- ├─ new_vec 2.3 CALL new_vec(2,3); -- ├─ new_con 2.4 { "" } CALL new_con(2,4, '{ "con":"" }'); -- ├─ ins_vec 2.5!1, obj = 2.3 -- │ └─ 0: 2.4 CALL ins_vec(2,5, 2,3, 0, 2,4); -- ├─ new_arr 2.6 CALL new_arr(2,6); -- ├─ new_con 2.7 { 0 } CALL new_con(2,7, '{ "con": 0 }'); -- ├─ ins_arr 2.8!1, obj = 2.6 { 2.6 ← 2.7 } CALL ins_arr(2,8, 2,6, 2,6, 2,7); -- ├─ new_arr 2.9 CALL new_arr(2,9); -- ├─ new_vec 2.10 CALL new_vec(2,10); -- ├─ new_con 2.11 { "" } CALL new_con(2,11, '{ "con":"" }'); -- ├─ ins_vec 2.12!1, obj = 2.10 -- │ └─ 0: 2.11 CALL ins_vec(2,12, 2,10, 0, 2,11); -- ├─ ins_arr 2.13!1, obj = 2.9 { 2.9 ← 2.10 } CALL ins_arr(2,13, 2,9, 2,9, 2,10); -- ├─ ins_obj 2.14!1, obj = 2.1 -- │ ├─ "doc_version": 2.2 -- │ ├─ "columnNames": 2.3 -- │ ├─ "columnOrder": 2.6 -- │ └─ "rows": 2.9 CALL ins_obj(2,1, 'doc_version', 2,2); CALL ins_obj(2,1, 'columnNames', 2,3); CALL ins_obj(2,1, 'columnOrder', 2,6); CALL ins_obj(2,1, 'rows', 2,9); -- ├─ ins_val 2.15!1, obj = 0.0, val = 2.1 # Not sure what to do with this. -- ├─ new_con ..5536.16 { "type" } CALL new_con(65536,16, '{ "con":"type" }'); -- ├─ ins_vec ..5536.17!1, obj = 2.3 -- │ └─ 0: ..5536.16 CALL ins_vec(65536,17, 2,3, 0, 65536,16); -- ├─ new_con ..5536.18 { "dog" } CALL new_con(65536,18, '{ "con":"dog" }'); -- ├─ ins_vec ..5536.19!1, obj = 2.10 -- │ └─ 0: ..5536.18 CALL ins_vec(65536,19, 2,10, 0, 65536,18); -- ├─ new_con ..5536.20 { "age" } CALL new_con(65536,20, '{ "con":"age" }'); -- ├─ ins_vec ..5536.21!1, obj = 2.3 -- │ └─ 1: ..5536.20 CALL ins_vec(65536,21, 2,3, 1, 65536,20); -- ├─ new_con ..5536.22 { 1 } CALL new_con(65536,22, '{ "con": 1 }'); -- ├─ ins_arr ..5536.23!1, obj = 2.6 { 2.8 ← ..5536.22 } CALL ins_arr(65536,23, 2,6, 2,8, 65536,22); -- ├─ new_con ..5536.24 { 9 } CALL new_con(65536,24, '{ "con": 9 }'); -- ├─ ins_vec ..5536.25!1, obj = 2.10 -- │ └─ 1: ..5536.24 CALL ins_vec(65536,25, 2,10, 1, 65536,24); -- ├─ new_con ..5536.26 { "name" } CALL new_con(65536,26, '{ "con": "name" }'); -- ├─ ins_vec ..5536.27!1, obj = 2.3 -- │ └─ 2: ..5536.26 CALL ins_vec(65536,27, 2,3, 2, 65536,26); -- ├─ new_con ..5536.28 { 2 } CALL new_con(65536,28, '{ "con": 2 }'); -- ├─ ins_arr ..5536.29!1, obj = 2.6 { 2.8 ← ..5536.28 } CALL ins_arr(65536,29, 2,6, 2,8, 65536,28); -- ├─ new_con ..5536.30 { "max" } CALL new_con(65536,30, '{ "con": "max" }'); -- ├─ ins_vec ..5536.31!1, obj = 2.10 -- │ └─ 2: ..5536.30 CALL ins_vec(65536,31, 2,10, 2, 65536,30); -- ├─ new_vec ..5536.32 CALL new_vec(65536,32); -- ├─ ins_arr ..5536.33!1, obj = 2.9 { 2.13 ← ..5536.32 } CALL ins_arr(65536,33, 2,9, 2,13, 65536,32); -- ├─ new_con ..5536.34 { "cat" } CALL new_con(65536,34, '{ "con": "cat" }'); -- ├─ new_con ..5536.35 { 15 } CALL new_con(65536,35, '{ "con": 15 }'); -- ├─ new_con ..5536.36 { "paws" } CALL new_con(65536,36, '{ "con": "paws" }'); -- ├─ ins_vec ..5536.37!1, obj = ..5536.32 -- │ ├─ 0: ..5536.34 -- │ ├─ 1: ..5536.35 -- │ └─ 2: ..5536.36 CALL ins_vec(65536,37, 65536,32, 0, 65536,34); CALL ins_vec(65536,37, 65536,32, 1, 65536,35); CALL ins_vec(65536,37, 65536,32, 2, 65536,36); -- ├─ new_vec ..5536.38 CALL new_vec(65536,38); -- ├─ ins_arr ..5536.39!1, obj = 2.9 { 2.13 ← ..5536.38 } CALL ins_arr(65536,39, 2,9, 2,13, 65536,38); -- ├─ new_con ..5536.40 { "rat" } CALL new_con(65536,40, '{ "con": "rat" }'); -- ├─ new_con ..5536.41 { 2 } CALL new_con(65536,41, '{ "con": 2 }'); -- ├─ new_con ..5536.42 { "whiskers" } CALL new_con(65536,42, '{ "con": "whiskers" }'); -- └─ ins_vec ..5536.43!1, obj = ..5536.38 -- ├─ 0: ..5536.40 -- ├─ 1: ..5536.41 -- └─ 2: ..5536.42 CALL ins_vec(65536,43, 65536,38, 0, 65536,40); CALL ins_vec(65536,43, 65536,38, 1, 65536,41); CALL ins_vec(65536,43, 65536,38, 2, 65536,42); |
After applying this patch to the database the following SQL can be used to show the “view” of the grid document:
WITH RECURSIVE D AS ( SELECT CHUNK_REP,CHUNK_SEQ, NODE_REP, NODE_SEQ, PARENT_REP, PARENT_SEQ, 1 AS level FROM R1_ARR WHERE (CHUNK_REP,CHUNK_SEQ) = (2,9) UNION ALL SELECT D2.CHUNK_REP, D2.CHUNK_SEQ, D2.NODE_REP, D2.NODE_SEQ, D2.PARENT_REP, D2.PARENT_SEQ, D.level + 1 FROM D INNER JOIN R1_ARR D2 ON (D.CHUNK_REP, D.CHUNK_SEQ) = (D2.PARENT_REP, D2.PARENT_SEQ) ) SELECT concat(D.NODE_REP, '.', D.NODE_SEQ) id, JSON_EXTRACT(V._C0_, '$.con' ) "type", JSON_EXTRACT(V._C2_, '$.con' ) "name", JSON_EXTRACT(V._C1_, '$.con' ) "age" FROM D JOIN R1_VEC V ON (D.NODE_REP,D.NODE_SEQ) = (V.VEC_REP,V.VEC_SEQ) ORDER BY level ASC; |
Results:
id | type | name | age |
---|---|---|---|
2.10 | "dog" | "max" | 9 |
65536.38 | "rat" | "whiskers" | 2 |
65536.32 | "cat" | "paws" | 15 |
This table matches our expected “view” of the example grid.
The following diagram provides a rough outline of the communication flow of the current grid implementation:
At the center is the “hub”. The hub is a cluster of workers that handle all requests from both server-side (internal) and client-side (web) replicas. The main job of the hub is to be the central CRDT patch database.
Both internal and web requests are pushed to the hub’s SQS queue. Responses from the hub are then dispatch according to the replica’s connection type. Websocket connections are forwarded to the websocket API gateway, while internal connections are pushed to internal replica queue.
The Paginated “view” is a specialized query that transforms the CRDT data nodes in the database into a tabular, paginated “grid”. This grid view is read-only. There are three consumers of the grid view:
Agent MCP Service - This is a set of services that are available to an agent assisting with the grid. The two main services are:
Grid Query - SQL like query of the grid view.
Grid Update - SQL like updates from the agent. These updates are translated into patches (patch builder), that feed back to the hub like all other patches.
JSON Schema validation Worker - This worker listens to grid changes and applies validation state changes in the form of patches (via patch builder).
Exporter - Used to export data from the grid back to the original grid source.
The following steps show how a client will connect to a Synapse grid:
Start a new grid session by calling POST /grid/session/async/start. The caller must be authenticated to start a new grid session. This method will return a job Id that will be used for the next step.
To initialize the grid with data from a table/view set the “initialQuery“ in the CreateGridReqequest. The follow example shows how to added all data from table: syn9602690:
{ "concreteType": "org.sagebionetworks.repo.model.grid.CreateGridRequest", "initialQuery": {"sql": "select * from syn9602690"} } |
When initialized from a table/view SQL the columns of the grid will match the select statement and there will be one row added for each row returned by the query. This data will be provided as a set of one or more patches during synchronization.
Get the results for the of the create grid job from step 1. by calling: GET /grid/session/async/get/{asyncToken}. This method will return a 202 while the job is still running, so you will need to keep calling until it until you get a 200 with a CreateGridResponse.
Using the the “sessionId” from the CreateGridResponse from step 2, create a new replica for the grid by calling: POST /grid/{sessionId}/replica. You will need to create a replica for each unique client that you wish to connect to the gird.
Get a presigend URL that can be used to establish a secure websocket connection to a grid replica by calling: POST /grid/{sessionId}/presigned/url. You will need to provide both the sessionId and replciaId from step 3.
We can test the resulting presigned URL using wscat:
wscat -c 'wss://bsz1rd4f32.execute-api.us-east-1.amazonaws.com/dev/?gridSessionId=NjU1NjE&replicaId=66537&userId=1&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20250530T180841Z&X-Amz-SignedHeaders=host&X-Amz-Credential=AKIAWRJDYHEDNASK5BJ5%2F20250530%2Fus-east-1%2Fexecute-api%2Faws4_request&X-Amz-Expires=900&X-Amz-Signature=123' |
With the connection open, we can then type the following to send a ping to the grid:
[8,"ping"] |
You should see the following results:
[8,"pong"] |
If you receive a “pong” then you have successful established a secure websocket connection with the grid session. Notes:
Each presigned URL establishes a connection with a specific session/relica authenticated as the user that created the URL.
Each presigned URL will expire 15 minutes after issued.
The presigned URL is only needed to establish a connection. Once connected, the connection will remain open as long as it is active.
You will need a new URL for each replica to connect to the grid session.
After a patch is created in a replica, the patch should be sent to the hub. The replica must keep a copy of the patch until the hub has confirmed receipt.
A patch is sent to the hub as a Request-Data-Message with the following format:
[1,<sequence_number>, "patch", <payload>] |
The first element must be: ‘1', which identifies it as a Request Data Message. The next element must be a numeric sequence number that issued by the client. The hub’s response will include this sequence number to aid the client in identifying the message chain. The next element is the method name: 'patch' that identifies this as a patch message. Finally, the last element is the patch body. The patch body must be encoded as compact-format.
The following is an example of patch message from replica 123, with a patchId 123.24, that creates a new constant with the value 'foo' and a client selected sequence number of 99:
[1,99, "patch", [[[123,24]],[0, "foo"] ] |
When the hub successfully process a patch from a replica it will respond with a message with the following format:
[5,<sequence_number>] |
The first element of 5, indicates that the hub’s response is a Response-Complete-Message, the next element is the sequence number issued by the client in their original request data message.
For example, if the hub successfully process the above example patch, the hub will respond with:
[5,99] |
Again, the replica must maintain a copy of each patch until a matching response complete message is sent from the hub to the replica.
The hub will notify each connected replica when a new patch is available by sending the following Notification-Message:
[8,"new-patch"] |
When a replica receives a new patch notification, it should start the clock synchronization process.
The clock synchronization process involves a replica sharing its current clock (version vector) with the hub. The hub will use the provided clock to determine which patches from the patch database, the replica is missing. The hub will then send the patches to the replica in the correct order.
Note: It is critical that all patches from a singe replica are applied to other replicas in the same order that they occurred at the source.
A replica should synchronize its clock with the hub under the following conditions:
Newly started replica.
The replica received a “new-patch” notification message
Anytime a connection to the grid is lost and then restored.
If it is not possible to detect disconnect/connect events, then a periodic synchronization might be needed.
A replica will initiate the clock synchronization process by sending the following Data Request Message to the hub via websocket:
[1, <sequence_number>, "synchronize-clock", <clock>] |
The first element 1 indicates that it is a Request-Data-Message. The second element is the sequence number that issued by the client. The hub’s response will include this sequence number to aid the client in identifying the message chain. The third element is the method name: "synchronize-clock". Finally, the last element is the replica’s current clock value. A clock value is represented as an array of logical timestamps. For example, consider the following replica clock:
clock 66000.102 └─ 65999.101 |
This clock would be encoded as:
[[66000,102],[65999,101]] |
Given this example clock and a client-side sequence number of 25 the request would be:
[1, 25, "synchronize-clock", [[66000,102],[65999,101]] ] |
For a new replica that does not yet have a clock, an empty array can be provided as the clock:
[1, 25, "synchronize-clock", [] ] |
The hub’s response to a “synchronize-clock” request will depend on the calling replica’s clock. If the hub determines that the calling replica is missing one or more patches, it will respond with a Response Data message :
[4, <sequence_number>, <payload>] |
The first element of 4, indicates that it is a Response Data Message. The <sequence_number> will match the client provided sequence number from the "synchronize-clock" request. Finally, the last element: <payload> will be the compact representation of the next patch that the replica needs to apply. For example, if the hub determined that the replica is missing the following patch:
[[[123,24]],[0, "foo"] |
then the full response of our above example request would be:
[4, 25, [[[123,24]],[0, "foo"]] |
Upon receiving such a response, the replica is expected to apply the provided patch, which will increment the replica’s clock accordingly. The client should then initiate a followup synchronize-clock call providing its updated clock. For example after apply the above patch a replica would send the following:
[1, 25, "synchronize-clock", [[66000,102],[65999,101],[123,24]] ] |
Note: the [123,24] added to the replica’s clock indicates that the patch was applied.
If the hub determines that a replica’s clock indicates that it is up-to-date, then the hub will respond to a synchronize-clock message with Response-Complete-Message:
[5,<sequence_number>] |
Again the first element of 5, indicates a Response Complete Message. The last element: <sequence_number> will match the sequence number provided by the client in its request. Extending the example from above with a sequence number of 25 an example response would be:
[5,25] |
In conclusion, a replica can synchronize with the hub by sending its clock in via “synchronize-clock” in a loop. Each time the hub returns a patch, the replica applies the patch, which updates it clock, and the loop continues. Finally, the loop terminates when the hub sends a Response Complete Message indicating that the replica is up-to-date.