Kafka Streaming¶
Apache Kafka 4.2.0 handles real-time ingestion of VQE simulation results, decoupling producers from consumers with durable, ordered delivery. Kafka runs in KRaft mode (no ZooKeeper dependency).
For how Kafka fits into the overall architecture, see System Design. For schema definitions and the Avro wire format, see Serialization.
Topic Design¶
All VQE results are published to a single topic: experiment.vqe. Previous
versions used dynamic per-molecule topics (vqe_decorated_result_{suffix}).
The current design consolidates everything into one topic, simplifying consumer
configuration and Schema Registry management. Simulation parameters (molecule,
basis set, backend) are encoded in the message payload.
Kafka Cluster (KRaft Mode)¶
A single-node KRaft setup where one node acts as both broker and controller.
Replication factors are 1 since there is only one broker. The internal listener
(PLAINTEXT) is used by containers on the Docker network, and the external
listener is exposed for development access from the host.
The full Kafka service definition is in
compose/docker-compose.ml.yaml.
| Listener | Port | Purpose |
|---|---|---|
PLAINTEXT |
9092 |
Internal, used by containers on the Docker network |
CONTROLLER |
9093 |
KRaft controller communication |
EXTERNAL |
9094 |
Host access for development |
Schema Registry¶
Confluent Schema Registry 8.2.0 manages Avro schema lifecycle with automatic
versioning. The default compatibility mode is BACKWARD. The registry stores
schemas on the internal _schemas topic.
The service definition is in
compose/docker-compose.ml.yaml.
For details on compatibility modes, see the Schema Registry documentation.
Producer Configuration¶
VQEKafkaProducer
wraps kafka-python's KafkaProducer, adding Avro serialization and a
two-level retry mechanism. On initialization it builds the security config,
connects to Schema Registry, and sets up the
VQEDecoratedResultInterface
serializer.
| Parameter | Default | Purpose |
|---|---|---|
bootstrap_servers |
localhost:9092 |
Kafka broker addresses (overridden by KAFKA_SERVERS env var at CLI level) |
kafka_retries |
5 |
Client-level retries handled by kafka-python for transient broker errors |
retries |
3 |
Application-level retries in _send_with_retry(), with configurable retry_delay (default 2s) |
acks |
all |
Acknowledgment level required from brokers |
timeout |
10 |
Timeout in seconds for each send().get() call |
There are two retry layers: kafka_retries is passed to the KafkaProducer
constructor for low-level retries. retries controls the outer application
loop in _send_with_retry(), which catches KafkaError exceptions and retries
with a delay.
The ProducerConfig
dataclass holds all producer settings and can be constructed directly or via
ProducerConfig.from_dict().
Message Format¶
Messages use the Confluent Wire Format, prepending metadata to the Avro binary payload for schema lookup during deserialization.
| Component | Size | Description |
|---|---|---|
| Magic Byte | 1 byte | Always 0x00, identifies Confluent wire format |
| Schema ID | 4 bytes | Big-endian integer referencing the Schema Registry |
| Avro Binary | Variable | Compact binary-encoded VQEDecoratedResult |
The serialization is handled by to_avro_bytes() on
AvroInterfaceBase.
It prepends the Confluent wire header when the schema ID is found in the
registry cache, and falls back to headerless Avro if not.
For schema structure details, see Serialization.
Redpanda Connect (S3 Sink)¶
Redpanda Connect replaces Kafka Connect as the S3 sink. It is a single Go binary (around 128 MB) compared to Kafka Connect's JVM footprint, which makes it a better fit for a local deployment.
It subscribes to experiment.vqe, decodes each message through Schema Registry
via schema_registry_decode, and writes the resulting JSON to the raw-results
bucket in Garage.
The full pipeline config is at
compose/redpanda-connect.yaml:
http:
address: 0.0.0.0:4195
input:
kafka:
addresses:
- kafka:9092
topics:
- experiment.vqe
consumer_group: redpanda-s3-sink
processors:
- schema_registry_decode:
url: http://schema-registry:8081
avro_raw_json: true
output:
aws_s3:
bucket: raw-results
path: experiments/experiment.vqe/${!count("s3_objects")}-${!timestamp_unix_nano()}.json
endpoint: http://garage:3901
region: garage
force_path_style_urls: true
credentials:
id: ${S3_ACCESS_KEY}
secret: ${S3_SECRET_KEY}
content_type: application/json
max_in_flight: 1
batching:
count: 1
period: 10s
Output files follow the pattern {counter}-{unix_nano_timestamp}.json.
Kafka Connect Alternative
A Kafka Connect configuration is available as a fallback in
compose/docker-compose.ml.kafka-connect.yaml.
It can be enabled by scaling down Redpanda Connect and starting Kafka
Connect instead.
Security Configuration¶
KafkaSecurity
builds the security dictionary passed to KafkaProducer. It supports two
modes, controlled by
SecurityConfig:
| Mode | Protocol | Authentication | Status |
|---|---|---|---|
| SSL | security_protocol: SSL |
Certificate-based (paths from CertConfig, resolved relative to ssl_dir, default ./secrets/) |
Tested |
| SASL_SSL | security_protocol: SASL_SSL |
Username/password (PLAIN, SCRAM-SHA-256/512) | Tested |
| SASL_SSL | security_protocol: SASL_SSL |
Kerberos (GSSAPI) | Untested - options are exposed but no Kerberos setup has been done |
All modes are disabled by default. When neither is enabled,
build_security_config() returns an empty dictionary and the producer connects
over PLAINTEXT. The Docker Compose stack uses PLAINTEXT within the container
network.
For the full configuration reference, see Configuration.
Related Documentation¶
- System Design - Full architecture overview
- Serialization - Schema definitions and wire format
- Spark Processing - Downstream data consumption
- Iceberg Storage - Where Redpanda Connect writes raw data
- Configuration - Full configuration reference