Ingestion from Kafka Overview

QuestDB provides a first-party Kafka Connect connector for streaming data from Apache Kafka into QuestDB tables. The connector handles serialization, fault tolerance, and batching automatically, making it the recommended approach for most use cases.

Choosing an integration strategy

There are three ways to get data from Kafka into QuestDB:

StrategyRecommended forComplexity
QuestDB Kafka connectorMost usersLow
Stream processing (Flink)Complex transformationsMedium
Custom programSpecial requirementsHigh

For most users, the QuestDB Kafka connector is the best choice. It provides excellent performance (100,000+ rows/second), handles fault tolerance automatically, and requires minimal configuration.

QuestDB Kafka connector

The QuestDB Kafka connector is built on the Kafka Connect framework and uses InfluxDB Line Protocol for high-performance data transfer. It works with Kafka-compatible systems like Redpanda.

Quick start

This guide walks through setting up the connector to read JSON data from Kafka and write it to QuestDB.

Prerequisites

  • Apache Kafka (or compatible system)
  • QuestDB instance with HTTP endpoint accessible
  • Java 17+ (JDK)

Step 1: Install the connector

Download and install the connector JAR files:

curl -s https://api.github.com/repos/questdb/kafka-questdb-connector/releases/latest |
jq -r '.assets[]|select(.content_type == "application/zip")|.browser_download_url'|
wget -qi -

Extract and copy to your Kafka installation:

unzip kafka-questdb-connector-*-bin.zip
cd kafka-questdb-connector
cp ./*.jar /path/to/kafka_*.*-*.*.*/libs
info

The connector is also available from Confluent Hub. For Confluent platform users, see the Confluent Docker images sample.

Step 2: Configure the connector

Create a configuration file at /path/to/kafka/config/questdb-connector.properties:

questdb-connector.properties
name=questdb-sink
connector.class=io.questdb.kafka.QuestDBSinkConnector

# QuestDB connection
client.conf.string=http::addr=localhost:9000;

# Kafka source
topics=example-topic

# Target table (optional - defaults to topic name)
table=example_table

# Message format
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
include.key=false

Step 3: Start the services

Run these commands from your Kafka installation directory (single-node KRaft):

# Generate a unique cluster ID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

# Format storage directories (run once)
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

# Start Kafka
bin/kafka-server-start.sh config/server.properties

# Start the connector (from another terminal)
bin/connect-standalone.sh config/connect-standalone.properties config/questdb-connector.properties

Step 4: Test the pipeline

Publish a test message:

bin/kafka-console-producer.sh --topic example-topic --bootstrap-server localhost:9092

Enter this JSON (as a single line):

{"symbol": "AAPL", "price": 192.34, "volume": 1200}

Verify the data in QuestDB:

curl -G --data-urlencode "query=select * from 'example_table'" http://localhost:9000/exp

Expected output:

"symbol","price","volume","timestamp"
"AAPL",192.34,1200,"2026-02-03T15:10:00.000000Z"

The timestamp is assigned by QuestDB on ingestion, so the value you see will match your local ingest time.

Configuration reference

The connector configuration has two parts:

  • Client configuration string: How the connector connects to QuestDB
  • Connector options: How the connector processes Kafka messages

Connector options

NameTypeExampleDefaultDescription
client.conf.stringstringhttp::addr=localhost:9000;N/AClient configuration string
topicsstringorders,auditN/AKafka topics to read from
tablestringmy_tableTopic nameTarget table in QuestDB
key.converterstringorg.apache.kafka.connect.storage.StringConverterN/AConverter for Kafka keys
value.converterstringorg.apache.kafka.connect.json.JsonConverterN/AConverter for Kafka values
include.keybooleanfalsetrueInclude message key in target table
key.prefixstringfrom_keykeyPrefix for key fields
value.prefixstringfrom_valueN/APrefix for value fields
symbolsstringinstrument,stockN/AColumns to create as symbol type
doublesstringvolume,priceN/AColumns to always send as double type
timestamp.field.namestringpickup_timeN/ADesignated timestamp field. Use comma-separated names for composed timestamps
timestamp.unitsstringmicrosautoTimestamp field units: nanos, micros, millis, seconds, auto
timestamp.kafka.nativebooleantruefalseUse Kafka message timestamps as designated timestamps
timestamp.string.fieldsstringcreation_timeN/AString fields containing textual timestamps
timestamp.string.formatstringyyyy-MM-dd HH:mm:ss.SSSUUU zyyyy-MM-ddTHH:mm:ss.SSSUUUZFormat for parsing string timestamps
skip.unsupported.typesbooleanfalsefalseSkip unsupported types instead of failing
allowed.lagint2501000Milliseconds to wait before flushing when no new events

