mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-07-01 17:50:50 +08:00
feat: improve mobile native impl (#14481)
fix #13529 #### PR Dependency Tree * **PR #14481** 👈 This tree was auto-generated by [Charcoal](https://github.com/danerwilliams/charcoal) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Mobile blob caching with file-backed storage for faster loads and reduced network usage * Blob decoding with lazy refresh on token-read failures for improved reliability * Full-text search/indexing exposed to mobile apps * Document sync APIs and peer clock management for robust cross-device sync * **Tests** * Added unit tests covering payload decoding, cache safety, and concurrency * **Dependencies** * Added an LRU cache dependency and a new mobile-shared package for shared mobile logic <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
Generated
+11
@@ -111,10 +111,12 @@ dependencies = [
|
||||
"base64-simd",
|
||||
"chrono",
|
||||
"homedir",
|
||||
"lru",
|
||||
"objc2",
|
||||
"objc2-foundation",
|
||||
"sqlx",
|
||||
"thiserror 2.0.17",
|
||||
"tokio",
|
||||
"uniffi",
|
||||
]
|
||||
|
||||
@@ -2572,6 +2574,15 @@ dependencies = [
|
||||
"weezl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.16.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593"
|
||||
dependencies = [
|
||||
"hashbrown 0.16.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mac"
|
||||
version = "0.1.1"
|
||||
|
||||
@@ -46,6 +46,7 @@ resolver = "3"
|
||||
libc = "0.2"
|
||||
log = "0.4"
|
||||
loom = { version = "0.7", features = ["checkpoint"] }
|
||||
lru = "0.16"
|
||||
memory-indexer = "0.3.0"
|
||||
mimalloc = "0.1"
|
||||
mp4parse = "0.17"
|
||||
|
||||
@@ -167,14 +167,23 @@ afterEvaluate {
|
||||
}
|
||||
def buildType = "${variant.buildType.name.capitalize()}"
|
||||
tasks["generate${productFlavor}${buildType}Assets"].dependsOn(tasks["cargoBuild"])
|
||||
def variantName = "${variant.name.substring(0,1).toUpperCase()}${variant.name.substring(1)}"
|
||||
def mergeNativeLibsTask = tasks.findByName("merge${variantName}NativeLibs")
|
||||
if (mergeNativeLibsTask != null) {
|
||||
mergeNativeLibsTask.dependsOn(tasks["cargoBuild"])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def localProps = new Properties()
|
||||
file("${rootProject.projectDir}/local.properties").withInputStream { localProps.load(it) }
|
||||
def cargoPath = localProps.getProperty("rust.cargoCommand") ?: 'cargo'
|
||||
android.applicationVariants.configureEach { variant ->
|
||||
def t = tasks.register("generate${variant.name.capitalize()}UniFFIBindings", Exec) {
|
||||
def variantName = "${variant.name.substring(0,1).toUpperCase()}${variant.name.substring(1)}"
|
||||
def t = tasks.register("generate${variantName}UniFFIBindings", Exec) {
|
||||
workingDir "${project.projectDir}"
|
||||
// Runs the bindings generation, note that you must have uniffi-bindgen installed and in your PATH environment variable
|
||||
commandLine 'cargo', 'run', '--bin', 'uniffi-bindgen', 'generate', '--library', "${buildDir}/rustJniLibs/android/arm64-v8a/libaffine_mobile_native.so", '--language', 'kotlin', '--out-dir', "${project.projectDir}/src/main/java"
|
||||
commandLine "${cargoPath}", 'run', '--bin', 'uniffi-bindgen', 'generate', '--library', "${buildDir}/rustJniLibs/android/arm64-v8a/libaffine_mobile_native.so", '--language', 'kotlin', '--out-dir', "${project.projectDir}/src/main/java"
|
||||
dependsOn("cargoBuild")
|
||||
}
|
||||
variant.javaCompileProvider.get().dependsOn(t)
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
"@affine/core": "workspace:*",
|
||||
"@affine/env": "workspace:*",
|
||||
"@affine/i18n": "workspace:*",
|
||||
"@affine/mobile-shared": "workspace:*",
|
||||
"@affine/nbstore": "workspace:*",
|
||||
"@affine/track": "workspace:*",
|
||||
"@blocksuite/affine": "workspace:*",
|
||||
|
||||
@@ -2,7 +2,7 @@ import type { CrawlResult, DocIndexedClock } from '@affine/nbstore';
|
||||
|
||||
export interface Blob {
|
||||
key: string;
|
||||
// base64 encoded data
|
||||
// base64 encoded data, or "__AFFINE_BLOB_FILE__:<absolutePath>" for large blobs
|
||||
data: string;
|
||||
mime: string;
|
||||
size: number;
|
||||
@@ -41,6 +41,7 @@ export interface NbStorePlugin {
|
||||
pushUpdate: (options: {
|
||||
id: string;
|
||||
docId: string;
|
||||
// base64 encoded data
|
||||
data: string;
|
||||
}) => Promise<{ timestamp: number }>;
|
||||
getDocSnapshot: (options: { id: string; docId: string }) => Promise<
|
||||
@@ -55,6 +56,7 @@ export interface NbStorePlugin {
|
||||
setDocSnapshot: (options: {
|
||||
id: string;
|
||||
docId: string;
|
||||
// base64 encoded data
|
||||
bin: string;
|
||||
timestamp: number;
|
||||
}) => Promise<{ success: boolean }>;
|
||||
|
||||
@@ -2,6 +2,10 @@ import {
|
||||
base64ToUint8Array,
|
||||
uint8ArrayToBase64,
|
||||
} from '@affine/core/modules/workspace-engine';
|
||||
import {
|
||||
decodePayload,
|
||||
MOBILE_BLOB_FILE_PREFIX,
|
||||
} from '@affine/mobile-shared/nbstore/payload';
|
||||
import {
|
||||
type BlobRecord,
|
||||
type CrawlResult,
|
||||
@@ -132,14 +136,23 @@ export const NbStoreNativeDBApis: NativeDBApis = {
|
||||
id,
|
||||
key,
|
||||
});
|
||||
return record
|
||||
? {
|
||||
data: base64ToUint8Array(record.data),
|
||||
key: record.key,
|
||||
mime: record.mime,
|
||||
createdAt: new Date(record.createdAt),
|
||||
}
|
||||
: null;
|
||||
if (!record) {
|
||||
return null;
|
||||
}
|
||||
|
||||
let refreshedBlobPromise: ReturnType<typeof NbStore.getBlob> | undefined;
|
||||
|
||||
return {
|
||||
data: await decodePayload(record.data, MOBILE_BLOB_FILE_PREFIX, {
|
||||
onTokenReadFailure: async () => {
|
||||
refreshedBlobPromise ??= NbStore.getBlob({ id, key });
|
||||
return (await refreshedBlobPromise)?.data;
|
||||
},
|
||||
}),
|
||||
key: record.key,
|
||||
mime: record.mime,
|
||||
createdAt: new Date(record.createdAt),
|
||||
};
|
||||
},
|
||||
setBlob: async function (id: string, blob: BlobRecord): Promise<void> {
|
||||
await NbStore.setBlob({
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
{ "path": "../../core" },
|
||||
{ "path": "../../../common/env" },
|
||||
{ "path": "../../i18n" },
|
||||
{ "path": "../mobile-shared" },
|
||||
{ "path": "../../../common/nbstore" },
|
||||
{ "path": "../../track" },
|
||||
{ "path": "../../../../blocksuite/affine/all" },
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
"@affine/env": "workspace:*",
|
||||
"@affine/graphql": "workspace:*",
|
||||
"@affine/i18n": "workspace:*",
|
||||
"@affine/mobile-shared": "workspace:*",
|
||||
"@affine/nbstore": "workspace:*",
|
||||
"@affine/track": "workspace:*",
|
||||
"@blocksuite/affine": "workspace:*",
|
||||
|
||||
@@ -2,7 +2,7 @@ import type { CrawlResult, DocIndexedClock } from '@affine/nbstore';
|
||||
|
||||
export interface Blob {
|
||||
key: string;
|
||||
// base64 encoded data
|
||||
// base64 encoded data, or "__AFFINE_BLOB_FILE__:<absolutePath>" for large blobs
|
||||
data: string;
|
||||
mime: string;
|
||||
size: number;
|
||||
@@ -41,6 +41,7 @@ export interface NbStorePlugin {
|
||||
pushUpdate: (options: {
|
||||
id: string;
|
||||
docId: string;
|
||||
// base64 encoded data
|
||||
data: string;
|
||||
}) => Promise<{ timestamp: number }>;
|
||||
getDocSnapshot: (options: { id: string; docId: string }) => Promise<
|
||||
@@ -55,6 +56,7 @@ export interface NbStorePlugin {
|
||||
setDocSnapshot: (options: {
|
||||
id: string;
|
||||
docId: string;
|
||||
// base64 encoded data
|
||||
bin: string;
|
||||
timestamp: number;
|
||||
}) => Promise<{ success: boolean }>;
|
||||
|
||||
@@ -2,6 +2,10 @@ import {
|
||||
base64ToUint8Array,
|
||||
uint8ArrayToBase64,
|
||||
} from '@affine/core/modules/workspace-engine';
|
||||
import {
|
||||
decodePayload,
|
||||
MOBILE_BLOB_FILE_PREFIX,
|
||||
} from '@affine/mobile-shared/nbstore/payload';
|
||||
import {
|
||||
type BlobRecord,
|
||||
type CrawlResult,
|
||||
@@ -132,14 +136,23 @@ export const NbStoreNativeDBApis: NativeDBApis = {
|
||||
id,
|
||||
key,
|
||||
});
|
||||
return record
|
||||
? {
|
||||
data: base64ToUint8Array(record.data),
|
||||
key: record.key,
|
||||
mime: record.mime,
|
||||
createdAt: new Date(record.createdAt),
|
||||
}
|
||||
: null;
|
||||
if (!record) {
|
||||
return null;
|
||||
}
|
||||
|
||||
let refreshedBlobPromise: ReturnType<typeof NbStore.getBlob> | undefined;
|
||||
|
||||
return {
|
||||
data: await decodePayload(record.data, MOBILE_BLOB_FILE_PREFIX, {
|
||||
onTokenReadFailure: async () => {
|
||||
refreshedBlobPromise ??= NbStore.getBlob({ id, key });
|
||||
return (await refreshedBlobPromise)?.data;
|
||||
},
|
||||
}),
|
||||
key: record.key,
|
||||
mime: record.mime,
|
||||
createdAt: new Date(record.createdAt),
|
||||
};
|
||||
},
|
||||
setBlob: async function (id: string, blob: BlobRecord): Promise<void> {
|
||||
await NbStore.setBlob({
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
{ "path": "../../../common/env" },
|
||||
{ "path": "../../../common/graphql" },
|
||||
{ "path": "../../i18n" },
|
||||
{ "path": "../mobile-shared" },
|
||||
{ "path": "../../../common/nbstore" },
|
||||
{ "path": "../../track" },
|
||||
{ "path": "../../../../blocksuite/affine/all" },
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"name": "@affine/mobile-shared",
|
||||
"version": "0.26.1",
|
||||
"type": "module",
|
||||
"private": true,
|
||||
"sideEffects": false,
|
||||
"exports": {
|
||||
".": "./src/index.ts",
|
||||
"./nbstore/payload": "./src/nbstore/payload.ts"
|
||||
},
|
||||
"dependencies": {
|
||||
"@affine/core": "workspace:*",
|
||||
"@capacitor/core": "^7.0.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"typescript": "^5.7.2",
|
||||
"vitest": "^3.2.4"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
export * from './nbstore/payload';
|
||||
@@ -0,0 +1,145 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
const { mockBase64ToUint8Array, mockConvertFileSrc } = vi.hoisted(() => ({
|
||||
mockBase64ToUint8Array: vi.fn((data: string) =>
|
||||
Uint8Array.from(data.split('').map(char => char.charCodeAt(0)))
|
||||
),
|
||||
mockConvertFileSrc: vi.fn((path: string) => `capacitor://localhost${path}`),
|
||||
}));
|
||||
|
||||
vi.mock('@affine/core/modules/workspace-engine', () => ({
|
||||
base64ToUint8Array: mockBase64ToUint8Array,
|
||||
}));
|
||||
|
||||
vi.mock('@capacitor/core', () => ({
|
||||
Capacitor: {
|
||||
convertFileSrc: mockConvertFileSrc,
|
||||
},
|
||||
}));
|
||||
|
||||
import { decodePayload, MOBILE_BLOB_FILE_PREFIX } from './payload';
|
||||
|
||||
describe('decodePayload', () => {
|
||||
const fetchMock = vi.fn<typeof fetch>();
|
||||
|
||||
beforeEach(() => {
|
||||
fetchMock.mockReset();
|
||||
mockBase64ToUint8Array.mockClear();
|
||||
mockConvertFileSrc.mockClear();
|
||||
vi.stubGlobal('fetch', fetchMock);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.unstubAllGlobals();
|
||||
});
|
||||
|
||||
it('decodes inline base64 payloads without file IO', async () => {
|
||||
const decoded = await decodePayload('ZGF0YQ==', MOBILE_BLOB_FILE_PREFIX);
|
||||
expect(decoded).toEqual(Uint8Array.from([90, 71, 70, 48, 89, 81, 61, 61]));
|
||||
expect(mockBase64ToUint8Array).toHaveBeenCalledWith('ZGF0YQ==');
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('reads valid cache file tokens', async () => {
|
||||
const expected = Uint8Array.from([1, 2, 3, 4]);
|
||||
fetchMock.mockResolvedValue({
|
||||
ok: true,
|
||||
status: 200,
|
||||
arrayBuffer: async () => expected.buffer,
|
||||
} as Response);
|
||||
|
||||
const path =
|
||||
'/var/mobile/Containers/Data/Application/abc/Library/Caches/nbstore-blob-cache/0123456789abcdef/fedcba9876543210.blob';
|
||||
const decoded = await decodePayload(
|
||||
`${MOBILE_BLOB_FILE_PREFIX}${path}`,
|
||||
MOBILE_BLOB_FILE_PREFIX
|
||||
);
|
||||
|
||||
expect(decoded).toEqual(expected);
|
||||
expect(mockConvertFileSrc).toHaveBeenCalledWith(`file://${path}`);
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('reads valid android cache file tokens', async () => {
|
||||
const expected = Uint8Array.from([4, 3, 2, 1]);
|
||||
fetchMock.mockResolvedValue({
|
||||
ok: true,
|
||||
status: 200,
|
||||
arrayBuffer: async () => expected.buffer,
|
||||
} as Response);
|
||||
|
||||
const path =
|
||||
'/data/user/0/com.affine.app/cache/nbstore-blob-cache/0123456789abcdef/fedcba9876543210.blob';
|
||||
const decoded = await decodePayload(
|
||||
`${MOBILE_BLOB_FILE_PREFIX}${path}`,
|
||||
MOBILE_BLOB_FILE_PREFIX
|
||||
);
|
||||
|
||||
expect(decoded).toEqual(expected);
|
||||
expect(mockConvertFileSrc).toHaveBeenCalledWith(`file://${path}`);
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('rejects suffix-only paths outside expected cache shape', async () => {
|
||||
const path =
|
||||
'/attacker/nbstore-blob-cache/0123456789abcdef/fedcba9876543210.blob';
|
||||
await expect(
|
||||
decodePayload(
|
||||
`${MOBILE_BLOB_FILE_PREFIX}${path}`,
|
||||
MOBILE_BLOB_FILE_PREFIX
|
||||
)
|
||||
).rejects.toThrow('Refusing to read mobile payload outside cache dir');
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('rejects random cache roots', async () => {
|
||||
const path =
|
||||
'/random/cache/nbstore-blob-cache/0123456789abcdef/fedcba9876543210.blob';
|
||||
await expect(
|
||||
decodePayload(
|
||||
`${MOBILE_BLOB_FILE_PREFIX}${path}`,
|
||||
MOBILE_BLOB_FILE_PREFIX
|
||||
)
|
||||
).rejects.toThrow('Refusing to read mobile payload outside cache dir');
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('rejects encoded traversal segments', async () => {
|
||||
const path =
|
||||
'/var/mobile/Containers/Data/Application/abc/Library/Caches/nbstore-blob-cache/%2E%2E/fedcba9876543210.blob';
|
||||
await expect(
|
||||
decodePayload(
|
||||
`${MOBILE_BLOB_FILE_PREFIX}${path}`,
|
||||
MOBILE_BLOB_FILE_PREFIX
|
||||
)
|
||||
).rejects.toThrow('Refusing to read mobile payload outside cache dir');
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('retries once with refreshed payload when token read fails', async () => {
|
||||
const path =
|
||||
'/var/mobile/Containers/Data/Application/abc/Library/Caches/nbstore-blob-cache/0123456789abcdef/fedcba9876543210.blob';
|
||||
const payload = `${MOBILE_BLOB_FILE_PREFIX}${path}`;
|
||||
const expected = Uint8Array.from([9, 8, 7]);
|
||||
|
||||
fetchMock
|
||||
.mockResolvedValueOnce({
|
||||
ok: false,
|
||||
status: 404,
|
||||
} as Response)
|
||||
.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
status: 200,
|
||||
arrayBuffer: async () => expected.buffer,
|
||||
} as Response);
|
||||
|
||||
const reloadedPayload = vi.fn(async () => payload);
|
||||
const decoded = await decodePayload(payload, MOBILE_BLOB_FILE_PREFIX, {
|
||||
onTokenReadFailure: reloadedPayload,
|
||||
});
|
||||
|
||||
expect(decoded).toEqual(expected);
|
||||
expect(reloadedPayload).toHaveBeenCalledTimes(1);
|
||||
expect(fetchMock).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,167 @@
|
||||
import { base64ToUint8Array } from '@affine/core/modules/workspace-engine';
|
||||
import { Capacitor } from '@capacitor/core';
|
||||
|
||||
export const MOBILE_BLOB_FILE_PREFIX = '__AFFINE_BLOB_FILE__:';
|
||||
export const MOBILE_PAYLOAD_INLINE_THRESHOLD_BYTES = 1024 * 1024;
|
||||
const MOBILE_PAYLOAD_CACHE_DIR = 'nbstore-blob-cache';
|
||||
const MOBILE_PAYLOAD_BUCKET_PATTERN = /^[0-9a-f]{16}$/;
|
||||
const MOBILE_PAYLOAD_FILE_PATTERN = /^[0-9a-f]{16}\.blob$/;
|
||||
const MOBILE_PAYLOAD_PARENT_DIRS = new Set(['cache', 'Caches', 'T', 'tmp']);
|
||||
const MOBILE_ANDROID_PACKAGE_PATTERN =
|
||||
/^[A-Za-z][A-Za-z0-9_]*(\.[A-Za-z][A-Za-z0-9_]*)+$/;
|
||||
|
||||
function normalizeTokenFilePath(rawPath: string): string {
|
||||
const trimmedPath = rawPath.trim();
|
||||
if (!trimmedPath) {
|
||||
throw new Error('Invalid mobile payload token: empty file path');
|
||||
}
|
||||
|
||||
return trimmedPath.startsWith('file://')
|
||||
? trimmedPath
|
||||
: `file://${trimmedPath}`;
|
||||
}
|
||||
|
||||
function assertMobileCachePath(fileUrl: string): void {
|
||||
let pathname: string;
|
||||
try {
|
||||
const parsedUrl = new URL(fileUrl);
|
||||
if (parsedUrl.protocol !== 'file:') {
|
||||
throw new Error('unexpected protocol');
|
||||
}
|
||||
pathname = parsedUrl.pathname;
|
||||
} catch {
|
||||
throw new Error('Invalid mobile payload token: malformed file URL');
|
||||
}
|
||||
|
||||
let decodedSegments: string[];
|
||||
try {
|
||||
decodedSegments = pathname
|
||||
.split('/')
|
||||
.filter(Boolean)
|
||||
.map(segment => {
|
||||
const decoded = decodeURIComponent(segment);
|
||||
if (
|
||||
!decoded ||
|
||||
decoded === '.' ||
|
||||
decoded === '..' ||
|
||||
decoded.includes('/') ||
|
||||
decoded.includes('\\')
|
||||
) {
|
||||
throw new Error('path traversal');
|
||||
}
|
||||
return decoded;
|
||||
});
|
||||
} catch {
|
||||
throw new Error(
|
||||
`Refusing to read mobile payload outside cache dir: ${fileUrl}`
|
||||
);
|
||||
}
|
||||
|
||||
const fileName = decodedSegments.at(-1);
|
||||
const bucket = decodedSegments.at(-2);
|
||||
const cacheDir = decodedSegments.at(-3);
|
||||
const parentDir = decodedSegments.at(-4);
|
||||
const cacheParent = decodedSegments.at(-5);
|
||||
|
||||
if (
|
||||
!fileName ||
|
||||
!bucket ||
|
||||
!cacheDir ||
|
||||
!parentDir ||
|
||||
cacheDir !== MOBILE_PAYLOAD_CACHE_DIR ||
|
||||
!MOBILE_PAYLOAD_BUCKET_PATTERN.test(bucket) ||
|
||||
!MOBILE_PAYLOAD_FILE_PATTERN.test(fileName) ||
|
||||
!MOBILE_PAYLOAD_PARENT_DIRS.has(parentDir) ||
|
||||
!isAllowedCacheParent(decodedSegments, parentDir, cacheParent)
|
||||
) {
|
||||
throw new Error(
|
||||
`Refusing to read mobile payload outside cache dir: ${fileUrl}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function isAllowedCacheParent(
|
||||
parts: string[],
|
||||
parentDir: string,
|
||||
cacheParent: string | undefined
|
||||
): boolean {
|
||||
if (parentDir === 'Caches') {
|
||||
return cacheParent === 'Library' && ['var', 'private'].includes(parts[0]);
|
||||
}
|
||||
|
||||
if (parentDir === 'cache') {
|
||||
if (parts[0] !== 'data' || !cacheParent) {
|
||||
return false;
|
||||
}
|
||||
if (!MOBILE_ANDROID_PACKAGE_PATTERN.test(cacheParent)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (parts[1] === 'data') {
|
||||
return true;
|
||||
}
|
||||
if (parts[1] === 'user') {
|
||||
return !!parts[2] && /^[0-9]+$/.test(parts[2]);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
if (parentDir === 'tmp') {
|
||||
return (
|
||||
parts[0] === 'tmp' ||
|
||||
(parts[0] === 'private' && parts[1] === 'var' && parts[2] === 'tmp')
|
||||
);
|
||||
}
|
||||
|
||||
if (parentDir === 'T') {
|
||||
return parts[0] === 'var' && parts[1] === 'folders';
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
async function readTokenPayload(filePath: string): Promise<Uint8Array> {
|
||||
const response = await fetch(Capacitor.convertFileSrc(filePath));
|
||||
if (!response.ok) {
|
||||
throw new Error(
|
||||
`Failed to read mobile payload file: ${filePath} (status ${response.status})`
|
||||
);
|
||||
}
|
||||
|
||||
return new Uint8Array(await response.arrayBuffer());
|
||||
}
|
||||
|
||||
export interface DecodePayloadOptions {
|
||||
onTokenReadFailure?: (error: Error) => Promise<string | null | undefined>;
|
||||
}
|
||||
|
||||
export async function decodePayload(
|
||||
data: string,
|
||||
prefix: string,
|
||||
options?: DecodePayloadOptions
|
||||
): Promise<Uint8Array> {
|
||||
if (!data.startsWith(prefix)) {
|
||||
return base64ToUint8Array(data);
|
||||
}
|
||||
|
||||
const normalizedPath = normalizeTokenFilePath(data.slice(prefix.length));
|
||||
assertMobileCachePath(normalizedPath);
|
||||
|
||||
try {
|
||||
return await readTokenPayload(normalizedPath);
|
||||
} catch (error) {
|
||||
const reloadPayload = options?.onTokenReadFailure;
|
||||
if (!reloadPayload) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
const refreshedPayload = await reloadPayload(
|
||||
error instanceof Error ? error : new Error(String(error))
|
||||
);
|
||||
if (!refreshedPayload) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
return decodePayload(refreshedPayload, prefix);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
{
|
||||
"extends": "../../../../tsconfig.web.json",
|
||||
"compilerOptions": {
|
||||
"rootDir": "./src",
|
||||
"outDir": "./dist",
|
||||
"tsBuildInfoFile": "./dist/tsconfig.tsbuildinfo"
|
||||
},
|
||||
"include": ["./src"],
|
||||
"references": [{ "path": "../../core" }]
|
||||
}
|
||||
@@ -23,6 +23,7 @@ base64-simd = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
uniffi = { workspace = true, features = ["cli", "tokio"] }
|
||||
|
||||
[target.'cfg(any(target_os = "ios", target_os = "macos"))'.dependencies]
|
||||
@@ -38,5 +39,12 @@ objc2-foundation = { workspace = true, features = [
|
||||
[target.'cfg(not(any(target_os = "ios", target_os = "macos")))'.dependencies]
|
||||
homedir = { workspace = true }
|
||||
|
||||
[target.'cfg(any(target_os = "android", target_os = "ios"))'.dependencies]
|
||||
lru = { workspace = true }
|
||||
|
||||
[build-dependencies]
|
||||
uniffi = { workspace = true, features = ["build"] }
|
||||
|
||||
[dev-dependencies]
|
||||
lru = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
|
||||
+360
@@ -0,0 +1,360 @@
|
||||
use std::{
|
||||
collections::{HashMap, hash_map::DefaultHasher},
|
||||
hash::{Hash, Hasher},
|
||||
num::NonZeroUsize,
|
||||
path::{Path, PathBuf},
|
||||
sync::{Mutex, RwLock},
|
||||
};
|
||||
|
||||
use affine_nbstore::Blob as NbBlob;
|
||||
use lru::LruCache;
|
||||
|
||||
pub(crate) const MOBILE_PAYLOAD_INLINE_THRESHOLD_BYTES: usize = 1024 * 1024;
|
||||
const MOBILE_BLOB_MAX_READ_BYTES: u64 = 64 * 1024 * 1024;
|
||||
const MOBILE_BLOB_CACHE_CAPACITY: usize = 32;
|
||||
const MOBILE_BLOB_CACHE_DIR: &str = "nbstore-blob-cache";
|
||||
pub(crate) const MOBILE_BLOB_FILE_PREFIX: &str = "__AFFINE_BLOB_FILE__:";
|
||||
|
||||
pub(crate) fn should_cache_payload_as_file(payload_len: usize) -> bool {
|
||||
payload_len >= MOBILE_PAYLOAD_INLINE_THRESHOLD_BYTES
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MobileBlobCacheEntry {
|
||||
key: String,
|
||||
path: String,
|
||||
mime: String,
|
||||
size: i64,
|
||||
created_at: i64,
|
||||
}
|
||||
|
||||
impl MobileBlobCacheEntry {
|
||||
fn to_blob(&self) -> crate::Blob {
|
||||
crate::Blob {
|
||||
key: self.key.clone(),
|
||||
data: format!("{MOBILE_BLOB_FILE_PREFIX}{}", self.path),
|
||||
mime: self.mime.clone(),
|
||||
size: self.size,
|
||||
created_at: self.created_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct MobileBlobCache {
|
||||
workspace_dirs: RwLock<HashMap<String, PathBuf>>,
|
||||
blob_entries: Mutex<LruCache<String, MobileBlobCacheEntry>>,
|
||||
}
|
||||
|
||||
impl MobileBlobCache {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
workspace_dirs: RwLock::new(HashMap::new()),
|
||||
blob_entries: Mutex::new(LruCache::new(
|
||||
NonZeroUsize::new(MOBILE_BLOB_CACHE_CAPACITY).expect("cache capacity is non-zero"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn register_workspace(&self, universal_id: &str, database_path: &str) -> std::io::Result<()> {
|
||||
let cache_dir = Self::system_cache_dir(database_path, universal_id);
|
||||
|
||||
std::fs::create_dir_all(&cache_dir)?;
|
||||
Self::cleanup_cache_dir(&cache_dir)?;
|
||||
self
|
||||
.workspace_dirs
|
||||
.write()
|
||||
.expect("workspace cache lock poisoned")
|
||||
.insert(universal_id.to_string(), cache_dir);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_blob(&self, universal_id: &str, key: &str) -> Option<crate::Blob> {
|
||||
let cache_key = Self::cache_key(universal_id, key);
|
||||
let mut stale_path = None;
|
||||
{
|
||||
let mut blob_entries = self.blob_entries.lock().expect("blob cache lock poisoned");
|
||||
if let Some(entry) = blob_entries.get(&cache_key).cloned() {
|
||||
if Path::new(&entry.path).exists() {
|
||||
return Some(entry.to_blob());
|
||||
}
|
||||
stale_path = blob_entries.pop(&cache_key).map(|removed| removed.path);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(path) = stale_path {
|
||||
Self::delete_blob_file(&path);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn cache_blob(&self, universal_id: &str, blob: &NbBlob) -> std::io::Result<crate::Blob> {
|
||||
let cache_key = Self::cache_key(universal_id, &blob.key);
|
||||
let cache_dir = self.get_or_create_cache_dir(universal_id)?;
|
||||
|
||||
let file_path = Self::hashed_file_path(&cache_dir, &cache_key, "blob");
|
||||
std::fs::write(&file_path, &blob.data)?;
|
||||
|
||||
let entry = MobileBlobCacheEntry {
|
||||
key: blob.key.clone(),
|
||||
path: file_path.to_string_lossy().into_owned(),
|
||||
mime: blob.mime.clone(),
|
||||
size: blob.size,
|
||||
created_at: blob.created_at.and_utc().timestamp_millis(),
|
||||
};
|
||||
|
||||
let previous_path = {
|
||||
self
|
||||
.blob_entries
|
||||
.lock()
|
||||
.expect("blob cache lock poisoned")
|
||||
.push(cache_key, entry.clone())
|
||||
.and_then(|(_previous_key, previous)| (previous.path != entry.path).then_some(previous.path))
|
||||
};
|
||||
if let Some(previous_path) = previous_path {
|
||||
Self::delete_blob_file(&previous_path);
|
||||
}
|
||||
|
||||
Ok(entry.to_blob())
|
||||
}
|
||||
|
||||
pub(crate) fn invalidate_blob(&self, universal_id: &str, key: &str) {
|
||||
let cache_key = Self::cache_key(universal_id, key);
|
||||
let removed_path = self
|
||||
.blob_entries
|
||||
.lock()
|
||||
.expect("blob cache lock poisoned")
|
||||
.pop(&cache_key)
|
||||
.map(|entry| entry.path);
|
||||
if let Some(path) = removed_path {
|
||||
Self::delete_blob_file(&path);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear_workspace_cache(&self, universal_id: &str) {
|
||||
self.evict_workspace_entries(universal_id);
|
||||
|
||||
let cache_dir = {
|
||||
self
|
||||
.workspace_dirs
|
||||
.read()
|
||||
.expect("workspace cache lock poisoned")
|
||||
.get(universal_id)
|
||||
.cloned()
|
||||
};
|
||||
if let Some(cache_dir) = cache_dir {
|
||||
let _ = Self::cleanup_cache_dir(&cache_dir);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn invalidate_workspace(&self, universal_id: &str) {
|
||||
self.evict_workspace_entries(universal_id);
|
||||
|
||||
if let Some(cache_dir) = self
|
||||
.workspace_dirs
|
||||
.write()
|
||||
.expect("workspace cache lock poisoned")
|
||||
.remove(universal_id)
|
||||
{
|
||||
let _ = std::fs::remove_dir_all(&cache_dir);
|
||||
if let Some(parent) = cache_dir.parent() {
|
||||
let _ = std::fs::remove_dir(parent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn evict_workspace_entries(&self, universal_id: &str) {
|
||||
let prefix = format!("{universal_id}\u{1f}");
|
||||
|
||||
let removed_blob_paths = {
|
||||
let mut blob_entries = self.blob_entries.lock().expect("blob cache lock poisoned");
|
||||
let keys = blob_entries
|
||||
.iter()
|
||||
.filter_map(|(key, _)| key.starts_with(&prefix).then_some(key.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
keys
|
||||
.into_iter()
|
||||
.filter_map(|key| blob_entries.pop(&key).map(|entry| entry.path))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
for path in removed_blob_paths {
|
||||
Self::delete_blob_file(&path);
|
||||
}
|
||||
}
|
||||
|
||||
fn cache_key(universal_id: &str, key: &str) -> String {
|
||||
format!("{universal_id}\u{1f}{key}")
|
||||
}
|
||||
|
||||
#[cfg(target_os = "android")]
|
||||
fn system_cache_dir(database_path: &str, universal_id: &str) -> PathBuf {
|
||||
// Android DB lives in "<app>/files/..."; cache should live in
|
||||
// "<app>/cache/...".
|
||||
let mut current = Path::new(database_path).parent();
|
||||
while let Some(path) = current {
|
||||
if path.file_name().and_then(|n| n.to_str()) == Some("files") {
|
||||
if let Some(app_root) = path.parent() {
|
||||
return app_root
|
||||
.join("cache")
|
||||
.join(MOBILE_BLOB_CACHE_DIR)
|
||||
.join(Self::workspace_bucket(universal_id));
|
||||
}
|
||||
}
|
||||
current = path.parent();
|
||||
}
|
||||
Self::fallback_temp_cache_dir(universal_id)
|
||||
}
|
||||
|
||||
#[cfg(target_os = "ios")]
|
||||
fn system_cache_dir(database_path: &str, universal_id: &str) -> PathBuf {
|
||||
// iOS DB lives in ".../Documents/..."; cache should live in
|
||||
// ".../Library/Caches/...".
|
||||
let mut current = Path::new(database_path).parent();
|
||||
while let Some(path) = current {
|
||||
if path.file_name().and_then(|n| n.to_str()) == Some("Documents") {
|
||||
if let Some(container_root) = path.parent() {
|
||||
return container_root
|
||||
.join("Library")
|
||||
.join("Caches")
|
||||
.join(MOBILE_BLOB_CACHE_DIR)
|
||||
.join(Self::workspace_bucket(universal_id));
|
||||
}
|
||||
}
|
||||
current = path.parent();
|
||||
}
|
||||
Self::fallback_temp_cache_dir(universal_id)
|
||||
}
|
||||
|
||||
#[cfg(not(any(target_os = "android", target_os = "ios")))]
|
||||
fn system_cache_dir(_database_path: &str, universal_id: &str) -> PathBuf {
|
||||
Self::fallback_temp_cache_dir(universal_id)
|
||||
}
|
||||
|
||||
fn fallback_temp_cache_dir(universal_id: &str) -> PathBuf {
|
||||
std::env::temp_dir()
|
||||
.join(MOBILE_BLOB_CACHE_DIR)
|
||||
.join(Self::workspace_bucket(universal_id))
|
||||
}
|
||||
|
||||
fn workspace_bucket(universal_id: &str) -> String {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
universal_id.hash(&mut hasher);
|
||||
format!("{:016x}", hasher.finish())
|
||||
}
|
||||
|
||||
fn get_or_create_cache_dir(&self, universal_id: &str) -> std::io::Result<PathBuf> {
|
||||
let cache_dir = self
|
||||
.workspace_dirs
|
||||
.write()
|
||||
.expect("workspace cache lock poisoned")
|
||||
.entry(universal_id.to_string())
|
||||
.or_insert_with(|| Self::fallback_temp_cache_dir(universal_id))
|
||||
.clone();
|
||||
std::fs::create_dir_all(&cache_dir)?;
|
||||
Ok(cache_dir)
|
||||
}
|
||||
|
||||
fn hashed_file_path(cache_dir: &Path, cache_key: &str, extension: &str) -> PathBuf {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
cache_key.hash(&mut hasher);
|
||||
cache_dir.join(format!("{:016x}.{extension}", hasher.finish()))
|
||||
}
|
||||
|
||||
fn delete_blob_file(path: &str) {
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
fn cleanup_cache_dir(cache_dir: &Path) -> std::io::Result<()> {
|
||||
for entry in std::fs::read_dir(cache_dir)? {
|
||||
let entry = entry?;
|
||||
if entry.path().is_file() {
|
||||
let _ = std::fs::remove_file(entry.path());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_mobile_binary_file_token(value: &str) -> bool {
|
||||
value.starts_with(MOBILE_BLOB_FILE_PREFIX)
|
||||
}
|
||||
|
||||
impl MobileBlobCache {
|
||||
pub(crate) fn read_binary_file(&self, universal_id: &str, value: &str) -> std::io::Result<Vec<u8>> {
|
||||
let path = value
|
||||
.strip_prefix(MOBILE_BLOB_FILE_PREFIX)
|
||||
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid mobile file token"))?;
|
||||
|
||||
let path = path.strip_prefix("file://").unwrap_or(path);
|
||||
let canonical = std::fs::canonicalize(path)?;
|
||||
let workspace_dir = {
|
||||
self
|
||||
.workspace_dirs
|
||||
.read()
|
||||
.expect("workspace cache lock poisoned")
|
||||
.get(universal_id)
|
||||
.cloned()
|
||||
}
|
||||
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "workspace cache directory not registered"))?;
|
||||
let workspace_dir = std::fs::canonicalize(workspace_dir)?;
|
||||
|
||||
if !is_valid_mobile_cache_path(&canonical, &workspace_dir) {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::PermissionDenied,
|
||||
"mobile file token points outside the workspace cache directory",
|
||||
));
|
||||
}
|
||||
|
||||
let metadata = std::fs::metadata(&canonical)?;
|
||||
if !metadata.is_file() {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
"mobile file token does not resolve to a file",
|
||||
));
|
||||
}
|
||||
if metadata.len() > MOBILE_BLOB_MAX_READ_BYTES {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"mobile file token exceeds max size: {} > {}",
|
||||
metadata.len(),
|
||||
MOBILE_BLOB_MAX_READ_BYTES
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
std::fs::read(canonical)
|
||||
}
|
||||
}
|
||||
|
||||
fn is_valid_mobile_cache_path(path: &Path, workspace_dir: &Path) -> bool {
|
||||
if !path.starts_with(workspace_dir) {
|
||||
return false;
|
||||
}
|
||||
|
||||
let Ok(relative) = path.strip_prefix(workspace_dir) else {
|
||||
return false;
|
||||
};
|
||||
let mut components = relative.components();
|
||||
let Some(std::path::Component::Normal(file_name)) = components.next() else {
|
||||
return false;
|
||||
};
|
||||
if components.next().is_some() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let Some(file_name) = file_name.to_str() else {
|
||||
return false;
|
||||
};
|
||||
let Some((stem, extension)) = file_name.rsplit_once('.') else {
|
||||
return false;
|
||||
};
|
||||
if extension != "blob" {
|
||||
return false;
|
||||
}
|
||||
stem.len() == 16 && stem.chars().all(|c| c.is_ascii_hexdigit())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
+333
@@ -0,0 +1,333 @@
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::{PermissionsExt, symlink};
|
||||
use std::{
|
||||
fs,
|
||||
io::ErrorKind,
|
||||
path::PathBuf,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
},
|
||||
thread,
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use affine_nbstore::Blob as NbBlob;
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
use super::*;
|
||||
|
||||
static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
fn unique_id(prefix: &str) -> String {
|
||||
let counter = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("system clock before unix epoch")
|
||||
.as_nanos();
|
||||
format!("{prefix}-{now}-{counter}")
|
||||
}
|
||||
|
||||
fn build_blob(key: &str, data: Vec<u8>) -> NbBlob {
|
||||
NbBlob {
|
||||
key: key.to_string(),
|
||||
data: data.clone(),
|
||||
mime: "application/octet-stream".to_string(),
|
||||
size: data.len() as i64,
|
||||
created_at: DateTime::<Utc>::from_timestamp_millis(0)
|
||||
.expect("valid timestamp")
|
||||
.naive_utc(),
|
||||
}
|
||||
}
|
||||
|
||||
fn workspace_dir(cache: &MobileBlobCache, universal_id: &str) -> PathBuf {
|
||||
cache
|
||||
.workspace_dirs
|
||||
.read()
|
||||
.expect("workspace cache lock poisoned")
|
||||
.get(universal_id)
|
||||
.cloned()
|
||||
.expect("workspace should be registered")
|
||||
}
|
||||
|
||||
fn token_path(token: &str) -> PathBuf {
|
||||
token
|
||||
.strip_prefix(MOBILE_BLOB_FILE_PREFIX)
|
||||
.map(PathBuf::from)
|
||||
.expect("token should contain file path")
|
||||
}
|
||||
|
||||
fn setup_cache(prefix: &str) -> (MobileBlobCache, String, PathBuf) {
|
||||
let cache = MobileBlobCache::new();
|
||||
let universal_id = unique_id(prefix);
|
||||
let db_path = std::env::temp_dir()
|
||||
.join("affine-mobile-cache-tests")
|
||||
.join(unique_id("db"))
|
||||
.join("workspace.sqlite");
|
||||
if let Some(parent) = db_path.parent() {
|
||||
fs::create_dir_all(parent).expect("create test db parent");
|
||||
}
|
||||
cache
|
||||
.register_workspace(&universal_id, db_path.to_string_lossy().as_ref())
|
||||
.expect("register workspace should succeed");
|
||||
let workspace = workspace_dir(&cache, &universal_id);
|
||||
(cache, universal_id, workspace)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_binary_file_rejects_path_traversal_and_malformed_name() {
|
||||
let (cache, universal_id, workspace) = setup_cache("path-validation");
|
||||
|
||||
let outside_name = unique_id("outside");
|
||||
let outside_dir = workspace
|
||||
.parent()
|
||||
.expect("workspace should have parent")
|
||||
.join(&outside_name);
|
||||
fs::create_dir_all(&outside_dir).expect("create outside dir");
|
||||
let outside_file = outside_dir.join("1234567890abcdef.blob");
|
||||
fs::write(&outside_file, b"outside-data").expect("write outside file");
|
||||
|
||||
let traversal = workspace.join(format!("../{outside_name}/1234567890abcdef.blob"));
|
||||
let traversal_token = format!("{MOBILE_BLOB_FILE_PREFIX}{}", traversal.display());
|
||||
let traversal_err = cache
|
||||
.read_binary_file(&universal_id, &traversal_token)
|
||||
.expect_err("path traversal should be rejected");
|
||||
assert_eq!(traversal_err.kind(), ErrorKind::PermissionDenied);
|
||||
|
||||
let malformed = workspace.join("invalid-name.blob");
|
||||
fs::write(&malformed, b"bad").expect("write malformed file");
|
||||
let malformed_token = format!("{MOBILE_BLOB_FILE_PREFIX}{}", malformed.display());
|
||||
let malformed_err = cache
|
||||
.read_binary_file(&universal_id, &malformed_token)
|
||||
.expect_err("malformed cache path should be rejected");
|
||||
assert_eq!(malformed_err.kind(), ErrorKind::PermissionDenied);
|
||||
|
||||
cache.invalidate_workspace(&universal_id);
|
||||
let _ = fs::remove_dir_all(outside_dir);
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn read_binary_file_rejects_symlink_escape() {
|
||||
let (cache, universal_id, workspace) = setup_cache("symlink");
|
||||
|
||||
let outside_dir = workspace
|
||||
.parent()
|
||||
.expect("workspace should have parent")
|
||||
.join(unique_id("symlink-outside"));
|
||||
fs::create_dir_all(&outside_dir).expect("create outside dir");
|
||||
let outside_file = outside_dir.join("1234567890abcdef.blob");
|
||||
fs::write(&outside_file, b"outside-data").expect("write outside file");
|
||||
|
||||
let symlink_path = workspace.join("aaaaaaaaaaaaaaaa.blob");
|
||||
symlink(&outside_file, &symlink_path).expect("create symlink");
|
||||
|
||||
let token = format!("{MOBILE_BLOB_FILE_PREFIX}{}", symlink_path.display());
|
||||
let err = cache
|
||||
.read_binary_file(&universal_id, &token)
|
||||
.expect_err("symlink escaping cache dir should be rejected");
|
||||
assert_eq!(err.kind(), ErrorKind::PermissionDenied);
|
||||
|
||||
cache.invalidate_workspace(&universal_id);
|
||||
let _ = fs::remove_dir_all(outside_dir);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cache_blob_evicts_lru_entry_and_deletes_file() {
|
||||
let (cache, universal_id, _workspace) = setup_cache("lru-eviction");
|
||||
let mut first_path = None;
|
||||
|
||||
for i in 0..=MOBILE_BLOB_CACHE_CAPACITY {
|
||||
let key = format!("blob-{i}");
|
||||
let blob = build_blob(&key, vec![i as u8]);
|
||||
let cached = cache.cache_blob(&universal_id, &blob).expect("cache blob");
|
||||
if i == 0 {
|
||||
first_path = Some(token_path(&cached.data));
|
||||
}
|
||||
}
|
||||
|
||||
let first_path = first_path.expect("first path should exist");
|
||||
assert!(!first_path.exists(), "evicted blob file should be deleted");
|
||||
assert!(cache.get_blob(&universal_id, "blob-0").is_none());
|
||||
assert!(cache.get_blob(&universal_id, "blob-1").is_some());
|
||||
|
||||
cache.invalidate_workspace(&universal_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_binary_file_returns_not_found_for_missing_file() {
|
||||
let (cache, universal_id, _workspace) = setup_cache("missing-file");
|
||||
|
||||
let cached_blob = cache
|
||||
.cache_blob(&universal_id, &build_blob("blob", vec![9, 8, 7]))
|
||||
.expect("cache blob");
|
||||
let path = token_path(&cached_blob.data);
|
||||
fs::remove_file(&path).expect("remove cached file");
|
||||
|
||||
let err = cache
|
||||
.read_binary_file(&universal_id, &cached_blob.data)
|
||||
.expect_err("missing file should error");
|
||||
assert_eq!(err.kind(), ErrorKind::NotFound);
|
||||
|
||||
cache.invalidate_workspace(&universal_id);
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn read_binary_file_returns_permission_denied_for_unreadable_file() {
|
||||
let (cache, universal_id, workspace) = setup_cache("permissions");
|
||||
|
||||
let file_path = workspace.join("1234567890abcdef.blob");
|
||||
fs::write(&file_path, b"secret").expect("write file");
|
||||
|
||||
let mut permissions = fs::metadata(&file_path).expect("read metadata").permissions();
|
||||
permissions.set_mode(0o000);
|
||||
fs::set_permissions(&file_path, permissions).expect("set restrictive permissions");
|
||||
|
||||
let token = format!("{MOBILE_BLOB_FILE_PREFIX}{}", file_path.display());
|
||||
let err = cache
|
||||
.read_binary_file(&universal_id, &token)
|
||||
.expect_err("unreadable file should error");
|
||||
assert_eq!(err.kind(), ErrorKind::PermissionDenied);
|
||||
|
||||
let mut restore = fs::metadata(&file_path).expect("read metadata").permissions();
|
||||
restore.set_mode(0o600);
|
||||
let _ = fs::set_permissions(&file_path, restore);
|
||||
cache.invalidate_workspace(&universal_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn concurrent_cache_and_read_is_consistent() {
|
||||
let cache = Arc::new(MobileBlobCache::new());
|
||||
let universal_id = Arc::new(unique_id("concurrent"));
|
||||
cache
|
||||
.register_workspace(universal_id.as_str(), ":memory:")
|
||||
.expect("register workspace");
|
||||
|
||||
let workers = 8;
|
||||
let iterations = 64;
|
||||
let mut handles = Vec::with_capacity(workers);
|
||||
|
||||
for worker in 0..workers {
|
||||
let cache = Arc::clone(&cache);
|
||||
let universal_id = Arc::clone(&universal_id);
|
||||
handles.push(thread::spawn(move || {
|
||||
// Keep key cardinality under cache capacity so this test exercises
|
||||
// concurrent read/write consistency rather than LRU eviction behavior.
|
||||
let key = format!("blob-{worker}");
|
||||
for i in 0..iterations {
|
||||
let data = vec![worker as u8, i as u8, 42];
|
||||
let blob = build_blob(&key, data.clone());
|
||||
let cached = cache
|
||||
.cache_blob(universal_id.as_str(), &blob)
|
||||
.expect("cache blob in worker");
|
||||
let read_back = cache
|
||||
.read_binary_file(universal_id.as_str(), &cached.data)
|
||||
.expect("read cached blob");
|
||||
assert_eq!(read_back, data);
|
||||
assert!(cache.get_blob(universal_id.as_str(), &key).is_some());
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
handle.join().expect("worker thread should succeed");
|
||||
}
|
||||
|
||||
cache.invalidate_workspace(universal_id.as_str());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn concurrent_high_churn_read_tolerates_eviction_not_found() {
|
||||
let cache = Arc::new(MobileBlobCache::new());
|
||||
let universal_id = Arc::new(unique_id("concurrent-high-churn"));
|
||||
cache
|
||||
.register_workspace(universal_id.as_str(), ":memory:")
|
||||
.expect("register workspace");
|
||||
|
||||
let workers = 8;
|
||||
let iterations = 64;
|
||||
let mut handles = Vec::with_capacity(workers);
|
||||
|
||||
for worker in 0..workers {
|
||||
let cache = Arc::clone(&cache);
|
||||
let universal_id = Arc::clone(&universal_id);
|
||||
handles.push(thread::spawn(move || {
|
||||
let mut read_ok = 0usize;
|
||||
let mut read_not_found = 0usize;
|
||||
|
||||
for i in 0..iterations {
|
||||
// Use unique keys to force churn and LRU eviction under contention.
|
||||
let key = format!("blob-{worker}-{i}");
|
||||
let data = vec![worker as u8, i as u8, 77];
|
||||
let blob = build_blob(&key, data.clone());
|
||||
let cached = cache
|
||||
.cache_blob(universal_id.as_str(), &blob)
|
||||
.expect("cache blob in worker");
|
||||
|
||||
match cache.read_binary_file(universal_id.as_str(), &cached.data) {
|
||||
Ok(read_back) => {
|
||||
assert_eq!(read_back, data);
|
||||
read_ok += 1;
|
||||
}
|
||||
Err(err) => {
|
||||
assert_eq!(
|
||||
err.kind(),
|
||||
ErrorKind::NotFound,
|
||||
"unexpected read error during high churn: {err:?}"
|
||||
);
|
||||
// Once the backing file is gone, cache lookup for this unique key
|
||||
// should no longer return an entry.
|
||||
assert!(cache.get_blob(universal_id.as_str(), &key).is_none());
|
||||
read_not_found += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(read_ok, read_not_found)
|
||||
}));
|
||||
}
|
||||
|
||||
let mut total_read_ok = 0usize;
|
||||
let mut total_read_not_found = 0usize;
|
||||
for handle in handles {
|
||||
let (read_ok, read_not_found) = handle.join().expect("worker thread should succeed");
|
||||
total_read_ok += read_ok;
|
||||
total_read_not_found += read_not_found;
|
||||
}
|
||||
|
||||
assert_eq!(total_read_ok + total_read_not_found, workers * iterations);
|
||||
|
||||
// Deterministically force eviction to validate NotFound behavior.
|
||||
let evicted_key = "evicted-target";
|
||||
let evicted_blob = cache
|
||||
.cache_blob(universal_id.as_str(), &build_blob(evicted_key, vec![9, 9, 9]))
|
||||
.expect("cache target blob for eviction");
|
||||
for i in 0..=MOBILE_BLOB_CACHE_CAPACITY {
|
||||
let pressure_key = format!("pressure-{i}");
|
||||
cache
|
||||
.cache_blob(
|
||||
universal_id.as_str(),
|
||||
&build_blob(&pressure_key, vec![i as u8, 1, 2, 3]),
|
||||
)
|
||||
.expect("cache pressure blob");
|
||||
}
|
||||
let evicted_err = cache
|
||||
.read_binary_file(universal_id.as_str(), &evicted_blob.data)
|
||||
.expect_err("evicted token should not be readable");
|
||||
assert_eq!(evicted_err.kind(), ErrorKind::NotFound);
|
||||
assert!(cache.get_blob(universal_id.as_str(), evicted_key).is_none());
|
||||
|
||||
// Cache remains healthy for subsequent writes and reads.
|
||||
let stable_blob = build_blob("post-churn", vec![1, 2, 3, 4]);
|
||||
let stable_cached = cache
|
||||
.cache_blob(universal_id.as_str(), &stable_blob)
|
||||
.expect("cache stable blob after churn");
|
||||
let stable_read = cache
|
||||
.read_binary_file(universal_id.as_str(), &stable_cached.data)
|
||||
.expect("read stable blob after churn");
|
||||
assert_eq!(stable_read, vec![1, 2, 3, 4]);
|
||||
assert!(cache.get_blob(universal_id.as_str(), "post-churn").is_some());
|
||||
|
||||
cache.invalidate_workspace(universal_id.as_str());
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
use affine_nbstore::error::Error as NbStoreError;
|
||||
|
||||
#[derive(uniffi::Error, thiserror::Error, Debug)]
|
||||
pub enum UniffiError {
|
||||
#[error("Error: {0}")]
|
||||
Err(String),
|
||||
#[error("Base64 decoding error: {0}")]
|
||||
Base64DecodingError(String),
|
||||
#[error("Timestamp decoding error")]
|
||||
TimestampDecodingError,
|
||||
}
|
||||
|
||||
impl From<NbStoreError> for UniffiError {
|
||||
fn from(err: NbStoreError) -> Self {
|
||||
Self::Err(err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type Result<T> = std::result::Result<T, UniffiError>;
|
||||
@@ -0,0 +1,243 @@
|
||||
use affine_nbstore::{
|
||||
Blob as NbBlob, Data, DocClock as NbDocClock, DocRecord as NbDocRecord, DocUpdate as NbDocUpdate,
|
||||
ListedBlob as NbListedBlob, SetBlob as NbSetBlob,
|
||||
indexer::{NativeBlockInfo, NativeCrawlResult, NativeMatch, NativeSearchHit},
|
||||
};
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
use crate::{
|
||||
Result, UniffiError,
|
||||
payload_codec::{decode_base64_data, encode_base64_data},
|
||||
};
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct DocRecord {
|
||||
pub doc_id: String,
|
||||
// base64 encoded data
|
||||
pub bin: String,
|
||||
pub timestamp: i64,
|
||||
}
|
||||
|
||||
impl From<NbDocRecord> for DocRecord {
|
||||
fn from(record: NbDocRecord) -> Self {
|
||||
Self {
|
||||
doc_id: record.doc_id,
|
||||
bin: encode_base64_data(&record.bin),
|
||||
timestamp: record.timestamp.and_utc().timestamp_millis(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DocRecord> for NbDocRecord {
|
||||
type Error = UniffiError;
|
||||
|
||||
fn try_from(record: DocRecord) -> Result<Self> {
|
||||
Ok(Self {
|
||||
doc_id: record.doc_id,
|
||||
bin: Into::<Data>::into(decode_base64_data(&record.bin)?),
|
||||
timestamp: DateTime::<Utc>::from_timestamp_millis(record.timestamp)
|
||||
.ok_or(UniffiError::TimestampDecodingError)?
|
||||
.naive_utc(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct DocUpdate {
|
||||
pub doc_id: String,
|
||||
pub timestamp: i64,
|
||||
// base64 encoded data
|
||||
pub bin: String,
|
||||
}
|
||||
|
||||
impl From<NbDocUpdate> for DocUpdate {
|
||||
fn from(update: NbDocUpdate) -> Self {
|
||||
Self {
|
||||
doc_id: update.doc_id,
|
||||
timestamp: update.timestamp.and_utc().timestamp_millis(),
|
||||
bin: encode_base64_data(&update.bin),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DocUpdate> for NbDocUpdate {
|
||||
type Error = UniffiError;
|
||||
|
||||
fn try_from(update: DocUpdate) -> Result<Self> {
|
||||
Ok(Self {
|
||||
doc_id: update.doc_id,
|
||||
timestamp: DateTime::<Utc>::from_timestamp_millis(update.timestamp)
|
||||
.ok_or(UniffiError::TimestampDecodingError)?
|
||||
.naive_utc(),
|
||||
bin: Into::<Data>::into(decode_base64_data(&update.bin)?),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct DocClock {
|
||||
pub doc_id: String,
|
||||
pub timestamp: i64,
|
||||
}
|
||||
|
||||
impl From<NbDocClock> for DocClock {
|
||||
fn from(clock: NbDocClock) -> Self {
|
||||
Self {
|
||||
doc_id: clock.doc_id,
|
||||
timestamp: clock.timestamp.and_utc().timestamp_millis(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DocClock> for NbDocClock {
|
||||
type Error = UniffiError;
|
||||
|
||||
fn try_from(clock: DocClock) -> Result<Self> {
|
||||
Ok(Self {
|
||||
doc_id: clock.doc_id,
|
||||
timestamp: DateTime::<Utc>::from_timestamp_millis(clock.timestamp)
|
||||
.ok_or(UniffiError::TimestampDecodingError)?
|
||||
.naive_utc(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct Blob {
|
||||
pub key: String,
|
||||
// base64 encoded data; on mobile large blobs this is a file-path token prefixed
|
||||
// with "__AFFINE_BLOB_FILE__:"
|
||||
pub data: String,
|
||||
pub mime: String,
|
||||
pub size: i64,
|
||||
pub created_at: i64,
|
||||
}
|
||||
|
||||
impl From<NbBlob> for Blob {
|
||||
fn from(blob: NbBlob) -> Self {
|
||||
Self {
|
||||
key: blob.key,
|
||||
data: encode_base64_data(&blob.data),
|
||||
mime: blob.mime,
|
||||
size: blob.size,
|
||||
created_at: blob.created_at.and_utc().timestamp_millis(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct SetBlob {
|
||||
pub key: String,
|
||||
// base64 encoded data; mobile file-path tokens are also accepted
|
||||
pub data: String,
|
||||
pub mime: String,
|
||||
}
|
||||
|
||||
impl TryFrom<SetBlob> for NbSetBlob {
|
||||
type Error = UniffiError;
|
||||
|
||||
fn try_from(blob: SetBlob) -> Result<Self> {
|
||||
Ok(Self {
|
||||
key: blob.key,
|
||||
data: Into::<Data>::into(decode_base64_data(&blob.data)?),
|
||||
mime: blob.mime,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct ListedBlob {
|
||||
pub key: String,
|
||||
pub size: i64,
|
||||
pub mime: String,
|
||||
pub created_at: i64,
|
||||
}
|
||||
|
||||
impl From<NbListedBlob> for ListedBlob {
|
||||
fn from(blob: NbListedBlob) -> Self {
|
||||
Self {
|
||||
key: blob.key,
|
||||
size: blob.size,
|
||||
mime: blob.mime,
|
||||
created_at: blob.created_at.and_utc().timestamp_millis(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct BlockInfo {
|
||||
pub block_id: String,
|
||||
pub flavour: String,
|
||||
pub content: Option<Vec<String>>,
|
||||
pub blob: Option<Vec<String>>,
|
||||
pub ref_doc_id: Option<Vec<String>>,
|
||||
pub ref_info: Option<Vec<String>>,
|
||||
pub parent_flavour: Option<String>,
|
||||
pub parent_block_id: Option<String>,
|
||||
pub additional: Option<String>,
|
||||
}
|
||||
|
||||
impl From<NativeBlockInfo> for BlockInfo {
|
||||
fn from(value: NativeBlockInfo) -> Self {
|
||||
Self {
|
||||
block_id: value.block_id,
|
||||
flavour: value.flavour,
|
||||
content: value.content,
|
||||
blob: value.blob,
|
||||
ref_doc_id: value.ref_doc_id,
|
||||
ref_info: value.ref_info,
|
||||
parent_flavour: value.parent_flavour,
|
||||
parent_block_id: value.parent_block_id,
|
||||
additional: value.additional,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct CrawlResult {
|
||||
pub blocks: Vec<BlockInfo>,
|
||||
pub title: String,
|
||||
pub summary: String,
|
||||
}
|
||||
|
||||
impl From<NativeCrawlResult> for CrawlResult {
|
||||
fn from(value: NativeCrawlResult) -> Self {
|
||||
Self {
|
||||
blocks: value.blocks.into_iter().map(Into::into).collect(),
|
||||
title: value.title,
|
||||
summary: value.summary,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct SearchHit {
|
||||
pub id: String,
|
||||
pub score: f64,
|
||||
pub terms: Vec<String>,
|
||||
}
|
||||
|
||||
impl From<NativeSearchHit> for SearchHit {
|
||||
fn from(value: NativeSearchHit) -> Self {
|
||||
Self {
|
||||
id: value.id,
|
||||
score: value.score,
|
||||
terms: value.terms,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct MatchRange {
|
||||
pub start: u32,
|
||||
pub end: u32,
|
||||
}
|
||||
|
||||
impl From<NativeMatch> for MatchRange {
|
||||
fn from(value: NativeMatch) -> Self {
|
||||
Self {
|
||||
start: value.start,
|
||||
end: value.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,23 +1,20 @@
|
||||
mod error;
|
||||
mod ffi_types;
|
||||
mod payload_codec;
|
||||
mod storage;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
#[cfg_attr(all(test, not(any(target_os = "android", target_os = "ios"))), allow(dead_code))]
|
||||
pub(crate) mod cache;
|
||||
use affine_common::hashcash::Stamp;
|
||||
use affine_nbstore::{Data, pool::SqliteDocStoragePool};
|
||||
|
||||
#[derive(uniffi::Error, thiserror::Error, Debug)]
|
||||
pub enum UniffiError {
|
||||
#[error("Error: {0}")]
|
||||
Err(String),
|
||||
#[error("Base64 decoding error: {0}")]
|
||||
Base64DecodingError(String),
|
||||
#[error("Timestamp decoding error")]
|
||||
TimestampDecodingError,
|
||||
}
|
||||
|
||||
impl From<affine_nbstore::error::Error> for UniffiError {
|
||||
fn from(err: affine_nbstore::error::Error) -> Self {
|
||||
Self::Err(err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
type Result<T> = std::result::Result<T, UniffiError>;
|
||||
pub(crate) use error::Result;
|
||||
pub use error::UniffiError;
|
||||
pub use ffi_types::{
|
||||
Blob, BlockInfo, CrawlResult, DocClock, DocRecord, DocUpdate, ListedBlob, MatchRange, SearchHit, SetBlob,
|
||||
};
|
||||
pub use storage::{DocStoragePool, new_doc_storage_pool};
|
||||
|
||||
uniffi::setup_scaffolding!("affine_mobile_native");
|
||||
|
||||
@@ -25,769 +22,3 @@ uniffi::setup_scaffolding!("affine_mobile_native");
|
||||
pub fn hashcash_mint(resource: String, bits: u32) -> String {
|
||||
Stamp::mint(resource, Some(bits)).format()
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct DocRecord {
|
||||
pub doc_id: String,
|
||||
// base64 encoded data
|
||||
pub bin: String,
|
||||
pub timestamp: i64,
|
||||
}
|
||||
|
||||
impl From<affine_nbstore::DocRecord> for DocRecord {
|
||||
fn from(record: affine_nbstore::DocRecord) -> Self {
|
||||
Self {
|
||||
doc_id: record.doc_id,
|
||||
bin: base64_simd::STANDARD.encode_to_string(&record.bin),
|
||||
timestamp: record.timestamp.and_utc().timestamp_millis(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DocRecord> for affine_nbstore::DocRecord {
|
||||
type Error = UniffiError;
|
||||
|
||||
fn try_from(record: DocRecord) -> Result<Self> {
|
||||
Ok(Self {
|
||||
doc_id: record.doc_id,
|
||||
bin: Into::<Data>::into(
|
||||
base64_simd::STANDARD
|
||||
.decode_to_vec(record.bin)
|
||||
.map_err(|e| UniffiError::Base64DecodingError(e.to_string()))?,
|
||||
),
|
||||
timestamp: chrono::DateTime::<chrono::Utc>::from_timestamp_millis(record.timestamp)
|
||||
.ok_or(UniffiError::TimestampDecodingError)?
|
||||
.naive_utc(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct DocUpdate {
|
||||
pub doc_id: String,
|
||||
pub timestamp: i64,
|
||||
// base64 encoded data
|
||||
pub bin: String,
|
||||
}
|
||||
|
||||
impl From<affine_nbstore::DocUpdate> for DocUpdate {
|
||||
fn from(update: affine_nbstore::DocUpdate) -> Self {
|
||||
Self {
|
||||
doc_id: update.doc_id,
|
||||
timestamp: update.timestamp.and_utc().timestamp_millis(),
|
||||
bin: base64_simd::STANDARD.encode_to_string(&update.bin),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DocUpdate> for affine_nbstore::DocUpdate {
|
||||
type Error = UniffiError;
|
||||
|
||||
fn try_from(update: DocUpdate) -> Result<Self> {
|
||||
Ok(Self {
|
||||
doc_id: update.doc_id,
|
||||
timestamp: chrono::DateTime::<chrono::Utc>::from_timestamp_millis(update.timestamp)
|
||||
.ok_or(UniffiError::TimestampDecodingError)?
|
||||
.naive_utc(),
|
||||
bin: Into::<Data>::into(
|
||||
base64_simd::STANDARD
|
||||
.decode_to_vec(update.bin)
|
||||
.map_err(|e| UniffiError::Base64DecodingError(e.to_string()))?,
|
||||
),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn doc_update_roundtrip_base64() {
|
||||
let timestamp = chrono::DateTime::<chrono::Utc>::from_timestamp_millis(1_700_000_000_000)
|
||||
.unwrap()
|
||||
.naive_utc();
|
||||
let original = affine_nbstore::DocUpdate {
|
||||
doc_id: "doc-1".to_string(),
|
||||
timestamp,
|
||||
bin: vec![1, 2, 3, 4, 5],
|
||||
};
|
||||
|
||||
let encoded: DocUpdate = original.into();
|
||||
let decoded = affine_nbstore::DocUpdate::try_from(encoded).unwrap();
|
||||
|
||||
assert_eq!(decoded.doc_id, "doc-1");
|
||||
assert_eq!(decoded.timestamp, timestamp);
|
||||
assert_eq!(decoded.bin, vec![1, 2, 3, 4, 5]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn doc_update_rejects_invalid_base64() {
|
||||
let update = DocUpdate {
|
||||
doc_id: "doc-2".to_string(),
|
||||
timestamp: 0,
|
||||
bin: "not-base64!!".to_string(),
|
||||
};
|
||||
|
||||
let err = match affine_nbstore::DocUpdate::try_from(update) {
|
||||
Ok(_) => panic!("expected base64 decode error"),
|
||||
Err(err) => err,
|
||||
};
|
||||
match err {
|
||||
UniffiError::Base64DecodingError(_) => {}
|
||||
other => panic!("unexpected error: {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct DocClock {
|
||||
pub doc_id: String,
|
||||
pub timestamp: i64,
|
||||
}
|
||||
|
||||
impl From<affine_nbstore::DocClock> for DocClock {
|
||||
fn from(clock: affine_nbstore::DocClock) -> Self {
|
||||
Self {
|
||||
doc_id: clock.doc_id,
|
||||
timestamp: clock.timestamp.and_utc().timestamp_millis(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DocClock> for affine_nbstore::DocClock {
|
||||
type Error = UniffiError;
|
||||
|
||||
fn try_from(clock: DocClock) -> Result<Self> {
|
||||
Ok(Self {
|
||||
doc_id: clock.doc_id,
|
||||
timestamp: chrono::DateTime::<chrono::Utc>::from_timestamp_millis(clock.timestamp)
|
||||
.ok_or(UniffiError::TimestampDecodingError)?
|
||||
.naive_utc(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct Blob {
|
||||
pub key: String,
|
||||
// base64 encoded data
|
||||
pub data: String,
|
||||
pub mime: String,
|
||||
pub size: i64,
|
||||
pub created_at: i64,
|
||||
}
|
||||
|
||||
impl From<affine_nbstore::Blob> for Blob {
|
||||
fn from(blob: affine_nbstore::Blob) -> Self {
|
||||
Self {
|
||||
key: blob.key,
|
||||
data: base64_simd::STANDARD.encode_to_string(&blob.data),
|
||||
mime: blob.mime,
|
||||
size: blob.size,
|
||||
created_at: blob.created_at.and_utc().timestamp_millis(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct SetBlob {
|
||||
pub key: String,
|
||||
// base64 encoded data
|
||||
pub data: String,
|
||||
pub mime: String,
|
||||
}
|
||||
|
||||
impl TryFrom<SetBlob> for affine_nbstore::SetBlob {
|
||||
type Error = UniffiError;
|
||||
|
||||
fn try_from(blob: SetBlob) -> Result<Self> {
|
||||
Ok(Self {
|
||||
key: blob.key,
|
||||
data: Into::<Data>::into(
|
||||
base64_simd::STANDARD
|
||||
.decode_to_vec(blob.data)
|
||||
.map_err(|e| UniffiError::Base64DecodingError(e.to_string()))?,
|
||||
),
|
||||
mime: blob.mime,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct ListedBlob {
|
||||
pub key: String,
|
||||
pub size: i64,
|
||||
pub mime: String,
|
||||
pub created_at: i64,
|
||||
}
|
||||
|
||||
impl From<affine_nbstore::ListedBlob> for ListedBlob {
|
||||
fn from(blob: affine_nbstore::ListedBlob) -> Self {
|
||||
Self {
|
||||
key: blob.key,
|
||||
size: blob.size,
|
||||
mime: blob.mime,
|
||||
created_at: blob.created_at.and_utc().timestamp_millis(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct BlockInfo {
|
||||
pub block_id: String,
|
||||
pub flavour: String,
|
||||
pub content: Option<Vec<String>>,
|
||||
pub blob: Option<Vec<String>>,
|
||||
pub ref_doc_id: Option<Vec<String>>,
|
||||
pub ref_info: Option<Vec<String>>,
|
||||
pub parent_flavour: Option<String>,
|
||||
pub parent_block_id: Option<String>,
|
||||
pub additional: Option<String>,
|
||||
}
|
||||
|
||||
impl From<affine_nbstore::indexer::NativeBlockInfo> for BlockInfo {
|
||||
fn from(value: affine_nbstore::indexer::NativeBlockInfo) -> Self {
|
||||
Self {
|
||||
block_id: value.block_id,
|
||||
flavour: value.flavour,
|
||||
content: value.content,
|
||||
blob: value.blob,
|
||||
ref_doc_id: value.ref_doc_id,
|
||||
ref_info: value.ref_info,
|
||||
parent_flavour: value.parent_flavour,
|
||||
parent_block_id: value.parent_block_id,
|
||||
additional: value.additional,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct CrawlResult {
|
||||
pub blocks: Vec<BlockInfo>,
|
||||
pub title: String,
|
||||
pub summary: String,
|
||||
}
|
||||
|
||||
impl From<affine_nbstore::indexer::NativeCrawlResult> for CrawlResult {
|
||||
fn from(value: affine_nbstore::indexer::NativeCrawlResult) -> Self {
|
||||
Self {
|
||||
blocks: value.blocks.into_iter().map(Into::into).collect(),
|
||||
title: value.title,
|
||||
summary: value.summary,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct SearchHit {
|
||||
pub id: String,
|
||||
pub score: f64,
|
||||
pub terms: Vec<String>,
|
||||
}
|
||||
|
||||
impl From<affine_nbstore::indexer::NativeSearchHit> for SearchHit {
|
||||
fn from(value: affine_nbstore::indexer::NativeSearchHit) -> Self {
|
||||
Self {
|
||||
id: value.id,
|
||||
score: value.score,
|
||||
terms: value.terms,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct MatchRange {
|
||||
pub start: u32,
|
||||
pub end: u32,
|
||||
}
|
||||
|
||||
impl From<affine_nbstore::indexer::NativeMatch> for MatchRange {
|
||||
fn from(value: affine_nbstore::indexer::NativeMatch) -> Self {
|
||||
Self {
|
||||
start: value.start,
|
||||
end: value.end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(uniffi::Object)]
|
||||
pub struct DocStoragePool {
|
||||
inner: SqliteDocStoragePool,
|
||||
}
|
||||
|
||||
#[uniffi::export]
|
||||
pub fn new_doc_storage_pool() -> DocStoragePool {
|
||||
DocStoragePool {
|
||||
inner: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[uniffi::export(async_runtime = "tokio")]
|
||||
impl DocStoragePool {
|
||||
/// Initialize the database and run migrations.
|
||||
pub async fn connect(&self, universal_id: String, path: String) -> Result<()> {
|
||||
Ok(self.inner.connect(universal_id, path).await?)
|
||||
}
|
||||
|
||||
pub async fn disconnect(&self, universal_id: String) -> Result<()> {
|
||||
self.inner.disconnect(universal_id).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn set_space_id(&self, universal_id: String, space_id: String) -> Result<()> {
|
||||
Ok(self.inner.get(universal_id).await?.set_space_id(space_id).await?)
|
||||
}
|
||||
|
||||
pub async fn push_update(&self, universal_id: String, doc_id: String, update: String) -> Result<i64> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.push_update(
|
||||
doc_id,
|
||||
base64_simd::STANDARD
|
||||
.decode_to_vec(update)
|
||||
.map_err(|e| UniffiError::Base64DecodingError(e.to_string()))?,
|
||||
)
|
||||
.await?
|
||||
.and_utc()
|
||||
.timestamp_millis(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_doc_snapshot(&self, universal_id: String, doc_id: String) -> Result<Option<DocRecord>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_doc_snapshot(doc_id)
|
||||
.await?
|
||||
.map(Into::into),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn set_doc_snapshot(&self, universal_id: String, snapshot: DocRecord) -> Result<bool> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.set_doc_snapshot(snapshot.try_into()?)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_doc_updates(&self, universal_id: String, doc_id: String) -> Result<Vec<DocUpdate>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_doc_updates(doc_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn mark_updates_merged(&self, universal_id: String, doc_id: String, updates: Vec<i64>) -> Result<u32> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.mark_updates_merged(
|
||||
doc_id,
|
||||
updates
|
||||
.into_iter()
|
||||
.map(|t| {
|
||||
chrono::DateTime::<chrono::Utc>::from_timestamp_millis(t)
|
||||
.ok_or(UniffiError::TimestampDecodingError)
|
||||
.map(|t| t.naive_utc())
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?,
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn delete_doc(&self, universal_id: String, doc_id: String) -> Result<()> {
|
||||
Ok(self.inner.get(universal_id).await?.delete_doc(doc_id).await?)
|
||||
}
|
||||
|
||||
pub async fn get_doc_clocks(&self, universal_id: String, after: Option<i64>) -> Result<Vec<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_doc_clocks(
|
||||
after
|
||||
.map(|t| {
|
||||
chrono::DateTime::<chrono::Utc>::from_timestamp_millis(t)
|
||||
.ok_or(UniffiError::TimestampDecodingError)
|
||||
.map(|t| t.naive_utc())
|
||||
})
|
||||
.transpose()?,
|
||||
)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_doc_clock(&self, universal_id: String, doc_id: String) -> Result<Option<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_doc_clock(doc_id)
|
||||
.await?
|
||||
.map(Into::into),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_blob(&self, universal_id: String, key: String) -> Result<Option<Blob>> {
|
||||
Ok(self.inner.get(universal_id).await?.get_blob(key).await?.map(Into::into))
|
||||
}
|
||||
|
||||
pub async fn set_blob(&self, universal_id: String, blob: SetBlob) -> Result<()> {
|
||||
Ok(self.inner.get(universal_id).await?.set_blob(blob.try_into()?).await?)
|
||||
}
|
||||
|
||||
pub async fn delete_blob(&self, universal_id: String, key: String, permanently: bool) -> Result<()> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.delete_blob(key, permanently)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn release_blobs(&self, universal_id: String) -> Result<()> {
|
||||
Ok(self.inner.get(universal_id).await?.release_blobs().await?)
|
||||
}
|
||||
|
||||
pub async fn list_blobs(&self, universal_id: String) -> Result<Vec<ListedBlob>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.list_blobs()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_peer_remote_clocks(&self, universal_id: String, peer: String) -> Result<Vec<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_remote_clocks(peer)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_peer_remote_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
) -> Result<Option<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_remote_clock(peer, doc_id)
|
||||
.await?
|
||||
.map(Into::into),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn set_peer_remote_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
clock: i64,
|
||||
) -> Result<()> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.set_peer_remote_clock(
|
||||
peer,
|
||||
doc_id,
|
||||
chrono::DateTime::<chrono::Utc>::from_timestamp_millis(clock)
|
||||
.ok_or(UniffiError::TimestampDecodingError)?
|
||||
.naive_utc(),
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_peer_pulled_remote_clocks(&self, universal_id: String, peer: String) -> Result<Vec<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_pulled_remote_clocks(peer)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_peer_pulled_remote_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
) -> Result<Option<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_pulled_remote_clock(peer, doc_id)
|
||||
.await?
|
||||
.map(Into::into),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn set_peer_pulled_remote_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
clock: i64,
|
||||
) -> Result<()> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.set_peer_pulled_remote_clock(
|
||||
peer,
|
||||
doc_id,
|
||||
chrono::DateTime::<chrono::Utc>::from_timestamp_millis(clock)
|
||||
.ok_or(UniffiError::TimestampDecodingError)?
|
||||
.naive_utc(),
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_peer_pushed_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
) -> Result<Option<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_pushed_clock(peer, doc_id)
|
||||
.await?
|
||||
.map(Into::into),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_peer_pushed_clocks(&self, universal_id: String, peer: String) -> Result<Vec<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_pushed_clocks(peer)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn set_peer_pushed_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
clock: i64,
|
||||
) -> Result<()> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.set_peer_pushed_clock(
|
||||
peer,
|
||||
doc_id,
|
||||
chrono::DateTime::<chrono::Utc>::from_timestamp_millis(clock)
|
||||
.ok_or(UniffiError::TimestampDecodingError)?
|
||||
.naive_utc(),
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn clear_clocks(&self, universal_id: String) -> Result<()> {
|
||||
Ok(self.inner.get(universal_id).await?.clear_clocks().await?)
|
||||
}
|
||||
|
||||
pub async fn set_blob_uploaded_at(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
blob_id: String,
|
||||
uploaded_at: Option<i64>,
|
||||
) -> Result<()> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.set_blob_uploaded_at(
|
||||
peer,
|
||||
blob_id,
|
||||
uploaded_at
|
||||
.map(|t| {
|
||||
chrono::DateTime::<chrono::Utc>::from_timestamp_millis(t)
|
||||
.ok_or(UniffiError::TimestampDecodingError)
|
||||
.map(|t| t.naive_utc())
|
||||
})
|
||||
.transpose()?,
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_blob_uploaded_at(&self, universal_id: String, peer: String, blob_id: String) -> Result<Option<i64>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_blob_uploaded_at(peer, blob_id)
|
||||
.await?
|
||||
.map(|t| t.and_utc().timestamp_millis()),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn crawl_doc_data(&self, universal_id: String, doc_id: String) -> Result<CrawlResult> {
|
||||
let result = self
|
||||
.inner
|
||||
.get(universal_id.clone())
|
||||
.await?
|
||||
.crawl_doc_data(&doc_id)
|
||||
.await?;
|
||||
Ok(result.into())
|
||||
}
|
||||
|
||||
pub async fn fts_add_document(
|
||||
&self,
|
||||
universal_id: String,
|
||||
index_name: String,
|
||||
doc_id: String,
|
||||
text: String,
|
||||
index: bool,
|
||||
) -> Result<()> {
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.fts_add(&index_name, &doc_id, &text, index)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn fts_delete_document(&self, universal_id: String, index_name: String, doc_id: String) -> Result<()> {
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.fts_delete(&index_name, &doc_id)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn fts_get_document(
|
||||
&self,
|
||||
universal_id: String,
|
||||
index_name: String,
|
||||
doc_id: String,
|
||||
) -> Result<Option<String>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.fts_get(&index_name, &doc_id)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn fts_search(&self, universal_id: String, index_name: String, query: String) -> Result<Vec<SearchHit>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.fts_search(&index_name, &query)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn fts_get_matches(
|
||||
&self,
|
||||
universal_id: String,
|
||||
index_name: String,
|
||||
doc_id: String,
|
||||
query: String,
|
||||
) -> Result<Vec<MatchRange>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.fts_get_matches(&index_name, &doc_id, &query)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn fts_flush_index(&self, universal_id: String) -> Result<()> {
|
||||
self.inner.get(universal_id).await?.flush_index().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn fts_index_version(&self) -> Result<u32> {
|
||||
Ok(affine_nbstore::storage::SqliteDocStorage::index_version())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
use crate::{Result, UniffiError};
|
||||
|
||||
pub(crate) fn decode_base64_data(data: &str) -> Result<Vec<u8>> {
|
||||
base64_simd::STANDARD
|
||||
.decode_to_vec(data)
|
||||
.map_err(|err| UniffiError::Base64DecodingError(err.to_string()))
|
||||
}
|
||||
|
||||
pub(crate) fn encode_base64_data(data: &[u8]) -> String {
|
||||
base64_simd::STANDARD.encode_to_string(data)
|
||||
}
|
||||
@@ -0,0 +1,182 @@
|
||||
use super::*;
|
||||
|
||||
#[uniffi::export(async_runtime = "tokio")]
|
||||
impl DocStoragePool {
|
||||
pub async fn get_blob(&self, universal_id: String, key: String) -> Result<Option<Blob>> {
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
{
|
||||
use affine_nbstore::Blob as NbBlob;
|
||||
enum BlobEncodeOutcome {
|
||||
Cached(Blob),
|
||||
Inline(NbBlob),
|
||||
}
|
||||
|
||||
let universal_id_for_cache = universal_id.clone();
|
||||
let key_for_cache = key.clone();
|
||||
if let Ok(Some(blob)) = self
|
||||
.run_mobile_cache_io(
|
||||
move |cache| Ok(cache.get_blob(&universal_id_for_cache, &key_for_cache)),
|
||||
"Failed to read mobile blob cache",
|
||||
)
|
||||
.await
|
||||
{
|
||||
return Ok(Some(blob));
|
||||
}
|
||||
|
||||
let Some(blob) = self
|
||||
.inner
|
||||
.get(universal_id.clone())
|
||||
.await?
|
||||
.get_blob(key.clone())
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if !should_cache_payload_as_file(blob.data.len()) {
|
||||
return Ok(Some(blob.into()));
|
||||
}
|
||||
|
||||
let universal_id_for_cache = universal_id.clone();
|
||||
let key_for_fallback = key.clone();
|
||||
return match self
|
||||
.run_mobile_cache_io(
|
||||
move |cache| {
|
||||
Ok(match cache.cache_blob(&universal_id_for_cache, &blob) {
|
||||
Ok(cached) => BlobEncodeOutcome::Cached(cached),
|
||||
Err(_) => BlobEncodeOutcome::Inline(blob),
|
||||
})
|
||||
},
|
||||
"Failed to cache blob file",
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(BlobEncodeOutcome::Cached(cached)) => Ok(Some(cached)),
|
||||
Ok(BlobEncodeOutcome::Inline(blob)) => Ok(Some(blob.into())),
|
||||
Err(_) => Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_blob(key_for_fallback)
|
||||
.await?
|
||||
.map(Into::into),
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(not(any(target_os = "android", target_os = "ios", test)))]
|
||||
{
|
||||
Ok(self.inner.get(universal_id).await?.get_blob(key).await?.map(Into::into))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn set_blob(&self, universal_id: String, blob: SetBlob) -> Result<()> {
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
let key = blob.key.clone();
|
||||
let blob = NbSetBlob {
|
||||
key: blob.key,
|
||||
data: Into::<Data>::into(self.decode_blob_data(&universal_id, &blob.data).await?),
|
||||
mime: blob.mime,
|
||||
};
|
||||
self.inner.get(universal_id.clone()).await?.set_blob(blob).await?;
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
{
|
||||
let universal_id_for_cache = universal_id;
|
||||
let _ = self
|
||||
.run_mobile_cache_io(
|
||||
move |cache| {
|
||||
cache.invalidate_blob(&universal_id_for_cache, &key);
|
||||
Ok(())
|
||||
},
|
||||
"Failed to invalidate mobile blob cache entry",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_blob(&self, universal_id: String, key: String, permanently: bool) -> Result<()> {
|
||||
self
|
||||
.inner
|
||||
.get(universal_id.clone())
|
||||
.await?
|
||||
.delete_blob(key.clone(), permanently)
|
||||
.await?;
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
{
|
||||
let universal_id_for_cache = universal_id;
|
||||
let _ = self
|
||||
.run_mobile_cache_io(
|
||||
move |cache| {
|
||||
cache.invalidate_blob(&universal_id_for_cache, &key);
|
||||
Ok(())
|
||||
},
|
||||
"Failed to invalidate mobile blob cache entry",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn release_blobs(&self, universal_id: String) -> Result<()> {
|
||||
self.inner.get(universal_id.clone()).await?.release_blobs().await?;
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
{
|
||||
let universal_id_for_cache = universal_id;
|
||||
let _ = self
|
||||
.run_mobile_cache_io(
|
||||
move |cache| {
|
||||
cache.clear_workspace_cache(&universal_id_for_cache);
|
||||
Ok(())
|
||||
},
|
||||
"Failed to clear mobile blob cache workspace",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn list_blobs(&self, universal_id: String) -> Result<Vec<ListedBlob>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.list_blobs()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn set_blob_uploaded_at(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
blob_id: String,
|
||||
uploaded_at: Option<i64>,
|
||||
) -> Result<()> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.set_blob_uploaded_at(peer, blob_id, uploaded_at.map(millis_to_naive_utc).transpose()?)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_blob_uploaded_at(&self, universal_id: String, peer: String, blob_id: String) -> Result<Option<i64>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_blob_uploaded_at(peer, blob_id)
|
||||
.await?
|
||||
.map(|t| t.and_utc().timestamp_millis()),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,105 @@
|
||||
use super::*;
|
||||
|
||||
#[uniffi::export(async_runtime = "tokio")]
|
||||
impl DocStoragePool {
|
||||
pub async fn get_doc_snapshot(&self, universal_id: String, doc_id: String) -> Result<Option<DocRecord>> {
|
||||
let Some(record) = self
|
||||
.inner
|
||||
.get(universal_id.clone())
|
||||
.await?
|
||||
.get_doc_snapshot(doc_id)
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let timestamp = record.timestamp.and_utc().timestamp_millis();
|
||||
let bin = self
|
||||
.encode_doc_data(&universal_id, &record.doc_id, timestamp, &record.bin)
|
||||
.await?;
|
||||
Ok(Some(DocRecord {
|
||||
doc_id: record.doc_id,
|
||||
bin,
|
||||
timestamp,
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn set_doc_snapshot(&self, universal_id: String, snapshot: DocRecord) -> Result<bool> {
|
||||
let doc_record = NbDocRecord {
|
||||
doc_id: snapshot.doc_id,
|
||||
bin: Into::<Data>::into(self.decode_base64_payload(&snapshot.bin)?),
|
||||
timestamp: millis_to_naive_utc(snapshot.timestamp)?,
|
||||
};
|
||||
Ok(self.inner.get(universal_id).await?.set_doc_snapshot(doc_record).await?)
|
||||
}
|
||||
|
||||
pub async fn get_doc_updates(&self, universal_id: String, doc_id: String) -> Result<Vec<DocUpdate>> {
|
||||
let updates = self
|
||||
.inner
|
||||
.get(universal_id.clone())
|
||||
.await?
|
||||
.get_doc_updates(doc_id)
|
||||
.await?;
|
||||
|
||||
let mut converted = Vec::with_capacity(updates.len());
|
||||
for update in updates {
|
||||
let timestamp = update.timestamp.and_utc().timestamp_millis();
|
||||
let bin = self
|
||||
.encode_doc_data(&universal_id, &update.doc_id, timestamp, &update.bin)
|
||||
.await?;
|
||||
converted.push(DocUpdate {
|
||||
doc_id: update.doc_id,
|
||||
timestamp,
|
||||
bin,
|
||||
});
|
||||
}
|
||||
Ok(converted)
|
||||
}
|
||||
|
||||
pub async fn mark_updates_merged(&self, universal_id: String, doc_id: String, updates: Vec<i64>) -> Result<u32> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.mark_updates_merged(
|
||||
doc_id,
|
||||
updates
|
||||
.into_iter()
|
||||
.map(millis_to_naive_utc)
|
||||
.collect::<Result<Vec<_>>>()?,
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn delete_doc(&self, universal_id: String, doc_id: String) -> Result<()> {
|
||||
Ok(self.inner.get(universal_id).await?.delete_doc(doc_id).await?)
|
||||
}
|
||||
|
||||
pub async fn get_doc_clocks(&self, universal_id: String, after: Option<i64>) -> Result<Vec<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_doc_clocks(after.map(millis_to_naive_utc).transpose()?)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_doc_clock(&self, universal_id: String, doc_id: String) -> Result<Option<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_doc_clock(doc_id)
|
||||
.await?
|
||||
.map(Into::into),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,100 @@
|
||||
use super::*;
|
||||
|
||||
#[uniffi::export(async_runtime = "tokio")]
|
||||
impl DocStoragePool {
|
||||
pub async fn crawl_doc_data(&self, universal_id: String, doc_id: String) -> Result<CrawlResult> {
|
||||
let result = self
|
||||
.inner
|
||||
.get(universal_id.clone())
|
||||
.await?
|
||||
.crawl_doc_data(&doc_id)
|
||||
.await?;
|
||||
Ok(result.into())
|
||||
}
|
||||
|
||||
pub async fn fts_add_document(
|
||||
&self,
|
||||
universal_id: String,
|
||||
index_name: String,
|
||||
doc_id: String,
|
||||
text: String,
|
||||
index: bool,
|
||||
) -> Result<()> {
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.fts_add(&index_name, &doc_id, &text, index)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn fts_delete_document(&self, universal_id: String, index_name: String, doc_id: String) -> Result<()> {
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.fts_delete(&index_name, &doc_id)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn fts_get_document(
|
||||
&self,
|
||||
universal_id: String,
|
||||
index_name: String,
|
||||
doc_id: String,
|
||||
) -> Result<Option<String>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.fts_get(&index_name, &doc_id)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn fts_search(&self, universal_id: String, index_name: String, query: String) -> Result<Vec<SearchHit>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.fts_search(&index_name, &query)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn fts_get_matches(
|
||||
&self,
|
||||
universal_id: String,
|
||||
index_name: String,
|
||||
doc_id: String,
|
||||
query: String,
|
||||
) -> Result<Vec<MatchRange>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.fts_get_matches(&index_name, &doc_id, &query)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn fts_flush_index(&self, universal_id: String) -> Result<()> {
|
||||
self.inner.get(universal_id).await?.flush_index().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn fts_index_version(&self) -> Result<u32> {
|
||||
Ok(SqliteDocStorage::index_version())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
use super::*;
|
||||
|
||||
#[uniffi::export(async_runtime = "tokio")]
|
||||
impl DocStoragePool {
|
||||
/// Initialize the database and run migrations.
|
||||
pub async fn connect(&self, universal_id: String, path: String) -> Result<()> {
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
{
|
||||
let universal_id_for_cache = universal_id.clone();
|
||||
let path_for_cache = path.clone();
|
||||
self
|
||||
.run_mobile_cache_io(
|
||||
move |cache| cache.register_workspace(&universal_id_for_cache, &path_for_cache),
|
||||
"Failed to initialize mobile blob cache",
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if let Err(err) = self.inner.connect(universal_id.clone(), path).await {
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
{
|
||||
let universal_id_for_cache = universal_id.clone();
|
||||
let _ = self
|
||||
.run_mobile_cache_io(
|
||||
move |cache| {
|
||||
cache.invalidate_workspace(&universal_id_for_cache);
|
||||
Ok(())
|
||||
},
|
||||
"Failed to rollback mobile blob cache workspace",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn disconnect(&self, universal_id: String) -> Result<()> {
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
{
|
||||
let universal_id_for_cache = universal_id.clone();
|
||||
let _ = self
|
||||
.run_mobile_cache_io(
|
||||
move |cache| {
|
||||
cache.invalidate_workspace(&universal_id_for_cache);
|
||||
Ok(())
|
||||
},
|
||||
"Failed to clear mobile blob cache workspace",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
self.inner.disconnect(universal_id).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn set_space_id(&self, universal_id: String, space_id: String) -> Result<()> {
|
||||
Ok(self.inner.get(universal_id).await?.set_space_id(space_id).await?)
|
||||
}
|
||||
|
||||
pub async fn push_update(&self, universal_id: String, doc_id: String, update: String) -> Result<i64> {
|
||||
let decoded_update = self.decode_base64_payload(&update)?;
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.push_update(doc_id, decoded_update)
|
||||
.await?
|
||||
.and_utc()
|
||||
.timestamp_millis(),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,90 @@
|
||||
mod blobs;
|
||||
mod docs;
|
||||
mod indexer;
|
||||
mod lifecycle;
|
||||
mod peers;
|
||||
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
use std::sync::Arc;
|
||||
|
||||
use affine_nbstore::{
|
||||
Data, DocRecord as NbDocRecord, SetBlob as NbSetBlob, pool::SqliteDocStoragePool, storage::SqliteDocStorage,
|
||||
};
|
||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
use crate::cache::{MobileBlobCache, is_mobile_binary_file_token, should_cache_payload_as_file};
|
||||
use crate::{
|
||||
Blob, CrawlResult, DocClock, DocRecord, DocUpdate, ListedBlob, MatchRange, Result, SearchHit, SetBlob, UniffiError,
|
||||
payload_codec::{decode_base64_data, encode_base64_data},
|
||||
};
|
||||
|
||||
fn millis_to_naive_utc(millis: i64) -> Result<NaiveDateTime> {
|
||||
DateTime::<Utc>::from_timestamp_millis(millis)
|
||||
.ok_or(UniffiError::TimestampDecodingError)
|
||||
.map(|value| value.naive_utc())
|
||||
}
|
||||
|
||||
#[derive(uniffi::Object)]
|
||||
pub struct DocStoragePool {
|
||||
inner: SqliteDocStoragePool,
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
mobile_blob_cache: Arc<MobileBlobCache>,
|
||||
}
|
||||
|
||||
#[uniffi::export]
|
||||
pub fn new_doc_storage_pool() -> DocStoragePool {
|
||||
DocStoragePool {
|
||||
inner: Default::default(),
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
mobile_blob_cache: Arc::new(MobileBlobCache::new()),
|
||||
}
|
||||
}
|
||||
|
||||
impl DocStoragePool {
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
async fn run_mobile_cache_io<T, F>(&self, task: F, context: &'static str) -> Result<T>
|
||||
where
|
||||
T: Send + 'static,
|
||||
F: FnOnce(Arc<MobileBlobCache>) -> std::io::Result<T> + Send + 'static,
|
||||
{
|
||||
let cache = Arc::clone(&self.mobile_blob_cache);
|
||||
tokio::task::spawn_blocking(move || task(cache))
|
||||
.await
|
||||
.map_err(|err| UniffiError::Err(format!("{context}: {err}")))?
|
||||
.map_err(|err| UniffiError::Err(format!("{context}: {err}")))
|
||||
}
|
||||
|
||||
pub(crate) fn decode_base64_payload(&self, data: &str) -> Result<Vec<u8>> {
|
||||
decode_base64_data(data)
|
||||
}
|
||||
|
||||
pub(crate) async fn decode_blob_data(&self, universal_id: &str, data: &str) -> Result<Vec<u8>> {
|
||||
#[cfg(any(target_os = "android", target_os = "ios", test))]
|
||||
if is_mobile_binary_file_token(data) {
|
||||
let universal_id = universal_id.to_string();
|
||||
let data = data.to_string();
|
||||
return self
|
||||
.run_mobile_cache_io(
|
||||
move |cache| cache.read_binary_file(&universal_id, &data),
|
||||
"Failed to read mobile file token",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
#[cfg(not(any(target_os = "android", target_os = "ios", test)))]
|
||||
let _ = universal_id;
|
||||
|
||||
self.decode_base64_payload(data)
|
||||
}
|
||||
|
||||
pub(crate) async fn encode_doc_data(
|
||||
&self,
|
||||
universal_id: &str,
|
||||
doc_id: &str,
|
||||
timestamp: i64,
|
||||
data: &[u8],
|
||||
) -> Result<String> {
|
||||
let _ = (universal_id, doc_id, timestamp);
|
||||
Ok(encode_base64_data(data))
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
use super::*;
|
||||
|
||||
#[uniffi::export(async_runtime = "tokio")]
|
||||
impl DocStoragePool {
|
||||
pub async fn get_peer_remote_clocks(&self, universal_id: String, peer: String) -> Result<Vec<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_remote_clocks(peer)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_peer_remote_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
) -> Result<Option<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_remote_clock(peer, doc_id)
|
||||
.await?
|
||||
.map(Into::into),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn set_peer_remote_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
clock: i64,
|
||||
) -> Result<()> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.set_peer_remote_clock(peer, doc_id, millis_to_naive_utc(clock)?)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_peer_pulled_remote_clocks(&self, universal_id: String, peer: String) -> Result<Vec<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_pulled_remote_clocks(peer)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_peer_pulled_remote_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
) -> Result<Option<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_pulled_remote_clock(peer, doc_id)
|
||||
.await?
|
||||
.map(Into::into),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn set_peer_pulled_remote_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
clock: i64,
|
||||
) -> Result<()> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.set_peer_pulled_remote_clock(peer, doc_id, millis_to_naive_utc(clock)?)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_peer_pushed_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
) -> Result<Option<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_pushed_clock(peer, doc_id)
|
||||
.await?
|
||||
.map(Into::into),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_peer_pushed_clocks(&self, universal_id: String, peer: String) -> Result<Vec<DocClock>> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.get_peer_pushed_clocks(peer)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn set_peer_pushed_clock(
|
||||
&self,
|
||||
universal_id: String,
|
||||
peer: String,
|
||||
doc_id: String,
|
||||
clock: i64,
|
||||
) -> Result<()> {
|
||||
Ok(
|
||||
self
|
||||
.inner
|
||||
.get(universal_id)
|
||||
.await?
|
||||
.set_peer_pushed_clock(peer, doc_id, millis_to_naive_utc(clock)?)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn clear_clocks(&self, universal_id: String) -> Result<()> {
|
||||
Ok(self.inner.get(universal_id).await?.clear_clocks().await?)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
use std::{
|
||||
fs,
|
||||
sync::atomic::{AtomicU64, Ordering},
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use affine_nbstore::DocUpdate as NbDocUpdate;
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
use crate::{UniffiError, cache, ffi_types::DocUpdate, storage::new_doc_storage_pool};
|
||||
|
||||
static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
fn unique_id(prefix: &str) -> String {
|
||||
let counter = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("system clock before unix epoch")
|
||||
.as_nanos();
|
||||
format!("{prefix}-{now}-{counter}")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn doc_update_roundtrip_base64() {
|
||||
let timestamp = DateTime::<Utc>::from_timestamp_millis(1_700_000_000_000)
|
||||
.unwrap()
|
||||
.naive_utc();
|
||||
let original = NbDocUpdate {
|
||||
doc_id: "doc-1".to_string(),
|
||||
timestamp,
|
||||
bin: vec![1, 2, 3, 4, 5],
|
||||
};
|
||||
|
||||
let encoded: DocUpdate = original.into();
|
||||
let decoded = NbDocUpdate::try_from(encoded).unwrap();
|
||||
|
||||
assert_eq!(decoded.doc_id, "doc-1");
|
||||
assert_eq!(decoded.timestamp, timestamp);
|
||||
assert_eq!(decoded.bin, vec![1, 2, 3, 4, 5]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn doc_update_rejects_invalid_base64() {
|
||||
let update = DocUpdate {
|
||||
doc_id: "doc-2".to_string(),
|
||||
timestamp: 0,
|
||||
bin: "not-base64!!".to_string(),
|
||||
};
|
||||
|
||||
let err = match NbDocUpdate::try_from(update) {
|
||||
Ok(_) => panic!("expected base64 decode error"),
|
||||
Err(err) => err,
|
||||
};
|
||||
match err {
|
||||
UniffiError::Base64DecodingError(_) => {}
|
||||
other => panic!("unexpected error: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn decode_blob_data_rejects_out_of_workspace_path() {
|
||||
let pool = new_doc_storage_pool();
|
||||
let universal_id = unique_id("mobile-doc-outside");
|
||||
pool
|
||||
.connect(universal_id.clone(), ":memory:".to_string())
|
||||
.await
|
||||
.expect("connect should succeed");
|
||||
|
||||
let outside_dir = std::env::temp_dir().join(unique_id("mobile-doc-outside-dir"));
|
||||
fs::create_dir_all(&outside_dir).expect("create outside dir");
|
||||
let outside_file = outside_dir.join("1234567890abcdef.blob");
|
||||
fs::write(&outside_file, b"outside").expect("write outside file");
|
||||
let token = format!("{}{}", cache::MOBILE_BLOB_FILE_PREFIX, outside_file.display());
|
||||
|
||||
let err = pool
|
||||
.decode_blob_data(&universal_id, &token)
|
||||
.await
|
||||
.expect_err("decode should reject out-of-workspace token");
|
||||
let UniffiError::Err(message) = err else {
|
||||
panic!("unexpected error kind");
|
||||
};
|
||||
assert!(message.contains("outside the workspace cache directory"));
|
||||
|
||||
pool.disconnect(universal_id).await.expect("disconnect should succeed");
|
||||
let _ = fs::remove_dir_all(outside_dir);
|
||||
}
|
||||
@@ -8,6 +8,8 @@ pub enum Error {
|
||||
SqlxError(#[from] sqlx::Error),
|
||||
#[error("Migrate Error: {0}")]
|
||||
MigrateError(#[from] sqlx::migrate::MigrateError),
|
||||
#[error("Connection in progress")]
|
||||
ConnectionInProgress,
|
||||
#[error("Invalid operation")]
|
||||
InvalidOperation,
|
||||
#[error("Serialization Error: {0}")]
|
||||
|
||||
@@ -106,7 +106,7 @@ impl DocStoragePool {
|
||||
})
|
||||
}
|
||||
|
||||
async fn get(&self, universal_id: String) -> Result<Ref<'_, SqliteDocStorage>> {
|
||||
async fn get(&self, universal_id: String) -> Result<Ref<SqliteDocStorage>> {
|
||||
Ok(self.pool.get(universal_id).await?)
|
||||
}
|
||||
|
||||
@@ -504,4 +504,11 @@ mod tests {
|
||||
assert_eq!(err.status, napi::Status::GenericFailure);
|
||||
assert!(err.reason.contains("Invalid operation"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn napi_error_mapping_connection_in_progress() {
|
||||
let err: napi::Error = error::Error::ConnectionInProgress.into();
|
||||
assert_eq!(err.status, napi::Status::GenericFailure);
|
||||
assert!(err.reason.contains("Connection in progress"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,92 +1,129 @@
|
||||
use core::ops::{Deref, DerefMut};
|
||||
use std::collections::hash_map::{Entry, HashMap};
|
||||
use core::ops::Deref;
|
||||
use std::{
|
||||
collections::hash_map::{Entry, HashMap},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use tokio::sync::{RwLock, RwLockMappedWriteGuard, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{
|
||||
error::{Error, Result},
|
||||
storage::SqliteDocStorage,
|
||||
};
|
||||
|
||||
pub struct Ref<'a, V> {
|
||||
_guard: RwLockReadGuard<'a, V>,
|
||||
pub struct Ref<V> {
|
||||
inner: Arc<V>,
|
||||
}
|
||||
|
||||
impl<V> Deref for Ref<'_, V> {
|
||||
impl<V> Deref for Ref<V> {
|
||||
type Target = V;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self._guard.deref()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RefMut<'a, V> {
|
||||
_guard: RwLockMappedWriteGuard<'a, V>,
|
||||
}
|
||||
|
||||
impl<V> Deref for RefMut<'_, V> {
|
||||
type Target = V;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self._guard
|
||||
}
|
||||
}
|
||||
|
||||
impl<V> DerefMut for RefMut<'_, V> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self._guard
|
||||
self.inner.deref()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SqliteDocStoragePool {
|
||||
inner: RwLock<HashMap<String, SqliteDocStorage>>,
|
||||
inner: RwLock<HashMap<String, StorageState>>,
|
||||
}
|
||||
|
||||
enum StorageState {
|
||||
Connecting(Arc<SqliteDocStorage>),
|
||||
Connected(Arc<SqliteDocStorage>),
|
||||
}
|
||||
|
||||
impl SqliteDocStoragePool {
|
||||
async fn get_or_create_storage<'a>(&'a self, universal_id: String, path: &str) -> RefMut<'a, SqliteDocStorage> {
|
||||
let lock = RwLockWriteGuard::map(self.inner.write().await, |lock| match lock.entry(universal_id) {
|
||||
Entry::Occupied(entry) => entry.into_mut(),
|
||||
Entry::Vacant(entry) => {
|
||||
let storage = SqliteDocStorage::new(path.to_string());
|
||||
entry.insert(storage)
|
||||
}
|
||||
});
|
||||
|
||||
RefMut { _guard: lock }
|
||||
}
|
||||
|
||||
pub async fn get(&self, universal_id: String) -> Result<Ref<'_, SqliteDocStorage>> {
|
||||
let lock = RwLockReadGuard::try_map(self.inner.read().await, |lock| {
|
||||
if let Some(storage) = lock.get(&universal_id) {
|
||||
Some(storage)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
match lock {
|
||||
Ok(guard) => Ok(Ref { _guard: guard }),
|
||||
Err(_) => Err(Error::InvalidOperation),
|
||||
}
|
||||
pub async fn get(&self, universal_id: String) -> Result<Ref<SqliteDocStorage>> {
|
||||
let lock = self.inner.read().await;
|
||||
let Some(state) = lock.get(&universal_id) else {
|
||||
return Err(Error::InvalidOperation);
|
||||
};
|
||||
let StorageState::Connected(storage) = state else {
|
||||
return Err(Error::InvalidOperation);
|
||||
};
|
||||
Ok(Ref {
|
||||
inner: Arc::clone(storage),
|
||||
})
|
||||
}
|
||||
|
||||
/// Initialize the database and run migrations.
|
||||
pub async fn connect(&self, universal_id: String, path: String) -> Result<()> {
|
||||
let storage = self.get_or_create_storage(universal_id.to_owned(), &path).await;
|
||||
let storage = {
|
||||
let mut lock = self.inner.write().await;
|
||||
match lock.entry(universal_id.clone()) {
|
||||
Entry::Occupied(entry) => match entry.get() {
|
||||
StorageState::Connected(_) => return Ok(()),
|
||||
StorageState::Connecting(_) => return Err(Error::ConnectionInProgress),
|
||||
},
|
||||
Entry::Vacant(entry) => {
|
||||
let storage = Arc::new(SqliteDocStorage::new(path));
|
||||
entry.insert(StorageState::Connecting(Arc::clone(&storage)));
|
||||
storage
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
storage.connect().await?;
|
||||
Ok(())
|
||||
}
|
||||
if let Err(err) = storage.connect().await {
|
||||
let mut lock = self.inner.write().await;
|
||||
if matches!(
|
||||
lock.get(&universal_id),
|
||||
Some(StorageState::Connecting(existing)) if Arc::ptr_eq(existing, &storage)
|
||||
) {
|
||||
lock.remove(&universal_id);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
pub async fn disconnect(&self, universal_id: String) -> Result<()> {
|
||||
let mut lock = self.inner.write().await;
|
||||
let mut transitioned = false;
|
||||
{
|
||||
let mut lock = self.inner.write().await;
|
||||
if matches!(
|
||||
lock.get(&universal_id),
|
||||
Some(StorageState::Connecting(existing)) if Arc::ptr_eq(existing, &storage)
|
||||
) {
|
||||
lock.insert(universal_id.clone(), StorageState::Connected(Arc::clone(&storage)));
|
||||
transitioned = true;
|
||||
}
|
||||
}
|
||||
|
||||
if let Entry::Occupied(entry) = lock.entry(universal_id) {
|
||||
let storage = entry.remove();
|
||||
if !transitioned {
|
||||
let mut lock = self.inner.write().await;
|
||||
if matches!(
|
||||
lock.get(&universal_id),
|
||||
Some(StorageState::Connecting(existing)) if Arc::ptr_eq(existing, &storage)
|
||||
) {
|
||||
lock.remove(&universal_id);
|
||||
}
|
||||
drop(lock);
|
||||
storage.close().await;
|
||||
return Err(Error::InvalidOperation);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn disconnect(&self, universal_id: String) -> Result<()> {
|
||||
let storage = {
|
||||
let mut lock = self.inner.write().await;
|
||||
match lock.get(&universal_id) {
|
||||
None => return Ok(()),
|
||||
Some(StorageState::Connecting(_)) => return Err(Error::ConnectionInProgress),
|
||||
Some(StorageState::Connected(storage)) => {
|
||||
// Prevent shutting down the shared storage while requests still hold refs.
|
||||
if Arc::strong_count(storage) > 1 {
|
||||
return Err(Error::InvalidOperation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let Some(StorageState::Connected(storage)) = lock.remove(&universal_id) else {
|
||||
return Err(Error::InvalidOperation);
|
||||
};
|
||||
storage
|
||||
};
|
||||
|
||||
storage.close().await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@
|
||||
"build": "napi build -p affine_native --platform --release",
|
||||
"build:debug": "napi build -p affine_native --platform",
|
||||
"universal": "napi universal",
|
||||
"test": "ava",
|
||||
"test": "ava --no-worker-threads --concurrency=2",
|
||||
"version": "napi version"
|
||||
},
|
||||
"version": "0.26.1"
|
||||
|
||||
@@ -1248,6 +1248,7 @@ export const PackageList = [
|
||||
'packages/frontend/core',
|
||||
'packages/common/env',
|
||||
'packages/frontend/i18n',
|
||||
'packages/frontend/apps/mobile-shared',
|
||||
'packages/common/nbstore',
|
||||
'packages/frontend/track',
|
||||
'blocksuite/affine/all',
|
||||
@@ -1290,6 +1291,7 @@ export const PackageList = [
|
||||
'packages/common/env',
|
||||
'packages/common/graphql',
|
||||
'packages/frontend/i18n',
|
||||
'packages/frontend/apps/mobile-shared',
|
||||
'packages/common/nbstore',
|
||||
'packages/frontend/track',
|
||||
'blocksuite/affine/all',
|
||||
@@ -1313,6 +1315,11 @@ export const PackageList = [
|
||||
'packages/common/infra',
|
||||
],
|
||||
},
|
||||
{
|
||||
location: 'packages/frontend/apps/mobile-shared',
|
||||
name: '@affine/mobile-shared',
|
||||
workspaceDependencies: ['packages/frontend/core'],
|
||||
},
|
||||
{
|
||||
location: 'packages/frontend/apps/web',
|
||||
name: '@affine/web',
|
||||
@@ -1593,6 +1600,7 @@ export type PackageName =
|
||||
| '@affine/electron-renderer'
|
||||
| '@affine/ios'
|
||||
| '@affine/mobile'
|
||||
| '@affine/mobile-shared'
|
||||
| '@affine/web'
|
||||
| '@affine/component'
|
||||
| '@affine/core'
|
||||
|
||||
@@ -139,6 +139,7 @@
|
||||
{ "path": "./packages/frontend/apps/electron-renderer" },
|
||||
{ "path": "./packages/frontend/apps/ios" },
|
||||
{ "path": "./packages/frontend/apps/mobile" },
|
||||
{ "path": "./packages/frontend/apps/mobile-shared" },
|
||||
{ "path": "./packages/frontend/apps/web" },
|
||||
{ "path": "./packages/frontend/component" },
|
||||
{ "path": "./packages/frontend/core" },
|
||||
|
||||
@@ -255,6 +255,7 @@ __metadata:
|
||||
"@affine/core": "workspace:*"
|
||||
"@affine/env": "workspace:*"
|
||||
"@affine/i18n": "workspace:*"
|
||||
"@affine/mobile-shared": "workspace:*"
|
||||
"@affine/nbstore": "workspace:*"
|
||||
"@affine/track": "workspace:*"
|
||||
"@blocksuite/affine": "workspace:*"
|
||||
@@ -703,6 +704,7 @@ __metadata:
|
||||
"@affine/env": "workspace:*"
|
||||
"@affine/graphql": "workspace:*"
|
||||
"@affine/i18n": "workspace:*"
|
||||
"@affine/mobile-shared": "workspace:*"
|
||||
"@affine/native": "workspace:*"
|
||||
"@affine/nbstore": "workspace:*"
|
||||
"@affine/track": "workspace:*"
|
||||
@@ -764,6 +766,17 @@ __metadata:
|
||||
languageName: unknown
|
||||
linkType: soft
|
||||
|
||||
"@affine/mobile-shared@workspace:*, @affine/mobile-shared@workspace:packages/frontend/apps/mobile-shared":
|
||||
version: 0.0.0-use.local
|
||||
resolution: "@affine/mobile-shared@workspace:packages/frontend/apps/mobile-shared"
|
||||
dependencies:
|
||||
"@affine/core": "workspace:*"
|
||||
"@capacitor/core": "npm:^7.0.0"
|
||||
typescript: "npm:^5.7.2"
|
||||
vitest: "npm:^3.2.4"
|
||||
languageName: unknown
|
||||
linkType: soft
|
||||
|
||||
"@affine/mobile@workspace:packages/frontend/apps/mobile":
|
||||
version: 0.0.0-use.local
|
||||
resolution: "@affine/mobile@workspace:packages/frontend/apps/mobile"
|
||||
|
||||
Reference in New Issue
Block a user