Compare commits

...

3 Commits

Author SHA1 Message Date
DarkSky
6557e5d01d feat: init disk remote source 2026-02-27 02:39:53 +08:00
DarkSky
895e774569 fix: static file handle & ws connect 2026-02-25 11:41:42 +08:00
DarkSky
79460072bb fix: old client compatibility 2026-02-24 23:58:10 +08:00
57 changed files with 6954 additions and 176 deletions

View File

@@ -96,12 +96,20 @@ spec:
httpGet:
path: /info
port: http
initialDelaySeconds: {{ .Values.probe.initialDelaySeconds }}
initialDelaySeconds: {{ default .Values.probe.initialDelaySeconds .Values.probe.liveness.initialDelaySeconds }}
timeoutSeconds: {{ default .Values.probe.timeoutSeconds .Values.probe.liveness.timeoutSeconds }}
periodSeconds: {{ default .Values.probe.periodSeconds .Values.probe.liveness.periodSeconds }}
failureThreshold: {{ default .Values.probe.failureThreshold .Values.probe.liveness.failureThreshold }}
successThreshold: {{ default .Values.probe.successThreshold .Values.probe.liveness.successThreshold }}
readinessProbe:
httpGet:
path: /info
port: http
initialDelaySeconds: {{ .Values.probe.initialDelaySeconds }}
initialDelaySeconds: {{ default .Values.probe.initialDelaySeconds .Values.probe.readiness.initialDelaySeconds }}
timeoutSeconds: {{ default .Values.probe.timeoutSeconds .Values.probe.readiness.timeoutSeconds }}
periodSeconds: {{ default .Values.probe.periodSeconds .Values.probe.readiness.periodSeconds }}
failureThreshold: {{ default .Values.probe.failureThreshold .Values.probe.readiness.failureThreshold }}
successThreshold: {{ default .Values.probe.successThreshold .Values.probe.readiness.successThreshold }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- with .Values.nodeSelector }}

View File

@@ -31,13 +31,21 @@ podSecurityContext:
resources:
limits:
cpu: '1'
memory: 2Gi
memory: 4Gi
requests:
cpu: '1'
memory: 2Gi
probe:
initialDelaySeconds: 20
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 6
successThreshold: 1
liveness:
initialDelaySeconds: 60
failureThreshold: 12
readiness: {}
services:
sync:

2
Cargo.lock generated
View File

@@ -135,10 +135,12 @@ dependencies = [
"napi-derive",
"once_cell",
"serde_json",
"sha3",
"sqlx",
"thiserror 2.0.17",
"tokio",
"uuid",
"y-octo",
]
[[package]]

View File

