Data Connector Protocol
DCP is Slateo's Data Connector Protocol. A DCP plugin is a package that teaches Slateo how to connect to an external system, collect input, resolve credentials, fetch data, describe freshness, and expose the result as a source that agents and reports can use.
DCP separates plugin-specific code from platform responsibilities: plugin authors own source definitions, input schemas, normalization, rendering, fingerprinting, execution, and source-specific auth. Slateo owns orchestration, retries, cache key composition, artifact provenance, result validation, permissions, sandboxing, and UI behavior.
Protocol model
DCP has two layers:
- Plugin manifest: the installable plugin definition. It declares identity, version, lifecycle, runner strategy, trust and permissions, auth model, provenance, and the sources the plugin provides.
- Source connector: executable code for one or more sources. It validates input, normalizes it, renders a stable non-secret description, fingerprints source freshness, and returns normalized data.
First-party integrations and customer-local plugins use the same source contract. The difference is trust, packaging, and where the code is allowed to run.
How a DCP plugin is defined
A custom plugin exports a manifest and connector implementations from its entrypoint:
import { Schema } from 'effect';
import type { DcpCustomPluginManifestV2, DcpSourceConnector } from '@slateo/dcp';
const OrdersInputSchema = Schema.Struct({
endpoint: Schema.String,
startDate: Schema.optional(Schema.String),
});
type OrdersInput = typeof OrdersInputSchema.Type;
export const manifest = {
protocolVersion: 'dcp-source-v2',
pluginId: 'acme-orders',
pluginVersion: '0.1.0',
label: 'Acme Orders',
description: 'Read order data from Acme internal APIs.',
origin: 'customer_local',
lifecycleStatus: 'draft',
runner: {
primary: 'desktop_sandbox',
allowed: ['desktop_sandbox'],
cloudRunnable: false,
},
trust: {
tier: 'local_user_trusted',
sandboxRequired: true,
requiresAdminApprovalToPublish: false,
permissions: {
network: {
level: 'allow',
allowedHosts: ['api.acme.test'],
allowHttps: true,
allowHttp: false,
blockPrivateIps: true,
blockLocalhost: true,
},
secrets: ['acmeApiToken'],
},
allowedSecretScopes: ['orders:read'],
allowedNetworkHosts: ['api.acme.test'],
secretBindings: [
{
key: 'acmeApiToken',
label: 'Acme API token',
scopes: ['orders:read'],
provider: 'desktop_secure_storage',
visibility: 'desktop_local_only',
valuePolicy: 'runtime_injection',
required: true,
},
],
},
provenance: {
createdBy: 'agent',
createdAt: '2026-05-15T00:00:00.000Z',
},
auth: { type: 'connection', connectionKinds: ['acme-api'] },
sources: [
{
sourceKey: 'orders',
label: 'Orders',
inputSchemaKey: 'ordersInput',
schemaStrategy: { type: 'dynamic' },
cachePolicy: { cacheable: true, defaultStrength: 'ttl', ttlSeconds: 300 },
capabilities: {
outputType: 'tabular',
queryModel: 'api',
schemaDiscovery: { supported: false },
},
},
],
} satisfies DcpCustomPluginManifestV2;
export const connectors = {
orders: makeOrdersConnector(),
} satisfies Record<string, DcpSourceConnector<OrdersInput, OrdersInput>>;
The manifest is data-only. It is safe for Slateo to inspect before running plugin code, which lets the desktop app show sources, request permissions, bind secrets, validate schema declarations, and decide whether the plugin must run in a sandbox.
- Name
protocolVersion- Type
- 'dcp-source-v2'
- Description
Selects the plugin manifest contract. Connector implementations still use the source execution contract described below.
- Name
pluginId- Type
- string
- Description
Stable plugin identity. Use a durable, lowercase id; changing it creates a different plugin.
- Name
pluginVersion- Type
- string
- Description
Version of the plugin package. Increment it when source behavior, permissions, schema, or execution semantics change.
- Name
runner- Type
- DcpPluginRunnerStrategy
- Description
Declares whether the plugin runs in the local desktop process, the desktop sandbox, or a cloud runner. Customer-local plugins normally use desktop runners.
- Name
trust- Type
- DcpPluginTrustModel
- Description
Declares trust tier, sandbox requirements, network access, filesystem access, environment access, subprocess access, IPC access, and secret bindings.
- Name
sources- Type
- DcpSourceDefinition[]
- Description
Lists the data sources exposed by the plugin. Each source points at an input schema and declares schema, cache, and capability metadata.
Plugin lifecycle
Local DCP plugins are created and managed in the DCP plugin workspace. The Slateo CLI is the control surface for scaffolding, inspecting, and validating plugins:
slateo dcp plugin create acme-orders --label "Acme Orders"
slateo dcp plugin show acme-orders --format json
slateo dcp plugin validate acme-orders --format json
The desktop app stores local drafts and installed plugin artifacts under the configured DCP plugin workspace. In desktop builds this workspace is managed for the user; in development it can be set with SLATEO_DCP_PLUGIN_WORKSPACE.
The normal lifecycle is:
- Draft: a plugin manifest and source code exist locally.
- Generated: an agent or developer has produced implementation code.
- Validated: manifest, schemas, permissions, and tests have passed validation.
- Installed: the compiled bundle is available to the desktop runtime.
- Enabled: Slateo can use the plugin when a node or connection references it.
Runtime execution
When a DCP source runs, Slateo follows the same sequence for built-in connectors and custom plugins:
- Decode and validate user input with the source input schema.
- Normalize the input into a deterministic shape.
- Render a stable, non-secret source description for provenance and cache identity.
- Resolve secret handles into runtime-only values when the runner starts.
- Check network, filesystem, environment, subprocess, IPC, and secret permissions.
- Ask the connector for a fingerprint or execute the source when fresh data is required.
- Validate returned rows, schema, fingerprint, and metadata.
- Compose platform cache keys and record provenance without storing secret values.
How a DCP connector is defined
A source connector exports a DcpSourceConnector with five core pieces:
- Name
manifest- Type
- DcpSourceConnectorManifest
- Description
Public metadata for the connector: protocol version, connector id, version, label, auth model, source definitions, cache policy, schema strategy, and optional capabilities.
- Name
inputSchemas- Type
- Effect Schema map
- Description
Effect schemas for every source input. These schemas validate inputs and can include UI annotations for generated forms.
- Name
normalizeInput- Type
- function
- Description
Converts user input into a deterministic shape before rendering, fingerprinting, or execution. Trim strings, fill defaults, canonicalize casing, and remove ambiguity here.
- Name
render- Type
- function
- Description
Produces a stable, non-secret source description used for cache identity and provenance. Never include credentials, tokens, or secret values.
- Name
fingerprint- Type
- function
- Description
Returns the source freshness guarantee. Strong fingerprints are immutable content hashes or snapshot ids; weak fingerprints are things like ETags or watermarks; TTL fingerprints include
expiresAt;nonemeans no cache reuse guarantee.
- Name
execute- Type
- function
- Description
Fetches data and returns normalized rows, a DCP schema, a fingerprint observed during execution, and execution metadata.
Minimal connector
import { Effect, Schema } from 'effect';
import type {
DcpSourceConnector,
DcpSourceExecutionResult,
DcpSourceFingerprint,
} from '@slateo/dcp';
interface HttpClient {
readonly getJson: (url: string) => Effect.Effect<unknown, Error>;
readonly head: (url: string) => Effect.Effect<Headers, Error>;
}
const InputSchema = Schema.Struct({
url: Schema.String,
mode: Schema.optional(Schema.Literal('summary', 'detail')),
});
type Input = typeof InputSchema.Type;
interface Services {
readonly http: HttpClient;
}
export function makeExampleConnector(
services: Services,
): DcpSourceConnector<Input, Input> {
return {
manifest: {
protocolVersion: 'dcp-source-v1',
connectorId: 'example-http',
connectorVersion: '1.0.0',
label: 'Example HTTP',
auth: { type: 'connection', connectionKinds: ['example-http'] },
sources: [
{
sourceKey: 'endpoint',
label: 'Endpoint',
inputSchemaKey: 'endpointInput',
schemaStrategy: {
type: 'declared',
schema: {
fields: [
{ name: 'id', type: 'string' },
{ name: 'value', type: 'number' },
],
primaryKey: ['id'],
},
},
cachePolicy: {
cacheable: true,
defaultStrength: 'weak',
ttlSeconds: 300,
},
capabilities: {
outputType: 'tabular',
queryModel: 'api',
schemaDiscovery: { supported: false },
},
},
],
},
inputSchemas: { endpointInput: InputSchema },
normalizeInput: (input) =>
Effect.succeed({
url: input.url.trim(),
mode: input.mode ?? 'summary',
}),
render: (input) => Effect.succeed(`GET ${input.url} mode=${input.mode}`),
fingerprint: (input, context) =>
Effect.gen(function* () {
const headers = yield* services.http.head(input.url);
const observedAt = context.now.toISOString();
const etag = headers.get('etag');
if (etag) {
return {
keyParts: [`url:${input.url}`, `etag:${etag}`],
strength: 'weak',
observedAt,
diagnostics: { sourceUrl: input.url, versionToken: etag },
} satisfies DcpSourceFingerprint;
}
const expiresAt = new Date(
context.now.getTime() + 300_000,
).toISOString();
return {
keyParts: [`url:${input.url}`, `ttl-expires:${expiresAt}`],
strength: 'ttl',
observedAt,
expiresAt,
diagnostics: { sourceUrl: input.url },
} satisfies DcpSourceFingerprint;
}),
execute: (input, context) =>
Effect.gen(function* () {
const payload = yield* services.http.getJson(input.url);
const rows = normalizeRows(payload);
return {
rows,
schema: {
fields: [
{ name: 'id', type: 'string' },
{ name: 'value', type: 'number' },
],
primaryKey: ['id'],
},
fingerprint: {
keyParts: [`observed:${context.now.toISOString()}`],
strength: 'ttl',
observedAt: context.now.toISOString(),
expiresAt: new Date(
context.now.getTime() + 300_000,
).toISOString(),
},
metadata: { rowCount: rows.length },
} satisfies DcpSourceExecutionResult;
}),
};
}
function normalizeRows(payload: unknown): Array<Record<string, unknown>> {
if (!Array.isArray(payload)) {
return [];
}
return payload.map((item, index) =>
item && typeof item === 'object'
? { id: String(index), ...item }
: { id: String(index), value: item },
);
}
Fingerprints and cache behavior
DCP connectors must describe the freshness guarantee behind their data. The platform composes the final cache key, but connector fingerprints provide the source-specific freshness signal.
- Strong: immutable content hash, S3 version id, database snapshot id, or another identifier that guarantees the exact same bytes or rows.
- Weak: ETag, last-modified timestamp, SQL watermark, API revision, or another best-effort change token.
- TTL: cacheable until
expiresAt; always includeexpiresAt. - None: always fresh; no cache reuse guarantee.
Do not put credentials, tokens, user secrets, or other sensitive values in keyParts, rendered content, provenance, or diagnostics.
Capability metadata
Capabilities describe the data semantics exposed by a source independently from how it connects. This is important for custom plugins: a connector can use custom auth and transport while still declaring that the underlying semantics are Trino SQL, Snowflake SQL, an API, or a mixed model.
interface DcpSourceCapabilities {
outputType: 'tabular' | 'streaming' | 'single_value' | 'multi_table';
queryModel: 'sql' | 'api' | 'mixed';
sqlDialect?:
| 'postgres'
| 'snowflake'
| 'trino'
| 'redshift'
| 'bigquery'
| 'databricks'
| 'mysql'
| 'sqlite'
| 'duckdb'
| 'clickhouse'
| 'generic';
schemaDiscovery?: DcpSchemaDiscoveryCapability;
validation?: DcpValidationCapability;
parameterization?: DcpParameterizationCapability;
cancellation?: DcpCancellationCapability;
streaming?: DcpStreamingCapability;
maxRowsPerExecution?: number;
supportsIncrementalLoad?: boolean;
notes?: string;
}
Capabilities affect platform behavior:
queryModel: 'sql'plussqlDialectenables dialect-aware editor support, validation, and agent planning.schemaDiscovery.supportedenables schema browsing and table or column autocomplete.validation.supportsDryRunallows pre-execution cost or time estimates when the connector implements them.parameterization.styletells editors and agents how to render placeholders.cancellation.supportedallows the platform to expose cancellation controls.streaming.supportedallows incremental result display and backpressure-aware execution.
If queryModel is sql, always set sqlDialect. Existing connectors can omit capabilities, but they will receive less platform integration.
Custom proxy to a known SQL engine
Use capabilities when a custom connection method fronts a known engine. For example, an internal OAuth proxy to Trino can still declare full Trino semantics:
const TrinoQueryInputSchema = Schema.Struct({
query: Schema.String.pipe(Schema.minLength(1)),
catalog: Schema.optional(Schema.String.pipe(Schema.minLength(1))),
schema: Schema.optional(Schema.String.pipe(Schema.minLength(1))),
parameters: Schema.optional(
Schema.Record({ key: Schema.String, value: Schema.Unknown }),
),
});
type TrinoQueryInput = typeof TrinoQueryInputSchema.Type;
export const customTrinoConnector: DcpSourceConnector<
TrinoQueryInput,
TrinoQueryInput
> = {
manifest: {
protocolVersion: 'dcp-source-v1',
connectorId: 'custom-trino-proxy',
connectorVersion: '1.0.0',
label: 'Internal Trino',
description: 'Connect to internal Trino cluster via custom auth proxy.',
auth: { type: 'connection', connectionKinds: ['custom-trino'] },
sources: [
{
sourceKey: 'query',
label: 'SQL Query',
inputSchemaKey: 'trinoQueryInput',
schemaStrategy: { type: 'dynamic' },
cachePolicy: { cacheable: true, defaultStrength: 'strong' },
capabilities: {
outputType: 'tabular',
queryModel: 'sql',
sqlDialect: 'trino',
schemaDiscovery: {
supported: true,
requiresConnection: true,
cacheableDurationSeconds: 3600,
supportsPartialDiscovery: true,
},
validation: {
supportsSyntaxValidation: true,
supportsSemanticValidation: true,
supportsDryRun: true,
supportsExecutionPreview: true,
},
parameterization: {
supported: true,
style: 'question_mark',
supportsTypeHints: true,
supportsPreparedStatements: true,
},
cancellation: {
supported: true,
gracefulShutdown: true,
timeoutMs: 5000,
},
streaming: {
supported: true,
chunkSize: 1000,
backpressureSupported: true,
},
maxRowsPerExecution: 1_000_000,
notes: 'Full Trino SQL via a custom internal proxy.',
},
},
],
},
inputSchemas: { trinoQueryInput: TrinoQueryInputSchema },
normalizeInput: (input) => Effect.succeed(input),
render: (input) =>
Effect.succeed(
`${input.catalog ? `USE ${input.catalog}; ` : ''}${input.query}`,
),
fingerprint: (input, context) =>
Effect.succeed({
keyParts: [
`query-hash:${hashQuery(input.query)}`,
`catalog:${input.catalog ?? 'default'}`,
`schema:${input.schema ?? 'default'}`,
],
strength: 'strong',
observedAt: context.now.toISOString(),
}),
execute: (input, context) =>
Effect.tryPromise({
try: async () => {
const token = context.secrets.trinoApiKey;
const result = await executeTrinoQuery({ token, ...input });
return {
rows: result.rows,
schema: result.schema,
fingerprint: {
keyParts: [`query-hash:${hashQuery(input.query)}`],
strength: 'strong',
observedAt: context.now.toISOString(),
},
metadata: { rowCount: result.rows.length },
} satisfies DcpSourceExecutionResult;
},
catch: (error) =>
error instanceof Error ? error : new Error(String(error)),
}),
};
With this declaration, Slateo can treat the source as Trino for SQL highlighting, autocomplete, validation, schema discovery, and agent planning even though authentication and transport are custom.
Secrets and connection bindings
DCP separates shared plugin definitions from the credentials needed to run them.
- Name
DcpPluginSecretBinding- Description
Declares a required secret by stable key, label, scopes, provider, visibility, and value policy.
- Name
DcpConnectionBinding- Description
Stores non-secret settings plus secret handles. It never stores secret values.
- Name
DcpSecretResolverService- Description
Resolves secret handles at runtime and returns values only to the runner process.
Secret visibility controls where values may be resolved:
desktop_local_only: encrypted in the desktop app with OS secure storage. Available to desktop-local and sandboxed desktop runners only.cloud_user: future user-scoped cloud store. May be available to cloud runners if explicitly allowed.cloud_org: org-managed cloud store. Must not be loaded into a local desktop agent.
Runtime rules:
- Generated code, cache keys, provenance, org registry artifacts, and manifests receive handles only.
- Secret values are injected at runtime through the resolver service.
- Desktop local secrets are per laptop and per user. Plugin sharing does not share credentials.
- Missing required secrets should fail before execution with a binding error that names the missing secret key, not the value.
- Test connection uses the same resolver pathway as execution so stale or missing handles are caught before a node run.
Connector checklist
- Define an Effect Schema input for each source.
- Normalize input deterministically before rendering, fingerprinting, or execution.
- Render a stable, non-secret source description.
- Return a fingerprint that matches the source freshness guarantee.
- Do not put credentials, tokens, or user secrets in
keyParts, rendered content, or diagnostics. - Return normalized tabular rows plus a DCP schema from
execute. - Add capabilities for SQL, API, schema discovery, validation, parameterization, cancellation, and streaming where supported.
- Add tests that assert the manifest, fingerprints, secret handling, and capability declarations.