Data Connector Protocol

DCP is Slateo's Data Connector Protocol. A DCP plugin is a package that teaches Slateo how to connect to an external system, collect input, resolve credentials, fetch data, describe freshness, and expose the result as a source that agents and reports can use.

DCP separates plugin-specific code from platform responsibilities: plugin authors own source definitions, input schemas, normalization, rendering, fingerprinting, execution, and source-specific auth. Slateo owns orchestration, retries, cache key composition, artifact provenance, result validation, permissions, sandboxing, and UI behavior.

Protocol model

DCP has two layers:

  • Plugin manifest: the installable plugin definition. It declares identity, version, lifecycle, runner strategy, trust and permissions, auth model, provenance, and the sources the plugin provides.
  • Source connector: executable code for one or more sources. It validates input, normalizes it, renders a stable non-secret description, fingerprints source freshness, and returns normalized data.

First-party integrations and customer-local plugins use the same source contract. The difference is trust, packaging, and where the code is allowed to run.

How a DCP plugin is defined

A custom plugin exports a manifest and connector implementations from its entrypoint:

import { Schema } from 'effect';
import type { DcpCustomPluginManifestV2, DcpSourceConnector } from '@slateo/dcp';

const OrdersInputSchema = Schema.Struct({
  endpoint: Schema.String,
  startDate: Schema.optional(Schema.String),
});

type OrdersInput = typeof OrdersInputSchema.Type;

export const manifest = {
  protocolVersion: 'dcp-source-v2',
  pluginId: 'acme-orders',
  pluginVersion: '0.1.0',
  label: 'Acme Orders',
  description: 'Read order data from Acme internal APIs.',
  origin: 'customer_local',
  lifecycleStatus: 'draft',
  runner: {
    primary: 'desktop_sandbox',
    allowed: ['desktop_sandbox'],
    cloudRunnable: false,
  },
  trust: {
    tier: 'local_user_trusted',
    sandboxRequired: true,
    requiresAdminApprovalToPublish: false,
    permissions: {
      network: {
        level: 'allow',
        allowedHosts: ['api.acme.test'],
        allowHttps: true,
        allowHttp: false,
        blockPrivateIps: true,
        blockLocalhost: true,
      },
      secrets: ['acmeApiToken'],
    },
    allowedSecretScopes: ['orders:read'],
    allowedNetworkHosts: ['api.acme.test'],
    secretBindings: [
      {
        key: 'acmeApiToken',
        label: 'Acme API token',
        scopes: ['orders:read'],
        provider: 'desktop_secure_storage',
        visibility: 'desktop_local_only',
        valuePolicy: 'runtime_injection',
        required: true,
      },
    ],
  },
  provenance: {
    createdBy: 'agent',
    createdAt: '2026-05-15T00:00:00.000Z',
  },
  auth: { type: 'connection', connectionKinds: ['acme-api'] },
  sources: [
    {
      sourceKey: 'orders',
      label: 'Orders',
      inputSchemaKey: 'ordersInput',
      schemaStrategy: { type: 'dynamic' },
      cachePolicy: { cacheable: true, defaultStrength: 'ttl', ttlSeconds: 300 },
      capabilities: {
        outputType: 'tabular',
        queryModel: 'api',
        schemaDiscovery: { supported: false },
      },
    },
  ],
} satisfies DcpCustomPluginManifestV2;

export const connectors = {
  orders: makeOrdersConnector(),
} satisfies Record<string, DcpSourceConnector<OrdersInput, OrdersInput>>;

The manifest is data-only. It is safe for Slateo to inspect before running plugin code, which lets the desktop app show sources, request permissions, bind secrets, validate schema declarations, and decide whether the plugin must run in a sandbox.

  • Name
    protocolVersion
    Type
    'dcp-source-v2'
    Description

    Selects the plugin manifest contract. Connector implementations still use the source execution contract described below.

  • Name
    pluginId
    Type
    string
    Description

    Stable plugin identity. Use a durable, lowercase id; changing it creates a different plugin.

  • Name
    pluginVersion
    Type
    string
    Description

    Version of the plugin package. Increment it when source behavior, permissions, schema, or execution semantics change.

  • Name
    runner
    Type
    DcpPluginRunnerStrategy
    Description

    Declares whether the plugin runs in the local desktop process, the desktop sandbox, or a cloud runner. Customer-local plugins normally use desktop runners.

  • Name
    trust
    Type
    DcpPluginTrustModel
    Description

    Declares trust tier, sandbox requirements, network access, filesystem access, environment access, subprocess access, IPC access, and secret bindings.

  • Name
    sources
    Type
    DcpSourceDefinition[]
    Description

    Lists the data sources exposed by the plugin. Each source points at an input schema and declares schema, cache, and capability metadata.