@@ -7,7 +7,12 @@ const MOBILE_CLIENT_ORIGINS = new Set([
'capacitor://localhost',
'ionic://localhost',
]);
const DESKTOP_CLIENT_ORIGINS = new Set(['assets://.', 'assets://another-host']);
const DESKTOP_CLIENT_ORIGINS = new Set([
'assets://.',
'assets://another-host',
// for old versions of client, which use file:// as origin
'file://',
]);
export const CORS_ALLOWED_METHODS = [
'GET',
@@ -55,6 +60,19 @@ function isDevLoopbackOrigin(origin: string) {
}
}
function normalizeCorsOrigin(origin: string) {
try {
const parsed = new URL(origin);
// Some websocket clients send ws:// or wss:// as Origin.
if (parsed.protocol === 'ws:' || parsed.protocol === 'wss:') {
parsed.protocol = parsed.protocol === 'wss:' ? 'https:' : 'http:';
}
return parsed.origin;
} catch {
return null;
}
}
export function buildCorsAllowedOrigins(url: URLHelper) {
return new Set<string>([
...url.allowedOrigins,
@@ -75,6 +93,11 @@ export function isCorsOriginAllowed(
return true;
}
const normalizedOrigin = normalizeCorsOrigin(origin);
if (normalizedOrigin && allowedOrigins.has(normalizedOrigin)) {
return true;
}
if ((env.dev || env.testing) && isDevLoopbackOrigin(origin)) {
return true;
}

View File

@@ -1,6 +1,7 @@
import ava, { TestFn } from 'ava';
import Sinon from 'sinon';
import { buildCorsAllowedOrigins, isCorsOriginAllowed } from '../../cors';
import { ActionForbidden } from '../../error';
import { URLHelper } from '../url';
@@ -193,3 +194,19 @@ test('can get request base url with multiple hosts', t => {
t.is(url.requestOrigin, 'https://app.affine.local2');
t.is(url.requestBaseUrl, 'https://app.affine.local2');
});
test('should allow websocket secure origin by normalizing wss to https', t => {
const allowedOrigins = buildCorsAllowedOrigins({
allowedOrigins: ['https://app.affine.pro'],
} as any);
t.true(isCorsOriginAllowed('wss://app.affine.pro', allowedOrigins));
});
test('should allow desktop file origin', t => {
const allowedOrigins = buildCorsAllowedOrigins({
allowedOrigins: [],
} as any);
t.true(isCorsOriginAllowed('file://', allowedOrigins));
});

View File

@@ -10,6 +10,16 @@ import { isMobileRequest } from '../utils/user-agent';
const staticPathRegex = /^\/(_plugin|assets|imgs|js|plugins|static)\//;
function isMissingStaticAssetError(error: unknown) {
if (!error || typeof error !== 'object') {
return false;
}
const err = error as { code?: string; status?: number; statusCode?: number };
return err.code === 'ENOENT' || err.status === 404 || err.statusCode === 404;
}
@Injectable()
export class StaticFilesResolver implements OnModuleInit {
constructor(
@@ -86,7 +96,18 @@ export class StaticFilesResolver implements OnModuleInit {
next();
return;
}
routeByUA(req, res, next, true);
routeByUA(
req,
res,
error => {
if (isMissingStaticAssetError(error)) {
res.status(404).end();
return;
}
next(error);
},
true
);
});
// /

View File

@@ -210,6 +210,9 @@ export class SpaceSyncGateway
private readonly server!: Server;
private connectionCount = 0;
private readonly socketUsers = new Map<string, string>();
private readonly localUserConnectionCounts = new Map<string, number>();
private unresolvedPresenceSockets = 0;
private flushTimer?: NodeJS.Timeout;
constructor(
@@ -224,7 +227,9 @@ export class SpaceSyncGateway
onModuleInit() {
this.flushTimer = setInterval(() => {
this.flushActiveUsersMinute().catch(error => {
this.logger.warn('Failed to flush active users minute', error as Error);
this.logger.warn(
`Failed to flush active users minute: ${this.formatError(error)}`
);
});
}, 60_000);
this.flushTimer.unref?.();
@@ -278,8 +283,7 @@ export class SpaceSyncGateway
};
} catch (error) {
this.logger.warn(
'Failed to merge updates for broadcast, falling back to batch',
error as Error
`Failed to merge updates for broadcast, falling back to batch: ${this.formatError(error)}`
);
return {
spaceType,
@@ -302,14 +306,20 @@ export class SpaceSyncGateway
this.connectionCount++;
this.logger.debug(`New connection, total: ${this.connectionCount}`);
metrics.socketio.gauge('connections').record(this.connectionCount);
this.attachPresenceUserId(client);
this.flushActiveUsersMinute().catch(error => {
this.logger.warn('Failed to flush active users minute', error as Error);
const userId = this.attachPresenceUserId(client);
this.trackConnectedSocket(client.id, userId);
void this.flushActiveUsersMinute({
aggregateAcrossCluster: false,
}).catch(error => {
this.logger.warn(
`Failed to flush active users minute: ${this.formatError(error)}`
);
});
}
handleDisconnect(_client: Socket) {
handleDisconnect(client: Socket) {
this.connectionCount = Math.max(0, this.connectionCount - 1);
this.trackDisconnectedSocket(client.id);
this.logger.debug(
`Connection disconnected, total: ${this.connectionCount}`
);
@@ -317,21 +327,24 @@ export class SpaceSyncGateway
void this.flushActiveUsersMinute({
aggregateAcrossCluster: false,
}).catch(error => {
this.logger.warn('Failed to flush active users minute', error as Error);
this.logger.warn(
`Failed to flush active users minute: ${this.formatError(error)}`
);
});
}
private attachPresenceUserId(client: Socket) {
private attachPresenceUserId(client: Socket): string | null {
const request = client.request as Request;
const userId = request.session?.user.id ?? request.token?.user.id;
if (typeof userId !== 'string' || !userId) {
this.logger.warn(
`Unable to resolve authenticated user id for socket ${client.id}`
);
return;
return null;
}
client.data[SOCKET_PRESENCE_USER_ID_KEY] = userId;
return userId;
}
private resolvePresenceUserId(socket: { data?: unknown }) {
@@ -345,6 +358,60 @@ export class SpaceSyncGateway
return typeof userId === 'string' && userId ? userId : null;
}
private trackConnectedSocket(socketId: string, userId: string | null) {
if (!userId) {
this.unresolvedPresenceSockets++;
return;
}
this.socketUsers.set(socketId, userId);
const prev = this.localUserConnectionCounts.get(userId) ?? 0;
this.localUserConnectionCounts.set(userId, prev + 1);
}
private trackDisconnectedSocket(socketId: string) {
const userId = this.socketUsers.get(socketId);
if (!userId) {
this.unresolvedPresenceSockets = Math.max(
0,
this.unresolvedPresenceSockets - 1
);
return;
}
this.socketUsers.delete(socketId);
const next = (this.localUserConnectionCounts.get(userId) ?? 1) - 1;
if (next <= 0) {
this.localUserConnectionCounts.delete(userId);
} else {
this.localUserConnectionCounts.set(userId, next);
}
}
private resolveLocalActiveUsers() {
if (this.unresolvedPresenceSockets > 0) {
return Math.max(0, this.connectionCount);
}
return this.localUserConnectionCounts.size;
}
private formatError(error: unknown) {
if (error instanceof Error) {
return error.stack ?? error.message;
}
if (typeof error === 'string') {
return error;
}
try {
return JSON.stringify(error);
} catch {
return String(error);
}
}
private async flushActiveUsersMinute(options?: {
aggregateAcrossCluster?: boolean;
}) {
@@ -352,7 +419,7 @@ export class SpaceSyncGateway
minute.setSeconds(0, 0);
const aggregateAcrossCluster = options?.aggregateAcrossCluster ?? true;
let activeUsers = Math.max(0, this.connectionCount);
let activeUsers = this.resolveLocalActiveUsers();
if (aggregateAcrossCluster) {
try {
const sockets = await this.server.fetchSockets();
@@ -377,8 +444,7 @@ export class SpaceSyncGateway
}
} catch (error) {
this.logger.warn(
'Failed to aggregate active users from sockets, using local value',
error as Error
`Failed to aggregate active users from sockets, using local value: ${this.formatError(error)}`
);
}
}

View File

@@ -430,7 +430,9 @@ fn parse_markdown_inner(markdown: &str) -> Result<MarkdownDocument, ParseError>
table_handled = true;
}
Event::Html(html) | Event::InlineHtml(html) => {
if let Some(text) = extract_wrapped_html_text(html) {
if is_html_comment(html) || is_iframe_end_tag(html) {
// Ignore HTML comments and iframe end tags inside table cells.
} else if let Some(text) = extract_wrapped_html_text(html) {
state.push_text(&text);
} else if is_html_line_break(html) {
state.push_text("\n");
@@ -621,6 +623,9 @@ fn parse_markdown_inner(markdown: &str) -> Result<MarkdownDocument, ParseError>
}
}
Event::Html(html) | Event::InlineHtml(html) => {
if is_html_comment(&html) || is_iframe_end_tag(&html) {
continue;
}
if is_ai_editable_comment(&html) {
continue;
}
@@ -773,6 +778,9 @@ fn validate_markdown_inner(markdown: &str) -> Result<(), ParseError> {
match event {
Event::Start(tag) => ensure_supported_tag(&tag)?,
Event::Html(html) | Event::InlineHtml(html) => {
if is_html_comment(&html) || is_iframe_end_tag(&html) {
continue;
}
if is_ai_editable_comment(&html) {
continue;
}
@@ -936,6 +944,15 @@ fn is_ai_editable_comment(html: &str) -> bool {
body.contains("block_id=") && body.contains("flavour=")
}
fn is_html_comment(html: &str) -> bool {
let trimmed = html.trim();
trimmed.starts_with("<!--") && trimmed.ends_with("-->")
}
fn is_iframe_end_tag(html: &str) -> bool {
parse_html_tag(html).is_some_and(|tag| tag.closing && tag.name == "iframe")
}
fn is_html_line_break(html: &str) -> bool {
let trimmed = html.trim();
if !trimmed.starts_with('<') || !trimmed.ends_with('>') {
@@ -1716,6 +1733,13 @@ mod tests {
assert!(result.is_ok());
}
#[test]
fn test_validate_markdown_allows_html_comment() {
let markdown = "# Title\n\n<!-- omit from toc -->\n\nContent.";
let result = validate_markdown(markdown);
assert!(result.is_ok());
}
#[test]
fn test_validate_markdown_rejects_html() {
let markdown = "# Title\n\n<div>HTML</div>";

View File

@@ -282,6 +282,9 @@ pub fn parse_doc_to_markdown(
0
};
let ai_block = ai_editable && block_level == 2;
let ai_preserve_block = ai_block
&& (matches!(flavour.as_str(), "affine:database" | "affine:callout")
|| BlockFlavour::from_str(flavour.as_str()).is_none());
let mut block_markdown = String::new();
@@ -308,7 +311,9 @@ pub fn parse_doc_to_markdown(
};
renderer.write_block(&mut block_markdown, &spec, list_depth);
} else {
return Err(ParseError::ParserError(format!("unsupported_block_flavour:{flavour}")));
block_markdown.push_str(&format!(
"<!-- unsupported_block_flavour:{flavour} block_id={block_id} -->\n\n"
));
}
}
}
@@ -317,6 +322,9 @@ pub fn parse_doc_to_markdown(
markdown.push_str(&format!("<!-- block_id={block_id} flavour={flavour} -->\n"));
}
markdown.push_str(&block_markdown);
if ai_preserve_block {
markdown.push_str(&format!("<!-- block_id={block_id} flavour={flavour} end -->\n"));
}
}
Ok(MarkdownResult {
@@ -792,4 +800,59 @@ mod tests {
assert!(md.contains("|A|B|"));
assert!(md.contains("|---|---|"));
}
#[test]
fn test_parse_doc_to_markdown_skips_unsupported_block_flavour() {
let doc_id = "unsupported-doc".to_string();
let doc = DocOptions::new().with_guid(doc_id.clone()).build();
let mut blocks = doc.get_or_create_map("blocks").unwrap();
let mut page = doc.create_map().unwrap();
page.insert("sys:id".into(), "page").unwrap();
page.insert("sys:flavour".into(), "affine:page").unwrap();
let mut page_children = doc.create_array().unwrap();
page_children.push("note").unwrap();
page.insert("sys:children".into(), Value::Array(page_children)).unwrap();
let mut page_title = doc.create_text().unwrap();
page_title.insert(0, "Page").unwrap();
page.insert("prop:title".into(), Value::Text(page_title)).unwrap();
blocks.insert("page".into(), Value::Map(page)).unwrap();
let mut note = doc.create_map().unwrap();
note.insert("sys:id".into(), "note").unwrap();
note.insert("sys:flavour".into(), "affine:note").unwrap();
let mut note_children = doc.create_array().unwrap();
note_children.push("latex").unwrap();
note_children.push("paragraph").unwrap();
note.insert("sys:children".into(), Value::Array(note_children)).unwrap();
note.insert("prop:displayMode".into(), "page").unwrap();
blocks.insert("note".into(), Value::Map(note)).unwrap();
let mut unsupported = doc.create_map().unwrap();
unsupported.insert("sys:id".into(), "latex").unwrap();
unsupported.insert("sys:flavour".into(), "affine:latex").unwrap();
unsupported
.insert("sys:children".into(), Value::Array(doc.create_array().unwrap()))
.unwrap();
blocks.insert("latex".into(), Value::Map(unsupported)).unwrap();
let mut paragraph = doc.create_map().unwrap();
paragraph.insert("sys:id".into(), "paragraph").unwrap();
paragraph.insert("sys:flavour".into(), "affine:paragraph").unwrap();
paragraph
.insert("sys:children".into(), Value::Array(doc.create_array().unwrap()))
.unwrap();
let mut paragraph_text = doc.create_text().unwrap();
paragraph_text.insert(0, "After unsupported block").unwrap();
paragraph
.insert("prop:text".into(), Value::Text(paragraph_text))
.unwrap();
blocks.insert("paragraph".into(), Value::Map(paragraph)).unwrap();
let doc_bin = doc.encode_update_v1().unwrap();
let result = parse_doc_to_markdown(doc_bin, doc_id, false, None).expect("parse doc");
assert!(result.markdown.contains("unsupported_block_flavour:affine:latex"));
assert!(result.markdown.contains("After unsupported block"));
}
}

View File

@@ -3,7 +3,7 @@
//! Converts markdown content into AFFiNE-compatible y-octo document binary
//! format.
use y_octo::DocOptions;
use y_octo::{DocOptions, StateVector};
use super::{
super::{
@@ -73,7 +73,7 @@ fn build_doc_update(doc_id: &str, title: &str, blocks: &[BlockNode]) -> Result<V
note_map.insert(PROP_HIDDEN.to_string(), Any::False)?;
note_map.insert(PROP_DISPLAY_MODE.to_string(), Any::String("both".to_string()))?;
Ok(doc.encode_update_v1()?)
Ok(doc.encode_state_as_update_v1(&StateVector::default())?)
}
fn insert_block_trees(doc: &Doc, blocks_map: &mut Map, blocks: &[BlockNode]) -> Result<Vec<String>, ParseError> {

View File

@@ -8,19 +8,37 @@ use std::collections::HashMap;
use super::{
super::{
block_spec::{TreeNode, count_tree_nodes, text_delta_eq},
blocksuite::{collect_child_ids, find_child_id_by_flavour},
blocksuite::{collect_child_ids, find_child_id_by_flavour, get_string},
markdown::{MAX_BLOCKS, parse_markdown_blocks},
schema::{PROP_BACKGROUND, PROP_DISPLAY_MODE, PROP_ELEMENTS, PROP_HIDDEN, PROP_INDEX, PROP_XYWH, SURFACE_FLAVOUR},
},
builder::{
ApplyBlockOptions, BOXED_NATIVE_TYPE, NOTE_BG_DARK, NOTE_BG_LIGHT, apply_block_spec, boxed_empty_map,
insert_block_map, insert_block_tree, insert_children, insert_sys_fields, insert_text, note_background_map,
text_ops_from_plain,
},
builder::{ApplyBlockOptions, apply_block_spec, insert_block_tree, insert_children},
*,
};
const MAX_LCS_CELLS: usize = 2_000_000;
#[derive(Debug, Clone)]
enum NodeSpec {
Supported(BlockSpec),
/// A block flavour we don't support for markdown diffing/updating (e.g.
/// `affine:database`).
///
/// These nodes are treated as opaque: we preserve them and never modify their
/// properties/children.
Opaque {
flavour: String,
},
}
#[derive(Debug, Clone)]
struct StoredNode {
id: String,
spec: BlockSpec,
spec: NodeSpec,
children: Vec<StoredNode>,
}
@@ -30,6 +48,20 @@ impl TreeNode for StoredNode {
}
}
#[derive(Debug, Clone)]
struct TargetNode {
/// Optional block id marker from exported markdown (AI-editable markers).
id_hint: Option<String>,
spec: NodeSpec,
children: Vec<TargetNode>,
}
impl TreeNode for TargetNode {
fn children(&self) -> &[TargetNode] {
&self.children
}
}
struct DocState {
doc: Doc,
note_id: String,
@@ -59,8 +91,24 @@ enum PatchOp {
/// # Returns
/// A binary vector representing only the delta (changes) to apply
pub fn update_doc(existing_binary: &[u8], new_markdown: &str, doc_id: &str) -> Result<Vec<u8>, ParseError> {
let mut new_nodes = parse_markdown_blocks(new_markdown)?;
let state = load_doc_state(existing_binary, doc_id)?;
let state = match load_doc_state(existing_binary, doc_id) {
Ok(state) => state,
Err(ParseError::ParserError(msg))
if matches!(
msg.as_str(),
"blocks map is empty" | "page block not found" | "note block not found"
) =>
{
// The existing doc may be a stub/partial document (e.g. created by references)
// and doesn't contain the canonical page/note structure yet. In that
// case, initialize the doc from the markdown instead of failing hard.
let new_nodes = parse_markdown_blocks(new_markdown)?;
return init_doc_from_markdown(existing_binary, new_markdown, doc_id, &new_nodes);
}
Err(err) => return Err(err),
};
let mut new_nodes = parse_markdown_targets(new_markdown)?;
check_limits(&state.blocks, &new_nodes)?;
@@ -74,6 +122,315 @@ pub fn update_doc(existing_binary: &[u8], new_markdown: &str, doc_id: &str) -> R
Ok(state.doc.encode_state_as_update_v1(&state_before)?)
}
#[derive(Debug, Clone)]
struct BlockMarker {
id: String,
flavour: String,
end: bool,
}
fn parse_block_marker_line(line: &str) -> Option<BlockMarker> {
let trimmed = line.trim();
if !trimmed.starts_with("<!--") || !trimmed.ends_with("-->") {
return None;
}
let body = trimmed.trim_start_matches("<!--").trim_end_matches("-->").trim();
if !body.contains("block_id=") || !body.contains("flavour=") {
return None;
}
let mut id: Option<String> = None;
let mut flavour: Option<String> = None;
let mut end = false;
for token in body.split_whitespace() {
if token == "end" || token == "type=end" || token == "end=true" {
end = true;
continue;
}
if let Some(value) = token.strip_prefix("block_id=") {
if !value.is_empty() {
id = Some(value.to_string());
}
continue;
}
if let Some(value) = token.strip_prefix("flavour=") {
if !value.is_empty() {
flavour = Some(value.to_string());
}
continue;
}
}
Some(BlockMarker {
id: id?,
flavour: flavour?,
end,
})
}
fn should_preserve_marker_flavour(flavour: &str) -> bool {
matches!(flavour, "affine:database" | "affine:callout")
}
fn parse_markdown_targets(markdown: &str) -> Result<Vec<TargetNode>, ParseError> {
// Fast path: no markers, behave like the original implementation.
if !markdown.contains("block_id=") || !markdown.contains("flavour=") {
let blocks = parse_markdown_blocks(markdown)?;
return Ok(blocks.into_iter().map(|b| target_from_block_node(b, None)).collect());
}
// Split the markdown by marker comments. For most blocks, a marker indicates
// the start of a block. For preserved blocks (e.g. database), an optional end
// marker can be emitted so users can append new content after the preserved
// section without needing to add markers manually.
let mut segments: Vec<(Option<BlockMarker>, String)> = Vec::new();
let mut current_marker: Option<BlockMarker> = None;
let mut current_body = String::new();
let mut saw_marker = false;
for line in markdown.lines() {
if let Some(marker) = parse_block_marker_line(line) {
saw_marker = true;
if marker.end {
if current_marker.is_some() || !current_body.is_empty() {
segments.push((current_marker.take(), std::mem::take(&mut current_body)));
}
// Close the marker scope; subsequent lines belong to an unmarked segment.
current_marker = None;
continue;
}
if current_marker.is_some() || !current_body.is_empty() {
segments.push((current_marker.take(), std::mem::take(&mut current_body)));
}
current_marker = Some(marker);
continue;
}
current_body.push_str(line);
current_body.push('\n');
}
if current_marker.is_some() || !current_body.is_empty() {
segments.push((current_marker.take(), current_body));
}
if !saw_marker {
let blocks = parse_markdown_blocks(markdown)?;
return Ok(blocks.into_iter().map(|b| target_from_block_node(b, None)).collect());
}
let mut out: Vec<TargetNode> = Vec::new();
for (marker, body) in segments {
if let Some(marker) = marker {
let preserve =
should_preserve_marker_flavour(&marker.flavour) || BlockFlavour::from_str(&marker.flavour).is_none();
if preserve {
out.push(TargetNode {
id_hint: Some(marker.id),
spec: NodeSpec::Opaque {
flavour: marker.flavour,
},
children: Vec::new(),
});
continue;
}
let blocks = parse_markdown_blocks(&body)?;
for (idx, block) in blocks.into_iter().enumerate() {
let id_hint = if idx == 0 { Some(marker.id.clone()) } else { None };
out.push(target_from_block_node(block, id_hint));
}
continue;
}
let trimmed = body.trim();
if trimmed.is_empty() {
continue;
}
let blocks = parse_markdown_blocks(&body)?;
for block in blocks {
out.push(target_from_block_node(block, None));
}
}
Ok(out)
}
fn target_from_block_node(node: BlockNode, id_hint: Option<String>) -> TargetNode {
TargetNode {
id_hint,
spec: NodeSpec::Supported(node.spec),
children: node
.children
.into_iter()
.map(|child| target_from_block_node(child, None))
.collect(),
}
}
fn target_node_to_block_node(node: &TargetNode) -> Result<BlockNode, ParseError> {
let NodeSpec::Supported(spec) = &node.spec else {
return Err(ParseError::ParserError("cannot_insert_opaque_block".into()));
};
Ok(BlockNode {
spec: spec.clone(),
children: node
.children
.iter()
.map(target_node_to_block_node)
.collect::<Result<Vec<_>, _>>()?,
})
}
fn init_doc_from_markdown(
existing_binary: &[u8],
new_markdown: &str,
doc_id: &str,
blocks: &[BlockNode],
) -> Result<Vec<u8>, ParseError> {
let doc = load_doc(existing_binary, Some(doc_id))?;
let state_before = doc.get_state_vector();
let mut blocks_map = doc.get_or_create_map("blocks")?;
let title = derive_title_from_markdown(new_markdown).unwrap_or_else(|| "Untitled".to_string());
// Prefer reusing an existing page block if the doc already has one (but is
// missing surface/note). This avoids creating multiple page roots when
// recovering from partial documents.
if !blocks_map.is_empty() {
let index = build_block_index(&blocks_map);
if let Some(page_id) = find_block_id_by_flavour(&index.block_pool, PAGE_FLAVOUR) {
insert_page_children(&doc, &mut blocks_map, &page_id, &title, blocks)?;
return Ok(doc.encode_state_as_update_v1(&state_before)?);
}
}
insert_page_doc(&doc, &mut blocks_map, &title, blocks)?;
Ok(doc.encode_state_as_update_v1(&state_before)?)
}
fn derive_title_from_markdown(markdown: &str) -> Option<String> {
for line in markdown.lines() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Some(rest) = trimmed.strip_prefix("# ") {
let title = rest.trim();
if !title.is_empty() {
return Some(title.to_string());
}
}
}
None
}
fn insert_page_doc(doc: &Doc, blocks_map: &mut Map, title: &str, blocks: &[BlockNode]) -> Result<(), ParseError> {
let page_id = nanoid::nanoid!();
let surface_id = nanoid::nanoid!();
let note_id = nanoid::nanoid!();
// Insert root blocks first to establish stable IDs.
let mut page_map = insert_block_map(doc, blocks_map, &page_id)?;
let mut surface_map = insert_block_map(doc, blocks_map, &surface_id)?;
let mut note_map = insert_block_map(doc, blocks_map, &note_id)?;
// Create content blocks under note.
let content_ids = insert_block_trees(doc, blocks_map, blocks)?;
// Page block.
insert_sys_fields(&mut page_map, &page_id, PAGE_FLAVOUR)?;
insert_children(doc, &mut page_map, &[surface_id.clone(), note_id.clone()])?;
insert_text(doc, &mut page_map, PROP_TITLE, &text_ops_from_plain(title))?;
// Surface block.
insert_sys_fields(&mut surface_map, &surface_id, SURFACE_FLAVOUR)?;
insert_children(doc, &mut surface_map, &[])?;
let mut boxed = boxed_empty_map(doc)?;
surface_map.insert(PROP_ELEMENTS.to_string(), Value::Map(boxed.clone()))?;
boxed.insert("type".to_string(), Any::String(BOXED_NATIVE_TYPE.to_string()))?;
let value = doc.create_map()?;
boxed.insert("value".to_string(), Value::Map(value))?;
// Note block.
insert_sys_fields(&mut note_map, &note_id, NOTE_FLAVOUR)?;
insert_children(doc, &mut note_map, &content_ids)?;
let mut background = note_background_map(doc)?;
note_map.insert(PROP_BACKGROUND.to_string(), Value::Map(background.clone()))?;
background.insert("light".to_string(), Any::String(NOTE_BG_LIGHT.to_string()))?;
background.insert("dark".to_string(), Any::String(NOTE_BG_DARK.to_string()))?;
note_map.insert(PROP_XYWH.to_string(), Any::String("[0,0,800,95]".to_string()))?;
note_map.insert(PROP_INDEX.to_string(), Any::String("a0".to_string()))?;
note_map.insert(PROP_HIDDEN.to_string(), Any::False)?;
note_map.insert(PROP_DISPLAY_MODE.to_string(), Any::String("both".to_string()))?;
Ok(())
}
fn insert_page_children(
doc: &Doc,
blocks_map: &mut Map,
page_id: &str,
title: &str,
blocks: &[BlockNode],
) -> Result<(), ParseError> {
let surface_id = nanoid::nanoid!();
let note_id = nanoid::nanoid!();
// Insert root blocks first to establish stable IDs.
let mut surface_map = insert_block_map(doc, blocks_map, &surface_id)?;
let mut note_map = insert_block_map(doc, blocks_map, &note_id)?;
// Create content blocks under note.
let content_ids = insert_block_trees(doc, blocks_map, blocks)?;
let Some(mut page_map) = blocks_map.get(page_id).and_then(|v| v.to_map()) else {
return Err(ParseError::ParserError("page block not found".into()));
};
// Page block.
insert_sys_fields(&mut page_map, page_id, PAGE_FLAVOUR)?;
insert_children(doc, &mut page_map, &[surface_id.clone(), note_id.clone()])?;
if page_map.get(PROP_TITLE).is_none() {
insert_text(doc, &mut page_map, PROP_TITLE, &text_ops_from_plain(title))?;
}
// Surface block.
insert_sys_fields(&mut surface_map, &surface_id, SURFACE_FLAVOUR)?;
insert_children(doc, &mut surface_map, &[])?;
let mut boxed = boxed_empty_map(doc)?;
surface_map.insert(PROP_ELEMENTS.to_string(), Value::Map(boxed.clone()))?;
boxed.insert("type".to_string(), Any::String(BOXED_NATIVE_TYPE.to_string()))?;
let value = doc.create_map()?;
boxed.insert("value".to_string(), Value::Map(value))?;
// Note block.
insert_sys_fields(&mut note_map, &note_id, NOTE_FLAVOUR)?;
insert_children(doc, &mut note_map, &content_ids)?;
let mut background = note_background_map(doc)?;
note_map.insert(PROP_BACKGROUND.to_string(), Value::Map(background.clone()))?;
background.insert("light".to_string(), Any::String(NOTE_BG_LIGHT.to_string()))?;
background.insert("dark".to_string(), Any::String(NOTE_BG_DARK.to_string()))?;
note_map.insert(PROP_XYWH.to_string(), Any::String("[0,0,800,95]".to_string()))?;
note_map.insert(PROP_INDEX.to_string(), Any::String("a0".to_string()))?;
note_map.insert(PROP_HIDDEN.to_string(), Any::False)?;
note_map.insert(PROP_DISPLAY_MODE.to_string(), Any::String("both".to_string()))?;
Ok(())
}
fn insert_block_trees(doc: &Doc, blocks_map: &mut Map, blocks: &[BlockNode]) -> Result<Vec<String>, ParseError> {
let mut ids = Vec::with_capacity(blocks.len());
for block in blocks {
let id = insert_block_tree(doc, blocks_map, block)?;
ids.push(id);
}
Ok(ids)
}
fn load_doc_state(binary: &[u8], doc_id: &str) -> Result<DocState, ParseError> {
let doc = load_doc(binary, Some(doc_id))?;
@@ -110,14 +467,31 @@ fn load_doc_state(binary: &[u8], doc_id: &str) -> Result<DocState, ParseError> {
}
fn build_stored_tree(block_id: &str, block: &Map, pool: &HashMap<String, Map>) -> Result<StoredNode, ParseError> {
let spec = BlockSpec::from_block_map(block)?;
let child_ids = collect_child_ids(block);
let flavour = get_string(block, "sys:flavour").unwrap_or_default();
let spec = match BlockSpec::from_block_map(block) {
Ok(spec) => spec,
Err(ParseError::ParserError(msg)) if msg.starts_with("unsupported block flavour:") => {
return Ok(StoredNode {
id: block_id.to_string(),
spec: NodeSpec::Opaque { flavour },
children: Vec::new(),
});
}
Err(err) => return Err(err),
};
// Only list/callout are supported as containers for markdown diffing.
// For any other block with children, treat as opaque so we never corrupt it.
if !child_ids.is_empty() && !matches!(spec.flavour, BlockFlavour::List | BlockFlavour::Callout) {
return Err(ParseError::ParserError(format!(
"unsupported children on block: {block_id}"
)));
return Ok(StoredNode {
id: block_id.to_string(),
spec: NodeSpec::Opaque { flavour },
children: Vec::new(),
});
}
let mut children = Vec::new();
for child_id in child_ids {
let child_block = pool
@@ -128,7 +502,7 @@ fn build_stored_tree(block_id: &str, block: &Map, pool: &HashMap<String, Map>) -
Ok(StoredNode {
id: block_id.to_string(),
spec,
spec: NodeSpec::Supported(spec),
children,
})
}
@@ -137,7 +511,7 @@ fn sync_nodes(
doc: &Doc,
blocks_map: &mut Map,
current: &[StoredNode],
target: &mut [BlockNode],
target: &mut [TargetNode],
) -> Result<Vec<String>, ParseError> {
let ops = diff_blocks(current, target);
let mut new_children = Vec::new();
@@ -148,29 +522,47 @@ fn sync_nodes(
PatchOp::Keep(old_idx, new_idx) => {
let old_node = &current[old_idx];
let new_node = &target[new_idx];
update_block_props(doc, blocks_map, old_node, &new_node.spec, true)?;
let child_ids = sync_nodes(doc, blocks_map, &old_node.children, &mut new_node.children.clone())?;
sync_children(doc, blocks_map, &old_node.id, &child_ids)?;
if let (NodeSpec::Supported(old_spec), NodeSpec::Supported(new_spec)) = (&old_node.spec, &new_node.spec) {
update_block_props(doc, blocks_map, &old_node.id, old_spec, new_spec, true)?;
let child_ids = sync_nodes(doc, blocks_map, &old_node.children, &mut new_node.children.clone())?;
sync_children(doc, blocks_map, &old_node.id, &child_ids)?;
} else {
// Preserve opaque blocks (and any mismatched marker blocks) as-is.
// Don't touch their properties or children ordering.
}
new_children.push(old_node.id.clone());
}
PatchOp::Update(old_idx, new_idx) => {
let old_node = &current[old_idx];
let new_node = &target[new_idx];
update_block_props(doc, blocks_map, old_node, &new_node.spec, false)?;
let child_ids = sync_nodes(doc, blocks_map, &old_node.children, &mut new_node.children.clone())?;
sync_children(doc, blocks_map, &old_node.id, &child_ids)?;
if let (NodeSpec::Supported(old_spec), NodeSpec::Supported(new_spec)) = (&old_node.spec, &new_node.spec) {
update_block_props(doc, blocks_map, &old_node.id, old_spec, new_spec, false)?;
let child_ids = sync_nodes(doc, blocks_map, &old_node.children, &mut new_node.children.clone())?;
sync_children(doc, blocks_map, &old_node.id, &child_ids)?;
} else {
// Opaque blocks are never updated from markdown.
}
new_children.push(old_node.id.clone());
}
PatchOp::Insert(new_idx) => {
let new_id = insert_block_tree(doc, blocks_map, &target[new_idx])?;
new_children.push(new_id);
if let Ok(node) = target_node_to_block_node(&target[new_idx]) {
let new_id = insert_block_tree(doc, blocks_map, &node)?;
new_children.push(new_id);
}
}
PatchOp::Delete(old_idx) => {
let node = &current[old_idx];
if node.spec.flavour == BlockFlavour::Callout {
new_children.push(node.id.clone());
} else {
collect_tree_ids(node, &mut to_remove);
match &node.spec {
NodeSpec::Opaque { .. } => {
// Never delete opaque blocks when syncing from markdown. They might contain
// rich data that can't be represented in markdown, so keeping them
// avoids data loss.
new_children.push(node.id.clone());
}
NodeSpec::Supported(spec) if spec.flavour == BlockFlavour::Callout => {
new_children.push(node.id.clone());
}
NodeSpec::Supported(_) => collect_tree_ids(node, &mut to_remove),
}
}
}
@@ -183,7 +575,7 @@ fn sync_nodes(
Ok(new_children)
}
fn diff_blocks(current: &[StoredNode], target: &[BlockNode]) -> Vec<PatchOp> {
fn diff_blocks(current: &[StoredNode], target: &[TargetNode]) -> Vec<PatchOp> {
let old_len = current.len();
let new_len = target.len();
@@ -198,10 +590,10 @@ fn diff_blocks(current: &[StoredNode], target: &[BlockNode]) -> Vec<PatchOp> {
for i in 1..=old_len {
for j in 1..=new_len {
let old_spec = &current[i - 1].spec;
let new_spec = &target[j - 1].spec;
let old_node = &current[i - 1];
let new_node = &target[j - 1];
if old_spec.is_exact(new_spec) {
if nodes_align(old_node, new_node) {
lcs[i][j] = lcs[i - 1][j - 1] + 1;
} else {
lcs[i][j] = std::cmp::max(lcs[i - 1][j], lcs[i][j - 1]);
@@ -215,14 +607,18 @@ fn diff_blocks(current: &[StoredNode], target: &[BlockNode]) -> Vec<PatchOp> {
while i > 0 || j > 0 {
if i > 0 && j > 0 {
let old_spec = &current[i - 1].spec;
let new_spec = &target[j - 1].spec;
let old_node = &current[i - 1];
let new_node = &target[j - 1];
if old_spec.is_exact(new_spec) {
ops.push(PatchOp::Keep(i - 1, j - 1));
if nodes_align(old_node, new_node) {
if nodes_should_update(old_node, new_node) {
ops.push(PatchOp::Update(i - 1, j - 1));
} else {
ops.push(PatchOp::Keep(i - 1, j - 1));
}
i -= 1;
j -= 1;
} else if old_spec.is_similar(new_spec)
} else if nodes_similar(old_node, new_node)
&& lcs[i - 1][j - 1] >= lcs[i - 1][j]
&& lcs[i - 1][j - 1] >= lcs[i][j - 1]
{
@@ -249,15 +645,60 @@ fn diff_blocks(current: &[StoredNode], target: &[BlockNode]) -> Vec<PatchOp> {
ops
}
fn nodes_align(old_node: &StoredNode, new_node: &TargetNode) -> bool {
if marker_matches(old_node, new_node) {
return true;
}
match (&old_node.spec, &new_node.spec) {
(NodeSpec::Supported(old_spec), NodeSpec::Supported(new_spec)) => old_spec.is_exact(new_spec),
_ => false,
}
}
fn nodes_should_update(old_node: &StoredNode, new_node: &TargetNode) -> bool {
if marker_matches(old_node, new_node) {
return match (&old_node.spec, &new_node.spec) {
(NodeSpec::Supported(old_spec), NodeSpec::Supported(new_spec)) => !old_spec.is_exact(new_spec),
_ => false,
};
}
false
}
fn nodes_similar(old_node: &StoredNode, new_node: &TargetNode) -> bool {
match (&old_node.spec, &new_node.spec) {
(NodeSpec::Supported(old_spec), NodeSpec::Supported(new_spec)) => old_spec.is_similar(new_spec),
_ => false,
}
}
fn marker_matches(old_node: &StoredNode, new_node: &TargetNode) -> bool {
let Some(id) = new_node.id_hint.as_deref() else {
return false;
};
if id != old_node.id.as_str() {
return false;
}
node_flavour_str(&old_node.spec) == node_flavour_str(&new_node.spec)
}
fn node_flavour_str(spec: &NodeSpec) -> &str {
match spec {
NodeSpec::Supported(spec) => spec.flavour.as_str(),
NodeSpec::Opaque { flavour } => flavour.as_str(),
}
}
fn update_block_props(
doc: &Doc,
blocks_map: &mut Map,
node: &StoredNode,
node_id: &str,
current: &BlockSpec,
target: &BlockSpec,
preserve_text: bool,
) -> Result<(), ParseError> {
let Some(mut block) = blocks_map.get(&node.id).and_then(|v| v.to_map()) else {
return Err(ParseError::ParserError(format!("Block {} not found", node.id)));
let Some(mut block) = blocks_map.get(node_id).and_then(|v| v.to_map()) else {
return Err(ParseError::ParserError(format!("Block {} not found", node_id)));
};
let preserve = match target.flavour {
@@ -266,7 +707,7 @@ fn update_block_props(
| BlockFlavour::Bookmark
| BlockFlavour::EmbedYoutube
| BlockFlavour::EmbedIframe => preserve_text,
_ => preserve_text || text_delta_eq(&node.spec.text, &target.text),
_ => preserve_text || text_delta_eq(&current.text, &target.text),
};
apply_block_spec(
@@ -302,7 +743,7 @@ fn collect_tree_ids(node: &StoredNode, output: &mut Vec<String>) {
}
}
fn check_limits(current: &[StoredNode], target: &[BlockNode]) -> Result<(), ParseError> {
fn check_limits(current: &[StoredNode], target: &[TargetNode]) -> Result<(), ParseError> {
let current_count = count_tree_nodes(current);
let target_count = count_tree_nodes(target);
@@ -319,7 +760,7 @@ fn check_limits(current: &[StoredNode], target: &[BlockNode]) -> Result<(), Pars
#[cfg(test)]
mod tests {
use y_octo::{Any, DocOptions, TextDeltaOp, TextInsert};
use y_octo::{Any, DocOptions, StateVector, TextDeltaOp, TextInsert};
use super::{super::builder::text_ops_from_plain, *};
use crate::doc_parser::{
@@ -647,6 +1088,233 @@ mod tests {
assert!(result.is_err());
}
#[test]
fn test_update_ydoc_fallback_when_blocks_empty() {
let doc_id = "stub-empty-blocks";
let markdown = "# From Markdown\n\nHello from markdown.";
// Build a valid ydoc update that results in an empty `blocks` map.
// NOTE: yjs/y-octo may encode a completely empty doc as `[0,0]`, which we treat
// as empty/invalid. We intentionally insert + remove a temp key so the
// update is non-empty while the final map is empty.
let doc = DocOptions::new().with_guid(doc_id.to_string()).build();
let mut blocks = doc.get_or_create_map("blocks").expect("create blocks map");
blocks
.insert("tmp".to_string(), Any::String("1".to_string()))
.expect("insert temp");
blocks.remove("tmp");
let stub_bin = doc
.encode_state_as_update_v1(&StateVector::default())
.expect("encode stub update");
assert!(
!stub_bin.is_empty() && stub_bin.as_slice() != [0, 0],
"stub update should not be empty update"
);
let delta = update_doc(&stub_bin, markdown, doc_id).expect("fallback delta");
assert!(!delta.is_empty(), "delta should contain changes");
let mut updated = DocOptions::new().with_guid(doc_id.to_string()).build();
updated
.apply_update_from_binary_v1(&stub_bin)
.expect("apply stub update");
updated
.apply_update_from_binary_v1(&delta)
.expect("apply fallback delta");
let blocks_map = updated.get_map("blocks").expect("blocks map exists");
let mut page: Option<Map> = None;
for (_, value) in blocks_map.iter() {
if let Some(block_map) = value.to_map()
&& get_string(&block_map, "sys:flavour").as_deref() == Some(PAGE_FLAVOUR)
{
page = Some(block_map);
break;
}
}
let page = page.expect("page block created");
assert_eq!(
get_string(&page, "prop:title").as_deref(),
Some("From Markdown"),
"page title should be derived from markdown H1"
);
let index = build_block_index(&blocks_map);
let note_id = find_child_id_by_flavour(&page, &index.block_pool, NOTE_FLAVOUR).expect("note child exists");
let note = index.block_pool.get(&note_id).expect("note block exists").clone();
assert!(
!collect_child_ids(&note).is_empty(),
"note should contain imported content blocks"
);
let full_bin = updated
.encode_state_as_update_v1(&StateVector::default())
.expect("encode full doc");
let md = parse_doc_to_markdown(full_bin, doc_id.to_string(), false, None).expect("render markdown");
assert!(md.markdown.contains("Hello from markdown."));
}
#[test]
fn test_update_ydoc_fallback_when_page_missing() {
let doc_id = "stub-page-missing";
let markdown = "# Title\n\nUpdated content.";
// Build a stub doc that has some blocks, but no `affine:page` root.
let doc = DocOptions::new().with_guid(doc_id.to_string()).build();
let mut blocks_map = doc.get_or_create_map("blocks").expect("create blocks map");
let para_id = "para-1";
let mut para = insert_block_map(&doc, &mut blocks_map, para_id).expect("insert para");
insert_sys_fields(&mut para, para_id, "affine:paragraph").expect("sys fields");
insert_children(&doc, &mut para, &[]).expect("children");
let stub_bin = doc
.encode_state_as_update_v1(&StateVector::default())
.expect("encode stub update");
assert!(!stub_bin.is_empty(), "stub update should not be empty");
let delta = update_doc(&stub_bin, markdown, doc_id).expect("fallback delta");
assert!(!delta.is_empty(), "delta should contain changes");
let mut updated = DocOptions::new().with_guid(doc_id.to_string()).build();
updated
.apply_update_from_binary_v1(&stub_bin)
.expect("apply stub update");
updated
.apply_update_from_binary_v1(&delta)
.expect("apply fallback delta");
let blocks_map = updated.get_map("blocks").expect("blocks map exists");
let index = build_block_index(&blocks_map);
let page_id = find_block_id_by_flavour(&index.block_pool, PAGE_FLAVOUR).expect("page block exists");
let page = index.block_pool.get(&page_id).expect("page map exists").clone();
let note_id = find_child_id_by_flavour(&page, &index.block_pool, NOTE_FLAVOUR).expect("note child exists");
let note = index.block_pool.get(&note_id).expect("note block exists").clone();
assert!(
!collect_child_ids(&note).is_empty(),
"note should contain imported content blocks"
);
}
#[test]
fn test_update_ydoc_fallback_when_note_missing() {
let doc_id = "stub-note-missing";
let markdown = "# Title\n\nUpdated content.";
// Build a stub doc that has an `affine:page` block but doesn't contain a note
// child.
let doc = DocOptions::new().with_guid(doc_id.to_string()).build();
let mut blocks_map = doc.get_or_create_map("blocks").expect("create blocks map");
let page_id = "page-1";
let mut page = insert_block_map(&doc, &mut blocks_map, page_id).expect("insert page");
insert_sys_fields(&mut page, page_id, PAGE_FLAVOUR).expect("sys fields");
insert_children(&doc, &mut page, &[]).expect("children");
let stub_bin = doc
.encode_state_as_update_v1(&StateVector::default())
.expect("encode stub update");
assert!(!stub_bin.is_empty(), "stub update should not be empty");
let delta = update_doc(&stub_bin, markdown, doc_id).expect("fallback delta");
assert!(!delta.is_empty(), "delta should contain changes");
let mut updated = DocOptions::new().with_guid(doc_id.to_string()).build();
updated
.apply_update_from_binary_v1(&stub_bin)
.expect("apply stub update");
updated
.apply_update_from_binary_v1(&delta)
.expect("apply fallback delta");
let blocks_map = updated.get_map("blocks").expect("blocks map exists");
let index = build_block_index(&blocks_map);
let page_id = find_block_id_by_flavour(&index.block_pool, PAGE_FLAVOUR).expect("page block exists");
let page = index.block_pool.get(&page_id).expect("page map exists").clone();
let note_id = find_child_id_by_flavour(&page, &index.block_pool, NOTE_FLAVOUR).expect("note child exists");
let note = index.block_pool.get(&note_id).expect("note block exists").clone();
assert!(
!collect_child_ids(&note).is_empty(),
"note should contain imported content blocks"
);
}
#[test]
fn test_update_ydoc_preserves_opaque_blocks_when_unsupported_block_flavour() {
let doc_id = "unsupported-flavour-replace";
// Build a doc with canonical page/note structure, but add an unsupported block
// flavour under note. This simulates real-world docs that contain blocks we
// don't support for structural diffing.
let doc = DocOptions::new().with_guid(doc_id.to_string()).build();
let mut blocks_map = doc.get_or_create_map("blocks").expect("create blocks map");
let page_id = "page-1";
let surface_id = "surface-1";
let note_id = "note-1";
let db_id = "db-1";
let mut page = insert_block_map(&doc, &mut blocks_map, page_id).expect("insert page");
let mut surface = insert_block_map(&doc, &mut blocks_map, surface_id).expect("insert surface");
let mut note = insert_block_map(&doc, &mut blocks_map, note_id).expect("insert note");
let mut db = insert_block_map(&doc, &mut blocks_map, db_id).expect("insert db");
insert_sys_fields(&mut page, page_id, PAGE_FLAVOUR).expect("page sys fields");
insert_children(&doc, &mut page, &[surface_id.to_string(), note_id.to_string()]).expect("page children");
insert_text(&doc, &mut page, PROP_TITLE, &text_ops_from_plain("Title")).expect("page title");
insert_sys_fields(&mut surface, surface_id, SURFACE_FLAVOUR).expect("surface sys fields");
insert_children(&doc, &mut surface, &[]).expect("surface children");
let mut boxed = boxed_empty_map(&doc).expect("boxed map");
surface
.insert(PROP_ELEMENTS.to_string(), Value::Map(boxed.clone()))
.expect("surface elements");
boxed
.insert("type".to_string(), Any::String(BOXED_NATIVE_TYPE.to_string()))
.expect("boxed type");
let value = doc.create_map().expect("boxed value map");
boxed
.insert("value".to_string(), Value::Map(value))
.expect("boxed value");
insert_sys_fields(&mut note, note_id, NOTE_FLAVOUR).expect("note sys fields");
insert_children(&doc, &mut note, &[db_id.to_string()]).expect("note children");
// Unsupported flavour.
insert_sys_fields(&mut db, db_id, "affine:database").expect("db sys fields");
insert_children(&doc, &mut db, &[]).expect("db children");
let initial_bin = doc
.encode_state_as_update_v1(&StateVector::default())
.expect("encode initial");
// Updating should succeed and preserve the opaque block rather than deleting
// it.
let updated_md = "# New Title\n\nHello.";
let delta = update_doc(&initial_bin, updated_md, doc_id).expect("delta");
assert!(!delta.is_empty(), "delta should contain changes");
let mut updated_doc = DocOptions::new().with_guid(doc_id.to_string()).build();
updated_doc
.apply_update_from_binary_v1(&initial_bin)
.expect("apply initial");
updated_doc.apply_update_from_binary_v1(&delta).expect("apply delta");
let blocks_map = updated_doc.get_map("blocks").expect("blocks map");
assert!(
blocks_map.get(db_id).is_some(),
"opaque block should be preserved when syncing from markdown"
);
let md = parse_doc_to_markdown(updated_doc.encode_update_v1().unwrap(), doc_id.to_string(), false, None)
.expect("render markdown")
.markdown;
assert!(md.contains("Hello."));
}
#[test]
fn test_update_ydoc_markdown_too_large() {
let initial_md = "# Title\n\nContent.";

View File

@@ -12,6 +12,7 @@
"./broadcast-channel": "./src/impls/broadcast-channel/index.ts",
"./idb/v1": "./src/impls/idb/v1/index.ts",
"./cloud": "./src/impls/cloud/index.ts",
"./disk": "./src/impls/disk/index.ts",
"./sqlite": "./src/impls/sqlite/index.ts",
"./sqlite/v1": "./src/impls/sqlite/v1/index.ts",
"./sync": "./src/sync/index.ts",

View File

@@ -0,0 +1,127 @@
import { AutoReconnectConnection } from '../../connection';
import type { DocClock, DocUpdate } from '../../storage';
import { type SpaceType, universalId } from '../../utils/universal-id';
export interface DiskSessionOptions {
workspaceId: string;
syncFolder: string;
}
export type DiskSyncEvent =
| { type: 'ready' }
| {
type: 'doc-update';
update: {
docId: string;
bin: Uint8Array;
timestamp: Date;
editor?: string;
};
origin?: string;
}
| { type: 'doc-delete'; docId: string; timestamp: Date }
| { type: 'error'; message: string };
export interface DiskSyncApis {
startSession: (
sessionId: string,
options: DiskSessionOptions
) => Promise<void>;
stopSession: (sessionId: string) => Promise<void>;
applyLocalUpdate: (
sessionId: string,
update: DocUpdate,
origin?: string
) => Promise<DocClock>;
subscribeEvents: (
sessionId: string,
callback: (event: DiskSyncEvent) => void
) => () => void;
}
interface DiskSyncOptions {
readonly flavour: string;
readonly type: SpaceType;
readonly id: string;
readonly syncFolder: string;
}
interface DiskSyncApisWrapper {
startSession: (options: DiskSessionOptions) => Promise<void>;
stopSession: () => Promise<void>;
applyLocalUpdate: (update: DocUpdate, origin?: string) => Promise<DocClock>;
subscribeEvents: (callback: (event: DiskSyncEvent) => void) => () => void;
}
let apis: DiskSyncApis | null = null;
export function bindDiskSyncApis(a: DiskSyncApis) {
apis = a;
}
export class DiskSyncConnection extends AutoReconnectConnection<{
unsubscribe: () => void;
}> {
readonly apis: DiskSyncApisWrapper;
readonly sessionId: string;
readonly flavour = this.options.flavour;
readonly type = this.options.type;
readonly id = this.options.id;
constructor(
private readonly options: DiskSyncOptions,
private readonly onEvent: (event: DiskSyncEvent) => void
) {
super();
if (!apis) {
throw new Error('Not in native context.');
}
this.sessionId = universalId({
peer: this.flavour,
type: this.type,
id: this.id,
});
this.apis = this.wrapApis(apis);
}
override get shareId(): string {
return `disk:${this.sessionId}:${this.options.syncFolder}`;
}
private wrapApis(originalApis: DiskSyncApis): DiskSyncApisWrapper {
const sessionId = this.sessionId;
return new Proxy(
{},
{
get: (_target, key: keyof DiskSyncApisWrapper) => {
const method = originalApis[key];
return (...args: unknown[]) => {
// oxlint-disable-next-line @typescript-eslint/no-explicit-any
return (method as any)(sessionId, ...args);
};
},
}
) as DiskSyncApisWrapper;
}
override async doConnect() {
await this.apis.startSession({
workspaceId: this.id,
syncFolder: this.options.syncFolder,
});
const unsubscribe = this.apis.subscribeEvents(this.onEvent);
return { unsubscribe };
}
override doDisconnect(conn: { unsubscribe: () => void }) {
try {
conn.unsubscribe();
} catch (error) {
console.error('DiskSyncConnection unsubscribe failed', error);
}
this.apis.stopSession().catch(error => {
console.error('DiskSyncConnection stopSession failed', error);
});
}
}

View File

@@ -0,0 +1,45 @@
import fs from 'node:fs';
import path from 'node:path';
import { describe, expect, it } from 'vitest';
const PROJECT_ROOT = path.resolve(__dirname, '../../../../../../');
const JS_BOUNDARY_FILES = [
path.join(PROJECT_ROOT, 'packages/common/nbstore/src/impls/disk/doc.ts'),
path.join(
PROJECT_ROOT,
'packages/frontend/apps/electron/src/helper/disk-sync/handlers.ts'
),
];
const FORBIDDEN_PATTERNS = [
/frontmatter/i,
/gray-matter/i,
/MarkdownAdapter/,
/markdownToSnapshot/,
/fromMarkdown/,
/toMarkdown/,
];
describe('disk boundary', () => {
it('keeps markdown/frontmatter parsing out of JS adapter layer', () => {
for (const file of JS_BOUNDARY_FILES) {
const content = fs.readFileSync(file, 'utf-8');
for (const pattern of FORBIDDEN_PATTERNS) {
expect(content).not.toMatch(pattern);
}
}
});
it('keeps JS layer focused on session orchestration APIs', () => {
const adapter = fs.readFileSync(JS_BOUNDARY_FILES[0], 'utf-8');
expect(adapter).toMatch(/applyLocalUpdate/);
const helper = fs.readFileSync(JS_BOUNDARY_FILES[1], 'utf-8');
expect(helper).toMatch(/startSession/);
expect(helper).toMatch(/stopSession/);
expect(helper).toMatch(/applyLocalUpdate/);
});
});

View File

@@ -0,0 +1,451 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import {
applyUpdate,
Array as YArray,
Doc as YDoc,
encodeStateAsUpdate,
Map as YMap,
} from 'yjs';
import { universalId } from '../../utils/universal-id';
import { bindDiskSyncApis, type DiskSyncApis, type DiskSyncEvent } from './api';
import { DiskDocStorage } from './doc';
function createUpdate(text: string): Uint8Array {
const doc = new YDoc();
doc.getText('content').insert(0, text);
return encodeStateAsUpdate(doc);
}
function createMapUpdate(entries: Record<string, string>): Uint8Array {
const doc = new YDoc();
const map = doc.getMap('test');
for (const [key, value] of Object.entries(entries)) {
map.set(key, value);
}
return encodeStateAsUpdate(doc);
}
function createRootMetaUpdate(docIds: string[]): Uint8Array {
const doc = new YDoc();
const meta = doc.getMap('meta');
const pages = new YArray<YMap<unknown>>();
for (const docId of docIds) {
const page = new YMap<unknown>();
page.set('id', docId);
pages.push([page]);
}
meta.set('pages', pages);
return encodeStateAsUpdate(doc);
}
describe('DiskDocStorage', () => {
const sessionId = universalId({
peer: 'local',
type: 'workspace',
id: 'workspace-test',
});
const listeners = new Map<string, Set<(event: DiskSyncEvent) => void>>();
const startSession = vi.fn(
async (_sessionId: string, _options: { workspaceId: string }) => {}
);
const stopSession = vi.fn(async (_sessionId: string) => {});
const applyLocalUpdate = vi.fn(
async (_sessionId: string, update: { docId: string }) => {
return {
docId: update.docId,
timestamp: new Date('2026-01-02T00:00:00.000Z'),
};
}
);
const subscribeEvents = vi.fn(
(currentSessionId: string, callback: (event: DiskSyncEvent) => void) => {
let set = listeners.get(currentSessionId);
if (!set) {
set = new Set();
listeners.set(currentSessionId, set);
}
set.add(callback);
return () => {
set?.delete(callback);
};
}
);
const apis: DiskSyncApis = {
startSession,
stopSession,
applyLocalUpdate,
subscribeEvents,
};
function emit(event: DiskSyncEvent) {
const callbacks = listeners.get(sessionId);
for (const callback of callbacks ?? []) {
callback(event);
}
}
function createStorage() {
return new DiskDocStorage({
flavour: 'local',
type: 'workspace',
id: 'workspace-test',
syncFolder: '/tmp/sync',
});
}
beforeEach(() => {
bindDiskSyncApis(apis);
listeners.clear();
vi.clearAllMocks();
});
afterEach(() => {
listeners.clear();
});
it('starts and stops disk session with connection lifecycle', async () => {
const storage = createStorage();
storage.connection.connect();
await storage.connection.waitForConnected();
expect(startSession).toHaveBeenCalledWith(sessionId, {
workspaceId: 'workspace-test',
syncFolder: '/tmp/sync',
});
storage.connection.disconnect();
await vi.waitFor(() => {
expect(stopSession).toHaveBeenCalledWith(sessionId);
});
});
it('forwards local updates and emits doc update events', async () => {
const storage = createStorage();
storage.connection.connect();
await storage.connection.waitForConnected();
const seen: Array<{ docId: string; origin?: string }> = [];
const unsubscribe = storage.subscribeDocUpdate((update, origin) => {
seen.push({ docId: update.docId, origin });
});
const bin = createUpdate('local');
await storage.pushDocUpdate({ docId: 'doc-local', bin }, 'origin:local');
expect(applyLocalUpdate).toHaveBeenCalledWith(
sessionId,
expect.objectContaining({
docId: 'doc-local',
}),
'origin:local'
);
expect(seen).toEqual([{ docId: 'doc-local', origin: 'origin:local' }]);
const snapshot = await storage.getDoc('doc-local');
expect(snapshot?.docId).toBe('doc-local');
expect(snapshot?.timestamp.toISOString()).toBe('2026-01-02T00:00:00.000Z');
unsubscribe();
storage.connection.disconnect();
});
it('applies remote events into local snapshots and handles delete events', async () => {
const storage = createStorage();
storage.connection.connect();
await storage.connection.waitForConnected();
emit({
type: 'doc-update',
update: {
docId: 'doc-remote',
bin: createUpdate('remote'),
timestamp: new Date('2026-01-03T00:00:00.000Z'),
},
});
await vi.waitFor(async () => {
const snapshot = await storage.getDoc('doc-remote');
expect(snapshot?.docId).toBe('doc-remote');
});
const timestamps = await storage.getDocTimestamps();
expect(timestamps['doc-remote']?.toISOString()).toBe(
'2026-01-03T00:00:00.000Z'
);
emit({
type: 'doc-delete',
docId: 'doc-remote',
timestamp: new Date('2026-01-03T00:00:01.000Z'),
});
await vi.waitFor(async () => {
expect(await storage.getDoc('doc-remote')).toBeNull();
});
storage.connection.disconnect();
});
it('serializes concurrent remote doc-update merges for the same doc', async () => {
const storage = createStorage();
storage.connection.connect();
await storage.connection.waitForConnected();
const originalMergeUpdates = (
storage as unknown as {
mergeUpdates: (updates: Uint8Array[]) => Promise<Uint8Array>;
}
).mergeUpdates.bind(storage);
let mergeCall = 0;
vi.spyOn(
storage as unknown as {
mergeUpdates: (updates: Uint8Array[]) => Promise<Uint8Array>;
},
'mergeUpdates'
).mockImplementation(async updates => {
mergeCall += 1;
// Force two in-flight merge operations to overlap and complete out-of-order.
if (mergeCall === 1) {
await new Promise(resolve => setTimeout(resolve, 20));
}
return originalMergeUpdates(updates);
});
emit({
type: 'doc-update',
update: {
docId: 'doc-race',
bin: createMapUpdate({ first: '1' }),
timestamp: new Date('2026-01-03T00:00:00.000Z'),
},
});
emit({
type: 'doc-update',
update: {
docId: 'doc-race',
bin: createMapUpdate({ second: '2' }),
timestamp: new Date('2026-01-03T00:00:00.001Z'),
},
});
await vi.waitFor(async () => {
const snapshot = await storage.getDoc('doc-race');
expect(snapshot).not.toBeNull();
expect(snapshot!.timestamp.toISOString()).toBe(
'2026-01-03T00:00:00.001Z'
);
const doc = new YDoc();
applyUpdate(doc, snapshot!.bin);
expect(doc.getMap('test').toJSON()).toEqual({
first: '1',
second: '2',
});
});
storage.connection.disconnect();
});
it('does not block follow-up updates when snapshot merge fails once', async () => {
const storage = createStorage();
storage.connection.connect();
await storage.connection.waitForConnected();
const originalMergeUpdates = (
storage as unknown as {
mergeUpdates: (updates: Uint8Array[]) => Promise<Uint8Array>;
}
).mergeUpdates.bind(storage);
let mergeCall = 0;
vi.spyOn(
storage as unknown as {
mergeUpdates: (updates: Uint8Array[]) => Promise<Uint8Array>;
},
'mergeUpdates'
).mockImplementation(async updates => {
mergeCall += 1;
if (mergeCall === 1) {
throw new Error('merge failed once');
}
return originalMergeUpdates(updates);
});
await expect(
storage.pushDocUpdate({
docId: 'doc-merge-fallback',
bin: createMapUpdate({ a: '1' }),
})
).resolves.toEqual({
docId: 'doc-merge-fallback',
timestamp: new Date('2026-01-02T00:00:00.000Z'),
});
// This update triggers the mocked merge failure, but should still resolve.
await expect(
storage.pushDocUpdate({
docId: 'doc-merge-fallback',
bin: createMapUpdate({ b: '2' }),
})
).resolves.toEqual({
docId: 'doc-merge-fallback',
timestamp: new Date('2026-01-02T00:00:00.000Z'),
});
// Follow-up update should continue to work without requiring reconnect/reload.
await expect(
storage.pushDocUpdate({
docId: 'doc-merge-fallback',
bin: createMapUpdate({ c: '3' }),
})
).resolves.toEqual({
docId: 'doc-merge-fallback',
timestamp: new Date('2026-01-02T00:00:00.000Z'),
});
const snapshot = await storage.getDoc('doc-merge-fallback');
expect(snapshot).not.toBeNull();
const doc = new YDoc();
applyUpdate(doc, snapshot!.bin);
const data = doc.getMap('test').toJSON();
expect(data).toMatchObject({
b: '2',
c: '3',
});
storage.connection.disconnect();
});
it('accepts remote doc-update bins as number[] (from native binding)', async () => {
const storage = createStorage();
storage.connection.connect();
await storage.connection.waitForConnected();
const original = createUpdate('remote-array');
const bin = Array.from(original) as unknown as Uint8Array;
emit({
type: 'doc-update',
update: {
docId: 'doc-remote-array',
bin,
timestamp: new Date('2026-01-03T00:00:00.000Z'),
},
});
await vi.waitFor(async () => {
const snapshot = await storage.getDoc('doc-remote-array');
expect(snapshot).not.toBeNull();
const doc = new YDoc();
applyUpdate(doc, snapshot!.bin);
expect(doc.getText('content').toString()).toBe('remote-array');
});
storage.connection.disconnect();
});
it('throws when applyLocalUpdate returns invalid timestamp', async () => {
applyLocalUpdate.mockResolvedValueOnce({
docId: 'doc-invalid-clock',
timestamp: new Date('invalid'),
});
const storage = createStorage();
storage.connection.connect();
await storage.connection.waitForConnected();
await expect(
storage.pushDocUpdate({
docId: 'doc-invalid-clock',
bin: createUpdate('invalid'),
})
).rejects.toThrow('[disk] invalid timestamp');
storage.connection.disconnect();
});
it('skips remote doc-update with invalid timestamp', async () => {
const storage = createStorage();
storage.connection.connect();
await storage.connection.waitForConnected();
emit({
type: 'doc-update',
update: {
docId: 'doc-invalid-remote-clock',
bin: createUpdate('remote-invalid'),
timestamp: new Date('invalid') as unknown as Date,
},
});
await vi.waitFor(async () => {
expect(await storage.getDoc('doc-invalid-remote-clock')).toBeNull();
});
storage.connection.disconnect();
});
it('discovers doc ids from root meta and emits connect-driving updates once', async () => {
const storage = createStorage();
storage.connection.connect();
await storage.connection.waitForConnected();
const seen: Array<{ docId: string; origin?: string; size: number }> = [];
const unsubscribe = storage.subscribeDocUpdate((update, origin) => {
seen.push({
docId: update.docId,
origin,
size: update.bin.byteLength,
});
});
const rootUpdate = createRootMetaUpdate(['doc-a', 'doc-b']);
await storage.pushDocUpdate(
{
docId: 'workspace-test',
bin: rootUpdate,
},
'origin:root'
);
await vi.waitFor(() => {
const discovered = seen.filter(
item => item.origin === 'disk:root-meta-discovery'
);
expect(discovered).toHaveLength(2);
});
const discoveredDocIds = seen
.filter(item => item.origin === 'disk:root-meta-discovery')
.map(item => item.docId)
.sort();
expect(discoveredDocIds).toEqual(['doc-a', 'doc-b']);
expect(
seen
.filter(item => item.origin === 'disk:root-meta-discovery')
.every(item => item.size === 0)
).toBe(true);
await storage.pushDocUpdate(
{
docId: 'workspace-test',
bin: rootUpdate,
},
'origin:root'
);
const discoveryCountAfterSecondPush = seen.filter(
item => item.origin === 'disk:root-meta-discovery'
).length;
expect(discoveryCountAfterSecondPush).toBe(2);
unsubscribe();
storage.connection.disconnect();
});
});

View File

@@ -0,0 +1,291 @@
import { applyUpdate, Doc as YDoc } from 'yjs';
import {
type DocClock,
type DocClocks,
type DocRecord,
DocStorageBase,
type DocUpdate,
} from '../../storage';
import { type SpaceType } from '../../utils/universal-id';
import { DiskSyncConnection, type DiskSyncEvent } from './api';
export interface DiskDocStorageOptions {
readonly flavour: string;
readonly type: SpaceType;
readonly id: string;
readonly syncFolder: string;
}
export class DiskDocStorage extends DocStorageBase<DiskDocStorageOptions> {
static readonly identifier = 'DiskDocStorage';
readonly connection: DiskSyncConnection;
private readonly snapshots = new Map<string, DocRecord>();
private readonly pendingUpdates = new Map<string, DocRecord[]>();
private readonly discoveredRootDocs = new Set<string>();
constructor(options: DiskDocStorageOptions) {
super(options);
this.connection = new DiskSyncConnection(options, this.handleDiskEvent);
}
override async pushDocUpdate(update: DocUpdate, origin?: string) {
const { timestamp } = await this.connection.apis.applyLocalUpdate(
update,
origin
);
const clock = normalizeDate(timestamp);
const next: DocRecord = {
docId: update.docId,
bin: update.bin,
timestamp: clock,
editor: update.editor,
};
await this.applySnapshotUpdate(next, origin);
return { docId: update.docId, timestamp: clock };
}
override async getDocTimestamp(docId: string): Promise<DocClock | null> {
const snapshot = this.snapshots.get(docId);
if (!snapshot) {
return null;
}
return {
docId,
timestamp: snapshot.timestamp,
};
}
override async getDocTimestamps(after?: Date): Promise<DocClocks> {
const timestamps: DocClocks = {};
for (const [docId, snapshot] of this.snapshots.entries()) {
if (after && snapshot.timestamp.getTime() <= after.getTime()) {
continue;
}
timestamps[docId] = snapshot.timestamp;
}
return timestamps;
}
override async deleteDoc(docId: string): Promise<void> {
this.snapshots.delete(docId);
this.pendingUpdates.delete(docId);
}
protected override async getDocSnapshot(docId: string) {
return this.snapshots.get(docId) ?? null;
}
protected override async setDocSnapshot(
snapshot: DocRecord
): Promise<boolean> {
const existing = this.snapshots.get(snapshot.docId);
if (
existing &&
existing.timestamp.getTime() > snapshot.timestamp.getTime()
) {
return false;
}
this.snapshots.set(snapshot.docId, snapshot);
return true;
}
protected override async getDocUpdates(docId: string): Promise<DocRecord[]> {
return this.pendingUpdates.get(docId) ?? [];
}
protected override async markUpdatesMerged(
docId: string,
updates: DocRecord[]
): Promise<number> {
if (updates.length) {
this.pendingUpdates.delete(docId);
}
return updates.length;
}
private readonly handleDiskEvent = (event: DiskSyncEvent) => {
switch (event.type) {
case 'doc-update': {
let timestamp: Date;
try {
timestamp = normalizeDate(event.update.timestamp);
} catch (error) {
console.warn(
'[disk] invalid doc-update timestamp, skip event',
error
);
return;
}
let bin: Uint8Array;
try {
bin = normalizeBin(event.update.bin);
} catch (error) {
console.warn('[disk] invalid doc-update bin, skip event', error);
return;
}
const update: DocRecord = {
docId: event.update.docId,
bin,
timestamp,
editor: event.update.editor,
};
void this.applySnapshotUpdate(update, event.origin).catch(error => {
console.warn(
'[disk] failed to apply remote doc-update, skip event',
error
);
});
return;
}
case 'doc-delete': {
this.snapshots.delete(event.docId);
this.pendingUpdates.delete(event.docId);
return;
}
case 'error': {
console.warn('[disk] session error', event.message);
return;
}
default: {
return;
}
}
};
private async applySnapshotUpdate(update: DocRecord, origin?: string) {
await using _lock = await this.lockDocForUpdate(update.docId);
try {
await this.mergeIntoSnapshot(update);
} catch (error) {
// Snapshot cache is best-effort. A merge failure must not block upstream sync
// forever (it can otherwise require a full app reload to recover).
console.warn(
'[disk] snapshot merge failed, reset in-memory snapshot cache',
error
);
this.snapshots.set(update.docId, update);
}
this.emit('update', update, origin);
if (update.docId === this.spaceId) {
this.emitRootMetaDiscoveryUpdates();
}
}
private async mergeIntoSnapshot(update: DocRecord) {
const current = this.snapshots.get(update.docId);
if (!current) {
this.snapshots.set(update.docId, update);
return;
}
const merged = await this.mergeUpdates([current.bin, update.bin]);
this.snapshots.set(update.docId, {
...update,
bin: merged,
timestamp:
current.timestamp.getTime() > update.timestamp.getTime()
? current.timestamp
: update.timestamp,
editor: update.editor ?? current.editor,
});
}
private emitRootMetaDiscoveryUpdates() {
const rootSnapshot = this.snapshots.get(this.spaceId);
if (!rootSnapshot) {
return;
}
const docIds = extractRootMetaDocIds(rootSnapshot.bin);
// These discovery events are only meant to "introduce" doc ids to the sync
// peer, so it can connect/pull/push them. They should NOT be treated as a
// remote clock; otherwise switching sync folders (remote empty) can be
// incorrectly seen as "remote newer than local" and skip the initial push.
const discoveryTimestamp = new Date(0);
for (const docId of docIds) {
if (docId === this.spaceId || this.discoveredRootDocs.has(docId)) {
continue;
}
this.discoveredRootDocs.add(docId);
this.emit(
'update',
{
docId,
bin: new Uint8Array(),
timestamp: discoveryTimestamp,
},
'disk:root-meta-discovery'
);
}
}
}
function normalizeDate(date: Date | string | number): Date {
const normalized = date instanceof Date ? date : new Date(date);
if (Number.isNaN(normalized.getTime())) {
throw new Error(`[disk] invalid timestamp: ${String(date)}`);
}
return normalized;
}
function extractRootMetaDocIds(rootBin: Uint8Array): string[] {
const doc = new YDoc();
try {
applyUpdate(doc, rootBin);
} catch {
return [];
}
const meta = doc.getMap<unknown>('meta');
const pages = meta.get('pages');
const pagesJson =
typeof pages === 'object' &&
pages !== null &&
'toJSON' in pages &&
typeof pages.toJSON === 'function'
? pages.toJSON()
: pages;
if (!Array.isArray(pagesJson)) {
return [];
}
const docIds: string[] = [];
for (const page of pagesJson) {
if (!page || typeof page !== 'object') {
continue;
}
const id = (page as { id?: unknown }).id;
if (typeof id === 'string' && id.length > 0) {
docIds.push(id);
}
}
return docIds;
}
function normalizeBin(bin: unknown): Uint8Array {
// Native NAPI binding may send `number[]` for `Vec<u8>` fields.
if (bin instanceof Uint8Array) {
return bin;
}
if (Array.isArray(bin)) {
return Uint8Array.from(bin);
}
// Some transports may serialize Buffer as `{ type: 'Buffer', data: number[] }`.
if (
bin &&
typeof bin === 'object' &&
'data' in bin &&
Array.isArray((bin as { data?: unknown }).data)
) {
return Uint8Array.from((bin as { data: number[] }).data);
}
throw new Error(
`[disk] invalid update bin type: ${Object.prototype.toString.call(bin)}`
);
}

View File

@@ -0,0 +1,7 @@
import type { StorageConstructor } from '..';
import { DiskDocStorage } from './doc';
export * from './api';
export * from './doc';
export const diskStorages = [DiskDocStorage] satisfies StorageConstructor[];

View File

@@ -0,0 +1,378 @@
import 'fake-indexeddb/auto';
import { expect, test, vi } from 'vitest';
import { Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import { expectYjsEqual } from '../../__tests__/utils';
import { SpaceStorage } from '../../storage';
import { Sync } from '../../sync';
import { universalId } from '../../utils/universal-id';
import { IndexedDBDocStorage, IndexedDBDocSyncStorage } from '../idb';
import { bindDiskSyncApis, type DiskSyncApis, type DiskSyncEvent } from './api';
import { DiskDocStorage } from './doc';
test('sync local <-> disk remote updates through DocSyncPeer', async () => {
const workspaceId = 'ws-disk-integration';
const sessionId = universalId({
peer: 'local',
type: 'workspace',
id: workspaceId,
});
const listeners = new Map<string, Set<(event: DiskSyncEvent) => void>>();
const remoteDocs = new Map<string, { timestamp: Date; bin: Uint8Array }>();
const apis: DiskSyncApis = {
startSession: async currentSessionId => {
if (!listeners.has(currentSessionId)) {
listeners.set(currentSessionId, new Set());
}
},
stopSession: async currentSessionId => {
listeners.delete(currentSessionId);
},
applyLocalUpdate: async (currentSessionId, update) => {
const timestamp = new Date();
remoteDocs.set(update.docId, { timestamp, bin: update.bin });
for (const callback of listeners.get(currentSessionId) ?? []) {
callback({
type: 'doc-update',
update: {
docId: update.docId,
bin: update.bin,
timestamp,
},
origin: 'sync:disk-mock',
});
}
return {
docId: update.docId,
timestamp,
};
},
subscribeEvents: (currentSessionId, callback) => {
let set = listeners.get(currentSessionId);
if (!set) {
set = new Set();
listeners.set(currentSessionId, set);
}
set.add(callback);
return () => {
set?.delete(callback);
};
},
};
bindDiskSyncApis(apis);
const localDoc = new IndexedDBDocStorage({
id: workspaceId,
flavour: 'local',
type: 'workspace',
});
const localDocSync = new IndexedDBDocSyncStorage({
id: workspaceId,
flavour: 'local',
type: 'workspace',
});
const remoteDoc = new DiskDocStorage({
id: workspaceId,
flavour: 'local',
type: 'workspace',
syncFolder: '/tmp/disk-sync',
});
const local = new SpaceStorage({
doc: localDoc,
docSync: localDocSync,
});
const remote = new SpaceStorage({
doc: remoteDoc,
});
local.connect();
remote.connect();
await local.waitForConnected();
await remote.waitForConnected();
const sync = new Sync({
local,
remotes: {
disk: remote,
},
});
sync.start();
const localSource = new YDoc();
localSource.getMap('test').set('origin', 'local');
await localDoc.pushDocUpdate({
docId: 'doc-local',
bin: encodeStateAsUpdate(localSource),
});
await vi.waitFor(() => {
expect(remoteDocs.has('doc-local')).toBe(true);
});
const remoteSource = new YDoc();
remoteSource.getMap('test').set('origin', 'remote');
remoteSource.getMap('test').set('synced', 'yes');
const remoteUpdate = encodeStateAsUpdate(remoteSource);
const remoteTimestamp = new Date('2026-01-05T00:00:00.000Z');
for (const callback of listeners.get(sessionId) ?? []) {
callback({
type: 'doc-update',
update: {
docId: 'doc-remote',
bin: remoteUpdate,
timestamp: remoteTimestamp,
},
});
}
await vi.waitFor(async () => {
const doc = await localDoc.getDoc('doc-remote');
expect(doc).not.toBeNull();
expectYjsEqual(doc!.bin, {
test: {
origin: 'remote',
synced: 'yes',
},
});
});
sync.stop();
// Intentionally keep IndexedDB connections open in tests. Disconnecting can
// abort in-flight IDB transactions in fake-indexeddb and surface as unhandled
// rejections, which makes Vitest fail the run.
remote.disconnect();
});
test('forces initial push when disk has stale pushed clocks but remote is empty', async () => {
const workspaceId = 'ws-disk-stale-push';
const listeners = new Map<string, Set<(event: DiskSyncEvent) => void>>();
const remoteDocs = new Map<string, { timestamp: Date; bin: Uint8Array }>();
const apis: DiskSyncApis = {
startSession: async currentSessionId => {
if (!listeners.has(currentSessionId)) {
listeners.set(currentSessionId, new Set());
}
},
stopSession: async currentSessionId => {
listeners.delete(currentSessionId);
},
applyLocalUpdate: async (currentSessionId, update) => {
const timestamp = new Date();
remoteDocs.set(update.docId, { timestamp, bin: update.bin });
for (const callback of listeners.get(currentSessionId) ?? []) {
callback({
type: 'doc-update',
update: {
docId: update.docId,
bin: update.bin,
timestamp,
},
origin: 'sync:disk-mock',
});
}
return {
docId: update.docId,
timestamp,
};
},
subscribeEvents: (currentSessionId, callback) => {
let set = listeners.get(currentSessionId);
if (!set) {
set = new Set();
listeners.set(currentSessionId, set);
}
set.add(callback);
return () => {
set?.delete(callback);
};
},
};
bindDiskSyncApis(apis);
const localDoc = new IndexedDBDocStorage({
id: workspaceId,
flavour: 'local',
type: 'workspace',
});
const localDocSync = new IndexedDBDocSyncStorage({
id: workspaceId,
flavour: 'local',
type: 'workspace',
});
const remoteDoc = new DiskDocStorage({
id: workspaceId,
flavour: 'local',
type: 'workspace',
syncFolder: '/tmp/disk-sync-stale',
});
const local = new SpaceStorage({
doc: localDoc,
docSync: localDocSync,
});
const remote = new SpaceStorage({
doc: remoteDoc,
});
local.connect();
remote.connect();
await local.waitForConnected();
await remote.waitForConnected();
const source = new YDoc();
source.getMap('test').set('value', 'local');
await localDoc.pushDocUpdate({
docId: 'doc-local-stale',
bin: encodeStateAsUpdate(source),
});
await localDocSync.setPeerPushedClock('disk', {
docId: 'doc-local-stale',
timestamp: new Date('2099-01-01T00:00:00.000Z'),
});
const sync = new Sync({
local,
remotes: {
disk: remote,
},
});
sync.start();
await vi.waitFor(() => {
expect(remoteDocs.has('doc-local-stale')).toBe(true);
});
sync.stop();
remote.disconnect();
});
test('root-meta discovery must not block pushing page docs when switching disk folders', async () => {
const workspaceId = 'ws-disk-discovery-nonblocking';
const pageDocId = 'page-doc-1';
const listeners = new Map<string, Set<(event: DiskSyncEvent) => void>>();
const remoteDocs = new Map<string, { timestamp: Date; bin: Uint8Array }>();
const apis: DiskSyncApis = {
startSession: async currentSessionId => {
if (!listeners.has(currentSessionId)) {
listeners.set(currentSessionId, new Set());
}
},
stopSession: async currentSessionId => {
listeners.delete(currentSessionId);
},
applyLocalUpdate: async (currentSessionId, update) => {
const timestamp = new Date();
remoteDocs.set(update.docId, { timestamp, bin: update.bin });
for (const callback of listeners.get(currentSessionId) ?? []) {
callback({
type: 'doc-update',
update: {
docId: update.docId,
bin: update.bin,
timestamp,
},
origin: 'sync:disk-mock',
});
}
return {
docId: update.docId,
timestamp,
};
},
subscribeEvents: (currentSessionId, callback) => {
let set = listeners.get(currentSessionId);
if (!set) {
set = new Set();
listeners.set(currentSessionId, set);
}
set.add(callback);
return () => {
set?.delete(callback);
};
},
};
bindDiskSyncApis(apis);
const localDoc = new IndexedDBDocStorage({
id: workspaceId,
flavour: 'local',
type: 'workspace',
});
const localDocSync = new IndexedDBDocSyncStorage({
id: workspaceId,
flavour: 'local',
type: 'workspace',
});
const remoteDoc = new DiskDocStorage({
id: workspaceId,
flavour: 'local',
type: 'workspace',
syncFolder: '/tmp/disk-sync-discovery',
});
const local = new SpaceStorage({
doc: localDoc,
docSync: localDocSync,
});
const remote = new SpaceStorage({
doc: remoteDoc,
});
local.connect();
remote.connect();
await local.waitForConnected();
await remote.waitForConnected();
// Seed local root meta so disk can discover the page doc id from it.
const root = new YDoc();
const meta = root.getMap('meta');
meta.set('pages', [{ id: pageDocId }]);
await localDoc.pushDocUpdate({
docId: workspaceId,
bin: encodeStateAsUpdate(root),
});
// Seed the page doc itself.
const page = new YDoc();
page.getMap('test').set('value', 'local');
const { timestamp: pageClock } = await localDoc.pushDocUpdate({
docId: pageDocId,
bin: encodeStateAsUpdate(page),
});
// Simulate "already pushed" clocks from a previous disk folder.
await localDocSync.setPeerPushedClock('disk', {
docId: pageDocId,
timestamp: pageClock,
});
const sync = new Sync({
local,
remotes: {
disk: remote,
},
});
// Match workspace engine behavior: sync root doc first.
sync.doc.addPriority(workspaceId, 100);
sync.start();
await vi.waitFor(() => {
expect(remoteDocs.has(pageDocId)).toBe(true);
});
sync.stop();
remote.disconnect();
});

View File

@@ -1,6 +1,7 @@
import type { Storage } from '../storage';
import type { broadcastChannelStorages } from './broadcast-channel';
import type { cloudStorages } from './cloud';
import type { diskStorages } from './disk';
import type { idbStorages } from './idb';
import type { idbV1Storages } from './idb/v1';
import type { sqliteStorages } from './sqlite';
@@ -15,6 +16,7 @@ type Storages =
| typeof cloudStorages
| typeof idbV1Storages
| typeof idbStorages
| typeof diskStorages
| typeof sqliteStorages
| typeof sqliteV1Storages
| typeof broadcastChannelStorages;

View File

@@ -241,13 +241,16 @@ export class DocSyncPeer {
(await this.syncMetadata.getPeerPushedClock(this.peerId, docId))
?.timestamp ?? null;
const clock = await this.local.getDocTimestamp(docId);
const remoteClock = this.status.remoteClocks.get(docId) ?? null;
throwIfAborted(signal);
if (
!this.remote.isReadonly &&
clock &&
(pushedClock === null ||
pushedClock.getTime() < clock.timestamp.getTime())
pushedClock.getTime() < clock.timestamp.getTime() ||
remoteClock === null ||
remoteClock.getTime() < clock.timestamp.getTime())
) {
await this.jobs.pullAndPush(docId, signal);
} else {
@@ -255,7 +258,6 @@ export class DocSyncPeer {
const pulled =
(await this.syncMetadata.getPeerPulledRemoteClock(this.peerId, docId))
?.timestamp ?? null;
const remoteClock = this.status.remoteClocks.get(docId);
if (
remoteClock &&
(pulled === null || pulled.getTime() < remoteClock.getTime())
@@ -676,10 +678,12 @@ export class DocSyncPeer {
this.actions.addDoc(docId);
}
const forceFullRemoteClockRefresh = this.peerId === 'disk';
// get cached clocks from metadata
const cachedClocks = await this.syncMetadata.getPeerRemoteClocks(
this.peerId
);
const cachedClocks = forceFullRemoteClockRefresh
? {}
: await this.syncMetadata.getPeerRemoteClocks(this.peerId);
this.status.remoteClocks.clear();
throwIfAborted(signal);
for (const [id, v] of Object.entries(cachedClocks)) {
@@ -687,9 +691,14 @@ export class DocSyncPeer {
}
this.statusUpdatedSubject$.next(true);
// get new clocks from server
const maxClockValue = this.status.remoteClocks.max;
// get clocks from server
const maxClockValue = forceFullRemoteClockRefresh
? undefined
: this.status.remoteClocks.max;
const newClocks = await this.remote.getDocTimestamps(maxClockValue);
if (forceFullRemoteClockRefresh) {
this.status.remoteClocks.clear();
}
for (const [id, v] of Object.entries(newClocks)) {
this.status.remoteClocks.set(id, v);
}

View File

@@ -1,4 +1,5 @@
import { OpConsumer } from '@toeverything/infra/op';
import { isEqual } from 'lodash-es';
import { Observable } from 'rxjs';
import { type StorageConstructor } from '../impls';
@@ -13,8 +14,9 @@ import type { StoreInitOptions, WorkerManagerOps, WorkerOps } from './ops';
export type { WorkerManagerOps };
class StoreConsumer {
private readonly storages: PeerStorageOptions<SpaceStorage>;
private readonly sync: Sync;
private storages: PeerStorageOptions<SpaceStorage> | null = null;
private sync: Sync | null = null;
private initOptions: StoreInitOptions;
get ensureLocal() {
if (!this.storages) {
@@ -70,20 +72,29 @@ class StoreConsumer {
private readonly availableStorageImplementations: StorageConstructor[],
init: StoreInitOptions
) {
this.initOptions = init;
this.initWithOptions(init);
}
private createStorage(opt: any): any {
if (opt === undefined) {
return undefined;
}
const Storage = this.availableStorageImplementations.find(
impl => impl.identifier === opt.name
);
if (!Storage) {
throw new Error(`Storage implementation ${opt.name} not found`);
}
return new Storage(opt.opts as any);
}
private initWithOptions(init: StoreInitOptions) {
this.storages = {
local: new SpaceStorage(
Object.fromEntries(
Object.entries(init.local).map(([type, opt]) => {
if (opt === undefined) {
return [type, undefined];
}
const Storage = this.availableStorageImplementations.find(
impl => impl.identifier === opt.name
);
if (!Storage) {
throw new Error(`Storage implementation ${opt.name} not found`);
}
return [type, new Storage(opt.opts as any)];
return [type, this.createStorage(opt)];
})
)
),
@@ -94,18 +105,7 @@ class StoreConsumer {
new SpaceStorage(
Object.fromEntries(
Object.entries(opts).map(([type, opt]) => {
if (opt === undefined) {
return [type, undefined];
}
const Storage = this.availableStorageImplementations.find(
impl => impl.identifier === opt.name
);
if (!Storage) {
throw new Error(
`Storage implementation ${opt.name} not found`
);
}
return [type, new Storage(opt.opts as any)];
return [type, this.createStorage(opt)];
})
)
),
@@ -125,6 +125,69 @@ class StoreConsumer {
this.registerHandlers(consumer);
}
async reconfigure(init: StoreInitOptions) {
if (isEqual(this.initOptions, init)) {
return;
}
// If local storage config changes, fall back to full teardown/rebuild.
// (Remote-only changes are expected, like enabling folder sync.)
if (
!this.storages ||
!this.sync ||
!isEqual(this.initOptions.local, init.local)
) {
await this.destroy();
this.initOptions = init;
this.initWithOptions(init);
return;
}
// Remote-only change: rebuild sync graph and remote storages in-place so
// existing OpConsumers keep working.
const prevInit = this.initOptions;
const storages = this.storages;
this.sync.stop();
// Destroy removed or changed remote peers.
for (const [peerId, prevPeerOpts] of Object.entries(prevInit.remotes)) {
const nextPeerOpts = init.remotes[peerId];
const changed = !nextPeerOpts || !isEqual(prevPeerOpts, nextPeerOpts);
if (!changed) {
continue;
}
const remote = storages.remotes[peerId];
if (remote) {
delete storages.remotes[peerId];
remote.disconnect();
await remote.destroy();
}
}
// Create added or changed remote peers.
for (const [peerId, nextPeerOpts] of Object.entries(init.remotes)) {
const prevPeerOpts = prevInit.remotes[peerId];
const changed = !prevPeerOpts || !isEqual(prevPeerOpts, nextPeerOpts);
if (!changed) {
continue;
}
const remote = new SpaceStorage(
Object.fromEntries(
Object.entries(nextPeerOpts).map(([type, opt]) => {
return [type, this.createStorage(opt)];
})
)
);
storages.remotes[peerId] = remote;
remote.connect();
}
this.sync = new Sync(storages);
this.sync.start();
this.initOptions = init;
}
async destroy() {
this.sync?.stop();
this.storages?.local.disconnect();
@@ -133,6 +196,9 @@ class StoreConsumer {
remote.disconnect();
await remote.destroy();
}
this.sync = null;
this.storages = null;
}
private readonly ENABLE_BATTERY_SAVE_MODE_DELAY = 1000;
@@ -337,7 +403,12 @@ export class StoreManagerConsumer {
private readonly storeDisposers = new Map<string, () => void>();
private readonly storePool = new Map<
string,
{ store: StoreConsumer; refCount: number }
{
store: StoreConsumer;
refCount: number;
options: StoreInitOptions;
reconfiguring?: Promise<void>;
}
>();
private readonly telemetry = new TelemetryManager();
@@ -360,7 +431,22 @@ export class StoreManagerConsumer {
this.availableStorageImplementations,
options
);
storeRef = { store, refCount: 0 };
storeRef = { store, refCount: 0, options };
} else if (!isEqual(storeRef.options, options)) {
const currentStoreRef = storeRef;
// Options can change across renderer reloads (or when features like
// folder sync are enabled). Reconfigure the shared store in-place
// so existing consumers keep working with the latest remotes.
currentStoreRef.reconfiguring = (
currentStoreRef.reconfiguring ?? Promise.resolve()
)
.then(async () => {
await currentStoreRef.store.reconfigure(options);
currentStoreRef.options = options;
})
.catch(error => {
console.error('failed to reconfigure store', key, error);
});
}
storeRef.refCount++;

View File

@@ -34,6 +34,7 @@
"@types/react": "^19.0.1",
"@types/react-dom": "^19.0.2",
"cross-env": "^10.1.0",
"typescript": "^5.9.3"
"typescript": "^5.9.3",
"vitest": "^3.2.4"
}
}

View File

@@ -60,6 +60,20 @@ export function setupStoreManager(framework: Framework) {
framework.impl(NbstoreProvider, {
openStore(key, options) {
try {
// E2E/debug only: capture init options passed to the nbstore worker.
(globalThis as any).__e2eNbstoreOpenStoreLogs =
(globalThis as any).__e2eNbstoreOpenStoreLogs ?? [];
(globalThis as any).__e2eNbstoreOpenStoreLogs.push({
key,
remotes: Object.keys(options?.remotes ?? {}),
diskSyncFolder:
(options as any)?.remotes?.['disk']?.doc?.opts?.syncFolder ?? null,
});
} catch {
// ignore
}
const { store, dispose } = storeManagerClient.open(key, options);
return {

View File

@@ -0,0 +1,79 @@
import type { DiskSyncEvent } from '@affine/nbstore/disk';
import { describe, expect, it, vi } from 'vitest';
import { createDiskSyncApis } from './disk-sync-bridge';
describe('createDiskSyncApis', () => {
it('forwards handler calls and filters events by session id', async () => {
const startSession = vi.fn(async () => {});
const stopSession = vi.fn(async () => {});
const applyLocalUpdate = vi.fn(async () => ({
docId: 'doc-1',
timestamp: new Date('2026-01-04T00:00:00.000Z'),
}));
const listeners = new Set<
(payload: { sessionId: string; event: DiskSyncEvent }) => void
>();
const onEvent = vi.fn(
(
callback: (payload: { sessionId: string; event: DiskSyncEvent }) => void
) => {
listeners.add(callback);
return () => {
listeners.delete(callback);
};
}
);
const apis = createDiskSyncApis(
{ startSession, stopSession, applyLocalUpdate },
{ onEvent }
);
await apis.startSession('session-a', {
workspaceId: 'workspace-a',
syncFolder: '/tmp/sync-a',
});
await apis.stopSession('session-a');
await apis.applyLocalUpdate('session-a', {
docId: 'doc-1',
bin: new Uint8Array([1, 2, 3]),
});
expect(startSession).toHaveBeenCalledWith('session-a', {
workspaceId: 'workspace-a',
syncFolder: '/tmp/sync-a',
});
expect(stopSession).toHaveBeenCalledWith('session-a');
expect(applyLocalUpdate).toHaveBeenCalledWith(
'session-a',
expect.objectContaining({ docId: 'doc-1' }),
undefined
);
const callback = vi.fn();
const unsubscribe = apis.subscribeEvents('session-a', callback);
expect(onEvent).toHaveBeenCalledTimes(1);
const docUpdate: DiskSyncEvent = {
type: 'doc-update',
update: {
docId: 'doc-2',
bin: new Uint8Array([4, 5, 6]),
timestamp: new Date('2026-01-04T00:00:01.000Z'),
},
};
for (const listener of listeners) {
listener({ sessionId: 'session-b', event: { type: 'ready' } });
listener({ sessionId: 'session-a', event: docUpdate });
}
expect(callback).toHaveBeenCalledTimes(1);
expect(callback).toHaveBeenCalledWith(docUpdate);
unsubscribe();
});
});

View File

@@ -0,0 +1,52 @@
import type { DocClock, DocUpdate } from '@affine/nbstore';
import type {
DiskSessionOptions,
DiskSyncApis,
DiskSyncEvent,
} from '@affine/nbstore/disk';
type DiskSyncEventPayload = {
sessionId: string;
event: DiskSyncEvent;
};
interface DiskSyncHandlers {
startSession: (
sessionId: string,
options: DiskSessionOptions
) => Promise<void>;
stopSession: (sessionId: string) => Promise<void>;
applyLocalUpdate: (
sessionId: string,
update: DocUpdate,
origin?: string
) => Promise<DocClock>;
}
interface DiskSyncEvents {
onEvent: (callback: (payload: DiskSyncEventPayload) => void) => () => void;
}
export function createDiskSyncApis(
handlers: DiskSyncHandlers,
events: DiskSyncEvents
): DiskSyncApis {
return {
startSession: (sessionId, options) => {
return handlers.startSession(sessionId, options);
},
stopSession: sessionId => {
return handlers.stopSession(sessionId);
},
applyLocalUpdate: (sessionId, update, origin) => {
return handlers.applyLocalUpdate(sessionId, update, origin);
},
subscribeEvents: (sessionId, callback) => {
return events.onEvent(payload => {
if (payload.sessionId === sessionId) {
callback(payload.event);
}
});
},
};
}

View File

@@ -1,8 +1,9 @@
import '@affine/core/bootstrap/electron';
import { apis } from '@affine/electron-api';
import { apis, events } from '@affine/electron-api';
import { broadcastChannelStorages } from '@affine/nbstore/broadcast-channel';
import { cloudStorages } from '@affine/nbstore/cloud';
import { bindDiskSyncApis, diskStorages } from '@affine/nbstore/disk';
import { bindNativeDBApis, sqliteStorages } from '@affine/nbstore/sqlite';
import {
bindNativeDBV1Apis,
@@ -14,14 +15,19 @@ import {
} from '@affine/nbstore/worker/consumer';
import { OpConsumer } from '@toeverything/infra/op';
import { createDiskSyncApis } from './disk-sync-bridge';
// oxlint-disable-next-line no-non-null-assertion
bindNativeDBApis(apis!.nbstore);
// oxlint-disable-next-line no-non-null-assertion
bindNativeDBV1Apis(apis!.db);
// oxlint-disable-next-line no-non-null-assertion
bindDiskSyncApis(createDiskSyncApis(apis!.diskSync, events!.diskSync));
const storeManager = new StoreManagerConsumer([
...sqliteStorages,
...sqliteV1Storages,
...diskStorages,
...broadcastChannelStorages,
...cloudStorages,
]);

View File

@@ -0,0 +1,169 @@
import fs from 'node:fs';
import path from 'node:path';
import type { DiskSyncEvent as NativeDiskSyncEvent } from '@affine/native';
import { DiskSync } from '@affine/native';
import type { DocClock, DocUpdate } from '@affine/nbstore';
import type { DiskSessionOptions, DiskSyncEvent } from '@affine/nbstore/disk';
import { diskSyncSubjects } from './subjects';
interface DiskSyncSubscriber {
unsubscribe(): Promise<void | Error> | void | Error;
}
type NapiMaybe<T> = T | Error;
function unwrapNapiResult<T>(result: NapiMaybe<T>, action: string): T {
if (result instanceof Error) {
throw new Error(`[disk] ${action} failed: ${result.message}`);
}
return result;
}
function normalizeTimestamp(timestamp: unknown): Date | null {
const normalized =
timestamp instanceof Date ? timestamp : new Date(timestamp as string);
if (Number.isNaN(normalized.getTime())) {
return null;
}
return normalized;
}
function normalizeDiskSyncEvent(
event: NativeDiskSyncEvent
): DiskSyncEvent | null {
switch (event.type) {
case 'ready':
return { type: 'ready' };
case 'doc-update': {
if (!event.update || !(event.update.bin instanceof Uint8Array)) {
return null;
}
const timestamp = normalizeTimestamp(event.update.timestamp);
if (!timestamp) {
return null;
}
return {
type: 'doc-update',
update: {
docId: event.update.docId,
bin: event.update.bin,
timestamp,
editor: event.update.editor,
},
origin: event.origin,
};
}
case 'doc-delete': {
if (typeof event.docId !== 'string') {
return null;
}
const timestamp = normalizeTimestamp(event.timestamp);
if (!timestamp) {
return null;
}
return {
type: 'doc-delete',
docId: event.docId,
timestamp,
};
}
case 'error': {
if (typeof event.message !== 'string') {
return null;
}
return {
type: 'error',
message: event.message,
};
}
default:
return null;
}
}
type DiskSyncRuntime = InstanceType<typeof DiskSync> & {
startSession(
sessionId: string,
options: DiskSessionOptions
): Promise<NapiMaybe<void>>;
stopSession(sessionId: string): Promise<NapiMaybe<void>>;
applyLocalUpdate(
sessionId: string,
update: DocUpdate,
origin?: string
): Promise<NapiMaybe<DocClock>>;
subscribeEvents(
sessionId: string,
callback: (err: Error | null, event: NativeDiskSyncEvent) => void
): Promise<NapiMaybe<DiskSyncSubscriber>>;
};
const diskSync = new DiskSync() as DiskSyncRuntime;
const subscriptions = new Map<string, () => Promise<void>>();
function e2eLog(options: DiskSessionOptions, line: string) {
if (process.env.AFFINE_E2E !== '1') {
return;
}
try {
const p = path.join(options.syncFolder, '.disk-e2e.log');
fs.appendFileSync(p, `${new Date().toISOString()}\t${line}\n`, 'utf8');
} catch {
// ignore
}
}
export async function startSession(
sessionId: string,
options: DiskSessionOptions
): Promise<void> {
e2eLog(
options,
`startSession\t${sessionId}\tworkspaceId=${options.workspaceId}\tsyncFolder=${options.syncFolder}`
);
unwrapNapiResult(
await diskSync.startSession(sessionId, options),
'startSession'
);
if (subscriptions.has(sessionId)) {
return;
}
const subscriber = unwrapNapiResult(
await diskSync.subscribeEvents(sessionId, (err, event) => {
if (err) {
return;
}
const normalizedEvent = normalizeDiskSyncEvent(event);
if (!normalizedEvent) {
return;
}
diskSyncSubjects.event$.next({ sessionId, event: normalizedEvent });
}),
'subscribeEvents'
);
subscriptions.set(sessionId, async () => {
unwrapNapiResult(await subscriber.unsubscribe(), 'unsubscribe');
});
}
export async function stopSession(sessionId: string): Promise<void> {
await subscriptions.get(sessionId)?.();
subscriptions.delete(sessionId);
unwrapNapiResult(await diskSync.stopSession(sessionId), 'stopSession');
}
export async function applyLocalUpdate(
sessionId: string,
update: DocUpdate,
origin?: string
): Promise<DocClock> {
// syncFolder isn't directly available here; we log per session start only.
return unwrapNapiResult(
await diskSync.applyLocalUpdate(sessionId, update, origin),
'applyLocalUpdate'
);
}

View File

@@ -0,0 +1,20 @@
import type { MainEventRegister } from '../type';
import { applyLocalUpdate, startSession, stopSession } from './handlers';
import { diskSyncSubjects } from './subjects';
export const diskSyncHandlers = {
startSession,
stopSession,
applyLocalUpdate,
};
export const diskSyncEvents = {
onEvent: ((callback: (...args: any[]) => void) => {
const subscription = diskSyncSubjects.event$.subscribe(payload => {
callback(payload);
});
return () => {
subscription.unsubscribe();
};
}) satisfies MainEventRegister,
};

View File

@@ -0,0 +1,11 @@
import type { DiskSyncEvent } from '@affine/nbstore/disk';
import { Subject } from 'rxjs';
export interface DiskSyncSessionEvent {
sessionId: string;
event: DiskSyncEvent;
}
export const diskSyncSubjects = {
event$: new Subject<DiskSyncSessionEvent>(),
};

View File

@@ -1,4 +1,5 @@
import { dialogHandlers } from './dialog';
import { diskSyncEvents, diskSyncHandlers } from './disk-sync';
import { dbEventsV1, dbHandlersV1, nbstoreHandlers } from './nbstore';
import { provideExposed } from './provide';
import { workspaceEvents, workspaceHandlers } from './workspace';
@@ -6,12 +7,14 @@ import { workspaceEvents, workspaceHandlers } from './workspace';
export const handlers = {
db: dbHandlersV1,
nbstore: nbstoreHandlers,
diskSync: diskSyncHandlers,
workspace: workspaceHandlers,
dialog: dialogHandlers,
};
export const events = {
db: dbEventsV1,
diskSync: diskSyncEvents,
workspace: workspaceEvents,
};

View File

@@ -82,30 +82,33 @@ function createSharedStorageApi(
}
});
const initPromise = (async () => {
try {
memory.setAll(init);
const latest = await ipcRenderer.invoke(
AFFINE_API_CHANNEL_NAME,
event === 'onGlobalStateChanged'
? 'sharedStorage:getAllGlobalState'
: 'sharedStorage:getAllGlobalCache'
);
if (latest && typeof latest === 'object') {
memory.setAll(latest);
}
} catch (err) {
console.error('Failed to load initial shared storage', err);
} finally {
loaded = true;
while (updateQueue.length) {
const updates = updateQueue.shift();
if (updates) {
applyUpdates(updates);
}
// Load initial state synchronously so consumers can read values during early
// bootstrap without awaiting `ready`. This prevents feature config races
// (e.g. folder sync remote options) on first app load.
try {
memory.setAll(init);
const latest = ipcRenderer.sendSync(
AFFINE_API_CHANNEL_NAME,
event === 'onGlobalStateChanged'
? 'sharedStorage:getAllGlobalState'
: 'sharedStorage:getAllGlobalCache'
);
if (latest && typeof latest === 'object') {
memory.setAll(latest);
}
} catch (err) {
console.error('Failed to load initial shared storage (sync)', err);
} finally {
loaded = true;
while (updateQueue.length) {
const updates = updateQueue.shift();
if (updates) {
applyUpdates(updates);
}
}
})();
}
const initPromise = Promise.resolve();
return {
ready: initPromise,

View File

@@ -0,0 +1,118 @@
import type { DiskSyncEvent } from '@affine/nbstore/disk';
import { beforeEach, describe, expect, it, vi } from 'vitest';
const diskSyncMocks = vi.hoisted(() => {
return {
startSession: vi.fn(async () => {}),
stopSession: vi.fn(async () => {}),
applyLocalUpdate: vi.fn(async () => ({
docId: 'doc-1',
timestamp: new Date('2026-01-06T00:00:00.000Z'),
})),
subscribeEvents: vi.fn(
(
_sessionId: string,
_callback: (err: Error | null, event: DiskSyncEvent) => void
) => {
return Promise.resolve({
unsubscribe: () => {},
});
}
),
};
});
vi.mock('@affine/native', () => {
class DiskSyncMock {
subscribeEvents(
sessionId: string,
callback: (err: Error | null, event: DiskSyncEvent) => void
) {
return diskSyncMocks.subscribeEvents(sessionId, callback);
}
startSession(
sessionId: string,
options: { workspaceId: string; syncFolder: string }
) {
return diskSyncMocks.startSession(sessionId, options);
}
stopSession(sessionId: string) {
return diskSyncMocks.stopSession(sessionId);
}
applyLocalUpdate(
sessionId: string,
update: { docId: string; bin: Uint8Array },
origin?: string
) {
return diskSyncMocks.applyLocalUpdate(sessionId, update, origin);
}
}
return { DiskSync: DiskSyncMock };
});
import {
applyLocalUpdate,
startSession,
stopSession,
} from '../../src/helper/disk-sync/handlers';
import { diskSyncSubjects } from '../../src/helper/disk-sync/subjects';
describe('disk helper handlers', () => {
beforeEach(() => {
vi.clearAllMocks();
});
it('forwards subscribeEvents payload and unsubscribes on stop', async () => {
const unsubscribe = vi.fn();
diskSyncMocks.subscribeEvents.mockImplementation(
(
_sessionId: string,
callback: (err: Error | null, event: DiskSyncEvent) => void
) => {
callback(null, {
type: 'ready',
} as DiskSyncEvent);
return Promise.resolve({
unsubscribe,
});
}
);
const seen: string[] = [];
const subscription = diskSyncSubjects.event$.subscribe(payload => {
seen.push(payload.event.type);
});
await startSession('session-subscribe', {
workspaceId: 'workspace-subscribe',
syncFolder: '/tmp/disk-sync',
});
expect(seen).toContain('ready');
expect(diskSyncMocks.subscribeEvents).toHaveBeenCalledWith(
'session-subscribe',
expect.any(Function)
);
await stopSession('session-subscribe');
expect(unsubscribe).toHaveBeenCalledTimes(1);
subscription.unsubscribe();
});
it('throws when native applyLocalUpdate returns Error payload', async () => {
diskSyncMocks.applyLocalUpdate.mockResolvedValueOnce(
new Error('invalid_binary')
);
await expect(
applyLocalUpdate('session-subscribe', {
docId: 'doc-failed',
bin: new Uint8Array([1, 2, 3]),
})
).rejects.toThrow('[disk] applyLocalUpdate failed: invalid_binary');
});
});

View File

@@ -0,0 +1,7 @@
export function shouldReloadDiskSyncSession(
enabled: boolean,
previousFolder: string | null,
nextFolder: string | null
) {
return enabled && previousFolder !== nextFolder;
}

View File

@@ -0,0 +1,27 @@
import { describe, expect, it } from 'vitest';
import { shouldReloadDiskSyncSession } from './disk-sync-session';
describe('shouldReloadDiskSyncSession', () => {
it('does not reload when feature is disabled', () => {
expect(
shouldReloadDiskSyncSession(false, '/tmp/folder-a', '/tmp/folder-b')
).toBe(false);
});
it('does not reload when folder is unchanged', () => {
expect(
shouldReloadDiskSyncSession(true, '/tmp/folder-a', '/tmp/folder-a')
).toBe(false);
});
it('reloads when folder changes while enabled', () => {
expect(
shouldReloadDiskSyncSession(true, '/tmp/folder-a', '/tmp/folder-b')
).toBe(true);
});
it('reloads when clearing folder while enabled', () => {
expect(shouldReloadDiskSyncSession(true, '/tmp/folder-a', null)).toBe(true);
});
});

View File

@@ -0,0 +1,116 @@
import { notify, Switch } from '@affine/component';
import { SettingRow } from '@affine/component/setting-components';
import { Button } from '@affine/component/ui/button';
import { useAsyncCallback } from '@affine/core/components/hooks/affine-async-hooks';
import { DesktopApiService } from '@affine/core/modules/desktop-api';
import { FeatureFlagService } from '@affine/core/modules/feature-flag';
import {
DISK_SYNC_FOLDERS_GLOBAL_STATE_KEY,
getDiskSyncFolderPath,
setDiskSyncFolderPath,
} from '@affine/core/modules/workspace-engine/impls/disk-config';
import { useLiveData, useService } from '@toeverything/infra';
import { useCallback, useEffect, useState } from 'react';
import { shouldReloadDiskSyncSession } from './disk-sync-session';
export const DiskSyncPanel = ({ workspaceId }: { workspaceId: string }) => {
const desktopApi = useService(DesktopApiService);
const featureFlagService = useService(FeatureFlagService);
const enabled = useLiveData(featureFlagService.flags.enable_disk_sync.$);
const [folder, setFolder] = useState<string | null>(() =>
getDiskSyncFolderPath(workspaceId)
);
useEffect(() => {
setFolder(getDiskSyncFolderPath(workspaceId));
const unwatch = desktopApi.sharedStorage.globalState.watch<
Record<string, string>
>(DISK_SYNC_FOLDERS_GLOBAL_STATE_KEY, folders => {
const next = folders?.[workspaceId];
setFolder(typeof next === 'string' && next.length > 0 ? next : null);
});
return () => {
unwatch();
};
}, [desktopApi.sharedStorage.globalState, workspaceId]);
const onToggle = useCallback(
(checked: boolean) => {
featureFlagService.flags.enable_disk_sync.set(checked);
},
[featureFlagService]
);
const onChooseFolder = useAsyncCallback(async () => {
const result = await desktopApi.handler.dialog.selectDBFileLocation();
if (result?.canceled || !result?.filePath) {
return;
}
if (result.filePath === folder) {
return;
}
setDiskSyncFolderPath(workspaceId, result.filePath);
setFolder(result.filePath);
if (shouldReloadDiskSyncSession(enabled, folder, result.filePath)) {
window.location.reload();
return;
}
notify.success({
title: 'Disk sync folder updated',
});
}, [desktopApi.handler.dialog, enabled, folder, workspaceId]);
const onClearFolder = useCallback(() => {
if (!folder) {
return;
}
setDiskSyncFolderPath(workspaceId, null);
setFolder(null);
if (shouldReloadDiskSyncSession(enabled, folder, null)) {
window.location.reload();
}
}, [enabled, folder, workspaceId]);
return (
<>
<SettingRow
name={'Markdown Folder Sync (Experimental)'}
desc={
'Enable local-folder Markdown sync through native pseudo remote (Electron only).'
}
>
<Switch
aria-label="Disk Markdown Sync"
data-testid="disk-sync-toggle"
checked={!!enabled}
onChange={onToggle}
/>
</SettingRow>
<SettingRow
name={'Sync Folder'}
desc={folder ?? 'No folder selected'}
spreadCol={false}
>
<div style={{ display: 'flex', gap: 8, marginTop: 8 }}>
<Button
data-testid="disk-sync-choose-folder"
disabled={!enabled}
onClick={onChooseFolder}
>
Choose Folder
</Button>
{folder ? (
<Button
data-testid="disk-sync-clear-folder"
disabled={!enabled}
onClick={onClearFolder}
>
Clear
</Button>
) : null}
</div>
</SettingRow>
</>
);
};

View File

@@ -9,6 +9,7 @@ import { useLiveData, useService } from '@toeverything/infra';
import { EnableCloudPanel } from '../preference/enable-cloud';
import { BlobManagementPanel } from './blob-management';
import { DiskSyncPanel } from './disk-sync';
import { DesktopExportPanel } from './export';
import { WorkspaceQuotaPanel } from './workspace-quota';
@@ -35,6 +36,11 @@ export const WorkspaceSettingStorage = ({
{workspace.flavour === 'local' ? (
<>
<EnableCloudPanel onCloseSetting={onCloseSetting} />{' '}
{BUILD_CONFIG.isElectron && (
<SettingWrapper>
<DiskSyncPanel workspaceId={workspace.id} />
</SettingWrapper>
)}
{BUILD_CONFIG.isElectron && (
<SettingWrapper>
<DesktopExportPanel workspace={workspace} />

View File

@@ -287,6 +287,14 @@ export const AFFINE_FLAGS = {
configurable: true,
defaultState: isMobile,
},
enable_disk_sync: {
category: 'affine',
displayName: 'Enable Disk Markdown Sync',
description:
'Enable experimental local-folder Markdown bidirectional sync on Electron desktop. WARNING: We are not responsible for any data loss without thorough testing.',
configurable: BUILD_CONFIG.isElectron && isCanaryBuild,
defaultState: false,
},
enable_mobile_database_editing: {
category: 'blocksuite',
bsFlag: 'enable_mobile_database_editing',

View File

@@ -0,0 +1,39 @@
import { BehaviorSubject } from 'rxjs';
import { describe, expect, it, vi } from 'vitest';
import { bindReloadOnFlagChange } from './feature-flag';
describe('bindReloadOnFlagChange', () => {
it('reloads only when flag value changes after initialization', () => {
const flag$ = new BehaviorSubject(false);
const reload = vi.fn();
const subscription = bindReloadOnFlagChange(flag$, reload);
expect(reload).not.toHaveBeenCalled();
flag$.next(false);
expect(reload).not.toHaveBeenCalled();
flag$.next(true);
expect(reload).toHaveBeenCalledTimes(1);
flag$.next(true);
expect(reload).toHaveBeenCalledTimes(1);
flag$.next(false);
expect(reload).toHaveBeenCalledTimes(2);
subscription.unsubscribe();
});
it('stops reloading after unsubscribe', () => {
const flag$ = new BehaviorSubject(false);
const reload = vi.fn();
const subscription = bindReloadOnFlagChange(flag$, reload);
subscription.unsubscribe();
flag$.next(true);
expect(reload).not.toHaveBeenCalled();
});
});

View File

@@ -1,19 +1,36 @@
import { OnEvent, Service } from '@toeverything/infra';
import type { Observable, Subscription } from 'rxjs';
import { distinctUntilChanged, skip } from 'rxjs';
import { ApplicationStarted } from '../../lifecycle';
import { Flags, type FlagsExt } from '../entities/flags';
export function bindReloadOnFlagChange(
flag$: Observable<boolean>,
reload: () => void
): Subscription {
return flag$.pipe(distinctUntilChanged(), skip(1)).subscribe(() => {
reload();
});
}
@OnEvent(ApplicationStarted, e => e.setupRestartListener)
export class FeatureFlagService extends Service {
flags = this.framework.createEntity(Flags) as FlagsExt;
setupRestartListener() {
this.flags.enable_ai.$.pipe(distinctUntilChanged(), skip(1)).subscribe(
() => {
// when enable_ai flag changes, reload the page.
window.location.reload();
}
const reload = () => window.location.reload();
const enableAiReload = bindReloadOnFlagChange(
this.flags.enable_ai.$,
reload
);
const diskReload = bindReloadOnFlagChange(
this.flags.enable_disk_sync.$,
reload
);
this.disposables.push(
() => enableAiReload.unsubscribe(),
() => diskReload.unsubscribe()
);
}
}

View File

@@ -0,0 +1,78 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import {
DISK_SYNC_FEATURE_FLAG_KEY,
DISK_SYNC_FOLDERS_GLOBAL_STATE_KEY,
getDiskSyncEnabled,
getDiskSyncFolderPath,
getDiskSyncRemoteOptions,
setDiskSyncEnabled,
setDiskSyncFolderPath,
} from './disk-config';
describe('disk-config', () => {
const originalBuildConfig = globalThis.BUILD_CONFIG;
const originalSharedStorage = (globalThis as any).__sharedStorage;
const state = new Map<string, unknown>();
beforeEach(() => {
state.clear();
(globalThis as any).__sharedStorage = {
globalState: {
get<T>(key: string): T | undefined {
return state.get(key) as T | undefined;
},
set<T>(key: string, value: T): void {
state.set(key, value);
},
},
};
globalThis.BUILD_CONFIG = {
...originalBuildConfig,
isElectron: true,
};
});
afterEach(() => {
globalThis.BUILD_CONFIG = originalBuildConfig;
(globalThis as any).__sharedStorage = originalSharedStorage;
});
it('reads and writes feature flag from electron global state', () => {
expect(getDiskSyncEnabled()).toBe(false);
setDiskSyncEnabled(true);
expect(getDiskSyncEnabled()).toBe(true);
expect(state.get(DISK_SYNC_FEATURE_FLAG_KEY)).toBe(true);
});
it('stores folder path per workspace and resolves remote options only when enabled', () => {
setDiskSyncFolderPath('workspace-a', '/tmp/a');
expect(getDiskSyncFolderPath('workspace-a')).toBe('/tmp/a');
expect(state.get(DISK_SYNC_FOLDERS_GLOBAL_STATE_KEY)).toEqual({
'workspace-a': '/tmp/a',
});
expect(getDiskSyncRemoteOptions('workspace-a')).toBeNull();
setDiskSyncEnabled(true);
expect(getDiskSyncRemoteOptions('workspace-a')).toEqual({
syncFolder: '/tmp/a',
});
});
it('ignores config when not running in electron', () => {
globalThis.BUILD_CONFIG = {
...globalThis.BUILD_CONFIG,
isElectron: false,
};
state.set(DISK_SYNC_FEATURE_FLAG_KEY, true);
state.set(DISK_SYNC_FOLDERS_GLOBAL_STATE_KEY, {
'workspace-b': '/tmp/b',
});
expect(getDiskSyncEnabled()).toBe(false);
expect(getDiskSyncFolderPath('workspace-b')).toBeNull();
expect(getDiskSyncRemoteOptions('workspace-b')).toBeNull();
});
});

View File

@@ -0,0 +1,99 @@
const DISK_SYNC_FLAG_STORAGE_KEY = 'affine-flag:enable_disk_sync';
const DISK_SYNC_FOLDERS_STORAGE_KEY = 'workspace-engine:disk-sync-folders:v1';
type GlobalStateStorageLike = {
get<T>(key: string): T | undefined;
set<T>(key: string, value: T): void;
};
function getElectronGlobalStateStorage(): GlobalStateStorageLike | null {
if (!BUILD_CONFIG.isElectron) {
return null;
}
const sharedStorage = (
globalThis as {
__sharedStorage?: { globalState?: GlobalStateStorageLike };
}
).__sharedStorage;
return sharedStorage?.globalState ?? null;
}
function normalizeFolderMap(value: unknown): Record<string, string> {
if (!value || typeof value !== 'object') {
return {};
}
const validEntries = Object.entries(value).filter(
([workspaceId, folder]) =>
typeof workspaceId === 'string' &&
workspaceId.length > 0 &&
typeof folder === 'string' &&
folder.length > 0
);
return Object.fromEntries(validEntries);
}
function readFolderMap(): Record<string, string> {
const storage = getElectronGlobalStateStorage();
if (!storage) {
return {};
}
return normalizeFolderMap(
storage.get<Record<string, string>>(DISK_SYNC_FOLDERS_STORAGE_KEY)
);
}
export function getDiskSyncEnabled(): boolean {
const storage = getElectronGlobalStateStorage();
if (!storage) {
return false;
}
return storage.get<boolean>(DISK_SYNC_FLAG_STORAGE_KEY) ?? false;
}
export function setDiskSyncEnabled(enabled: boolean): void {
const storage = getElectronGlobalStateStorage();
if (!storage) {
return;
}
storage.set(DISK_SYNC_FLAG_STORAGE_KEY, enabled);
}
export function getDiskSyncFolderPath(workspaceId: string): string | null {
return readFolderMap()[workspaceId] ?? null;
}
export function setDiskSyncFolderPath(
workspaceId: string,
folder: string | null
): void {
const storage = getElectronGlobalStateStorage();
if (!storage) {
return;
}
const folders = readFolderMap();
if (!folder) {
delete folders[workspaceId];
} else {
folders[workspaceId] = folder;
}
storage.set(DISK_SYNC_FOLDERS_STORAGE_KEY, folders);
}
export function getDiskSyncRemoteOptions(workspaceId: string): {
syncFolder: string;
} | null {
if (!getDiskSyncEnabled()) {
return null;
}
const folder = getDiskSyncFolderPath(workspaceId);
if (!folder) {
return null;
}
return { syncFolder: folder };
}
export const DISK_SYNC_FEATURE_FLAG_KEY = DISK_SYNC_FLAG_STORAGE_KEY;
export const DISK_SYNC_FOLDERS_GLOBAL_STATE_KEY = DISK_SYNC_FOLDERS_STORAGE_KEY;

View File

@@ -6,6 +6,7 @@ import {
type ListedBlobRecord,
universalId,
} from '@affine/nbstore';
import { DiskDocStorage } from '@affine/nbstore/disk';
import {
IndexedDBBlobStorage,
IndexedDBBlobSyncStorage,
@@ -46,6 +47,7 @@ import type {
WorkspaceProfileInfo,
} from '../../workspace';
import { WorkspaceImpl } from '../../workspace/impls/workspace';
import { getDiskSyncRemoteOptions } from './disk-config';
import { getWorkspaceProfileWorker } from './out-worker';
import {
dedupeWorkspaceIds,
@@ -430,6 +432,7 @@ class LocalWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
}
getEngineWorkerInitOptions(workspaceId: string): WorkerInitOptions {
const disk = getDiskSyncRemoteOptions(workspaceId);
return {
local: {
doc: {
@@ -488,6 +491,21 @@ class LocalWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
},
},
remotes: {
...(disk
? {
disk: {
doc: {
name: DiskDocStorage.identifier,
opts: {
flavour: this.flavour,
type: 'workspace',
id: workspaceId,
syncFolder: disk.syncFolder,
},
},
},
}
: {}),
v1: {
doc: this.DocStorageV1Type
? {

View File

@@ -11,9 +11,11 @@ affine_common = { workspace = true, features = ["hashcash"] }
affine_media_capture = { path = "./media_capture" }
affine_nbstore = { workspace = true, features = ["napi"] }
affine_sqlite_v1 = { path = "./sqlite_v1" }
chrono = { workspace = true }
napi = { workspace = true }
napi-derive = { workspace = true }
once_cell = { workspace = true }
sha3 = { workspace = true }
sqlx = { workspace = true, default-features = false, features = [
"chrono",
"macros",
@@ -24,6 +26,7 @@ sqlx = { workspace = true, default-features = false, features = [
] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
y-octo = { workspace = true }
[target.'cfg(not(target_os = "linux"))'.dependencies]
mimalloc = { workspace = true }
@@ -32,7 +35,6 @@ mimalloc = { workspace = true }
mimalloc = { workspace = true, features = ["local_dynamic_tls"] }
[dev-dependencies]
chrono = { workspace = true }
serde_json = { workspace = true }
uuid = { workspace = true }

View File

@@ -40,6 +40,51 @@ export declare function decodeAudio(buf: Uint8Array, destSampleRate?: number | u
/** Decode audio file into a Float32Array */
export declare function decodeAudioSync(buf: Uint8Array, destSampleRate?: number | undefined | null, filename?: string | undefined | null): Float32Array
export declare class DiskSync {
constructor()
startSession(sessionId: string, options: DiskSessionOptions): Promise<NapiResult<undefined>>
stopSession(sessionId: string): Promise<NapiResult<undefined>>
applyLocalUpdate(sessionId: string, update: DiskDocUpdateInput, origin?: string | undefined | null): Promise<NapiResult<DiskDocClock>>
pullEvents(sessionId: string): Promise<NapiResult<Array<DiskSyncEvent>>>
subscribeEvents(sessionId: string, callback: ((err: Error | null, arg: DiskSyncEvent) => void)): Promise<NapiResult<DiskSyncSubscriber>>
}
export declare class DiskSyncSubscriber {
unsubscribe(): Promise<NapiResult<undefined>>
}
export interface DiskDocClock {
docId: string
timestamp: Date
}
export interface DiskDocUpdateInput {
docId: string
bin: Uint8Array
editor?: string
}
export interface DiskSessionOptions {
workspaceId: string
syncFolder: string
}
export interface DiskSyncDocUpdateEvent {
docId: string
bin: Uint8Array
timestamp: Date
editor?: string
}
export interface DiskSyncEvent {
type: string
update?: DiskSyncDocUpdateEvent
docId?: string
timestamp?: Date
origin?: string
message?: string
}
export declare function mintChallengeResponse(resource: string, bits?: number | undefined | null): Promise<string>
export declare function verifyChallengeResponse(response: string, bits: number, resource: string): Promise<boolean>

View File

@@ -77,8 +77,8 @@ function requireNative() {
try {
const binding = require('@affine/native-android-arm64')
const bindingPackageVersion = require('@affine/native-android-arm64/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -93,8 +93,8 @@ function requireNative() {
try {
const binding = require('@affine/native-android-arm-eabi')
const bindingPackageVersion = require('@affine/native-android-arm-eabi/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -114,8 +114,8 @@ function requireNative() {
try {
const binding = require('@affine/native-win32-x64-gnu')
const bindingPackageVersion = require('@affine/native-win32-x64-gnu/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -130,8 +130,8 @@ function requireNative() {
try {
const binding = require('@affine/native-win32-x64-msvc')
const bindingPackageVersion = require('@affine/native-win32-x64-msvc/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -147,8 +147,8 @@ function requireNative() {
try {
const binding = require('@affine/native-win32-ia32-msvc')
const bindingPackageVersion = require('@affine/native-win32-ia32-msvc/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -163,8 +163,8 @@ function requireNative() {
try {
const binding = require('@affine/native-win32-arm64-msvc')
const bindingPackageVersion = require('@affine/native-win32-arm64-msvc/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -182,8 +182,8 @@ function requireNative() {
try {
const binding = require('@affine/native-darwin-universal')
const bindingPackageVersion = require('@affine/native-darwin-universal/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -198,8 +198,8 @@ function requireNative() {
try {
const binding = require('@affine/native-darwin-x64')
const bindingPackageVersion = require('@affine/native-darwin-x64/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -214,8 +214,8 @@ function requireNative() {
try {
const binding = require('@affine/native-darwin-arm64')
const bindingPackageVersion = require('@affine/native-darwin-arm64/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -234,8 +234,8 @@ function requireNative() {
try {
const binding = require('@affine/native-freebsd-x64')
const bindingPackageVersion = require('@affine/native-freebsd-x64/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -250,8 +250,8 @@ function requireNative() {
try {
const binding = require('@affine/native-freebsd-arm64')
const bindingPackageVersion = require('@affine/native-freebsd-arm64/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -271,8 +271,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-x64-musl')
const bindingPackageVersion = require('@affine/native-linux-x64-musl/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -287,8 +287,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-x64-gnu')
const bindingPackageVersion = require('@affine/native-linux-x64-gnu/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -305,8 +305,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-arm64-musl')
const bindingPackageVersion = require('@affine/native-linux-arm64-musl/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -321,8 +321,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-arm64-gnu')
const bindingPackageVersion = require('@affine/native-linux-arm64-gnu/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -339,8 +339,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-arm-musleabihf')
const bindingPackageVersion = require('@affine/native-linux-arm-musleabihf/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -355,8 +355,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-arm-gnueabihf')
const bindingPackageVersion = require('@affine/native-linux-arm-gnueabihf/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -373,8 +373,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-loong64-musl')
const bindingPackageVersion = require('@affine/native-linux-loong64-musl/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -389,8 +389,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-loong64-gnu')
const bindingPackageVersion = require('@affine/native-linux-loong64-gnu/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -407,8 +407,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-riscv64-musl')
const bindingPackageVersion = require('@affine/native-linux-riscv64-musl/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -423,8 +423,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-riscv64-gnu')
const bindingPackageVersion = require('@affine/native-linux-riscv64-gnu/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -440,8 +440,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-ppc64-gnu')
const bindingPackageVersion = require('@affine/native-linux-ppc64-gnu/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -456,8 +456,8 @@ function requireNative() {
try {
const binding = require('@affine/native-linux-s390x-gnu')
const bindingPackageVersion = require('@affine/native-linux-s390x-gnu/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -476,8 +476,8 @@ function requireNative() {
try {
const binding = require('@affine/native-openharmony-arm64')
const bindingPackageVersion = require('@affine/native-openharmony-arm64/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -492,8 +492,8 @@ function requireNative() {
try {
const binding = require('@affine/native-openharmony-x64')
const bindingPackageVersion = require('@affine/native-openharmony-x64/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -508,8 +508,8 @@ function requireNative() {
try {
const binding = require('@affine/native-openharmony-arm')
const bindingPackageVersion = require('@affine/native-openharmony-arm/package.json').version
if (bindingPackageVersion !== '0.26.0' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.0 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
if (bindingPackageVersion !== '0.26.3' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
throw new Error(`Native binding package version mismatch, expected 0.26.3 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
}
return binding
} catch (e) {
@@ -579,6 +579,8 @@ module.exports.AudioCaptureSession = nativeBinding.AudioCaptureSession
module.exports.ShareableContent = nativeBinding.ShareableContent
module.exports.decodeAudio = nativeBinding.decodeAudio
module.exports.decodeAudioSync = nativeBinding.decodeAudioSync
module.exports.DiskSync = nativeBinding.DiskSync
module.exports.DiskSyncSubscriber = nativeBinding.DiskSyncSubscriber
module.exports.mintChallengeResponse = nativeBinding.mintChallengeResponse
module.exports.verifyChallengeResponse = nativeBinding.verifyChallengeResponse
module.exports.DocStorage = nativeBinding.DocStorage

View File

@@ -0,0 +1,174 @@
use super::types::FrontmatterMeta;
pub(crate) fn parse_frontmatter(markdown: &str) -> (FrontmatterMeta, String) {
let normalized = markdown.replace("\r\n", "\n");
if !normalized.starts_with("---\n") {
return (FrontmatterMeta::default(), normalized);
}
let rest = &normalized[4..];
let Some(end) = rest.find("\n---\n") else {
return (FrontmatterMeta::default(), normalized);
};
let frontmatter_block = &rest[..end];
let body = rest[(end + 5)..].to_string();
let mut meta = FrontmatterMeta::default();
let mut in_tags_block = false;
for raw_line in frontmatter_block.lines() {
let line = raw_line.trim();
if line.is_empty() {
continue;
}
if in_tags_block && line.starts_with('-') {
let value = normalize_scalar(line.trim_start_matches('-').trim());
if !value.is_empty() {
meta.tags.get_or_insert_with(Vec::new).push(value);
}
continue;
}
in_tags_block = false;
let Some((key, value)) = line.split_once(':') else {
continue;
};
let key = key.trim();
let value = value.trim();
match key {
"id" => {
let normalized = normalize_scalar(value);
if !normalized.is_empty() {
meta.id = Some(normalized);
}
}
"title" => {
let normalized = normalize_scalar(value);
// Preserve explicit empty titles (`title: ""`) so round-trip hashing can
// distinguish them from a missing title field.
meta.title = Some(normalized);
}
"favorite" => {
meta.favorite = parse_bool(value);
}
"trash" => {
meta.trash = parse_bool(value);
}
"tags" => {
if value.is_empty() {
in_tags_block = true;
} else {
let tags = parse_tags(value);
if !tags.is_empty() {
meta.tags = Some(tags);
}
}
}
_ => {}
}
}
(meta, body)
}
pub(crate) fn render_frontmatter(meta: &FrontmatterMeta, body: &str) -> String {
let mut lines = Vec::new();
lines.push("---".to_string());
if let Some(id) = meta.id.as_ref() {
lines.push(format!("id: {}", quote_yaml_scalar(id)));
}
if let Some(title) = meta.title.as_ref() {
lines.push(format!("title: {}", quote_yaml_scalar(title)));
}
if let Some(tags) = normalize_tags(meta.tags.clone()) {
if tags.is_empty() {
lines.push("tags: []".to_string());
} else {
lines.push("tags:".to_string());
for tag in tags {
lines.push(format!(" - {}", quote_yaml_scalar(&tag)));
}
}
}
if let Some(favorite) = meta.favorite {
lines.push(format!("favorite: {}", favorite));
}
if let Some(trash) = meta.trash {
lines.push(format!("trash: {}", trash));
}
lines.push("---".to_string());
lines.push(String::new());
let mut rendered = lines.join("\n");
rendered.push_str(body.trim_start_matches('\n'));
if !rendered.ends_with('\n') {
rendered.push('\n');
}
rendered
}
fn normalize_scalar(value: &str) -> String {
value.trim().trim_matches('"').trim_matches('\'').to_string()
}
pub(crate) fn parse_bool(value: &str) -> Option<bool> {
match value.trim().to_ascii_lowercase().as_str() {
"true" | "yes" | "1" => Some(true),
"false" | "no" | "0" => Some(false),
_ => None,
}
}
pub(crate) fn parse_tags(value: &str) -> Vec<String> {
let trimmed = value.trim();
if trimmed.starts_with('[') && trimmed.ends_with(']') {
let inner = &trimmed[1..trimmed.len() - 1];
return inner
.split(',')
.map(normalize_scalar)
.filter(|value| !value.is_empty())
.collect();
}
trimmed
.split(',')
.map(normalize_scalar)
.filter(|value| !value.is_empty())
.collect()
}
pub(crate) fn normalize_tags(tags: Option<Vec<String>>) -> Option<Vec<String>> {
tags.map(|values| {
values
.into_iter()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.collect()
})
}
fn quote_yaml_scalar(value: &str) -> String {
if value
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '.')
{
return value.to_string();
}
let escaped = value.replace('"', "\\\"");
format!("\"{}\"", escaped)
}

View File

@@ -0,0 +1,203 @@
use std::{
collections::HashMap,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use chrono::NaiveDateTime;
use napi::{
bindgen_prelude::{Error as NapiError, Result as NapiResult, Uint8Array},
threadsafe_function::ThreadsafeFunction,
};
use napi_derive::napi;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
mod frontmatter;
mod root_meta;
mod session;
mod state_db;
mod types;
mod utils;
#[cfg(test)]
mod tests;
use session::DiskSession;
static SESSIONS: Lazy<RwLock<HashMap<String, Arc<DiskSession>>>> = Lazy::new(|| RwLock::new(HashMap::new()));
static NEXT_SUBSCRIBER_ID: AtomicU64 = AtomicU64::new(1);
#[napi(object)]
pub struct DiskSessionOptions {
pub workspace_id: String,
pub sync_folder: String,
}
#[napi(object)]
pub struct DiskDocUpdateInput {
pub doc_id: String,
#[napi(ts_type = "Uint8Array")]
pub bin: Uint8Array,
pub editor: Option<String>,
}
#[napi(object)]
pub struct DiskDocClock {
pub doc_id: String,
pub timestamp: NaiveDateTime,
}
#[napi(object)]
pub struct DiskSyncDocUpdateEvent {
pub doc_id: String,
pub bin: Uint8Array,
pub timestamp: NaiveDateTime,
pub editor: Option<String>,
}
impl Clone for DiskSyncDocUpdateEvent {
fn clone(&self) -> Self {
Self {
doc_id: self.doc_id.clone(),
bin: Uint8Array::new(self.bin.as_ref().to_vec()),
timestamp: self.timestamp,
editor: self.editor.clone(),
}
}
}
#[derive(Clone)]
#[napi(object)]
pub struct DiskSyncEvent {
pub r#type: String,
pub update: Option<DiskSyncDocUpdateEvent>,
pub doc_id: Option<String>,
pub timestamp: Option<NaiveDateTime>,
pub origin: Option<String>,
pub message: Option<String>,
}
#[napi]
pub struct DiskSync;
#[napi]
pub struct DiskSyncSubscriber {
session_id: String,
subscriber_id: u64,
}
#[napi]
impl DiskSync {
#[napi(constructor)]
pub fn new() -> Self {
Self
}
#[napi]
pub async fn start_session(&self, session_id: String, options: DiskSessionOptions) -> NapiResult<()> {
{
let sessions = SESSIONS.read().await;
if sessions.contains_key(&session_id) {
return Ok(());
}
}
let session = DiskSession::new(options).await.map_err(to_napi_error)?;
session.queue_ready_event().await.map_err(to_napi_error)?;
session.scan_once().await.map_err(to_napi_error)?;
let mut sessions = SESSIONS.write().await;
sessions.insert(session_id, Arc::new(session));
Ok(())
}
#[napi]
pub async fn stop_session(&self, session_id: String) -> NapiResult<()> {
let mut sessions = SESSIONS.write().await;
if let Some(session) = sessions.remove(&session_id) {
session.close().await;
}
Ok(())
}
#[napi]
pub async fn apply_local_update(
&self,
session_id: String,
update: DiskDocUpdateInput,
origin: Option<String>,
) -> NapiResult<DiskDocClock> {
let session = {
let sessions = SESSIONS.read().await;
sessions
.get(&session_id)
.cloned()
.ok_or_else(|| to_napi_error(format!("disk session {} is not started", session_id)))?
};
session.apply_local_update(update, origin).await.map_err(to_napi_error)
}
#[napi]
pub async fn pull_events(&self, session_id: String) -> NapiResult<Vec<DiskSyncEvent>> {
let session = {
let sessions = SESSIONS.read().await;
sessions
.get(&session_id)
.cloned()
.ok_or_else(|| to_napi_error(format!("disk session {} is not started", session_id)))?
};
session.pull_events().await.map_err(to_napi_error)
}
#[napi]
pub async fn subscribe_events(
&self,
session_id: String,
callback: ThreadsafeFunction<DiskSyncEvent, ()>,
) -> NapiResult<DiskSyncSubscriber> {
let session = {
let sessions = SESSIONS.read().await;
sessions
.get(&session_id)
.cloned()
.ok_or_else(|| to_napi_error(format!("disk session {} is not started", session_id)))?
};
let subscriber_id = NEXT_SUBSCRIBER_ID.fetch_add(1, Ordering::Relaxed);
session
.add_subscriber(subscriber_id, callback)
.await
.map_err(to_napi_error)?;
Ok(DiskSyncSubscriber {
session_id,
subscriber_id,
})
}
}
#[napi]
impl DiskSyncSubscriber {
#[napi]
pub async fn unsubscribe(&self) -> NapiResult<()> {
let session = {
let sessions = SESSIONS.read().await;
sessions.get(&self.session_id).cloned()
};
if let Some(session) = session {
session.remove_subscriber(self.subscriber_id).await;
}
Ok(())
}
}
fn to_napi_error(message: impl Into<String>) -> NapiError {
NapiError::from_reason(message.into())
}

View File

@@ -0,0 +1,345 @@
use std::collections::HashMap;
use chrono::Utc;
use y_octo::{Any, Array, Doc, Map, Value};
use super::{
frontmatter::{parse_bool, parse_tags},
types::FrontmatterMeta,
utils::{is_empty_update, load_doc_or_new},
};
pub(crate) fn build_root_meta_update(
existing_root: &[u8],
workspace_id: &str,
doc_id: &str,
meta: &FrontmatterMeta,
) -> Result<Vec<u8>, String> {
let doc = load_doc_or_new(existing_root, Some(workspace_id))?;
let state_before = doc.get_state_vector();
let mut meta_map = doc
.get_or_create_map("meta")
.map_err(|err| format!("failed to open root meta map: {}", err))?;
let mut pages = ensure_pages_array(&doc, &mut meta_map)?;
let mut found = false;
for idx in 0..pages.len() {
let Some(mut page) = pages.get(idx).and_then(|value| value.to_map()) else {
continue;
};
if get_string_from_map(&page, "id").as_deref() == Some(doc_id) {
apply_page_meta(&doc, &mut page, doc_id, meta)?;
found = true;
break;
}
}
if !found {
let page_map = doc
.create_map()
.map_err(|err| format!("failed to create root page map: {}", err))?;
let idx = pages.len();
pages
.insert(idx, Value::Map(page_map))
.map_err(|err| format!("failed to insert root page map: {}", err))?;
if let Some(mut page) = pages.get(idx).and_then(|value| value.to_map()) {
apply_page_meta(&doc, &mut page, doc_id, meta)?;
}
}
doc
.encode_state_as_update_v1(&state_before)
.map_err(|err| format!("failed to encode root meta update: {}", err))
}
pub(crate) fn extract_root_meta_for_doc(root_bin: &[u8], doc_id: &str) -> Result<Option<FrontmatterMeta>, String> {
let metas = extract_all_root_meta(root_bin)?;
Ok(metas.get(doc_id).cloned())
}
pub(crate) fn extract_all_root_meta(root_bin: &[u8]) -> Result<HashMap<String, FrontmatterMeta>, String> {
if is_empty_update(root_bin) {
return Ok(HashMap::new());
}
let doc = load_doc_or_new(root_bin, None)?;
let meta = match doc.get_map("meta") {
Ok(meta) => meta,
Err(_) => return Ok(HashMap::new()),
};
let pages_value = meta.get("pages");
let mut result = HashMap::new();
if let Some(pages) = pages_value.as_ref().and_then(|value| value.to_array()) {
for page_value in pages.iter() {
let Some(page_map) = page_value.to_map() else {
continue;
};
let Some(doc_id) = get_string_from_map(&page_map, "id") else {
continue;
};
result.insert(doc_id.clone(), extract_meta_from_page_map(&page_map, Some(doc_id)));
}
return Ok(result);
}
if let Some(Any::Array(entries)) = pages_value.and_then(|value| value.to_any()) {
for entry in entries {
let Any::Object(values) = entry else {
continue;
};
let Some(Any::String(doc_id)) = values.get("id") else {
continue;
};
let mut meta = FrontmatterMeta::default();
meta.id = Some(doc_id.clone());
if let Some(Any::String(title)) = values.get("title") {
meta.title = Some(title.clone());
}
if let Some(tags) = values.get("tags") {
meta.tags = extract_tags_from_any(tags);
}
if let Some(value) = values.get("favorite") {
meta.favorite = any_to_bool(value);
}
if let Some(value) = values.get("trash") {
meta.trash = any_to_bool(value);
}
result.insert(doc_id.clone(), meta);
}
}
Ok(result)
}
fn apply_page_meta(doc: &Doc, page: &mut Map, doc_id: &str, meta: &FrontmatterMeta) -> Result<(), String> {
page
.insert("id".to_string(), Any::String(doc_id.to_string()))
.map_err(|err| format!("failed to set root meta id: {}", err))?;
if let Some(title) = meta.title.as_ref() {
page
.insert("title".to_string(), Any::String(title.clone()))
.map_err(|err| format!("failed to set root meta title: {}", err))?;
}
if let Some(tags) = super::frontmatter::normalize_tags(meta.tags.clone()) {
let mut tags_array = doc
.create_array()
.map_err(|err| format!("failed to create tags array: {}", err))?;
for tag in tags {
tags_array
.push(Any::String(tag))
.map_err(|err| format!("failed to push tag: {}", err))?;
}
page
.insert("tags".to_string(), Value::Array(tags_array))
.map_err(|err| format!("failed to set tags array: {}", err))?;
}
if let Some(favorite) = meta.favorite {
page
.insert("favorite".to_string(), if favorite { Any::True } else { Any::False })
.map_err(|err| format!("failed to set favorite metadata: {}", err))?;
}
if let Some(trash) = meta.trash {
page
.insert("trash".to_string(), if trash { Any::True } else { Any::False })
.map_err(|err| format!("failed to set trash metadata: {}", err))?;
}
let now_ms = Utc::now().timestamp_millis() as f64;
if !has_numeric_timestamp(page, "createDate") {
page
.insert("createDate".to_string(), Any::Float64(now_ms.into()))
.map_err(|err| format!("failed to set createDate metadata: {}", err))?;
}
page
.insert("updatedDate".to_string(), Any::Float64(now_ms.into()))
.map_err(|err| format!("failed to set updatedDate metadata: {}", err))?;
Ok(())
}
fn ensure_pages_array(doc: &Doc, meta: &mut Map) -> Result<Array, String> {
let pages_value = meta.get("pages");
if let Some(pages) = pages_value.as_ref().and_then(|value| value.to_array()) {
return Ok(pages);
}
if let Some(Any::Array(entries)) = pages_value.and_then(|value| value.to_any()) {
let mut pages = doc
.create_array()
.map_err(|err| format!("failed to create pages array: {}", err))?;
for entry in entries {
let value = any_to_value(doc, entry)?;
pages
.push(value)
.map_err(|err| format!("failed to migrate page entry: {}", err))?;
}
meta
.insert("pages".to_string(), Value::Array(pages.clone()))
.map_err(|err| format!("failed to assign pages array: {}", err))?;
return Ok(pages);
}
let pages = doc
.create_array()
.map_err(|err| format!("failed to create pages array: {}", err))?;
meta
.insert("pages".to_string(), Value::Array(pages.clone()))
.map_err(|err| format!("failed to assign pages array: {}", err))?;
Ok(pages)
}
fn any_to_value(doc: &Doc, any: Any) -> Result<Value, String> {
match any {
Any::Array(values) => {
let mut array = doc
.create_array()
.map_err(|err| format!("failed to create nested array: {}", err))?;
for value in values {
let nested = any_to_value(doc, value)?;
array
.push(nested)
.map_err(|err| format!("failed to push nested array value: {}", err))?;
}
Ok(Value::Array(array))
}
Any::Object(values) => {
let mut map = doc
.create_map()
.map_err(|err| format!("failed to create nested map: {}", err))?;
for (key, value) in values {
let nested = any_to_value(doc, value)?;
map
.insert(key, nested)
.map_err(|err| format!("failed to insert nested map value: {}", err))?;
}
Ok(Value::Map(map))
}
_ => Ok(Value::Any(any)),
}
}
fn extract_meta_from_page_map(page_map: &Map, doc_id: Option<String>) -> FrontmatterMeta {
let mut meta = FrontmatterMeta::default();
meta.id = doc_id.or_else(|| get_string_from_map(page_map, "id"));
meta.title = get_string_from_map(page_map, "title");
if let Some(tags) = page_map.get("tags") {
meta.tags = extract_tags_from_value(&tags);
}
meta.favorite = page_map
.get("favorite")
.and_then(|value| value.to_any())
.and_then(|value| any_to_bool(&value));
meta.trash = page_map
.get("trash")
.and_then(|value| value.to_any())
.and_then(|value| any_to_bool(&value));
meta
}
fn extract_tags_from_value(value: &Value) -> Option<Vec<String>> {
if let Some(array) = value.to_array() {
let mut tags = Vec::new();
for item in array.iter() {
if let Some(any) = item.to_any()
&& let Some(value) = any_to_string(&any)
{
tags.push(value);
}
}
return Some(tags);
}
value.to_any().and_then(|any| extract_tags_from_any(&any))
}
fn extract_tags_from_any(value: &Any) -> Option<Vec<String>> {
match value {
Any::Array(values) => {
let mut tags = Vec::new();
for value in values {
if let Some(tag) = any_to_string(value) {
tags.push(tag);
}
}
Some(tags)
}
Any::String(value) => Some(parse_tags(value)),
_ => None,
}
}
fn any_to_bool(value: &Any) -> Option<bool> {
match value {
Any::True => Some(true),
Any::False => Some(false),
Any::Integer(value) => Some(*value != 0),
Any::BigInt64(value) => Some(*value != 0),
Any::Float32(value) => Some(value.0 != 0.0),
Any::Float64(value) => Some(value.0 != 0.0),
Any::String(value) => parse_bool(value),
Any::Null | Any::Undefined => None,
Any::Array(_) | Any::Object(_) | Any::Binary(_) => Some(true),
}
}
fn any_to_string(value: &Any) -> Option<String> {
match value {
Any::String(value) => Some(value.to_string()),
Any::Integer(value) => Some(value.to_string()),
Any::BigInt64(value) => Some(value.to_string()),
Any::Float32(value) => Some(value.0.to_string()),
Any::Float64(value) => Some(value.0.to_string()),
Any::True => Some("true".to_string()),
Any::False => Some("false".to_string()),
Any::Null | Any::Undefined => None,
Any::Array(_) | Any::Object(_) | Any::Binary(_) => None,
}
}
fn get_string_from_map(map: &Map, key: &str) -> Option<String> {
map.get(key).and_then(|value| {
if let Some(text) = value.to_text() {
return Some(text.to_string());
}
value.to_any().and_then(|any| any_to_string(&any))
})
}
fn has_numeric_timestamp(page: &Map, key: &str) -> bool {
page
.get(key)
.and_then(|value| value.to_any())
.is_some_and(|value| match value {
Any::Integer(_) | Any::BigInt64(_) => true,
Any::Float32(value) => value.0.is_finite(),
Any::Float64(value) => value.0.is_finite(),
_ => false,
})
}

View File

@@ -0,0 +1,869 @@
use std::{
collections::{HashMap, HashSet, VecDeque},
fs,
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
use affine_common::doc_parser::{build_full_doc, parse_doc_to_markdown, update_doc};
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
use tokio::{sync::Mutex, task::JoinHandle};
use super::{
DiskDocClock, DiskDocUpdateInput, DiskSessionOptions, DiskSyncDocUpdateEvent, DiskSyncEvent,
frontmatter::{normalize_tags, parse_frontmatter, render_frontmatter},
root_meta::{build_root_meta_update, extract_all_root_meta, extract_root_meta_for_doc},
state_db::StateDb,
types::{Baseline, FrontmatterMeta},
utils::{
collect_markdown_files, derive_title_from_markdown, derive_title_from_path, generate_missing_doc_id, hash_meta,
hash_string, is_empty_update, merge_update_binary, now_naive, paths_equal, sanitize_file_stem, write_atomic,
},
};
#[derive(Clone)]
pub(crate) struct DiskSession {
workspace_id: String,
sync_folder: PathBuf,
state_db: StateDb,
events: Arc<Mutex<VecDeque<DiskSyncEvent>>>,
docs: Arc<Mutex<HashMap<String, Vec<u8>>>>,
root_doc: Arc<Mutex<Vec<u8>>>,
bindings: Arc<Mutex<HashMap<String, PathBuf>>>,
path_bindings: Arc<Mutex<HashMap<PathBuf, String>>>,
baselines: Arc<Mutex<HashMap<String, Baseline>>>,
missing_logged: Arc<Mutex<HashSet<PathBuf>>>,
last_sync: Arc<Mutex<HashMap<String, chrono::NaiveDateTime>>>,
last_error: Arc<Mutex<Option<String>>>,
subscribers: Arc<Mutex<HashMap<u64, Arc<ThreadsafeFunction<DiskSyncEvent, ()>>>>>,
poll_task: Arc<Mutex<Option<JoinHandle<()>>>>,
closed: Arc<AtomicBool>,
scan_guard: Arc<Mutex<()>>,
}
impl DiskSession {
pub(crate) async fn new(options: DiskSessionOptions) -> Result<Self, String> {
let sync_folder = PathBuf::from(&options.sync_folder);
fs::create_dir_all(&sync_folder)
.map_err(|err| format!("failed to create sync folder {}: {}", sync_folder.display(), err))?;
let state_db = StateDb::open(&sync_folder, &options.workspace_id).await?;
let bindings = state_db.load_bindings().await?;
let baselines = state_db.load_baselines().await?;
let mut path_bindings = HashMap::new();
for (doc_id, file_path) in &bindings {
path_bindings.insert(file_path.clone(), doc_id.clone());
}
Ok(Self {
workspace_id: options.workspace_id,
sync_folder,
state_db,
events: Arc::new(Mutex::new(VecDeque::new())),
docs: Arc::new(Mutex::new(HashMap::new())),
root_doc: Arc::new(Mutex::new(Vec::new())),
bindings: Arc::new(Mutex::new(bindings)),
path_bindings: Arc::new(Mutex::new(path_bindings)),
baselines: Arc::new(Mutex::new(baselines)),
missing_logged: Arc::new(Mutex::new(HashSet::new())),
last_sync: Arc::new(Mutex::new(HashMap::new())),
last_error: Arc::new(Mutex::new(None)),
subscribers: Arc::new(Mutex::new(HashMap::new())),
poll_task: Arc::new(Mutex::new(None)),
closed: Arc::new(AtomicBool::new(false)),
scan_guard: Arc::new(Mutex::new(())),
})
}
pub(crate) async fn close(&self) {
self.closed.store(true, Ordering::Relaxed);
self.stop_poll_task().await;
self.subscribers.lock().await.clear();
self.state_db.close().await;
}
pub(crate) async fn add_subscriber(
&self,
subscriber_id: u64,
callback: ThreadsafeFunction<DiskSyncEvent, ()>,
) -> Result<(), String> {
let callback = Arc::new(callback);
let backlog = {
let mut events = self.events.lock().await;
events.drain(..).collect::<Vec<_>>()
};
{
let mut subscribers = self.subscribers.lock().await;
subscribers.insert(subscriber_id, callback.clone());
}
for event in backlog {
let _ = callback.call(Ok(event), ThreadsafeFunctionCallMode::NonBlocking);
}
self.ensure_poll_task().await;
Ok(())
}
pub(crate) async fn remove_subscriber(&self, subscriber_id: u64) {
let should_stop = {
let mut subscribers = self.subscribers.lock().await;
subscribers.remove(&subscriber_id);
subscribers.is_empty()
};
if should_stop {
self.stop_poll_task().await;
}
}
async fn ensure_poll_task(&self) {
if self.closed.load(Ordering::Relaxed) {
return;
}
let has_subscribers = {
let subscribers = self.subscribers.lock().await;
!subscribers.is_empty()
};
if !has_subscribers {
return;
}
let mut poll_task = self.poll_task.lock().await;
if poll_task.is_some() {
return;
}
let poll_interval_ms = std::env::var("AFFINE_DISK_POLL_INTERVAL_MS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.filter(|value| *value > 0)
.unwrap_or(500);
let session = self.clone();
*poll_task = Some(tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(poll_interval_ms));
loop {
interval.tick().await;
if session.closed.load(Ordering::Relaxed) {
break;
}
let has_subscribers = {
let subscribers = session.subscribers.lock().await;
!subscribers.is_empty()
};
if !has_subscribers {
break;
}
if let Err(err) = session.scan_once().await {
session.queue_error_event(err).await;
}
}
}));
}
async fn stop_poll_task(&self) {
let mut poll_task = self.poll_task.lock().await;
if let Some(handle) = poll_task.take() {
handle.abort();
}
}
pub(crate) async fn queue_ready_event(&self) -> Result<(), String> {
self
.emit_event(DiskSyncEvent {
r#type: "ready".to_string(),
update: None,
doc_id: None,
timestamp: None,
origin: None,
message: None,
})
.await;
Ok(())
}
async fn emit_event(&self, event: DiskSyncEvent) {
let subscribers = {
let subscribers = self.subscribers.lock().await;
subscribers.values().cloned().collect::<Vec<_>>()
};
if subscribers.is_empty() {
let mut events = self.events.lock().await;
events.push_back(event);
return;
}
for callback in subscribers {
let _ = callback.call(Ok(event.clone()), ThreadsafeFunctionCallMode::NonBlocking);
}
}
async fn queue_error_event(&self, message: impl Into<String>) {
let message = message.into();
{
let mut last_error = self.last_error.lock().await;
*last_error = Some(message.clone());
}
self
.emit_event(DiskSyncEvent {
r#type: "error".to_string(),
update: None,
doc_id: None,
timestamp: Some(now_naive()),
origin: None,
message: Some(message),
})
.await;
}
async fn queue_doc_update_event(&self, update: DiskSyncDocUpdateEvent, origin: Option<String>) {
self
.emit_event(DiskSyncEvent {
r#type: "doc-update".to_string(),
doc_id: Some(update.doc_id.clone()),
timestamp: Some(update.timestamp),
update: Some(update),
origin,
message: None,
})
.await;
}
pub(crate) async fn pull_events(&self) -> Result<Vec<DiskSyncEvent>, String> {
if let Err(err) = self.scan_once().await {
self.queue_error_event(err).await;
}
let mut events = self.events.lock().await;
let mut drained = Vec::with_capacity(events.len());
while let Some(event) = events.pop_front() {
drained.push(event);
}
Ok(drained)
}
pub(crate) async fn scan_once(&self) -> Result<(), String> {
let _guard = self.scan_guard.lock().await;
let mut markdown_files = Vec::new();
collect_markdown_files(&self.sync_folder, &mut markdown_files)?;
let mut seen_paths = HashSet::new();
for file_path in markdown_files {
seen_paths.insert(file_path.clone());
if let Err(err) = self.import_file_if_changed(&file_path).await {
self.state_db.append_event(None, "import-error", &err).await.ok();
self.queue_error_event(err).await;
}
}
self.handle_missing_files(&seen_paths).await?;
Ok(())
}
async fn handle_missing_files(&self, seen_paths: &HashSet<PathBuf>) -> Result<(), String> {
let path_bindings = self.path_bindings.lock().await.clone();
let mut missing_logged = self.missing_logged.lock().await;
for (path, doc_id) in path_bindings {
if seen_paths.contains(&path) {
missing_logged.remove(&path);
continue;
}
if missing_logged.contains(&path) {
continue;
}
missing_logged.insert(path.clone());
self
.state_db
.append_event(Some(&doc_id), "file-missing", &path.to_string_lossy())
.await?;
}
Ok(())
}
async fn import_file_if_changed(&self, file_path: &Path) -> Result<(), String> {
let raw = fs::read_to_string(file_path)
.map_err(|err| format!("failed to read markdown file {}: {}", file_path.display(), err))?;
let (mut meta, body) = parse_frontmatter(&raw);
let mut doc_id = meta.id.clone();
if doc_id.is_none() {
doc_id = Some(generate_missing_doc_id(file_path));
meta.id = doc_id.clone();
let rendered = render_frontmatter(&meta, &body);
write_atomic(file_path, &rendered)?;
}
let doc_id = doc_id.ok_or_else(|| format!("failed to resolve doc id for markdown file {}", file_path.display()))?;
let title = meta
.title
.clone()
.or_else(|| derive_title_from_markdown(&body))
.unwrap_or_else(|| derive_title_from_path(file_path));
let normalized_meta = FrontmatterMeta {
id: Some(doc_id.clone()),
title: Some(title),
tags: normalize_tags(meta.tags.clone()),
favorite: Some(meta.favorite.unwrap_or(false)),
trash: Some(meta.trash.unwrap_or(false)),
};
let md_hash = hash_string(&body);
let meta_hash = hash_meta(&normalized_meta);
let baseline = {
let baselines = self.baselines.lock().await;
baselines.get(&doc_id).cloned()
};
let current_binding = {
let bindings = self.bindings.lock().await;
bindings.get(&doc_id).cloned()
};
let unchanged = baseline
.as_ref()
.zip(current_binding.as_ref())
.map(|(baseline, bound_path)| {
baseline.md_hash == md_hash && baseline.meta_hash == meta_hash && paths_equal(bound_path, file_path)
})
.unwrap_or(false);
if unchanged {
return Ok(());
}
let (page_update, full_doc) = {
let docs = self.docs.lock().await;
let maybe_existing = docs.get(&doc_id).cloned();
match maybe_existing {
Some(existing_bin) if !is_empty_update(&existing_bin) => {
let delta = update_doc(&existing_bin, &body, &doc_id)
.map_err(|err| format!("failed to update doc from markdown {}: {}", doc_id, err))?;
let merged = merge_update_binary(Some(&existing_bin), &delta, Some(&doc_id))?;
(delta, merged)
}
_ => {
let built = build_full_doc(normalized_meta.title.as_deref().unwrap_or("Untitled"), &body, &doc_id)
.map_err(|err| format!("failed to build doc from markdown {}: {}", doc_id, err))?;
(built.clone(), built)
}
}
};
{
let mut docs = self.docs.lock().await;
docs.insert(doc_id.clone(), full_doc);
}
let now = now_naive();
self
.queue_doc_update_event(
DiskSyncDocUpdateEvent {
doc_id: doc_id.clone(),
bin: page_update.into(),
timestamp: now,
editor: None,
},
Some("disk:file-import".to_string()),
)
.await;
self.apply_root_meta_from_file(&doc_id, &normalized_meta, now).await?;
{
let mut bindings = self.bindings.lock().await;
let mut path_bindings = self.path_bindings.lock().await;
if let Some(prev) = bindings.insert(doc_id.clone(), file_path.to_path_buf()) {
path_bindings.remove(&prev);
}
path_bindings.insert(file_path.to_path_buf(), doc_id.clone());
}
self.state_db.upsert_binding(&doc_id, file_path).await.map_err(|err| {
format!(
"failed to persist binding for doc {} path {}: {}",
doc_id,
file_path.display(),
err
)
})?;
let baseline = Baseline {
base_clock: String::new(),
base_vector: String::new(),
md_hash,
meta_hash,
synced_at: now,
};
{
let mut baselines = self.baselines.lock().await;
baselines.insert(doc_id.clone(), baseline.clone());
}
self
.state_db
.upsert_baseline(&doc_id, &baseline)
.await
.map_err(|err| format!("failed to persist baseline for doc {}: {}", doc_id, err))?;
{
let mut last_sync = self.last_sync.lock().await;
last_sync.insert(doc_id.clone(), now);
}
self
.state_db
.append_event(Some(&doc_id), "import", &file_path.to_string_lossy())
.await?;
Ok(())
}
async fn apply_root_meta_from_file(
&self,
doc_id: &str,
meta: &FrontmatterMeta,
timestamp: chrono::NaiveDateTime,
) -> Result<(), String> {
let current_root = self.root_doc.lock().await.clone();
let delta = build_root_meta_update(&current_root, &self.workspace_id, doc_id, meta)?;
if is_empty_update(&delta) {
return Ok(());
}
let merged = merge_update_binary(Some(&current_root), &delta, Some(&self.workspace_id))?;
{
let mut root = self.root_doc.lock().await;
*root = merged;
}
self
.queue_doc_update_event(
DiskSyncDocUpdateEvent {
doc_id: self.workspace_id.clone(),
bin: delta.into(),
timestamp,
editor: None,
},
Some("disk:file-meta".to_string()),
)
.await;
Ok(())
}
pub(crate) async fn apply_local_update(
&self,
update: DiskDocUpdateInput,
origin: Option<String>,
) -> Result<DiskDocClock, String> {
// Serialize local updates with filesystem scanning/importing.
//
// Without this guard, root-meta exports and page exports can run concurrently
// and race on the same markdown file/baseline, causing the file content to
// flip between different snapshots while the client is editing.
let _guard = self.scan_guard.lock().await;
let timestamp = now_naive();
if update.doc_id == self.workspace_id {
self
.apply_local_root_update(update.bin.as_ref().to_vec(), timestamp, origin)
.await?;
return Ok(DiskDocClock {
doc_id: update.doc_id,
timestamp,
});
}
self
.apply_local_page_update(
update.doc_id.clone(),
update.bin.as_ref().to_vec(),
update.editor,
timestamp,
)
.await?;
Ok(DiskDocClock {
doc_id: update.doc_id,
timestamp,
})
}
async fn apply_local_root_update(
&self,
update_bin: Vec<u8>,
timestamp: chrono::NaiveDateTime,
_origin: Option<String>,
) -> Result<(), String> {
let current_root = self.root_doc.lock().await.clone();
let merged_root = merge_update_binary(Some(&current_root), &update_bin, Some(&self.workspace_id))?;
{
let mut root = self.root_doc.lock().await;
*root = merged_root.clone();
}
let metas = extract_all_root_meta(&merged_root)?;
for (doc_id, meta) in metas {
let binding_path = {
let bindings = self.bindings.lock().await;
bindings.get(&doc_id).cloned()
};
let doc_body = if let Some(doc_bin) = self.docs.lock().await.get(&doc_id).cloned() {
parse_doc_to_markdown(doc_bin, doc_id.clone(), true, None)
.ok()
.map(|result| result.markdown)
} else {
None
};
let (body, body_from_doc) = if let Some(body) = doc_body {
(body, true)
} else if let Some(path) = binding_path.as_ref().filter(|path| path.exists()) {
let existing = fs::read_to_string(path).map_err(|err| {
format!(
"failed to read markdown for metadata update {}: {}",
path.display(),
err
)
})?;
let (_, body) = parse_frontmatter(&existing);
(body, false)
} else {
continue;
};
if !body_from_doc && body.trim().is_empty() {
continue;
}
let path = if let Some(path) = binding_path {
path
} else {
self.resolve_file_path(&doc_id, meta.title.as_deref()).await?
};
// Avoid overwriting local filesystem edits that haven't been imported yet.
if self.is_markdown_dirty(&doc_id, &path).await {
self
.state_db
.append_event(Some(&doc_id), "export-root-skip-dirty", &path.to_string_lossy())
.await
.ok();
continue;
}
let meta_with_id = meta.clone().with_id(doc_id.clone());
let rendered = render_frontmatter(&meta_with_id, &body);
write_atomic(&path, &rendered)?;
let baseline = Baseline {
base_clock: String::new(),
base_vector: String::new(),
md_hash: hash_string(&body),
meta_hash: hash_meta(&meta_with_id),
synced_at: timestamp,
};
{
let mut baselines = self.baselines.lock().await;
baselines.insert(doc_id.clone(), baseline.clone());
}
self.state_db.upsert_baseline(&doc_id, &baseline).await?;
}
self
.state_db
.append_event(None, "export-root-meta", "root metadata update applied")
.await?;
Ok(())
}
async fn apply_local_page_update(
&self,
doc_id: String,
update_bin: Vec<u8>,
editor: Option<String>,
timestamp: chrono::NaiveDateTime,
) -> Result<(), String> {
// Internal docs (e.g. `db$folders`) are not page documents and are not
// exportable to markdown. Avoid emitting noisy parser errors for them.
if doc_id.starts_with("db$") {
self
.state_db
.append_event(
Some(&doc_id),
"export-skip-internal",
"internal doc skipped (not a page)",
)
.await
.ok();
return Ok(());
}
let current_doc = {
let docs = self.docs.lock().await;
docs.get(&doc_id).cloned()
};
let merged_doc = match merge_update_binary(current_doc.as_deref(), &update_bin, Some(&doc_id)) {
Ok(merged_doc) => merged_doc,
Err(err) => {
// A single malformed document update must not break the whole sync loop.
// Otherwise every push retries globally and delays other documents.
if current_doc.is_some() && err.contains("failed to apply existing update") {
{
let mut docs = self.docs.lock().await;
docs.remove(&doc_id);
}
self
.state_db
.append_event(Some(&doc_id), "export-recover-reset-doc-cache", &err)
.await
.ok();
match merge_update_binary(None, &update_bin, Some(&doc_id)) {
Ok(recovered) => recovered,
Err(recover_err) => {
self
.state_db
.append_event(
Some(&doc_id),
"export-skip-invalid-update",
&format!("{err}; recover failed: {recover_err}"),
)
.await
.ok();
return Ok(());
}
}
} else {
self
.state_db
.append_event(Some(&doc_id), "export-skip-invalid-update", &err)
.await
.ok();
return Ok(());
}
}
};
{
let mut docs = self.docs.lock().await;
docs.insert(doc_id.clone(), merged_doc.clone());
}
let markdown = match parse_doc_to_markdown(merged_doc, doc_id.clone(), true, None) {
Ok(markdown) => markdown,
Err(err) => {
self
.state_db
.append_event(
Some(&doc_id),
"export-skip",
&format!("failed to convert doc {} to markdown: {}", doc_id, err),
)
.await?;
return Ok(());
}
};
let meta = self.meta_for_doc(&doc_id, Some(markdown.title)).await?;
let file_path = self.resolve_file_path(&doc_id, meta.title.as_deref()).await?;
// Avoid overwriting local filesystem edits that haven't been imported yet.
// This is especially important when multiple export passes happen (e.g. page
// update + root meta update) and users edit the markdown file in between
// them.
if self.is_markdown_dirty(&doc_id, &file_path).await {
self
.state_db
.append_event(Some(&doc_id), "export-skip-dirty", &file_path.to_string_lossy())
.await
.ok();
let _ = editor;
return Ok(());
}
let meta_with_id = meta.clone().with_id(doc_id.clone());
let rendered = render_frontmatter(&meta_with_id, &markdown.markdown);
write_atomic(&file_path, &rendered)?;
let baseline = Baseline {
base_clock: String::new(),
base_vector: String::new(),
md_hash: hash_string(&markdown.markdown),
meta_hash: hash_meta(&meta_with_id),
synced_at: timestamp,
};
{
let mut baselines = self.baselines.lock().await;
baselines.insert(doc_id.clone(), baseline.clone());
}
self.state_db.upsert_baseline(&doc_id, &baseline).await?;
{
let mut last_sync = self.last_sync.lock().await;
last_sync.insert(doc_id.clone(), timestamp);
}
self
.state_db
.append_event(Some(&doc_id), "export-page", &file_path.to_string_lossy())
.await?;
let _ = editor;
Ok(())
}
async fn is_markdown_dirty(&self, doc_id: &str, file_path: &Path) -> bool {
if !file_path.exists() {
return false;
}
// No baseline means the file is not tracked by this session yet.
// If it already exists, treat it as dirty and let the import path handle it.
let baseline = {
let baselines = self.baselines.lock().await;
baselines.get(doc_id).cloned()
};
let Some(baseline) = baseline else {
return true;
};
let raw = match fs::read_to_string(file_path) {
Ok(raw) => raw,
Err(err) => {
self
.state_db
.append_event(
Some(doc_id),
"export-skip-read-error",
&format!("{}: {}", file_path.display(), err),
)
.await
.ok();
return true;
}
};
let (meta, body) = parse_frontmatter(&raw);
let title = meta
.title
.clone()
.or_else(|| derive_title_from_markdown(&body))
.unwrap_or_else(|| derive_title_from_path(file_path));
let normalized_meta = FrontmatterMeta {
id: meta.id.clone().or_else(|| Some(doc_id.to_string())),
title: Some(title),
tags: normalize_tags(meta.tags.clone()),
favorite: Some(meta.favorite.unwrap_or(false)),
trash: Some(meta.trash.unwrap_or(false)),
};
let md_hash = hash_string(&body);
let meta_hash = hash_meta(&normalized_meta);
baseline.md_hash != md_hash || baseline.meta_hash != meta_hash
}
async fn meta_for_doc(&self, doc_id: &str, fallback_title: Option<String>) -> Result<FrontmatterMeta, String> {
let root = self.root_doc.lock().await.clone();
let mut meta = extract_root_meta_for_doc(&root, doc_id)?.unwrap_or_default();
if meta.title.is_none() {
meta.title = fallback_title;
}
if meta.tags.is_none() {
meta.tags = Some(Vec::new());
}
if meta.favorite.is_none() {
meta.favorite = Some(false);
}
if meta.trash.is_none() {
meta.trash = Some(false);
}
Ok(meta)
}
async fn resolve_file_path(&self, doc_id: &str, title_hint: Option<&str>) -> Result<PathBuf, String> {
if let Some(path) = self.bindings.lock().await.get(doc_id).cloned() {
return Ok(path);
}
let base_name = sanitize_file_stem(title_hint.unwrap_or(doc_id));
let mut index = 1usize;
loop {
let candidate_name = if index == 1 {
format!("{}.md", base_name)
} else {
format!("{}-{}.md", base_name, index)
};
let candidate = self.sync_folder.join(candidate_name);
let taken = {
let path_bindings = self.path_bindings.lock().await;
path_bindings.get(&candidate).cloned()
};
if let Some(existing_doc_id) = taken {
if existing_doc_id == doc_id {
return Ok(candidate);
}
index += 1;
continue;
}
{
let mut bindings = self.bindings.lock().await;
bindings.insert(doc_id.to_string(), candidate.clone());
}
{
let mut path_bindings = self.path_bindings.lock().await;
path_bindings.insert(candidate.clone(), doc_id.to_string());
}
self.state_db.upsert_binding(doc_id, &candidate).await?;
return Ok(candidate);
}
}
}

View File

@@ -0,0 +1,261 @@
use std::{
collections::HashMap,
fs,
path::{Path, PathBuf},
};
use sqlx::{
Pool, Row, Sqlite,
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
};
use super::{types::Baseline, utils::now_naive};
#[derive(Clone)]
pub(crate) struct StateDb {
workspace_id: String,
pool: Pool<Sqlite>,
}
impl StateDb {
pub(crate) async fn open(sync_folder: &Path, workspace_id: &str) -> Result<Self, String> {
let state_dir = sync_folder.join(".affine-sync");
fs::create_dir_all(&state_dir)
.map_err(|err| format!("failed to create state dir {}: {}", state_dir.display(), err))?;
let db_path = state_dir.join("state.db");
let connect_options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal);
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(connect_options)
.await
.map_err(|err| format!("failed to open state db {}: {}", db_path.display(), err))?;
let db = Self {
workspace_id: workspace_id.to_string(),
pool,
};
db.init().await?;
Ok(db)
}
async fn init(&self) -> Result<(), String> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY
);
"#,
)
.execute(&self.pool)
.await
.map_err(|err| format!("failed to create schema_version table: {}", err))?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS bindings (
workspace_id TEXT NOT NULL,
doc_id TEXT NOT NULL,
file_path TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
updated_at DATETIME NOT NULL,
PRIMARY KEY(workspace_id, doc_id)
);
"#,
)
.execute(&self.pool)
.await
.map_err(|err| format!("failed to create bindings table: {}", err))?;
sqlx::query(
r#"
CREATE UNIQUE INDEX IF NOT EXISTS idx_bindings_workspace_file
ON bindings(workspace_id, file_path);
"#,
)
.execute(&self.pool)
.await
.map_err(|err| format!("failed to create bindings index: {}", err))?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS baselines (
workspace_id TEXT NOT NULL,
doc_id TEXT NOT NULL,
base_clock TEXT NOT NULL,
base_vector TEXT NOT NULL,
md_hash TEXT NOT NULL,
meta_hash TEXT NOT NULL,
synced_at DATETIME NOT NULL,
PRIMARY KEY(workspace_id, doc_id)
);
"#,
)
.execute(&self.pool)
.await
.map_err(|err| format!("failed to create baselines table: {}", err))?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workspace_id TEXT NOT NULL,
doc_id TEXT,
kind TEXT NOT NULL,
ts DATETIME NOT NULL,
payload TEXT NOT NULL
);
"#,
)
.execute(&self.pool)
.await
.map_err(|err| format!("failed to create events table: {}", err))?;
sqlx::query("INSERT OR IGNORE INTO schema_version(version) VALUES (1);")
.execute(&self.pool)
.await
.map_err(|err| format!("failed to initialize schema_version: {}", err))?;
Ok(())
}
pub(crate) async fn load_bindings(&self) -> Result<HashMap<String, PathBuf>, String> {
let rows = sqlx::query(
r#"
SELECT doc_id, file_path
FROM bindings
WHERE workspace_id = ? AND enabled = 1;
"#,
)
.bind(&self.workspace_id)
.fetch_all(&self.pool)
.await
.map_err(|err| format!("failed to load bindings: {}", err))?;
let mut map = HashMap::new();
for row in rows {
let doc_id: String = row.get("doc_id");
let file_path: String = row.get("file_path");
map.insert(doc_id, PathBuf::from(file_path));
}
Ok(map)
}
pub(crate) async fn upsert_binding(&self, doc_id: &str, file_path: &Path) -> Result<(), String> {
sqlx::query(
r#"
INSERT INTO bindings (workspace_id, doc_id, file_path, enabled, updated_at)
VALUES (?, ?, ?, 1, ?)
ON CONFLICT(workspace_id, doc_id)
DO UPDATE SET
file_path = excluded.file_path,
enabled = 1,
updated_at = excluded.updated_at;
"#,
)
.bind(&self.workspace_id)
.bind(doc_id)
.bind(file_path.to_string_lossy().to_string())
.bind(now_naive())
.execute(&self.pool)
.await
.map_err(|err| format!("failed to upsert binding for doc {}: {}", doc_id, err))?;
Ok(())
}
pub(crate) async fn load_baselines(&self) -> Result<HashMap<String, Baseline>, String> {
let rows = sqlx::query(
r#"
SELECT doc_id, base_clock, base_vector, md_hash, meta_hash, synced_at
FROM baselines
WHERE workspace_id = ?;
"#,
)
.bind(&self.workspace_id)
.fetch_all(&self.pool)
.await
.map_err(|err| format!("failed to load baselines: {}", err))?;
let mut map = HashMap::new();
for row in rows {
let doc_id: String = row.get("doc_id");
map.insert(
doc_id,
Baseline {
base_clock: row.get("base_clock"),
base_vector: row.get("base_vector"),
md_hash: row.get("md_hash"),
meta_hash: row.get("meta_hash"),
synced_at: row.get("synced_at"),
},
);
}
Ok(map)
}
pub(crate) async fn upsert_baseline(&self, doc_id: &str, baseline: &Baseline) -> Result<(), String> {
sqlx::query(
r#"
INSERT INTO baselines (
workspace_id,
doc_id,
base_clock,
base_vector,
md_hash,
meta_hash,
synced_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(workspace_id, doc_id)
DO UPDATE SET
base_clock = excluded.base_clock,
base_vector = excluded.base_vector,
md_hash = excluded.md_hash,
meta_hash = excluded.meta_hash,
synced_at = excluded.synced_at;
"#,
)
.bind(&self.workspace_id)
.bind(doc_id)
.bind(&baseline.base_clock)
.bind(&baseline.base_vector)
.bind(&baseline.md_hash)
.bind(&baseline.meta_hash)
.bind(baseline.synced_at)
.execute(&self.pool)
.await
.map_err(|err| format!("failed to upsert baseline for doc {}: {}", doc_id, err))?;
Ok(())
}
pub(crate) async fn append_event(&self, doc_id: Option<&str>, kind: &str, payload: &str) -> Result<(), String> {
sqlx::query(
r#"
INSERT INTO events (workspace_id, doc_id, kind, ts, payload)
VALUES (?, ?, ?, ?, ?);
"#,
)
.bind(&self.workspace_id)
.bind(doc_id)
.bind(kind)
.bind(now_naive())
.bind(payload)
.execute(&self.pool)
.await
.map_err(|err| format!("failed to append event {}: {}", kind, err))?;
Ok(())
}
pub(crate) async fn close(&self) {
self.pool.close().await;
}
}

View File

@@ -0,0 +1,818 @@
use std::{
fs,
path::{Path, PathBuf},
};
use affine_common::doc_parser::{build_full_doc, update_doc};
use chrono::Utc;
use napi::bindgen_prelude::Uint8Array;
use uuid::Uuid;
use y_octo::{Any, DocOptions, Value};
use super::{
DiskDocUpdateInput, DiskSessionOptions, DiskSync, frontmatter::parse_frontmatter, root_meta::build_root_meta_update,
types::FrontmatterMeta, utils::collect_markdown_files,
};
fn temp_dir() -> PathBuf {
let dir = std::env::temp_dir().join(format!(
"affine-disk-sync-{}-{}-{}",
std::process::id(),
Utc::now().timestamp_nanos_opt().unwrap_or_default(),
Uuid::new_v4()
));
fs::create_dir_all(&dir).expect("create temp dir");
dir
}
fn build_doc_with_unsupported_block(doc_id: &str, title: &str, flavour: &str) -> Vec<u8> {
let doc = DocOptions::new().with_guid(doc_id.to_string()).build();
let mut blocks = doc.get_or_create_map("blocks").expect("create blocks map");
let mut page = doc.create_map().expect("create page block");
page.insert("sys:id".into(), "page").expect("set page id");
page
.insert("sys:flavour".into(), "affine:page")
.expect("set page flavour");
let mut page_children = doc.create_array().expect("create page children");
page_children.push("note").expect("append page child");
page
.insert("sys:children".into(), Value::Array(page_children))
.expect("set page children");
let mut page_title = doc.create_text().expect("create page title");
page_title.insert(0, title).expect("set page title");
page
.insert("prop:title".into(), Value::Text(page_title))
.expect("set page title prop");
blocks
.insert("page".into(), Value::Map(page))
.expect("insert page block");
let mut note = doc.create_map().expect("create note block");
note.insert("sys:id".into(), "note").expect("set note id");
note
.insert("sys:flavour".into(), "affine:note")
.expect("set note flavour");
let mut note_children = doc.create_array().expect("create note children");
note_children.push("unsupported").expect("append unsupported child");
note
.insert("sys:children".into(), Value::Array(note_children))
.expect("set note children");
note
.insert("prop:displayMode".into(), "page")
.expect("set note display mode");
blocks
.insert("note".into(), Value::Map(note))
.expect("insert note block");
let mut unsupported = doc.create_map().expect("create unsupported block");
unsupported
.insert("sys:id".into(), "unsupported")
.expect("set unsupported id");
unsupported
.insert("sys:flavour".into(), flavour)
.expect("set unsupported flavour");
unsupported
.insert(
"sys:children".into(),
Value::Array(doc.create_array().expect("create unsupported children")),
)
.expect("set unsupported children");
blocks
.insert("unsupported".into(), Value::Map(unsupported))
.expect("insert unsupported block");
doc.encode_update_v1().expect("encode unsupported doc")
}
async fn teardown(sync: &DiskSync, session_id: &str, dir: &Path) {
sync.stop_session(session_id.to_string()).await.expect("stop session");
if dir.exists() {
let _ = fs::remove_dir_all(dir);
}
}
fn is_numeric_any(value: &Any) -> bool {
match value {
Any::Integer(_) | Any::BigInt64(_) => true,
Any::Float32(v) => v.0.is_finite(),
Any::Float64(v) => v.0.is_finite(),
_ => false,
}
}
#[test]
fn parse_frontmatter_supported_fields() {
let raw = r#"---
id: doc-1
title: "Demo"
tags:
- alpha
- beta
favorite: true
trash: false
---
# Heading
Body.
"#;
let (meta, body) = parse_frontmatter(raw);
assert_eq!(meta.id.as_deref(), Some("doc-1"));
assert_eq!(meta.title.as_deref(), Some("Demo"));
assert_eq!(meta.tags, Some(vec!["alpha".to_string(), "beta".to_string()]));
assert_eq!(meta.favorite, Some(true));
assert_eq!(meta.trash, Some(false));
assert!(body.contains("# Heading"));
}
#[test]
fn parse_frontmatter_preserves_explicit_empty_title() {
let raw = r#"---
id: doc-empty-title
title: ""
---
Body
"#;
let (meta, _) = parse_frontmatter(raw);
assert_eq!(meta.id.as_deref(), Some("doc-empty-title"));
assert_eq!(meta.title.as_deref(), Some(""));
}
#[tokio::test]
async fn start_session_imports_markdown_and_creates_state_db() {
let dir = temp_dir();
let md_path = dir.join("doc-a.md");
fs::write(
&md_path,
"---\nid: doc-a\ntitle: A\ntags: [one,two]\n---\n\n# A\n\ncontent",
)
.expect("write markdown");
let sync = DiskSync::new();
let session_id = "session-import";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: "ws-a".to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let events = sync.pull_events(session_id.to_string()).await.expect("pull events");
assert!(events.iter().any(|event| event.r#type == "ready"));
assert!(events.iter().any(|event| {
event.r#type == "doc-update" && event.update.as_ref().is_some_and(|update| update.doc_id == "doc-a")
}));
assert!(events.iter().any(|event| {
event.r#type == "doc-update" && event.update.as_ref().is_some_and(|update| update.doc_id == "ws-a")
}));
assert!(dir.join(".affine-sync/state.db").exists());
teardown(&sync, session_id, &dir).await;
}
#[tokio::test]
async fn apply_local_update_exports_markdown_even_with_unsupported_block() {
let dir = temp_dir();
let sync = DiskSync::new();
let session_id = "session-export-unsupported";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: "ws-export-unsupported".to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let _ = sync.pull_events(session_id.to_string()).await.expect("pull");
let doc_bin = build_doc_with_unsupported_block("doc-unsupported", "Unsupported", "affine:latex");
sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: "doc-unsupported".to_string(),
bin: Uint8Array::new(doc_bin),
editor: Some("test".to_string()),
},
Some("origin:local".to_string()),
)
.await
.expect("apply local update");
let mut exported_files = Vec::new();
collect_markdown_files(&dir, &mut exported_files).expect("collect markdown files");
assert_eq!(exported_files.len(), 1);
let content = fs::read_to_string(&exported_files[0]).expect("read exported markdown");
assert!(content.contains("id: doc-unsupported"));
assert!(content.contains("unsupported_block_flavour:affine:latex"));
teardown(&sync, session_id, &dir).await;
}
#[tokio::test]
async fn apply_local_update_exports_markdown_with_stable_id() {
let dir = temp_dir();
let sync = DiskSync::new();
let session_id = "session-export";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: "ws-export".to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let _ = sync.pull_events(session_id.to_string()).await.expect("pull");
let doc_bin = build_full_doc("Exported", "# Exported\n\nHello", "doc-export").expect("build doc bin");
sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: "doc-export".to_string(),
bin: Uint8Array::new(doc_bin),
editor: Some("test".to_string()),
},
Some("origin:local".to_string()),
)
.await
.expect("apply local update");
let mut exported_files = Vec::new();
collect_markdown_files(&dir, &mut exported_files).expect("collect markdown files");
assert_eq!(exported_files.len(), 1);
let content = fs::read_to_string(&exported_files[0]).expect("read exported markdown");
assert!(content.contains("id: doc-export"));
assert!(content.contains("# Exported"));
teardown(&sync, session_id, &dir).await;
}
#[tokio::test]
async fn empty_title_export_does_not_trigger_self_import() {
let dir = temp_dir();
let sync = DiskSync::new();
let session_id = "session-empty-title";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: "ws-empty-title".to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let _ = sync.pull_events(session_id.to_string()).await.expect("pull first");
let doc_id = "doc-empty-title";
let doc_bin = build_full_doc("", "", doc_id).expect("build empty-title doc");
sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: doc_id.to_string(),
bin: Uint8Array::new(doc_bin),
editor: Some("test".to_string()),
},
Some("origin:local".to_string()),
)
.await
.expect("apply local update");
let events = sync
.pull_events(session_id.to_string())
.await
.expect("pull after export");
assert!(!events.iter().any(|event| {
event.r#type == "doc-update"
&& event.origin.as_deref() == Some("disk:file-import")
&& event.update.as_ref().is_some_and(|update| update.doc_id == doc_id)
}));
teardown(&sync, session_id, &dir).await;
}
#[tokio::test]
async fn invalid_local_update_does_not_block_other_docs_exports() {
let dir = temp_dir();
let sync = DiskSync::new();
let session_id = "session-invalid-update";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: "ws-invalid-update".to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let _ = sync.pull_events(session_id.to_string()).await.expect("pull first");
let doc_a_id = "doc-invalid-a";
let doc_a_bin = build_full_doc("A", "# A\n\none", doc_a_id).expect("build doc A");
sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: doc_a_id.to_string(),
bin: Uint8Array::new(doc_a_bin),
editor: Some("test".to_string()),
},
Some("origin:local".to_string()),
)
.await
.expect("apply doc A update");
let invalid = sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: doc_a_id.to_string(),
bin: Uint8Array::new(vec![1, 2, 3]),
editor: Some("test".to_string()),
},
Some("origin:local".to_string()),
)
.await;
assert!(invalid.is_ok());
let doc_b_id = "doc-valid-b";
let doc_b_bin = build_full_doc("B", "# B\n\ntwo", doc_b_id).expect("build doc B");
sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: doc_b_id.to_string(),
bin: Uint8Array::new(doc_b_bin),
editor: Some("test".to_string()),
},
Some("origin:local".to_string()),
)
.await
.expect("apply doc B update");
let mut exported_files = Vec::new();
collect_markdown_files(&dir, &mut exported_files).expect("collect markdown files");
assert!(!exported_files.is_empty());
let mut found_doc_b = false;
for file in exported_files {
let content = fs::read_to_string(file).expect("read markdown");
if content.contains(&format!("id: {doc_b_id}")) && content.contains("two") {
found_doc_b = true;
break;
}
}
assert!(found_doc_b);
teardown(&sync, session_id, &dir).await;
}
#[tokio::test]
async fn apply_local_root_update_skips_metadata_only_placeholder_without_doc_body() {
let dir = temp_dir();
let sync = DiskSync::new();
let session_id = "session-root-meta-export";
let workspace_id = "ws-root-meta-export";
let doc_id = "doc-root-meta";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: workspace_id.to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let _ = sync.pull_events(session_id.to_string()).await.expect("pull");
let root_update = build_root_meta_update(
&[],
workspace_id,
doc_id,
&FrontmatterMeta {
id: None,
title: Some("Root Meta Title".to_string()),
tags: Some(vec!["alpha".to_string()]),
favorite: Some(true),
trash: Some(false),
},
)
.expect("build root meta update");
sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: workspace_id.to_string(),
bin: Uint8Array::new(root_update),
editor: None,
},
Some("origin:root-meta".to_string()),
)
.await
.expect("apply root update");
let mut exported_files = Vec::new();
collect_markdown_files(&dir, &mut exported_files).expect("collect markdown files");
assert_eq!(exported_files.len(), 0);
teardown(&sync, session_id, &dir).await;
}
#[tokio::test]
async fn file_change_after_export_is_imported_into_workspace() {
let dir = temp_dir();
let sync = DiskSync::new();
let session_id = "session-export-import";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: "ws-export-import".to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let _ = sync.pull_events(session_id.to_string()).await.expect("pull first");
let doc_id = "doc-export-import";
let doc_bin = build_full_doc("Export", "# Export\n\none", doc_id).expect("build doc bin");
sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: doc_id.to_string(),
bin: Uint8Array::new(doc_bin),
editor: Some("test".to_string()),
},
Some("origin:local".to_string()),
)
.await
.expect("apply local update");
let mut exported_files = Vec::new();
collect_markdown_files(&dir, &mut exported_files).expect("collect markdown files");
assert_eq!(exported_files.len(), 1);
let file_path = exported_files[0].clone();
fs::write(
&file_path,
format!(
"---\nid: {doc_id}\ntitle: Export\ntags: [edited]\nfavorite: false\ntrash: false\n---\n\n# Export\n\nchanged"
),
)
.expect("edit exported markdown");
let events = sync
.pull_events(session_id.to_string())
.await
.expect("pull after local file edit");
assert!(events.iter().any(|event| {
event.r#type == "doc-update"
&& event.update.as_ref().is_some_and(|update| update.doc_id == doc_id)
&& event.origin.as_deref() == Some("disk:file-import")
}));
teardown(&sync, session_id, &dir).await;
}
#[tokio::test]
async fn code_block_update_keeps_markdown_exporting() {
let dir = temp_dir();
let sync = DiskSync::new();
let session_id = "session-code-block-export";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: "ws-code-block-export".to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let _ = sync.pull_events(session_id.to_string()).await.expect("pull first");
let doc_id = "doc-code-block";
let initial_doc = build_full_doc("Code", "# Code\n\nbefore", doc_id).expect("build initial doc");
sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: doc_id.to_string(),
bin: Uint8Array::new(initial_doc.clone()),
editor: Some("test".to_string()),
},
Some("origin:local".to_string()),
)
.await
.expect("apply initial doc");
let mut exported_files = Vec::new();
collect_markdown_files(&dir, &mut exported_files).expect("collect markdown files");
assert_eq!(exported_files.len(), 1);
let file_path = exported_files[0].clone();
let markdown_with_code = "# Code\n\n```js\nconsole.log(1)\n```\n";
let delta_code = update_doc(&initial_doc, markdown_with_code, doc_id).expect("build code block delta");
sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: doc_id.to_string(),
bin: Uint8Array::new(delta_code.clone()),
editor: Some("test".to_string()),
},
Some("origin:local".to_string()),
)
.await
.expect("apply code block delta");
let mut doc = DocOptions::new().with_guid(doc_id.to_string()).build();
doc
.apply_update_from_binary_v1(&initial_doc)
.expect("apply initial update");
doc.apply_update_from_binary_v1(&delta_code).expect("apply code update");
let merged_after_code = doc.encode_update_v1().expect("encode merged after code");
let markdown_after_code_edit = "# Code\n\n```js\nconsole.log(2)\n```\n\nnext\n";
let delta_after_code_edit =
update_doc(&merged_after_code, markdown_after_code_edit, doc_id).expect("build delta after code edit");
sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: doc_id.to_string(),
bin: Uint8Array::new(delta_after_code_edit),
editor: Some("test".to_string()),
},
Some("origin:local".to_string()),
)
.await
.expect("apply follow-up delta");
let exported = fs::read_to_string(&file_path).expect("read exported markdown");
assert!(exported.contains("```js"));
assert!(exported.contains("console.log(2)"));
assert!(exported.contains("next"));
teardown(&sync, session_id, &dir).await;
}
#[tokio::test]
async fn file_change_after_start_is_imported_via_pull_events() {
let dir = temp_dir();
let md_path = dir.join("doc-poll.md");
fs::write(&md_path, "---\nid: doc-poll\ntitle: Poll\n---\n\n# Poll\n\none").expect("write initial markdown");
let sync = DiskSync::new();
let session_id = "session-poll";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: "ws-poll".to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let _ = sync.pull_events(session_id.to_string()).await.expect("pull first");
fs::write(&md_path, "---\nid: doc-poll\ntitle: Poll\n---\n\n# Poll\n\ntwo").expect("write changed markdown");
let events = sync
.pull_events(session_id.to_string())
.await
.expect("pull after change");
assert!(events.iter().any(|event| {
event.r#type == "doc-update" && event.update.as_ref().is_some_and(|update| update.doc_id == "doc-poll")
}));
teardown(&sync, session_id, &dir).await;
}
#[tokio::test]
async fn import_without_title_allows_followup_local_export() {
let dir = temp_dir();
let md_path = dir.join("doc-no-title.md");
fs::write(&md_path, "# Imported\n\none").expect("write markdown");
let sync = DiskSync::new();
let session_id = "session-import-no-title";
let workspace_id = "ws-import-no-title";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: workspace_id.to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let events = sync.pull_events(session_id.to_string()).await.expect("pull first");
let imported = fs::read_to_string(&md_path).expect("read imported markdown");
let (meta, _) = parse_frontmatter(&imported);
let doc_id = meta.id.expect("doc id should be generated");
let imported_doc_bin = events
.iter()
.find_map(|event| {
event
.update
.as_ref()
.filter(|update| update.doc_id == doc_id)
.map(|update| update.bin.as_ref().to_vec())
})
.expect("imported page update");
let delta = update_doc(&imported_doc_bin, "# Imported\n\ntwo", &doc_id).expect("build local edit delta");
sync
.apply_local_update(
session_id.to_string(),
DiskDocUpdateInput {
doc_id: doc_id.clone(),
bin: Uint8Array::new(delta),
editor: Some("test".to_string()),
},
Some("origin:local".to_string()),
)
.await
.expect("apply local update");
let updated = fs::read_to_string(&md_path).expect("read markdown after local edit");
assert!(updated.contains("two"));
teardown(&sync, session_id, &dir).await;
}
#[tokio::test]
async fn import_sets_root_meta_create_and_updated_date() {
let dir = temp_dir();
let md_path = dir.join("doc-dates.md");
fs::write(&md_path, "# Dates\n\ncontent").expect("write markdown");
let sync = DiskSync::new();
let session_id = "session-import-dates";
let workspace_id = "ws-import-dates";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: workspace_id.to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let events = sync.pull_events(session_id.to_string()).await.expect("pull events");
let imported = fs::read_to_string(&md_path).expect("read imported markdown");
let (meta, _) = parse_frontmatter(&imported);
let doc_id = meta.id.expect("doc id should exist");
let root_update = events
.iter()
.find_map(|event| {
event
.update
.as_ref()
.filter(|update| update.doc_id == workspace_id)
.map(|update| update.bin.as_ref().to_vec())
})
.expect("root-meta update");
let mut root = DocOptions::new().with_guid(workspace_id.to_string()).build();
root
.apply_update_from_binary_v1(&root_update)
.expect("apply root-meta update");
let meta_map = root.get_map("meta").expect("meta map");
let pages = meta_map
.get("pages")
.and_then(|value| value.to_array())
.expect("pages array");
let page_map = pages
.iter()
.find_map(|value| {
let page = value.to_map()?;
let id = page
.get("id")
.and_then(|value| value.to_any())
.and_then(|any| match any {
Any::String(value) => Some(value),
_ => None,
})?;
if id == doc_id { Some(page) } else { None }
})
.expect("imported page meta");
let create_date = page_map
.get("createDate")
.and_then(|value| value.to_any())
.expect("createDate should exist");
assert!(is_numeric_any(&create_date));
let updated_date = page_map
.get("updatedDate")
.and_then(|value| value.to_any())
.expect("updatedDate should exist");
assert!(is_numeric_any(&updated_date));
teardown(&sync, session_id, &dir).await;
}
#[tokio::test]
async fn no_delete_policy_does_not_emit_doc_delete() {
let dir = temp_dir();
let md_path = dir.join("doc-delete.md");
fs::write(&md_path, "---\nid: doc-delete\ntitle: Delete\n---\n\n# Delete\n\none").expect("write markdown");
let sync = DiskSync::new();
let session_id = "session-no-delete";
sync
.start_session(
session_id.to_string(),
DiskSessionOptions {
workspace_id: "ws-delete".to_string(),
sync_folder: dir.to_string_lossy().to_string(),
},
)
.await
.expect("start session");
let _ = sync.pull_events(session_id.to_string()).await.expect("pull first");
fs::remove_file(&md_path).expect("remove markdown file");
let events = sync
.pull_events(session_id.to_string())
.await
.expect("pull after delete");
assert!(!events.iter().any(|event| event.r#type == "doc-delete"));
teardown(&sync, session_id, &dir).await;
}

View File

@@ -0,0 +1,26 @@
use chrono::NaiveDateTime;
#[derive(Clone)]
pub(crate) struct Baseline {
pub(crate) base_clock: String,
pub(crate) base_vector: String,
pub(crate) md_hash: String,
pub(crate) meta_hash: String,
pub(crate) synced_at: NaiveDateTime,
}
#[derive(Clone, Debug, Default)]
pub(crate) struct FrontmatterMeta {
pub(crate) id: Option<String>,
pub(crate) title: Option<String>,
pub(crate) tags: Option<Vec<String>>,
pub(crate) favorite: Option<bool>,
pub(crate) trash: Option<bool>,
}
impl FrontmatterMeta {
pub(crate) fn with_id(mut self, id: String) -> Self {
self.id = Some(id);
self
}
}

View File

@@ -0,0 +1,250 @@
use std::{
fs,
path::{Path, PathBuf},
};
use chrono::{DateTime, NaiveDateTime, Utc};
use sha3::{Digest, Sha3_256};
use y_octo::{Doc, DocOptions, StateVector};
use super::{frontmatter::normalize_tags, types::FrontmatterMeta};
pub(crate) fn collect_markdown_files(root: &Path, output: &mut Vec<PathBuf>) -> Result<(), String> {
let entries = fs::read_dir(root).map_err(|err| format!("failed to read directory {}: {}", root.display(), err))?;
for entry in entries {
let entry = entry.map_err(|err| format!("failed to read directory entry: {}", err))?;
let path = entry.path();
if path
.file_name()
.and_then(|name| name.to_str())
.is_some_and(|name| name == ".affine-sync")
{
continue;
}
if path.is_dir() {
collect_markdown_files(&path, output)?;
continue;
}
if path
.extension()
.and_then(|ext| ext.to_str())
.is_some_and(|ext| ext.eq_ignore_ascii_case("md"))
{
output.push(path);
}
}
Ok(())
}
pub(crate) fn generate_missing_doc_id(file_path: &Path) -> String {
let stem = file_path
.file_stem()
.and_then(|value| value.to_str())
.map(sanitize_file_stem)
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "doc".to_string());
format!("{}-{}", stem, Utc::now().timestamp_millis())
}
pub(crate) fn derive_title_from_markdown(markdown: &str) -> Option<String> {
for line in markdown.lines() {
let trimmed = line.trim();
if let Some(title) = trimmed.strip_prefix("# ") {
let title = title.trim();
if !title.is_empty() {
return Some(title.to_string());
}
}
}
None
}
pub(crate) fn derive_title_from_path(file_path: &Path) -> String {
file_path
.file_stem()
.and_then(|value| value.to_str())
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "Untitled".to_string())
}
pub(crate) fn sanitize_file_stem(input: &str) -> String {
let mut out = String::with_capacity(input.len());
for ch in input.chars() {
if ch.is_ascii_alphanumeric() {
out.push(ch.to_ascii_lowercase());
} else if (ch == '-' || ch == '_' || ch == ' ') && !out.ends_with('-') {
out.push('-');
}
}
let out = out.trim_matches('-').to_string();
if out.is_empty() { "doc".to_string() } else { out }
}
pub(crate) fn write_atomic(path: &Path, content: &str) -> Result<(), String> {
let parent = path
.parent()
.ok_or_else(|| format!("path {} has no parent directory", path.display()))?;
fs::create_dir_all(parent)
.map_err(|err| format!("failed to create parent directory {}: {}", parent.display(), err))?;
let temp_name = format!(
".affine-sync-tmp-{}-{}.md",
std::process::id(),
Utc::now().timestamp_millis()
);
let temp_path = parent.join(temp_name);
fs::write(&temp_path, content)
.map_err(|err| format!("failed to write temp file {}: {}", temp_path.display(), err))?;
// On Unix, `rename` replaces the destination atomically. Avoiding an explicit
// delete reduces "delete + create" file events, which can confuse file
// watchers/editors and cause apparent content flapping.
//
// On Windows, `rename` fails if destination exists, so we remove first.
#[cfg(windows)]
{
if path.exists() {
fs::remove_file(path).map_err(|err| format!("failed to replace file {}: {}", path.display(), err))?;
}
}
fs::rename(&temp_path, path).map_err(|err| {
format!(
"failed to move temp file {} to {}: {}",
temp_path.display(),
path.display(),
err
)
})?;
Ok(())
}
pub(crate) fn hash_string(value: &str) -> String {
let mut hasher = Sha3_256::new();
hasher.update(value.as_bytes());
let digest = hasher.finalize();
let mut out = String::with_capacity(digest.len() * 2);
for byte in digest {
out.push(hex_char(byte >> 4));
out.push(hex_char(byte & 0x0f));
}
out
}
pub(crate) fn hash_meta(meta: &FrontmatterMeta) -> String {
let mut canonical = String::new();
canonical.push_str("id=");
canonical.push_str(meta.id.as_deref().unwrap_or_default());
canonical.push_str("|title=");
canonical.push_str(meta.title.as_deref().unwrap_or_default());
canonical.push_str("|tags=");
if let Some(tags) = normalize_tags(meta.tags.clone()) {
canonical.push_str(&tags.join("\u{1f}"));
}
canonical.push_str("|favorite=");
canonical.push_str(if meta.favorite.unwrap_or(false) {
"true"
} else {
"false"
});
canonical.push_str("|trash=");
canonical.push_str(if meta.trash.unwrap_or(false) { "true" } else { "false" });
hash_string(&canonical)
}
fn hex_char(value: u8) -> char {
match value {
0..=9 => (b'0' + value) as char,
10..=15 => (b'a' + (value - 10)) as char,
_ => '0',
}
}
pub(crate) fn now_naive() -> NaiveDateTime {
DateTime::from_timestamp_millis(Utc::now().timestamp_millis())
.unwrap_or_else(Utc::now)
.naive_utc()
}
pub(crate) fn is_empty_update(value: &[u8]) -> bool {
value.is_empty() || value == [0, 0]
}
pub(crate) fn merge_update_binary(
existing: Option<&[u8]>,
update: &[u8],
doc_id: Option<&str>,
) -> Result<Vec<u8>, String> {
let mut doc = if let Some(existing) = existing {
if is_empty_update(existing) {
build_doc(doc_id)
} else {
let mut doc = build_doc(doc_id);
doc
.apply_update_from_binary_v1(existing)
.map_err(|err| format!("failed to apply existing update: {}", err))?;
doc
}
} else {
build_doc(doc_id)
};
if !is_empty_update(update) {
doc
.apply_update_from_binary_v1(update)
.map_err(|err| format!("failed to merge update: {}", err))?;
}
doc
.encode_state_as_update_v1(&StateVector::default())
.map_err(|err| format!("failed to encode merged update: {}", err))
}
pub(crate) fn build_doc(doc_id: Option<&str>) -> Doc {
let options = DocOptions::new();
match doc_id {
Some(doc_id) => options.with_guid(doc_id.to_string()).build(),
None => options.build(),
}
}
pub(crate) fn load_doc_or_new(binary: &[u8], doc_id: Option<&str>) -> Result<Doc, String> {
if is_empty_update(binary) {
return Ok(build_doc(doc_id));
}
let mut doc = build_doc(doc_id);
doc
.apply_update_from_binary_v1(binary)
.map_err(|err| format!("failed to decode doc binary: {}", err))?;
Ok(doc)
}
pub(crate) fn paths_equal(lhs: &Path, rhs: &Path) -> bool {
if lhs == rhs {
return true;
}
match (lhs.canonicalize(), rhs.canonicalize()) {
(Ok(lhs), Ok(rhs)) => lhs == rhs,
_ => false,
}
}

View File

@@ -1,3 +1,4 @@
pub mod disk_sync;
pub mod hashcash;
#[cfg(not(target_arch = "arm"))]
@@ -8,3 +9,4 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
pub use affine_media_capture::*;
pub use affine_nbstore::*;
pub use affine_sqlite_v1::*;
pub use disk_sync::*;

View File

@@ -0,0 +1,588 @@
import path from 'node:path';
import type { apis } from '@affine/electron-api';
import { test } from '@affine-test/kit/electron';
import {
addDatabase,
clickNewPageButton,
getPageByTitle,
waitForAllPagesLoad,
waitForEditorLoad,
} from '@affine-test/kit/utils/page-logic';
import { clickSideBarAllPageButton } from '@affine-test/kit/utils/sidebar';
import {
createLocalWorkspace,
openWorkspaceListModal,
} from '@affine-test/kit/utils/workspace';
import { expect } from '@playwright/test';
import fs from 'fs-extra';
declare global {
interface Window {
__apis: typeof apis;
__events?: any;
}
}
async function collectMarkdownFiles(root: string): Promise<string[]> {
const entries = await fs.readdir(root, { withFileTypes: true });
const out: string[] = [];
for (const entry of entries) {
if (entry.name === '.affine-sync') {
continue;
}
const fullPath = path.join(root, entry.name);
if (entry.isDirectory()) {
out.push(...(await collectMarkdownFiles(fullPath)));
continue;
}
if (entry.isFile() && entry.name.toLowerCase().endsWith('.md')) {
out.push(fullPath);
}
}
return out;
}
async function findMarkdownFileContaining(
root: string,
needle: string
): Promise<string | null> {
const files = await collectMarkdownFiles(root);
for (const file of files) {
const content = await fs.readFile(file, 'utf8');
if (content.includes(needle)) {
return file;
}
}
return null;
}
async function ensureWorkspaceSelected(page: any, name: string) {
const currentName =
(await page
.getByTestId('app-sidebar')
.getByTestId('workspace-name')
.textContent()
.catch(() => null)) ?? '';
if (currentName.trim() === name) {
return;
}
await openWorkspaceListModal(page);
// Workspace cards are rendered in the selector popup.
await page
.getByTestId('workspace-card')
.filter({ hasText: name })
.first()
.click();
await expect(
page.getByTestId('app-sidebar').getByTestId('workspace-name')
).toHaveText(name, { timeout: 10_000 });
await waitForEditorLoad(page);
}
async function assertNbstoreOpenedWithDiskRemote(
page: any,
shell: any,
syncFolder: string
) {
const opened = async () => {
const [pageOpenStoreLogs, shellOpenStoreLogs] = await Promise.all([
page.evaluate(() => {
return (globalThis as any).__e2eNbstoreOpenStoreLogs ?? [];
}),
shell.evaluate(() => {
return (globalThis as any).__e2eNbstoreOpenStoreLogs ?? [];
}),
]);
const openStoreLogs = [...pageOpenStoreLogs, ...shellOpenStoreLogs];
return openStoreLogs.some(
(l: any) =>
l?.remotes?.includes?.('disk') && l?.diskSyncFolder === syncFolder
);
};
try {
await expect.poll(opened, { timeout: 20_000 }).toBe(true);
} catch {
const [pageOpenStoreLogs, shellOpenStoreLogs] = await Promise.all([
page.evaluate(() => {
return (globalThis as any).__e2eNbstoreOpenStoreLogs ?? [];
}),
shell.evaluate(() => {
return (globalThis as any).__e2eNbstoreOpenStoreLogs ?? [];
}),
]);
throw new Error(
`nbstore.openStore did not include disk remote (expected syncFolder=${syncFolder}). ` +
`PageLogs: ${JSON.stringify(pageOpenStoreLogs.slice(-10), null, 2)} ` +
`ShellLogs: ${JSON.stringify(shellOpenStoreLogs.slice(-10), null, 2)}`
);
}
}
test('disk markdown sync: export/update/import', async ({
page,
shell,
appInfo,
workspace,
}) => {
test.setTimeout(120_000);
const runId = Date.now();
const workspaceName = `disk-sync-e2e-${runId}`;
await createLocalWorkspace({ name: workspaceName }, page);
const titleA = `disk-sync-a-${runId}`;
const bodyA1 = `SYNC_E2E_BODY_A_${runId}`;
await clickNewPageButton(page, titleA);
await page.locator('affine-note').first().click();
await page.keyboard.type(bodyA1);
const titleB = `disk-sync-b-${runId}`;
const bodyB1 = `SYNC_E2E_BODY_B_${runId}`;
await clickNewPageButton(page, titleB);
await page.locator('affine-note').first().click();
await page.keyboard.type(bodyB1);
const w = await workspace.current();
const syncFolder = path.join(appInfo.sessionData, 'disk-sync-e2e', w.meta.id);
await fs.emptyDir(syncFolder);
// Configure via globalState directly to avoid coupling this E2E to the UI panel.
const maybeAutoReload = page
.waitForNavigation({ waitUntil: 'domcontentloaded', timeout: 20_000 })
.catch(() => null);
await page.evaluate(
async ({ workspaceId, folder }) => {
const apis = (window as any).__apis;
if (!apis?.sharedStorage?.setGlobalState) {
throw new Error('sharedStorage api is not available');
}
// FeatureFlagService will reload the page when the flag changes.
// Override it temporarily so we can persist state first, then reload from the test.
const loc = window.location as any;
const originalReload = loc.reload?.bind(loc);
try {
loc.reload = () => {};
} catch {
// ignore if it is not writable
}
await apis.sharedStorage.setGlobalState(
'workspace-engine:disk-sync-folders:v1',
{
[workspaceId]: folder,
}
);
await apis.sharedStorage.setGlobalState(
'affine-flag:enable_disk_sync',
true
);
try {
loc.reload = originalReload;
} catch {
// ignore if it is not writable
}
},
{ workspaceId: w.meta.id, folder: syncFolder }
);
await maybeAutoReload;
// If we blocked the auto reload, reload now so workspace-engine can pick up the remote options.
// If the app already navigated, Playwright will throw ERR_ABORTED here; just ignore.
try {
await page.reload({ waitUntil: 'domcontentloaded' });
} catch {}
await waitForEditorLoad(page);
await ensureWorkspaceSelected(page, workspaceName);
await workspace.current();
const folderConfig = await page.evaluate(
({ workspaceId }) => {
const gs = (globalThis as any).__sharedStorage?.globalState;
const folders = gs?.get('workspace-engine:disk-sync-folders:v1');
return {
hasSharedStorage: !!gs,
enabled: gs?.get('affine-flag:enable_disk_sync'),
folder: folders?.[workspaceId] ?? null,
folderKeyType: typeof folders,
};
},
{ workspaceId: w.meta.id }
);
expect(folderConfig.hasSharedStorage).toBe(true);
expect(folderConfig.enabled).toBe(true);
expect(folderConfig.folder).toBe(syncFolder);
await assertNbstoreOpenedWithDiskRemote(page, shell, syncFolder);
// Collect disk events for debugging and for asserting the import pipeline actually fired.
await page.evaluate(() => {
(globalThis as any).__e2eDiskEvents = [];
const onEvent = (window as any).__events?.diskSync?.onEvent;
if (typeof onEvent !== 'function') {
throw new Error('diskSync event api is not available');
}
const off = onEvent((payload: any) => {
const ev = payload?.event;
const update = ev?.update;
(globalThis as any).__e2eDiskEvents.push({
sessionId: payload?.sessionId,
type: ev?.type,
origin: ev?.origin,
docId: update?.docId ?? ev?.docId ?? null,
timestamp: (update?.timestamp ?? ev?.timestamp ?? null)?.toString?.(),
binLen: update?.bin?.length ?? null,
});
});
(globalThis as any).__e2eDiskEventsOff = off;
});
// 1) First-time linking: existing workspace docs should be exported to Markdown.
await expect
.poll(() => findMarkdownFileContaining(syncFolder, bodyA1), {
timeout: 30_000,
})
.not.toBeNull();
await expect
.poll(() => findMarkdownFileContaining(syncFolder, bodyB1), {
timeout: 30_000,
})
.not.toBeNull();
const fileA = await findMarkdownFileContaining(syncFolder, bodyA1);
if (!fileA) {
throw new Error('exported markdown for doc A not found');
}
// 2) Workspace changes should update the corresponding Markdown file.
await clickSideBarAllPageButton(page);
await waitForAllPagesLoad(page);
await getPageByTitle(page, titleA).click();
await waitForEditorLoad(page);
const bodyA2 = `SYNC_E2E_BODY_A_UPDATE_${runId}`;
await page.locator('affine-note').first().click();
await page.keyboard.press('Enter');
await page.keyboard.type(bodyA2);
await expect
.poll(async () => (await fs.readFile(fileA, 'utf8')).includes(bodyA2), {
timeout: 30_000,
})
.toBe(true);
// 3) Local Markdown changes should be imported back into the workspace.
const mdEdit = `SYNC_E2E_MD_EDIT_${runId}`;
const previous = await fs.readFile(fileA, 'utf8');
await fs.writeFile(fileA, previous + `\n\n${mdEdit}\n`, 'utf8');
// Ensure the disk import pipeline actually emitted an event for the file edit.
await expect
.poll(
() =>
page.evaluate(() => {
const events = (globalThis as any).__e2eDiskEvents ?? [];
return events.some(
(e: any) =>
e?.type === 'doc-update' && e?.origin === 'disk:file-import'
);
}),
{ timeout: 30_000 }
)
.toBe(true);
const note = page.locator('affine-note').first();
await expect(note.getByText(mdEdit)).toBeVisible({
timeout: 30_000,
});
});
test('disk markdown sync: switching folders re-exports existing docs', async ({
page,
shell,
appInfo,
workspace,
}) => {
test.setTimeout(150_000);
const runId = Date.now();
const workspaceName = `disk-sync-switch-${runId}`;
await createLocalWorkspace({ name: workspaceName }, page);
const title = `disk-sync-switch-page-${runId}`;
const body = `SYNC_E2E_SWITCH_BODY_${runId}`;
await clickNewPageButton(page, title);
await page.locator('affine-note').first().click();
await page.keyboard.type(body);
const w = await workspace.current();
const folderA = path.join(
appInfo.sessionData,
'disk-sync-e2e-switch',
w.meta.id,
'a'
);
const folderB = path.join(
appInfo.sessionData,
'disk-sync-e2e-switch',
w.meta.id,
'b'
);
await fs.emptyDir(folderA);
await fs.emptyDir(folderB);
const setFolder = async (folder: string) => {
const maybeAutoReload = page
.waitForNavigation({ waitUntil: 'domcontentloaded', timeout: 20_000 })
.catch(() => null);
await page.evaluate(
async ({ workspaceId, folder }) => {
const apis = (window as any).__apis;
if (!apis?.sharedStorage?.setGlobalState) {
throw new Error('sharedStorage api is not available');
}
const loc = window.location as any;
const originalReload = loc.reload?.bind(loc);
try {
loc.reload = () => {};
} catch {}
await apis.sharedStorage.setGlobalState(
'workspace-engine:disk-sync-folders:v1',
{
[workspaceId]: folder,
}
);
await apis.sharedStorage.setGlobalState(
'affine-flag:enable_disk_sync',
true
);
try {
loc.reload = originalReload;
} catch {}
},
{ workspaceId: w.meta.id, folder }
);
await maybeAutoReload;
try {
await page.reload({ waitUntil: 'domcontentloaded' });
} catch {}
await waitForEditorLoad(page);
await ensureWorkspaceSelected(page, workspaceName);
await workspace.current();
const folderConfig = await page.evaluate(
({ workspaceId, expectedFolder }) => {
const gs = (globalThis as any).__sharedStorage?.globalState;
const folders = gs?.get('workspace-engine:disk-sync-folders:v1');
return {
hasSharedStorage: !!gs,
enabled: gs?.get('affine-flag:enable_disk_sync'),
folder: folders?.[workspaceId] ?? null,
expected: expectedFolder,
};
},
{ workspaceId: w.meta.id, expectedFolder: folder }
);
expect(folderConfig.hasSharedStorage).toBe(true);
expect(folderConfig.enabled).toBe(true);
expect(folderConfig.folder).toBe(folder);
await assertNbstoreOpenedWithDiskRemote(page, shell, folder);
};
// First bind: export should appear in folder A.
await setFolder(folderA);
await expect
.poll(() => findMarkdownFileContaining(folderA, body), { timeout: 30_000 })
.not.toBeNull();
// Switch to a brand new empty folder: export should appear again in folder B.
await setFolder(folderB);
await expect
.poll(() => findMarkdownFileContaining(folderB, body), { timeout: 30_000 })
.not.toBeNull();
});
test('disk markdown sync: preserves database blocks', async ({
page,
shell,
appInfo,
workspace,
}) => {
test.setTimeout(120_000);
const runId = Date.now();
const workspaceName = `disk-sync-db-e2e-${runId}`;
await createLocalWorkspace({ name: workspaceName }, page);
const title = `disk-sync-db-${runId}`;
const dbTitle = `SYNC_E2E_DB_TITLE_${runId}`;
await clickNewPageButton(page, title);
await page.locator('affine-note').first().click();
await page.keyboard.type(`SYNC_E2E_DB_BODY_${runId}`);
await page.keyboard.press('Enter');
await addDatabase(page, dbTitle);
const w = await workspace.current();
const syncFolder = path.join(
appInfo.sessionData,
'disk-sync-db-e2e',
w.meta.id
);
await fs.emptyDir(syncFolder);
const maybeAutoReload = page
.waitForNavigation({ waitUntil: 'domcontentloaded', timeout: 20_000 })
.catch(() => null);
await page.evaluate(
async ({ workspaceId, folder }) => {
const apis = (window as any).__apis;
if (!apis?.sharedStorage?.setGlobalState) {
throw new Error('sharedStorage api is not available');
}
const loc = window.location as any;
const originalReload = loc.reload?.bind(loc);
try {
loc.reload = () => {};
} catch {}
await apis.sharedStorage.setGlobalState(
'workspace-engine:disk-sync-folders:v1',
{
[workspaceId]: folder,
}
);
await apis.sharedStorage.setGlobalState(
'affine-flag:enable_disk_sync',
true
);
try {
loc.reload = originalReload;
} catch {}
},
{ workspaceId: w.meta.id, folder: syncFolder }
);
await maybeAutoReload;
try {
await page.reload({ waitUntil: 'domcontentloaded' });
} catch {}
await waitForEditorLoad(page);
await ensureWorkspaceSelected(page, workspaceName);
await workspace.current();
const folderConfig = await page.evaluate(
({ workspaceId, folder }) => {
const gs = (globalThis as any).__sharedStorage?.globalState;
const folders = gs?.get('workspace-engine:disk-sync-folders:v1');
return {
hasSharedStorage: !!gs,
enabled: gs?.get('affine-flag:enable_disk_sync'),
folder: folders?.[workspaceId] ?? null,
expected: folder,
};
},
{ workspaceId: w.meta.id, folder: syncFolder }
);
expect(folderConfig.hasSharedStorage).toBe(true);
expect(folderConfig.enabled).toBe(true);
expect(folderConfig.folder).toBe(syncFolder);
await assertNbstoreOpenedWithDiskRemote(page, shell, syncFolder);
// Ensure we're viewing the target page so UI assertions below are stable.
await clickSideBarAllPageButton(page);
await waitForAllPagesLoad(page);
await getPageByTitle(page, title).click();
await waitForEditorLoad(page);
await page.evaluate(() => {
(globalThis as any).__e2eDiskEvents = [];
const onEvent = (window as any).__events?.diskSync?.onEvent;
if (typeof onEvent !== 'function') {
throw new Error('diskSync event api is not available');
}
const off = onEvent((payload: any) => {
const ev = payload?.event;
const update = ev?.update;
(globalThis as any).__e2eDiskEvents.push({
sessionId: payload?.sessionId,
type: ev?.type,
origin: ev?.origin,
docId: update?.docId ?? ev?.docId ?? null,
timestamp: (update?.timestamp ?? ev?.timestamp ?? null)?.toString?.(),
binLen: update?.bin?.length ?? null,
});
});
(globalThis as any).__e2eDiskEventsOff = off;
});
await expect
.poll(() => findMarkdownFileContaining(syncFolder, dbTitle), {
timeout: 30_000,
})
.not.toBeNull();
const mdFile = await findMarkdownFileContaining(syncFolder, dbTitle);
if (!mdFile) {
throw new Error('exported markdown for db doc not found');
}
// Ensure the exported file includes the database end marker so we can append after it.
await expect
.poll(
async () =>
(await fs.readFile(mdFile, 'utf8')).includes(
'flavour=affine:database end'
),
{
timeout: 30_000,
}
)
.toBe(true);
const mdEdit = `SYNC_E2E_DB_MD_EDIT_${runId}`;
const previous = await fs.readFile(mdFile, 'utf8');
await fs.writeFile(mdFile, previous + `\n\n${mdEdit}\n`, 'utf8');
await expect
.poll(
() =>
page.evaluate(() => {
const events = (globalThis as any).__e2eDiskEvents ?? [];
return events.some(
(e: any) =>
e?.type === 'doc-update' && e?.origin === 'disk:file-import'
);
}),
{ timeout: 30_000 }
)
.toBe(true);
await expect(
page.locator('affine-note').first().getByText(mdEdit)
).toBeVisible({
timeout: 30_000,
});
// Database block should remain a database, not be replaced by markdown blocks.
await expect(page.getByTestId('dv-table-view').first()).toBeVisible({
timeout: 30_000,
});
});

View File

@@ -134,8 +134,12 @@ export const test = base.extend<{
}
}
env.DEBUG = 'pw:browser';
// Some environments set this for running Electron as plain Node.js.
// Playwright needs a real Electron instance to attach via DevTools.
delete env.ELECTRON_RUN_AS_NODE;
env.SKIP_ONBOARDING = '1';
env.AFFINE_E2E = env.AFFINE_E2E || '1';
const electronApp = await electron.launch({
args: [clonedDist],

View File

@@ -572,6 +572,7 @@ __metadata:
react-router-dom: "npm:^6.30.3"
typescript: "npm:^5.9.3"
uuid: "npm:^13.0.0"
vitest: "npm:^3.2.4"
webm-muxer: "npm:^5.0.3"
languageName: unknown
linkType: soft