feat(server): add cloud indexer with Elasticsearch and Manticoresearch providers (#11835)

close CLOUD-137

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->
## Summary by CodeRabbit

- **New Features**
  - Introduced advanced workspace-scoped search and aggregation capabilities with support for complex queries, highlights, and pagination.
  - Added pluggable search providers: Elasticsearch and Manticoresearch.
  - New GraphQL queries, schema types, and resolver support for search and aggregation.
  - Enhanced configuration options for search providers in self-hosted and cloud deployments.
  - Added Docker Compose services and environment variables for Elasticsearch and Manticoresearch.
  - Integrated indexer service into deployment and CI workflows.

- **Bug Fixes**
  - Improved error handling with new user-friendly error messages for search provider and indexer issues.

- **Documentation**
  - Updated configuration examples and environment variable references for indexer and search providers.

- **Tests**
  - Added extensive end-to-end and provider-specific tests covering indexing, searching, aggregation, deletion, and error cases.
  - Included snapshot tests and test fixtures for search providers.

- **Chores**
  - Updated deployment scripts, Helm charts, and Kubernetes manifests to include indexer-related environment variables and secrets.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
fengmk2
2025-05-14 14:52:40 +00:00
parent 7c22b3931f
commit a1bcf77447
66 changed files with 10139 additions and 10 deletions

View File

@@ -0,0 +1,324 @@
import { Injectable } from '@nestjs/common';
import {
InternalServerError,
InvalidSearchProviderRequest,
} from '../../../base';
import { SearchProviderType } from '../config';
import { SearchTable, SearchTableUniqueId } from '../tables';
import {
AggregateQueryDSL,
AggregateResult,
OperationOptions,
SearchProvider,
SearchQueryDSL,
SearchResult,
} from './def';
interface ESSearchResponse {
took: number;
timed_out: boolean;
hits: {
total: {
value: number;
};
hits: {
_index: string;
_id: string;
_score: number;
_source: Record<string, unknown>;
fields: Record<string, unknown[]>;
highlight?: Record<string, string[]>;
sort: unknown[];
}[];
};
}
interface ESAggregateResponse extends ESSearchResponse {
aggregations: {
result: {
buckets: {
key: string;
doc_count: number;
result: {
hits: {
total: {
value: number;
};
max_score: number;
hits: {
_index: string;
_id: string;
_score: number;
_source: Record<string, unknown>;
fields: Record<string, unknown[]>;
highlight?: Record<string, string[]>;
}[];
};
};
}[];
};
};
}
@Injectable()
export class ElasticsearchProvider extends SearchProvider {
type = SearchProviderType.Elasticsearch;
/**
* @see https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-create
*/
override async createTable(
table: SearchTable,
mapping: string
): Promise<void> {
const url = `${this.config.provider.endpoint}/${table}`;
try {
const result = await this.request('PUT', url, mapping);
this.logger.log(
`created table ${table}, result: ${JSON.stringify(result)}`
);
} catch (err) {
if (
err instanceof InvalidSearchProviderRequest &&
err.data.type === 'resource_already_exists_exception'
) {
this.logger.debug(`table ${table} already exists`);
} else {
throw err;
}
}
}
override async write(
table: SearchTable,
documents: Record<string, unknown>[],
options?: OperationOptions
): Promise<void> {
const start = Date.now();
const records: string[] = [];
for (const document of documents) {
// @ts-expect-error ignore document type check
const id = SearchTableUniqueId[table](document);
records.push(
JSON.stringify({
index: {
_index: table,
_id: id,
},
})
);
records.push(JSON.stringify(document));
}
const query: Record<string, string> = {};
if (options?.refresh) {
query.refresh = 'true';
}
await this.requestBulk(table, records, query);
this.logger.debug(
`wrote ${documents.length} documents to ${table} in ${Date.now() - start}ms`
);
}
/**
* @see https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-delete-by-query
*/
override async deleteByQuery<T extends SearchTable>(
table: T,
query: Record<string, any>,
options?: OperationOptions
): Promise<void> {
const start = Date.now();
const url = new URL(
`${this.config.provider.endpoint}/${table}/_delete_by_query`
);
if (options?.refresh) {
url.searchParams.set('refresh', 'true');
}
const result = await this.request(
'POST',
url.toString(),
JSON.stringify({ query })
);
this.logger.debug(
`deleted by query ${table} ${JSON.stringify(query)} in ${Date.now() - start}ms, result: ${JSON.stringify(result)}`
);
}
override async search(
table: SearchTable,
dsl: SearchQueryDSL
): Promise<SearchResult> {
const body = this.#convertToSearchBody(dsl);
const data = (await this.requestSearch(table, body)) as ESSearchResponse;
return {
took: data.took,
timedOut: data.timed_out,
total: data.hits.total.value,
nextCursor: this.#encodeCursor(data.hits.hits.at(-1)?.sort),
nodes: data.hits.hits.map(hit => ({
_id: hit._id,
_score: hit._score,
_source: hit._source,
fields: hit.fields,
highlights: hit.highlight,
})),
};
}
override async aggregate(
table: SearchTable,
dsl: AggregateQueryDSL
): Promise<AggregateResult> {
const body = this.#convertToSearchBody(dsl);
const data = (await this.requestSearch(table, body)) as ESAggregateResponse;
const buckets = data.aggregations.result.buckets;
return {
took: data.took,
timedOut: data.timed_out,
total: data.hits.total.value,
nextCursor: this.#encodeCursor(data.hits.hits.at(-1)?.sort),
buckets: buckets.map(bucket => ({
key: bucket.key,
count: bucket.doc_count,
hits: {
nodes: bucket.result.hits.hits.map(hit => ({
_id: hit._id,
_score: hit._score,
_source: hit._source,
fields: hit.fields,
highlights: hit.highlight,
})),
},
})),
};
}
protected async requestSearch(table: SearchTable, body: Record<string, any>) {
const url = `${this.config.provider.endpoint}/${table}/_search`;
const jsonBody = JSON.stringify(body);
const start = Date.now();
try {
return await this.request('POST', url, jsonBody);
} finally {
const duration = Date.now() - start;
// log slow search
if (duration > 1000) {
this.logger.warn(
`Slow search on ${table} in ${duration}ms, DSL: ${jsonBody}`
);
} else {
this.logger.verbose(
`search ${table} in ${duration}ms, DSL: ${jsonBody}`
);
}
}
}
/**
* @see https://www.elastic.co/docs/api/doc/elasticsearch-serverless/operation/operation-bulk-2
*/
protected async requestBulk(
table: SearchTable,
records: string[],
query?: Record<string, string>
) {
const url = new URL(`${this.config.provider.endpoint}/${table}/_bulk`);
if (query) {
Object.entries(query).forEach(([key, value]) => {
url.searchParams.set(key, value);
});
}
return await this.request(
'POST',
url.toString(),
records.join('\n') + '\n',
'application/x-ndjson'
);
}
protected async request(
method: 'POST' | 'PUT',
url: string,
body: string,
contentType = 'application/json'
) {
const headers = {
'Content-Type': contentType,
} as Record<string, string>;
if (this.config.provider.password) {
headers.Authorization = `Basic ${Buffer.from(`${this.config.provider.username}:${this.config.provider.password}`).toString('base64')}`;
}
const response = await fetch(url, {
method,
body,
headers,
});
const data = await response.json();
// handle error, status >= 400
// {
// "error": {
// "root_cause": [
// {
// "type": "illegal_argument_exception",
// "reason": "The bulk request must be terminated by a newline [\\n]"
// }
// ],
// "type": "illegal_argument_exception",
// "reason": "The bulk request must be terminated by a newline [\\n]"
// },
// "status": 400
// }
if (response.status >= 500) {
this.logger.error(
`request error, url: ${url}, body: ${body}, response status: ${response.status}, response body: ${JSON.stringify(data, null, 2)}`
);
throw new InternalServerError();
}
if (response.status >= 400) {
this.logger.warn(
`request failed, url: ${url}, body: ${body}, response status: ${response.status}, response body: ${JSON.stringify(data, null, 2)}`
);
const errorData = data as {
error: { type: string; reason: string } | string;
};
let reason = '';
let type = '';
if (typeof errorData.error === 'string') {
reason = errorData.error;
} else {
reason = errorData.error.reason;
type = errorData.error.type;
}
throw new InvalidSearchProviderRequest({
reason,
type,
});
}
this.logger.verbose(
`request ${method} ${url}, body: ${body}, response status: ${response.status}, response body: ${JSON.stringify(data)}`
);
return data;
}
#convertToSearchBody(dsl: SearchQueryDSL | AggregateQueryDSL) {
const data: Record<string, any> = {
...dsl,
};
if (dsl.cursor) {
data.cursor = undefined;
data.search_after = this.#decodeCursor(dsl.cursor);
}
return data;
}
#decodeCursor(cursor: string) {
return JSON.parse(Buffer.from(cursor, 'base64').toString('utf-8'));
}
#encodeCursor(cursor?: unknown[]) {
return cursor
? Buffer.from(JSON.stringify(cursor)).toString('base64')
: undefined;
}
}