The connector uses Kafka Connect converters for deserialization and works with any format they support, including JSON, Avro, and Protobuf. When using Schema Registry, configure the appropriate converter (e.g., io.confluent.connect.avro.AvroConverter).

Client configuration string

The client.conf.string option configures how the connector communicates with QuestDB. You can also set this via the QDB_CLIENT_CONF environment variable.

Format:

<protocol>::<key>=<value>;<key>=<value>;...;

Note the trailing semicolon.

Supported protocols: http, https

Required keys:

  • addr - QuestDB hostname and port (port defaults to 9000)

Examples:

# Minimal configuration
client.conf.string=http::addr=localhost:9000;

# With HTTPS and retry timeout
client.conf.string=https::addr=questdb.example.com:9000;retry_timeout=60000;

# With authentication token from environment variable
client.conf.string=http::addr=localhost:9000;token=${QUESTDB_TOKEN};

See the Java Client configuration guide for all available client options.

danger

The QuestDB client also supports TCP transport, but it is not recommended for Kafka Connect because the TCP transport offers no delivery guarantees.

Environment variable expansion

The client.conf.string supports ${VAR} syntax for environment variable expansion, useful for injecting secrets in Kubernetes environments:

PatternResult
${VAR}Replaced with environment variable value
$$Escaped to literal $
$${VAR}Escaped to literal ${VAR} (not expanded)
$VARNot expanded (braces required)

