Custom Connectors
Connect any HTTP or GraphQL API with a declarative JSON spec. rawquery handles auth, pagination, rate limits, and schema evolution - no code required.
Quick Start
Create a JSON spec describing the API, then run one command:
{ "version": "1", "base_url": "https://api.example.com/v1", "auth": { "type": "bearer" }, "streams": [ { "name": "users", "endpoint": "/users", "data_path": "data", "primary_key": ["id"] } ]}{ "version": "1", "base_url": "https://api.example.com/v1", "auth": { "type": "bearer" }, "streams": [ { "name": "users", "endpoint": "/users", "data_path": "data", "primary_key": ["id"] } ]}# Connect with an API tokenrq connect my-api --spec spec.json --auth-token $API_TOKEN
# With a sync schedulerq connect my-api --spec spec.json --auth-token $API_TOKEN --schedule daily
# Update the spec later (no need to recreate)rq connections update my-api --spec updated-spec.json# Connect with an API tokenrq connect my-api --spec spec.json --auth-token $API_TOKEN
# With a sync schedulerq connect my-api --spec spec.json --auth-token $API_TOKEN --schedule daily
# Update the spec later (no need to recreate)rq connections update my-api --spec updated-spec.jsonAfter creation, manage it like any other connection:
rq connections sync my-api # trigger syncrq connections status my-api # per-stream statusrq connections delete my-api # removerq connections sync my-api # trigger syncrq connections status my-api # per-stream statusrq connections delete my-api # removeSpec Format
The spec is a JSON document with these top-level fields:
| Field | Required | Description |
|---|---|---|
version | Yes | Always "1" |
base_url | Yes | API base URL |
type | No | "rest" (default) or "graphql" |
auth | Yes | Authentication configuration |
streams | Yes | Array of endpoints/resources to sync |
rate_limit | No | Rate limiting and retry configuration |
Each stream defines an endpoint to extract data from:
| Field | Description |
|---|---|
name | Table name in rawquery |
endpoint | URL path (REST) or defaults to /graphql |
data_path | JSON path to the array of records (e.g. data.results) |
primary_key | Array of field names for deduplication |
cursor_field | Field for incremental sync (e.g. updatedAt) |
pagination | Pagination configuration (see below) |
params | Static query parameters |
query | GraphQL query string (GraphQL specs only) |
Authentication
API Key / Bearer / Basic
Static tokens. Credentials passed via CLI flags and stored encrypted server-side.
// API key in header{"auth": {"type": "api_key", "header": "X-API-Key"}}
// API key in query parameter{"auth": {"type": "api_key", "query_param": "api_key"}}
// Bearer token (Authorization: Bearer <token>){"auth": {"type": "bearer"}}
// Basic auth (username:password){"auth": {"type": "basic"}}// API key in header{"auth": {"type": "api_key", "header": "X-API-Key"}}
// API key in query parameter{"auth": {"type": "api_key", "query_param": "api_key"}}
// Bearer token (Authorization: Bearer <token>){"auth": {"type": "bearer"}}
// Basic auth (username:password){"auth": {"type": "basic"}}rq connect my-api --spec spec.json --auth-token $TOKEN # api_key or bearerrq connect my-api --spec spec.json --auth-user me --auth-pass $PASS # basicrq connect my-api --spec spec.json --auth-token $TOKEN # api_key or bearerrq connect my-api --spec spec.json --auth-user me --auth-pass $PASS # basicOAuth2
Token refresh handled automatically before each sync. Refreshed tokens persisted.
{ "auth": { "type": "oauth2", "token_url": "https://provider.com/oauth/token", "refresh_url": "https://provider.com/oauth/token", "grant_type": "refresh_token" }}{ "auth": { "type": "oauth2", "token_url": "https://provider.com/oauth/token", "refresh_url": "https://provider.com/oauth/token", "grant_type": "refresh_token" }}rq connect my-api --spec spec.json \ --client-id $CLIENT_ID --client-secret $CLIENT_SECRET \ --refresh-token $REFRESH_TOKENrq connect my-api --spec spec.json \ --client-id $CLIENT_ID --client-secret $CLIENT_SECRET \ --refresh-token $REFRESH_TOKENCustom Auth
For APIs that require multi-step auth (session tokens, HMAC, custom headers):
{ "auth": { "type": "custom", "pre_request": { "url": "https://api.example.com/auth/session", "method": "POST", "body": {"username": "{{auth.username}}", "password": "{{auth.password}}"}, "extract": {"session_token": "response.token"} }, "headers": { "X-Session-Token": "{{session_token}}" } }}{ "auth": { "type": "custom", "pre_request": { "url": "https://api.example.com/auth/session", "method": "POST", "body": {"username": "{{auth.username}}", "password": "{{auth.password}}"}, "extract": {"session_token": "response.token"} }, "headers": { "X-Session-Token": "{{session_token}}" } }}Pagination
Configure per-stream. If omitted, rawquery assumes a single-page response.
// Offset-based{"pagination": {"type": "offset", "param": "offset", "limit_param": "limit", "page_size": 100}}
// Cursor-based (read cursor from response, pass as query param){"pagination": {"type": "cursor", "cursor_path": "meta.next_cursor", "param": "cursor"}}
// Page number{"pagination": {"type": "page_number", "param": "page", "page_size_param": "per_page", "page_size": 50, "start_page": 1}}
// Link header (RFC 5988, GitHub-style){"pagination": {"type": "link_header"}}
// Next URL in response body{"pagination": {"type": "next_url", "next_url_path": "paging.next"}}// Offset-based{"pagination": {"type": "offset", "param": "offset", "limit_param": "limit", "page_size": 100}}
// Cursor-based (read cursor from response, pass as query param){"pagination": {"type": "cursor", "cursor_path": "meta.next_cursor", "param": "cursor"}}
// Page number{"pagination": {"type": "page_number", "param": "page", "page_size_param": "per_page", "page_size": 50, "start_page": 1}}
// Link header (RFC 5988, GitHub-style){"pagination": {"type": "link_header"}}
// Next URL in response body{"pagination": {"type": "next_url", "next_url_path": "paging.next"}}Data Transformation
API responses are rarely flat tables. These options let you shape the data before it lands in Iceberg.
Flatten Nested Objects
{ "flatten": { "address": "prefix", // {"address": {"city": "Paris"}} -> {"address_city": "Paris"} "metadata": "lift", // {"metadata": {"key": "val"}} -> {"key": "val"} "raw_json": "json" // keeps as JSON string (default) }}{ "flatten": { "address": "prefix", // {"address": {"city": "Paris"}} -> {"address_city": "Paris"} "metadata": "lift", // {"metadata": {"key": "val"}} -> {"key": "val"} "raw_json": "json" // keeps as JSON string (default) }}Array Expansion
Expand nested arrays into separate child tables with foreign keys:
{ "expand": { "line_items": {"primary_key": "id", "parent_key": "order_id"} }}{ "expand": { "line_items": {"primary_key": "id", "parent_key": "order_id"} }}Field Selection and Renaming
{ "fields": { "include": ["id", "name", "email", "created_at"], "rename": {"created_at": "created_date", "id": "external_id"} }}{ "fields": { "include": ["id", "name", "email", "created_at"], "rename": {"created_at": "created_date", "id": "external_id"} }}Type Coercion
Override auto-inferred types when needed:
{ "types": { "created_at": "timestamp", "amount": "float", "is_active": "boolean", "metadata": "json" }}{ "types": { "created_at": "timestamp", "amount": "float", "is_active": "boolean", "metadata": "json" }}Supported types: string, integer, float, boolean, timestamp, date, json.
GraphQL
Set "type": "graphql" at the top level. Each stream uses a query field instead of endpoint. Relay cursor pagination is supported natively.
{ "version": "1", "base_url": "https://api.github.com", "type": "graphql", "auth": {"type": "bearer"}, "streams": [ { "name": "repositories", "query": "query($cursor: String) { viewer { repositories(first: 100, after: $cursor) { nodes { id name createdAt stargazerCount } pageInfo { hasNextPage endCursor } } } }", "data_path": "data.viewer.repositories.nodes", "pagination": { "type": "cursor", "cursor_variable": "cursor", "cursor_path": "data.viewer.repositories.pageInfo.endCursor", "has_more_path": "data.viewer.repositories.pageInfo.hasNextPage" }, "primary_key": ["id"] } ]}{ "version": "1", "base_url": "https://api.github.com", "type": "graphql", "auth": {"type": "bearer"}, "streams": [ { "name": "repositories", "query": "query($cursor: String) { viewer { repositories(first: 100, after: $cursor) { nodes { id name createdAt stargazerCount } pageInfo { hasNextPage endCursor } } } }", "data_path": "data.viewer.repositories.nodes", "pagination": { "type": "cursor", "cursor_variable": "cursor", "cursor_path": "data.viewer.repositories.pageInfo.endCursor", "has_more_path": "data.viewer.repositories.pageInfo.hasNextPage" }, "primary_key": ["id"] } ]}Rate Limiting
Optional. Configures request throttling, retries, and backoff:
{ "rate_limit": { "requests_per_second": 10, "retry_on": [429, 500, 502, 503], "backoff": "exponential", "max_retries": 5, "respect_retry_after": true }}{ "rate_limit": { "requests_per_second": 10, "retry_on": [429, 500, 502, 503], "backoff": "exponential", "max_retries": 5, "respect_retry_after": true }}The connector respects Retry-After headers, uses exponential backoff with jitter, and isolates failures per stream.
Schema Evolution
rawquery detects schema changes on every sync:
- New fields - automatically added as nullable columns. Existing rows get nulls.
- Removed fields - columns kept in the table, new rows get nulls.
- Type conflicts - logged as warnings, existing types preserved.
Schema changes are reported in sync results. To update the spec (add endpoints, change fields):
rq connections update my-api --spec updated-spec.jsonrq connections update my-api --spec updated-spec.jsonFull Example
A complete spec with OAuth2, cursor pagination, data transformation, and incremental sync:
{ "version": "1", "base_url": "https://api.hubspot.com", "auth": { "type": "oauth2", "token_url": "https://api.hubspot.com/oauth/v1/token", "refresh_url": "https://api.hubspot.com/oauth/v1/token", "grant_type": "refresh_token" }, "rate_limit": { "requests_per_second": 10, "retry_on": [429, 500, 502, 503], "backoff": "exponential", "max_retries": 5 }, "streams": [ { "name": "contacts", "endpoint": "/crm/v3/objects/contacts", "params": {"limit": 100, "properties": "firstname,lastname,email"}, "pagination": { "type": "cursor", "cursor_path": "paging.next.after", "param": "after" }, "data_path": "results", "primary_key": ["id"], "cursor_field": "updatedAt", "sync_modes": ["full_refresh", "incremental"], "flatten": {"properties": "lift"}, "fields": { "include": ["id", "firstname", "lastname", "email", "updatedAt"] } } ]}{ "version": "1", "base_url": "https://api.hubspot.com", "auth": { "type": "oauth2", "token_url": "https://api.hubspot.com/oauth/v1/token", "refresh_url": "https://api.hubspot.com/oauth/v1/token", "grant_type": "refresh_token" }, "rate_limit": { "requests_per_second": 10, "retry_on": [429, 500, 502, 503], "backoff": "exponential", "max_retries": 5 }, "streams": [ { "name": "contacts", "endpoint": "/crm/v3/objects/contacts", "params": {"limit": 100, "properties": "firstname,lastname,email"}, "pagination": { "type": "cursor", "cursor_path": "paging.next.after", "param": "after" }, "data_path": "results", "primary_key": ["id"], "cursor_field": "updatedAt", "sync_modes": ["full_refresh", "incremental"], "flatten": {"properties": "lift"}, "fields": { "include": ["id", "firstname", "lastname", "email", "updatedAt"] } } ]}