Plugin lifecycle

Local DCP plugins are created and managed in the DCP plugin workspace. The Slateo CLI is the control surface for scaffolding, inspecting, and validating plugins:

slateo dcp plugin create acme-orders --label "Acme Orders"
slateo dcp plugin show acme-orders --format json
slateo dcp plugin validate acme-orders --format json

The desktop app stores local drafts and installed plugin artifacts under the configured DCP plugin workspace. In desktop builds this workspace is managed for the user; in development it can be set with SLATEO_DCP_PLUGIN_WORKSPACE.

The normal lifecycle is:

  • Draft: a plugin manifest and source code exist locally.
  • Generated: an agent or developer has produced implementation code.
  • Validated: manifest, schemas, permissions, and tests have passed validation.
  • Installed: the compiled bundle is available to the desktop runtime.
  • Enabled: Slateo can use the plugin when a node or connection references it.

Runtime execution

When a DCP source runs, Slateo follows the same sequence for built-in connectors and custom plugins:

  1. Decode and validate user input with the source input schema.
  2. Normalize the input into a deterministic shape.
  3. Render a stable, non-secret source description for provenance and cache identity.
  4. Resolve secret handles into runtime-only values when the runner starts.
  5. Check network, filesystem, environment, subprocess, IPC, and secret permissions.
  6. Ask the connector for a fingerprint or execute the source when fresh data is required.
  7. Validate returned rows, schema, fingerprint, and metadata.
  8. Compose platform cache keys and record provenance without storing secret values.

How a DCP connector is defined

A source connector exports a DcpSourceConnector with five core pieces:

  • Name
    manifest
    Type
    DcpSourceConnectorManifest
    Description

    Public metadata for the connector: protocol version, connector id, version, label, auth model, source definitions, cache policy, schema strategy, and optional capabilities.

  • Name
    inputSchemas
    Type
    Effect Schema map
    Description

    Effect schemas for every source input. These schemas validate inputs and can include UI annotations for generated forms.

  • Name
    normalizeInput
    Type
    function
    Description

    Converts user input into a deterministic shape before rendering, fingerprinting, or execution. Trim strings, fill defaults, canonicalize casing, and remove ambiguity here.

  • Name
    render
    Type
    function
    Description

    Produces a stable, non-secret source description used for cache identity and provenance. Never include credentials, tokens, or secret values.

  • Name
    fingerprint
    Type
    function
    Description

    Returns the source freshness guarantee. Strong fingerprints are immutable content hashes or snapshot ids; weak fingerprints are things like ETags or watermarks; TTL fingerprints include expiresAt; none means no cache reuse guarantee.

  • Name
    execute
    Type
    function
    Description

    Fetches data and returns normalized rows, a DCP schema, a fingerprint observed during execution, and execution metadata.

Minimal connector

import { Effect, Schema } from 'effect';
import type {
  DcpSourceConnector,
  DcpSourceExecutionResult,
  DcpSourceFingerprint,
} from '@slateo/dcp';

interface HttpClient {
  readonly getJson: (url: string) => Effect.Effect<unknown, Error>;
  readonly head: (url: string) => Effect.Effect<Headers, Error>;
}

const InputSchema = Schema.Struct({
  url: Schema.String,
  mode: Schema.optional(Schema.Literal('summary', 'detail')),
});

type Input = typeof InputSchema.Type;

interface Services {
  readonly http: HttpClient;
}

