An implementation proposal for Design: Agent Assisted Template Curation using JSON Joy for both the client-side and server-side implementation:
Client-side - Clients will use the json-joy TypeScript client.
Server-side - All CRDTs will have a relational database representation.
Note: Due to the way logical clocks work, both the client and server must enforce a single writer per replica (multiple readers are allowed).
We will use the following example to demonstrate both the client-side and server-side implementation. The grid will start off empty. We will then add three rows and three columns to our examples grid in the following order:
Create a column named “type”.
Add a row with the with type=dog
Add a column named “age”
Set age=9 for the first row.
Add a column after “type” named “name” (column order should be type,name,age).
Set name=max for the first row.
Add a new row after the first with: type=cat, name=paws, age=15.
Add a new row after the first with: type=rat, name=whiskers, age=2.
The final grid should be:
type | name | age |
---|---|---|
dog | max | 9 |
rat | whiskers | 2 |
cat | paws | 15 |
Notice that the name column was inserted between type and age, and that whiskers was inserted between max and paws.
Both the client and sever must share the grid document schema. Specifically, this schema will represent a single replica’s grid document. We will adopt the json-joy node type definitions for our schema. For the first version, we will only need the following sub-set of types:
con - This represents an immutable constant.
vec - A LLW, grow-only, append-only, index-value-pair of references to other nodes . Has a max length of 256.
arr - RGA (Replicated Growable Array) is mutable ordered tree of nodes.
The document schema is defined as follows:
const gridRowSchema = s.obj({ data: s.vec(s.con('')), metadata: s.obj({ synapseRow: s.obj({ rowId: s.con(0), versionNumber: s.con(0), etag: s.con(''), }), rowValidation: s.con(0) }), }); const gridSchema = s.obj({ doc_version: s.con('0.1.0'), columnNames: s.vec(s.con('')), columnOrder: s.arr([s.con(0)]), rows: s.arr([gridRowSchema]), }); |
name | description |
---|---|
| The semantic version of the document schema. This value will change anytime we change this schema. |
| This vector captures the name of the columns in their natural order (order they were added). Vectors are append-only (up to 256 values). Vectors allow LWW changes to each index. |
| This array defines the order that the columns should be displayed. Each value is an index into the |
| This is an |
| The row data |
| The row This data should be treated as read-only for all replicas other than the hub. |
| The |
| The rowValidation value is a reference to a constant containing the JSON serialization of a: ValidationResult. Note: The ValidationResult is identical to results provided for Entity Validation. |
Note: Each vec
of the rows
is append-only (cannot be re-order) with each index matching an index from columnNames
. When a new column name is appended to columnNames
vector, each row’s vector will be smaller (by one), indicating that the cells of newly added columns are null. For such cases, row vectors only need to be extended when setting a cell value for the new column.
The following uses the json-joy Typescript library to follow the eight example steps:
import {ConApi, Model, VecApi} from 'json-joy/lib/json-crdt'; import {ClockVector, konst, Konst, s} from 'json-joy/lib/json-crdt-patch'; import {encode, decode} from 'json-joy/lib/json-crdt-patch/codec/compact'; const schema = s.obj({ doc_version: s.con('0.0.2'), columnNames: s.vec(s.con('')), columnOrder: s.arr([s.con(0)]), rows: s.arr([s.vec(s.con(''))]) }); const replicaId = 0x10000; // Create a new JSON CRDT document. const model = Model.create(schema, replicaId); console.log('Initial Model:', model + ''); // The first patch is from the initalization of an emtpty model const p0 = model.api.flush(); console.log('Encoded patch zero:'); console.log(JSON.stringify(encode(p0), null, 0)); const cn = model.api.vec(['columnNames']); const co = model.api.arr(['columnOrder']); const rows = model.api.arr(['rows']); const r0 = (rows.get(0) as VecApi); // 1. set the name of the first column cn.set([[0, konst('type')]]); // 2. Set type=dog for the first cell of the first row r0.set([[0,konst('dog')]]); // 3. Add age; cn.set([[1,konst('age')]]); // set the column order with age as index 1 co.ins(1, [konst(1)]); // 4. Set age=9 r0.set([[1, konst(9)]]) // 5. Add 'name' column. cn.set([[2,konst('name')]]); // set the order or the name column to be at index 1 co.ins(1, [konst(2)]); // 6. Set name=max for the first row r0.set([[2, konst('max')]]) // 7. Add new row type=cat name=paws, age=15. rows.ins(1, [model.api.builder.vec()]); const r1 = (rows.get(1) as VecApi); r1.set([[0, konst('cat')],[1, konst(15)],[2, konst('paws')]]) // 8. add a new row after first with type=rat, name=whiskers rows.ins(1, [model.api.builder.vec()]); const r2 = (rows.get(1) as VecApi); r2.set([[0, konst('rat')],[1, konst(2)],[2, konst('whiskers')]]) console.log('Updated Model:', model + ''); const p1 = model.api.flush(); console.log('Encoded patch one:'); console.log(JSON.stringify(encode(p1), null, 0)); |
Initializing an empty document with a schema will start the document with default values. It is important to capture a patch after initialization as the internal replica ID of 2 is used for initialization. If we fail to capture a patch here, the next patch will erroneously contain a combination of the changes from replica 2 and replica 65536. The follow is the output from the script:
Initial Model: model ├─ root 0.0 │ └─ obj 2.1 │ ├─ "doc_version" │ │ └─ con 2.2 { "0.0.2" } │ ├─ "columnNames" │ │ └─ vec 2.3 │ │ └─ 0: con 2.4 { "" } │ ├─ "columnOrder" │ │ └─ arr 2.6 │ │ └─ chunk 2.8:1 .1. │ │ └─ [0]: con 2.7 { 0 } │ └─ "rows" │ └─ arr 2.9 │ └─ chunk 2.13:1 .1. │ └─ [0]: vec 2.10 │ └─ 0: con 2.11 { "" } │ ├─ index (9 nodes) │ ├─ obj 2.1 │ ├─ con 2.2 │ ├─ vec 2.3 │ ├─ con 2.4 │ ├─ arr 2.6 │ ├─ con 2.7 │ ├─ arr 2.9 │ ├─ vec 2.10 │ └─ con 2.11 │ ├─ view │ └─ { │ "doc_version": "0.0.2", │ "columnNames": [ │ "" │ ], │ "columnOrder": [ │ 0 │ ], │ "rows": [ │ [ │ "" │ ] │ ] │ } │ └─ clock 65536.16 └─ 2.15 Encoded patch zero: [[[2,1]],[2],[0,"0.0.2"],[3],[0,""],[11,3,[[0,4]]],[6],[0,0],[14,6,6,[7]],[6],[3],[0,""],[11,10,[[0,11]]],[14,9,9,[10]],[10,1,[["doc_version",2],["columnNames",3],["columnOrder",6],["rows",9]]],[9,[0,0],1]] Updated Model: model ├─ root 0.0 │ └─ obj 2.1 │ ├─ "doc_version" │ │ └─ con 2.2 { "0.0.2" } │ ├─ "columnNames" │ │ └─ vec 2.3 │ │ ├─ 0: con ..5536.16 { "type" } │ │ ├─ 1: con ..5536.20 { "age" } │ │ └─ 2: con ..5536.26 { "name" } │ ├─ "columnOrder" │ │ └─ arr 2.6 │ │ └─ chunk ..5536.29:1 .3. │ │ └─ [1]: con ..5536.28 { 2 } │ │ ← chunk 2.8:1 .1. │ │ └─ [0]: con 2.7 { 0 } │ │ → chunk ..5536.23:1 .1. │ │ └─ [2]: con ..5536.22 { 1 } │ └─ "rows" │ └─ arr 2.9 │ └─ chunk ..5536.39:1 .3. │ └─ [1]: vec ..5536.38 │ ├─ 0: con ..5536.40 { "rat" } │ ├─ 1: con ..5536.41 { 2 } │ └─ 2: con ..5536.42 { "whiskers" } │ ← chunk 2.13:1 .1. │ └─ [0]: vec 2.10 │ ├─ 0: con ..5536.18 { "dog" } │ ├─ 1: con ..5536.24 { 9 } │ └─ 2: con ..5536.30 { "max" } │ → chunk ..5536.33:1 .1. │ └─ [2]: vec ..5536.32 │ ├─ 0: con ..5536.34 { "cat" } │ ├─ 1: con ..5536.35 { 15 } │ └─ 2: con ..5536.36 { "paws" } │ ├─ index (23 nodes) │ ├─ obj 2.1 │ ├─ con 2.2 │ ├─ vec 2.3 │ ├─ arr 2.6 │ ├─ con 2.7 │ ├─ arr 2.9 │ ├─ vec 2.10 │ ├─ con ..5536.16 │ ├─ con ..5536.18 │ ├─ con ..5536.20 │ ├─ con ..5536.22 │ ├─ con ..5536.24 │ ├─ con ..5536.26 │ ├─ con ..5536.28 │ ├─ con ..5536.30 │ ├─ vec ..5536.32 │ ├─ con ..5536.34 │ ├─ con ..5536.35 │ ├─ con ..5536.36 │ ├─ vec ..5536.38 │ ├─ con ..5536.40 │ ├─ con ..5536.41 │ └─ con ..5536.42 │ ├─ view │ └─ { │ "doc_version": "0.0.2", │ "columnNames": [ │ "type", │ "age", │ "name" │ ], │ "columnOrder": [ │ 0, │ 2, │ 1 │ ], │ "rows": [ │ [ │ "dog", │ 9, │ "max" │ ], │ [ │ "rat", │ 2, │ "whiskers" │ ], │ [ │ "cat", │ 15, │ "paws" │ ] │ ] │ } │ └─ clock 65536.44 └─ 2.15 Encoded patch one: [[[65536,16]],[0,"type"],[11,[2,3],[[0,16]]],[0,"dog"],[11,[2,10],[[0,18]]],[0,"age"],[11,[2,3],[[1,20]]],[0,1],[14,[2,6],[2,8],[22]],[0,9],[11,[2,10],[[1,24]]],[0,"name"],[11,[2,3],[[2,26]]],[0,2],[14,[2,6],[2,8],[28]],[0,"max"],[11,[2,10],[[2,30]]],[3],[14,[2,9],[2,13],[32]],[0,"cat"],[0,15],[0,"paws"],[11,32,[[0,34],[1,35],[2,36]]],[3],[14,[2,9],[2,13],[38]],[0,"rat"],[0,2],[0,"whiskers"],[11,38,[[0,40],[1,41],[2,42]]]] |
After the client can send both encoded patches to the hub to share the changes with other replicas.
Clients work with the grid by maintaining an in-memory representation of the grid document (see above). The memory needed to load a large scale model with 10M would exceed the limits of a typical server-side worker. Server-side workers do not typically load large datasets into memory. Instead, the worker will manage large scale datasets using one of the following options:
Streaming - Read/write of large files is done by streaming over data rather than loading the data into memory.
Database - Large datasets are read from the database one page at time, and written to the database one block at a time.
For example, when a user makes a request to export a CSV from a Synapse table/view, a worker will process the request as follows:
The query results are read one page at a time.
Each row from a page is transformed into CSV row.
Each CSV row is streamed to file or S3.
This means the worker can process a table with an unbounded size using a small, fixed amount of memory.
The server-side implementation of a grid replica will need to read/write the grid document to a database. Consider an example where the worker is tasked with applying a patch from a client to the database. Ideally, the patch would be applied directly to database without loading the entire document into memory.
A server-side replica is maintained for each grid session by writing patches into the following database tables: Grid Schemas. A “view” of the grid document is generated using a specialized query that joins the grid tables using the following template: grid-index-view-template.sql.
The following diagram provides a rough outline of the communication flow of the current grid implementation:
At the center is the “hub”. The hub is a cluster of workers that handle all requests from both server-side (internal) and client-side (web) replicas. The main job of the hub is to be the central CRDT patch database.
Both internal and web requests are pushed to the hub’s SQS queue. Responses from the hub are then dispatch according to the replica’s connection type. Websocket connections are forwarded to the websocket API gateway, while internal connections are pushed to internal replica queue.
The Paginated “view” is a specialized query that transforms the CRDT data nodes in the database into a tabular, paginated “grid”. This grid view is read-only. There are three consumers of the grid view:
Agent MCP Service - This is a set of services that are available to an agent assisting with the grid. The two main services are:
Grid Query - SQL like query of the grid view.
Grid Update - SQL like updates from the agent. These updates are translated into patches (patch builder), that feed back to the hub like all other patches.
JSON Schema validation Worker - This worker listens to grid changes and applies validation state changes in the form of patches (via patch builder).
Exporter - Used to export data from the grid back to the original grid source.
Row validation occurs in the grid when JSON schema is bound to the grid session using one of the following techniques:
Schema Bound to Files of a View- For this case, if a JSON schema is bound to the files in a view, then this schema is detected when creating a grid using a query against such a view.
More options to be added….
Row validation is automatic. Anytime a cell is changed, a change event is sent to the validation worker via the Replica Change SQS queue (see communication diagram). The validation worker will re-validate the entire row and produce a ValidationResult. If the new result is different than the old, then worker will trigger the creation of a patch that will update the validation state of that row.
One cell’s validation result might be dependent on state of another cell within the same row. For example, consider the following JSON Schema where column “b” is required if column “a” has a value greater than 100:
{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Conditional Requirement Schema", "type": "object", "properties": { "a": { "type": "integer", "description": "An integer value." }, "b": { "type": "string", "description": "A string value, required if 'a' > 100." } }, "if": { "properties": { "a": { "minimum": 100 } }, "required": [ "a" ] }, "then": { "required": [ "b" ] } } |
Given this example schema, the following table shows the validation results for rows with various values:
a | b | ValidationResults |
---|---|---|
200 | “aString” |
|
99 | “aString” |
|
99 | null |
|
101 | null |
|
89 | true |
|
true | “aString“ |
|
The following steps show how a client will connect to a Synapse grid:
Start a new grid session by calling POST /grid/session/async/start. The caller must be authenticated to start a new grid session. This method will return a job Id that will be used for the next step.
To initialize the grid with data from a table/view set the “initialQuery“ in the CreateGridReqequest. The follow example shows how to added all data from table: syn9602690:
{ "concreteType": "org.sagebionetworks.repo.model.grid.CreateGridRequest", "initialQuery": {"sql": "select * from syn9602690"} } |
When initialized from a table/view SQL the columns of the grid will match the select statement and there will be one row added for each row returned by the query. This data will be provided as a set of one or more patches during synchronization.
Get the results for the of the create grid job from step 1. by calling: GET /grid/session/async/get/{asyncToken}. This method will return a 202 while the job is still running, so you will need to keep calling until it until you get a 200 with a CreateGridResponse.
Using the the “sessionId” from the CreateGridResponse from step 2, create a new replica for the grid by calling: POST /grid/{sessionId}/replica. You will need to create a replica for each unique client that you wish to connect to the gird.
Get a presigend URL that can be used to establish a secure websocket connection to a grid replica by calling: POST /grid/{sessionId}/presigned/url. You will need to provide both the sessionId and replciaId from step 3.
We can test the resulting presigned URL using wscat:
wscat -c 'wss://bsz1rd4f32.execute-api.us-east-1.amazonaws.com/dev/?gridSessionId=NjU1NjE&replicaId=66537&userId=1&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20250530T180841Z&X-Amz-SignedHeaders=host&X-Amz-Credential=AKIAWRJDYHEDNASK5BJ5%2F20250530%2Fus-east-1%2Fexecute-api%2Faws4_request&X-Amz-Expires=900&X-Amz-Signature=123' |
With the connection open, we can then type the following to send a ping to the grid:
[8,"ping"] |
You should see the following results:
[8,"pong"] |
If you receive a “pong” then you have successful established a secure websocket connection with the grid session. Notes:
Each presigned URL establishes a connection with a specific session/relica authenticated as the user that created the URL.
Each presigned URL will expire 15 minutes after issued.
The presigned URL is only needed to establish a connection. Once connected, the connection will remain open as long as it is active.
You will need a new URL for each replica to connect to the grid session.
After a patch is created in a replica, the patch should be sent to the hub. The replica must keep a copy of the patch until the hub has confirmed receipt.
A patch is sent to the hub as a Request-Data-Message with the following format:
[1,<sequence_number>, "patch", <payload>] |
The first element must be: ‘1', which identifies it as a Request Data Message. The next element must be a numeric sequence number that issued by the client. The hub’s response will include this sequence number to aid the client in identifying the message chain. The next element is the method name: 'patch' that identifies this as a patch message. Finally, the last element is the patch body. The patch body must be encoded as compact-format.
The following is an example of patch message from replica 123, with a patchId 123.24, that creates a new constant with the value 'foo' and a client selected sequence number of 99:
[1,99, "patch", [[[123,24]],[0, "foo"] ] |
When the hub successfully process a patch from a replica it will respond with a message with the following format:
[5,<sequence_number>] |
The first element of 5, indicates that the hub’s response is a Response-Complete-Message, the next element is the sequence number issued by the client in their original request data message.
For example, if the hub successfully process the above example patch, the hub will respond with:
[5,99] |
Again, the replica must maintain a copy of each patch until a matching response complete message is sent from the hub to the replica.
The hub will notify each connected replica when a new patch is available by sending the following Notification-Message:
[8,"new-patch"] |
When a replica receives a new patch notification, it should start the clock synchronization process.
The clock synchronization process involves a replica sharing its current clock (version vector) with the hub. The hub will use the provided clock to determine which patches from the patch database, the replica is missing. The hub will then send the patches to the replica in the correct order.
Note: It is critical that all patches from a singe replica are applied to other replicas in the same order that they occurred at the source.
A replica should synchronize its clock with the hub under the following conditions:
Newly started replica.
The replica received a “new-patch” notification message
Anytime a connection to the grid is lost and then restored.
If it is not possible to detect disconnect/connect events, then a periodic synchronization might be needed.
A replica will initiate the clock synchronization process by sending the following Data Request Message to the hub via websocket:
[1, <sequence_number>, "synchronize-clock", <clock>] |
The first element 1 indicates that it is a Request-Data-Message. The second element is the sequence number that issued by the client. The hub’s response will include this sequence number to aid the client in identifying the message chain. The third element is the method name: "synchronize-clock". Finally, the last element is the replica’s current clock value. A clock value is represented as an array of logical timestamps. For example, consider the following replica clock:
clock 66000.102 └─ 65999.101 |
This clock would be encoded as:
[[66000,102],[65999,101]] |
Given this example clock and a client-side sequence number of 25 the request would be:
[1, 25, "synchronize-clock", [[66000,102],[65999,101]] ] |
For a new replica that does not yet have a clock, an empty array can be provided as the clock:
[1, 25, "synchronize-clock", [] ] |
The hub’s response to a “synchronize-clock” request will depend on the calling replica’s clock. If the hub determines that the calling replica is missing one or more patches, it will respond with a Response Data message :
[4, <sequence_number>, <payload>] |
The first element of 4, indicates that it is a Response Data Message. The <sequence_number> will match the client provided sequence number from the "synchronize-clock" request. Finally, the last element: <payload> will be the compact representation of the next patch that the replica needs to apply. For example, if the hub determined that the replica is missing the following patch:
[[[123,24]],[0, "foo"] |
then the full response of our above example request would be:
[4, 25, [[[123,24]],[0, "foo"]] |
Upon receiving such a response, the replica is expected to apply the provided patch, which will increment the replica’s clock accordingly. The client should then initiate a followup synchronize-clock call providing its updated clock. For example after apply the above patch a replica would send the following:
[1, 25, "synchronize-clock", [[66000,102],[65999,101],[123,24]] ] |
Note: the [123,24] added to the replica’s clock indicates that the patch was applied.
If the hub determines that a replica’s clock indicates that it is up-to-date, then the hub will respond to a synchronize-clock message with Response-Complete-Message:
[5,<sequence_number>] |
Again the first element of 5, indicates a Response Complete Message. The last element: <sequence_number> will match the sequence number provided by the client in its request. Extending the example from above with a sequence number of 25 an example response would be:
[5,25] |
In conclusion, a replica can synchronize with the hub by sending its clock in via “synchronize-clock” in a loop. Each time the hub returns a patch, the replica applies the patch, which updates it clock, and the loop continues. Finally, the loop terminates when the hub sends a Response Complete Message indicating that the replica is up-to-date.