# SQL Batch

The SQL Batch connector is supported as a source only. It is built into dsync/dsynct and reads data from SQL databases using configurable queries. It supports SQL Server, PostgreSQL, and Oracle.

## Configuration

Specify `sqlbatch` as the source with a `--config` flag pointing to a YAML configuration file:

```
sqlbatch --config=config.yml
```

## Data Type

The SQL Batch connector uses **JSON**.

## Config File

The configuration file is a YAML file with the following top-level fields:

```yaml
id: my-source
driver: postgres
connectionstring: "postgres://user:password@host:5432/dbname"
mappings:
  - namespace: db.users
    query: "SELECT id, name, email FROM users"
    partitionquery: "SELECT id FROM users WHERE id % 4 = 0 ORDER BY id"
    cols: [id]
    limit: 1000
```

### Top-Level Fields

| Field              | Required | Description                                                             |
| ------------------ | -------- | ----------------------------------------------------------------------- |
| `id`               | No       | Identifier for the connector. Defaults to `"sql"` if not set.           |
| `driver`           | Yes      | SQL driver to use. Supported values: `sqlserver`, `postgres`, `oracle`. |
| `connectionstring` | Yes      | Connection string for the database.                                     |
| `mappings`         | Yes      | List of mapping definitions (see below).                                |

### Mapping Fields

Each entry in `mappings` defines a namespace (analogous to a collection or table) that the connector will read.

| Field            | Required | Description                                                                                                                                                                                                    |
| ---------------- | -------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `namespace`      | Yes      | Name used to identify this data set (e.g. `db.users`). This is the namespace that dsync will use.                                                                                                              |
| `query`          | Yes      | SQL query that returns the data. Must include all key columns from `cols`.                                                                                                                                     |
| `partitionquery` | Yes      | SQL query that returns partition boundary values. The columns must match `cols`. Results must be sorted in ascending order and contain no duplicates. Used to split the initial sync into parallel partitions. |
| `cols`           | Yes      | List of key column names. Used for partitioning, ordering, and identifying rows.                                                                                                                               |
| `limit`          | Yes      | Batch size for reading data. Must be at least 1.                                                                                                                                                               |
| `countquery`     | No       | SQL query that returns a single count value. If omitted, defaults to `WITH QUERY AS (<query>) SELECT COUNT(*) from QUERY`.                                                                                     |
| `nocount`        | No       | If `true`, skips the count query entirely.                                                                                                                                                                     |
| `decodejson`     | No       | List of column names whose values should be decoded from JSON strings into structured data.                                                                                                                    |
| `fetchers`       | No       | Number of parallel fetchers for change streaming. Defaults to 1.                                                                                                                                               |
| `changes`        | No       | List of change tracking configurations for streaming updates (see below).                                                                                                                                      |

### Change Tracking

If you want the connector to stream ongoing changes (not just do an initial sync), configure the `changes` field. Each entry polls for changes using a query.

The changes `query` receives the current cursor value as a single positional parameter. The placeholder syntax depends on the driver:

| Driver      | Placeholder |
| ----------- | ----------- |
| `sqlserver` | `@p1`       |
| `postgres`  | `$1`        |
| `oracle`    | `:1`        |

```yaml
mappings:
  - namespace: db.users
    query: "SELECT id, name, email FROM users"
    partitionquery: "SELECT id FROM users WHERE id % 4 = 0 ORDER BY id"
    cols: [id]
    limit: 1000
    changes:
      - initialcursorquery: "SELECT MAX(updated_at) FROM users"
        query: "SELECT id, 'U', updated_at FROM users WHERE updated_at > $1 ORDER BY updated_at LIMIT 1000"
        interval: 5s
```

| Field                | Required | Description                                                                                                       |
| -------------------- | -------- | ----------------------------------------------------------------------------------------------------------------- |
| `initialcursorquery` | Yes      | Query that returns a single value to use as the starting cursor.                                                  |
| `query`              | Yes      | Query that returns changed rows (see format below). Use the driver-specific placeholder for the cursor parameter. |
| `interval`           | No       | How long to wait before polling again when no changes are found. Defaults to `5s`.                                |

#### Changes Query Format

The changes `query` is critical to get right. It must return columns in this exact order:

1. **Key columns** -- the same columns listed in `cols`, used to identify which row changed.
2. **Update type** -- a string column: `"D"` for deletes, any other value (e.g. `"U"`, `"I"`) for upserts.
3. **Cursor value** -- a monotonically increasing value (e.g. a version number or timestamp) that becomes the input for the next poll cycle.

For example, with `cols: [id]`, the query must return exactly 3 columns: `id`, update type, cursor value.

The connector polls in a loop: it passes the current cursor as the placeholder parameter, reads all returned rows, advances the cursor to the last row's cursor value, and repeats until no more rows are returned. It then waits for `interval` before polling again.

**Important considerations:**

* The query should filter using `> cursor` (not `>=`) to avoid reprocessing the same row, unless the cursor value is set up to guarantee no overlap.
* Results should be ordered by the cursor column in ascending order so that the connector advances through changes sequentially.
* The number of rows returned per poll is bounded by `limit` from the mapping. If the query returns `limit` or more rows, the connector immediately polls again without waiting for `interval`.

## Full Example

```yaml
id: my-sql-source
driver: sqlserver
connectionstring: "sqlserver://user:password@host:1433?database=mydb"
mappings:
  - namespace: dbo.orders
    query: "SELECT order_id, customer_id, total, details FROM orders"
    partitionquery: "SELECT order_id FROM orders WHERE order_id % 8 = 0 ORDER BY order_id"
    cols: [order_id]
    limit: 5000
    decodejson: [details]
    changes:
      - initialcursorquery: "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
        query: "SELECT CT.order_id, CT.SYS_CHANGE_OPERATION, CT.SYS_CHANGE_VERSION FROM CHANGETABLE(CHANGES dbo.orders, @p1) AS CT ORDER BY CT.SYS_CHANGE_VERSION"
        interval: 10s

  - namespace: dbo.customers
    query: "SELECT customer_id, name, email FROM customers"
    partitionquery: "SELECT customer_id FROM customers WHERE customer_id % 4 = 0 ORDER BY customer_id"
    cols: [customer_id]
    limit: 2000
    nocount: true
```