export function makeExampleConnector(
  services: Services,
): DcpSourceConnector<Input, Input> {
  return {
    manifest: {
      protocolVersion: 'dcp-source-v1',
      connectorId: 'example-http',
      connectorVersion: '1.0.0',
      label: 'Example HTTP',
      auth: { type: 'connection', connectionKinds: ['example-http'] },
      sources: [
        {
          sourceKey: 'endpoint',
          label: 'Endpoint',
          inputSchemaKey: 'endpointInput',
          schemaStrategy: {
            type: 'declared',
            schema: {
              fields: [
                { name: 'id', type: 'string' },
                { name: 'value', type: 'number' },
              ],
              primaryKey: ['id'],
            },
          },
          cachePolicy: {
            cacheable: true,
            defaultStrength: 'weak',
            ttlSeconds: 300,
          },
          capabilities: {
            outputType: 'tabular',
            queryModel: 'api',
            schemaDiscovery: { supported: false },
          },
        },
      ],
    },
    inputSchemas: { endpointInput: InputSchema },
    normalizeInput: (input) =>
      Effect.succeed({
        url: input.url.trim(),
        mode: input.mode ?? 'summary',
      }),
    render: (input) => Effect.succeed(`GET ${input.url} mode=${input.mode}`),
    fingerprint: (input, context) =>
      Effect.gen(function* () {
        const headers = yield* services.http.head(input.url);
        const observedAt = context.now.toISOString();
        const etag = headers.get('etag');

        if (etag) {
          return {
            keyParts: [`url:${input.url}`, `etag:${etag}`],
            strength: 'weak',
            observedAt,
            diagnostics: { sourceUrl: input.url, versionToken: etag },
          } satisfies DcpSourceFingerprint;
        }

        const expiresAt = new Date(
          context.now.getTime() + 300_000,
        ).toISOString();

        return {
          keyParts: [`url:${input.url}`, `ttl-expires:${expiresAt}`],
          strength: 'ttl',
          observedAt,
          expiresAt,
          diagnostics: { sourceUrl: input.url },
        } satisfies DcpSourceFingerprint;
      }),
    execute: (input, context) =>
      Effect.gen(function* () {
        const payload = yield* services.http.getJson(input.url);
        const rows = normalizeRows(payload);

        return {
          rows,
          schema: {
            fields: [
              { name: 'id', type: 'string' },
              { name: 'value', type: 'number' },
            ],
            primaryKey: ['id'],
          },
          fingerprint: {
            keyParts: [`observed:${context.now.toISOString()}`],
            strength: 'ttl',
            observedAt: context.now.toISOString(),
            expiresAt: new Date(
              context.now.getTime() + 300_000,
            ).toISOString(),
          },
          metadata: { rowCount: rows.length },
        } satisfies DcpSourceExecutionResult;
      }),
  };
}

function normalizeRows(payload: unknown): Array<Record<string, unknown>> {
  if (!Array.isArray(payload)) {
    return [];
  }

  return payload.map((item, index) =>
    item && typeof item === 'object'
      ? { id: String(index), ...item }
      : { id: String(index), value: item },
  );
}

Fingerprints and cache behavior

DCP connectors must describe the freshness guarantee behind their data. The platform composes the final cache key, but connector fingerprints provide the source-specific freshness signal.

  • Strong: immutable content hash, S3 version id, database snapshot id, or another identifier that guarantees the exact same bytes or rows.
  • Weak: ETag, last-modified timestamp, SQL watermark, API revision, or another best-effort change token.
  • TTL: cacheable until expiresAt; always include expiresAt.
  • None: always fresh; no cache reuse guarantee.

Do not put credentials, tokens, user secrets, or other sensitive values in keyParts, rendered content, provenance, or diagnostics.

Capability metadata

Capabilities describe the data semantics exposed by a source independently from how it connects. This is important for custom plugins: a connector can use custom auth and transport while still declaring that the underlying semantics are Trino SQL, Snowflake SQL, an API, or a mixed model.

interface DcpSourceCapabilities {
  outputType: 'tabular' | 'streaming' | 'single_value' | 'multi_table';
  queryModel: 'sql' | 'api' | 'mixed';
  sqlDialect?:
    | 'postgres'
    | 'snowflake'
    | 'trino'
    | 'redshift'
    | 'bigquery'
    | 'databricks'
    | 'mysql'
    | 'sqlite'
    | 'duckdb'
    | 'clickhouse'
    | 'generic';
  schemaDiscovery?: DcpSchemaDiscoveryCapability;
  validation?: DcpValidationCapability;
  parameterization?: DcpParameterizationCapability;
  cancellation?: DcpCancellationCapability;
  streaming?: DcpStreamingCapability;
  maxRowsPerExecution?: number;
  supportsIncrementalLoad?: boolean;
  notes?: string;
}

Capabilities affect platform behavior:

  • queryModel: 'sql' plus sqlDialect enables dialect-aware editor support, validation, and agent planning.
  • schemaDiscovery.supported enables schema browsing and table or column autocomplete.
  • validation.supportsDryRun allows pre-execution cost or time estimates when the connector implements them.
  • parameterization.style tells editors and agents how to render placeholders.
  • cancellation.supported allows the platform to expose cancellation controls.
  • streaming.supported allows incremental result display and backpressure-aware execution.

Custom proxy to a known SQL engine

Use capabilities when a custom connection method fronts a known engine. For example, an internal OAuth proxy to Trino can still declare full Trino semantics:

const TrinoQueryInputSchema = Schema.Struct({
  query: Schema.String.pipe(Schema.minLength(1)),
  catalog: Schema.optional(Schema.String.pipe(Schema.minLength(1))),
  schema: Schema.optional(Schema.String.pipe(Schema.minLength(1))),
  parameters: Schema.optional(
    Schema.Record({ key: Schema.String, value: Schema.Unknown }),
  ),
});

type TrinoQueryInput = typeof TrinoQueryInputSchema.Type;

export const customTrinoConnector: DcpSourceConnector<
  TrinoQueryInput,
  TrinoQueryInput