The connector fails to start if:

  • A referenced environment variable is not defined
  • A variable reference is malformed (e.g., unclosed braces ${VAR)
  • A variable name is empty (${}) or invalid (must start with letter or underscore, followed by letters, digits, or underscores)
warning

Environment variable values containing semicolons (;) will break the configuration string parsing.

How data is mapped

The connector converts each Kafka message field to a QuestDB column. Nested structures and maps are flattened with underscores.

Example input:

{
"firstname": "John",
"lastname": "Doe",
"age": 30,
"address": {
"street": "Main Street",
"city": "New York"
}
}

Resulting table:

firstnamelastnameageaddress_streetaddress_city
JohnDoe30Main StreetNew York

Designated timestamps

The connector supports four strategies for designated timestamps:

StrategyConfigurationUse case
Server-assigned(default)QuestDB assigns timestamp on receipt
Message payloadtimestamp.field.name=fieldnameUse a field from the message
Kafka metadatatimestamp.kafka.native=trueUse Kafka's message timestamp
Composedtimestamp.field.name=date,timeCombine multiple fields

These strategies are mutually exclusive.

Using a message field

If your message contains a timestamp field:

timestamp.field.name=event_time
timestamp.units=millis # or: nanos, micros, seconds, auto

The connector auto-detects units for timestamps after April 26, 1970.

Using Kafka timestamps

To use Kafka's built-in message timestamp:

timestamp.kafka.native=true

Parsing string timestamps

For timestamps stored as strings:

timestamp.field.name=created_at
timestamp.string.fields=updated_at,deleted_at
timestamp.string.format=yyyy-MM-dd HH:mm:ss.SSSUUU z

The timestamp.field.name field becomes the designated timestamp. Fields in timestamp.string.fields are parsed as regular timestamp columns.

See QuestDB timestamp format for format patterns.

Composed timestamps

Some data sources split timestamps across multiple fields (common with KDB-style data):

{
"symbol": "BTC-USD",
"date": "20260202",
"time": "135010207"
}

Configure the connector to concatenate and parse them:

timestamp.field.name=date,time
timestamp.string.format=yyyyMMddHHmmssSSS

The fields date and time are concatenated into 20260202135010207, parsed to produce 2026-02-02T13:50:10.207000Z. The source fields are consumed and do not appear as columns in the output.

All listed fields must be present in each message.

Fault tolerance

The connector automatically retries recoverable errors (network issues, server unavailability, timeouts). Non-recoverable errors (invalid data, authentication failures) are not retried.

Configure retry behavior via the client configuration:

# Retry for up to 60 seconds
client.conf.string=http::addr=localhost:9000;retry_timeout=60000;

Default retry timeout is 10,000 ms.

Exactly-once delivery

Retries may cause duplicate rows. To ensure exactly-once delivery, enable deduplication on your target table. Deduplication requires a designated timestamp from the message payload or Kafka metadata.

Dead letter queue

For messages that fail due to non-recoverable errors (invalid data, schema mismatches), configure a Dead Letter Queue to prevent the connector from stopping. These settings must be configured in the Kafka Connect worker configuration (e.g., connect-standalone.properties or connect-distributed.properties), not in the connector configuration:

errors.tolerance=all
errors.deadletterqueue.topic.name=dlq-questdb
errors.deadletterqueue.topic.replication.factor=1

Failed messages are sent to the DLQ topic for later inspection.

See the Confluent DLQ documentation for details.

Performance tuning

Batch size

The connector batches messages before sending. Default batch size is 75,000 rows. For low-throughput scenarios, reduce this to lower latency:

client.conf.string=http::addr=localhost:9000;auto_flush_rows=1000;

Flush interval

The connector flushes data when:

  • Batch size is reached
  • No new events for allowed.lag milliseconds (default: 1000)
  • Kafka Connect commits offsets
# Flush after 250ms of no new events
allowed.lag=250

Configure offset commit frequency in Kafka Connect via offset.flush.interval.ms. See Kafka Connect configuration.

Type handling

Symbol columns

Use the symbols option to create columns as symbol type for better performance on repeated string values:

symbols=instrument,exchange,currency

Numeric type inference

Without a schema, the connector infers types from values. This can cause issues when a field is sometimes an integer and sometimes a float:

{"volume": 42}      // Inferred as long
{"volume": 42.5} // Error: column is long, value is double

Solutions:

  1. Use the doubles option to force double type:
    doubles=volume,price
  2. Pre-create the table with explicit column types using CREATE TABLE

Target table options

Table naming

By default, the table name matches the Kafka topic name. Override with:

table=my_custom_table

The table option supports templating:

table=kafka_${topic}_${partition}

Available variables: ${topic}, ${key}, ${partition}

If ${key} is used and the message has no key, it resolves to null.

Schema management

Tables are created automatically when they don't exist. This is convenient for development but in production, pre-create tables using CREATE TABLE for control over partitioning, indexes, and column types.

Transformations

OrderBookToArray

The connector includes an OrderBookToArray Single Message Transform (SMT) for converting arrays of structs into arrays of arrays. This is useful for order book data or tabular data stored as rows that needs to be pivoted into columnar form.

For querying order book data stored as arrays, see Order book analytics using arrays.

Input:

{
"symbol": "BTC-USD",
"buy_entries": [
{ "price": 100.5, "size": 10.0 },
{ "price": 99.8, "size": 25.0 }
]
}

Output:

{
"symbol": "BTC-USD",
"bids": [
[100.5, 99.8],
[10.0, 25.0]
]
}

Configuration:

transforms=orderbook
transforms.orderbook.type=io.questdb.kafka.OrderBookToArray$Value
transforms.orderbook.mappings=buy_entries:bids:price,size;sell_entries:asks:price,size

The mappings format is sourceField:targetField:field1,field2;...

Behavior:

  • All extracted values are converted to double
  • Missing source fields are skipped (no error)
  • Empty source arrays are skipped
  • Null values inside structs cause an error
  • If the target field name already exists in the input, it is replaced
  • Works with both schema-based and schemaless messages
note

QuestDB requires all inner arrays to have the same length. The OrderBookToArray SMT satisfies this naturally since each inner array comes from the same source entries.

Sample projects

Additional examples are available on GitHub:

Stream processing

Stream processing engines like Apache Flink provide rich APIs for data transformation, enrichment, and filtering with built-in fault tolerance.

QuestDB offers a Flink connector for users who need complex transformations while ingesting from Kafka.

Use stream processing when you need:

  • Complex stateful transformations
  • Joining multiple data streams
  • Windowed aggregations before writing to QuestDB

Custom program

Writing a dedicated program to read from Kafka and write to QuestDB offers maximum flexibility for arbitrary transformations and filtering.

Trade-offs:

  • Full control over serialization, error handling, and batching
  • Highest implementation complexity
  • Must handle Kafka consumer groups, offset management, and retries

This approach is only recommended for advanced use cases where the Kafka connector or stream processing cannot meet your requirements.

FAQ

Does the connector work with Schema Registry?

Yes. The connector relies on Kafka Connect converters for deserialization. Configure converters using key.converter and value.converter options. It works with Avro, JSON Schema, and other formats supported by Schema Registry.

Does the connector work with Debezium?

Yes. QuestDB works well with Debezium for change data capture. Since QuestDB is append-only, updates become new rows preserving history.

Use Debezium's ExtractNewRecordState transformation to extract the new record state. DELETE events are dropped by default.

See the Debezium sample project and the blog post Change Data Capture with QuestDB and Debezium.

Typical pattern: Use a relational database for current state and QuestDB for change history. For example, PostgreSQL holds current stock prices while QuestDB stores the complete price history for analytics.

How do I select which fields to include?

Use Kafka Connect's ReplaceField transformation:

{
"transforms": "removeFields",
"transforms.removeFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.removeFields.blacklist": "address,internal_id"
}

See ReplaceField documentation.

I'm getting a JsonConverter schema error

If you see:

JsonConverter with schemas.enable requires 'schema' and 'payload' fields

Your JSON data doesn't include a schema. Add to your configuration:

value.converter.schemas.enable=false

Or for keys:

key.converter.schemas.enable=false

See also