> = {
  manifest: {
    protocolVersion: 'dcp-source-v1',
    connectorId: 'custom-trino-proxy',
    connectorVersion: '1.0.0',
    label: 'Internal Trino',
    description: 'Connect to internal Trino cluster via custom auth proxy.',
    auth: { type: 'connection', connectionKinds: ['custom-trino'] },
    sources: [
      {
        sourceKey: 'query',
        label: 'SQL Query',
        inputSchemaKey: 'trinoQueryInput',
        schemaStrategy: { type: 'dynamic' },
        cachePolicy: { cacheable: true, defaultStrength: 'strong' },
        capabilities: {
          outputType: 'tabular',
          queryModel: 'sql',
          sqlDialect: 'trino',
          schemaDiscovery: {
            supported: true,
            requiresConnection: true,
            cacheableDurationSeconds: 3600,
            supportsPartialDiscovery: true,
          },
          validation: {
            supportsSyntaxValidation: true,
            supportsSemanticValidation: true,
            supportsDryRun: true,
            supportsExecutionPreview: true,
          },
          parameterization: {
            supported: true,
            style: 'question_mark',
            supportsTypeHints: true,
            supportsPreparedStatements: true,
          },
          cancellation: {
            supported: true,
            gracefulShutdown: true,
            timeoutMs: 5000,
          },
          streaming: {
            supported: true,
            chunkSize: 1000,
            backpressureSupported: true,
          },
          maxRowsPerExecution: 1_000_000,
          notes: 'Full Trino SQL via a custom internal proxy.',
        },
      },
    ],
  },
  inputSchemas: { trinoQueryInput: TrinoQueryInputSchema },
  normalizeInput: (input) => Effect.succeed(input),
  render: (input) =>
    Effect.succeed(
      `${input.catalog ? `USE ${input.catalog}; ` : ''}${input.query}`,
    ),
  fingerprint: (input, context) =>
    Effect.succeed({
      keyParts: [
        `query-hash:${hashQuery(input.query)}`,
        `catalog:${input.catalog ?? 'default'}`,
        `schema:${input.schema ?? 'default'}`,
      ],
      strength: 'strong',
      observedAt: context.now.toISOString(),
    }),
  execute: (input, context) =>
    Effect.tryPromise({
      try: async () => {
        const token = context.secrets.trinoApiKey;
        const result = await executeTrinoQuery({ token, ...input });

        return {
          rows: result.rows,
          schema: result.schema,
          fingerprint: {
            keyParts: [`query-hash:${hashQuery(input.query)}`],
            strength: 'strong',
            observedAt: context.now.toISOString(),
          },
          metadata: { rowCount: result.rows.length },
        } satisfies DcpSourceExecutionResult;
      },
      catch: (error) =>
        error instanceof Error ? error : new Error(String(error)),
    }),
};

With this declaration, Slateo can treat the source as Trino for SQL highlighting, autocomplete, validation, schema discovery, and agent planning even though authentication and transport are custom.

Secrets and connection bindings

DCP separates shared plugin definitions from the credentials needed to run them.

  • Name
    DcpPluginSecretBinding
    Description

    Declares a required secret by stable key, label, scopes, provider, visibility, and value policy.

  • Name
    DcpConnectionBinding
    Description

    Stores non-secret settings plus secret handles. It never stores secret values.

  • Name
    DcpSecretResolverService
    Description

    Resolves secret handles at runtime and returns values only to the runner process.

Secret visibility controls where values may be resolved:

  • desktop_local_only: encrypted in the desktop app with OS secure storage. Available to desktop-local and sandboxed desktop runners only.
  • cloud_user: future user-scoped cloud store. May be available to cloud runners if explicitly allowed.
  • cloud_org: org-managed cloud store. Must not be loaded into a local desktop agent.

Runtime rules:

  • Generated code, cache keys, provenance, org registry artifacts, and manifests receive handles only.
  • Secret values are injected at runtime through the resolver service.
  • Desktop local secrets are per laptop and per user. Plugin sharing does not share credentials.
  • Missing required secrets should fail before execution with a binding error that names the missing secret key, not the value.
  • Test connection uses the same resolver pathway as execution so stale or missing handles are caught before a node run.

Connector checklist

  • Define an Effect Schema input for each source.
  • Normalize input deterministically before rendering, fingerprinting, or execution.
  • Render a stable, non-secret source description.
  • Return a fingerprint that matches the source freshness guarantee.
  • Do not put credentials, tokens, or user secrets in keyParts, rendered content, or diagnostics.
  • Return normalized tabular rows plus a DCP schema from execute.
  • Add capabilities for SQL, API, schema discovery, validation, parameterization, cancellation, and streaming where supported.
  • Add tests that assert the manifest, fingerprints, secret handling, and capability declarations.

Was this page helpful?

Was this page helpful?