Skip to content

Kafka Streaming

Overview

Apache Kafka handles real-time ingestion of VQE simulation results, decoupling producers from consumers with durable, ordered, exactly-once delivery.

  • Receive serialized VQE results from simulation containers
  • Validate schemas through Confluent Schema Registry
  • Buffer messages for reliable delivery
  • Feed Kafka Connect to write Avro files to MinIO

System Design | Avro Serialization


Topic Naming Convention

Topics follow a naming pattern encoding the data type and schema version:

vqe_decorated_result_{suffix}

The suffix is generated by VQEDecoratedResult.get_schema_suffix() and encodes simulation parameters. Each unique combination of molecule, iteration count, basis set, and backend produces a distinct topic.

Suffix Components

The suffix follows the format _mol{id}_{symbols}_it{iters}_bs_{basis_set}_bk_{backend}:

Component Description Example
Molecule ID Identifier for the simulated molecule 1, 2, 42
Symbols Atomic symbols in the molecule H2, LiH
Iteration Count Number of optimizer iterations 100, 500
Basis Set Quantum chemistry basis set used sto3g, cc-pvdz
Backend Qiskit simulation backend aer_simulator

Example Topic Names

vqe_decorated_result_mol0_H2_it100_bs_sto3g_bk_aer_simulator
vqe_decorated_result_mol1_LiH_it500_bs_cc_pvdz_bk_aer_simulator

Kafka Connect subscribes to all VQE result topics using the regex pattern vqe_decorated_result_.*, matching any suffix regardless of simulation parameters.


Schema Versioning

The Confluent Schema Registry manages Avro schema lifecycle with automatic versioning and compatibility checking. For details on Schema Registry concepts, see the Confluent Schema Registry documentation.

Schema Registry Configuration

schema-registry:
  image: confluentinc/cp-schema-registry:${SCHEMA_REGISTRY_VERSION:-latest}
  environment:
    SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "${KAFKA_SERVERS}"
    SCHEMA_REGISTRY_KAFKASTORE_TOPIC: "${SCHEMA_REGISTRY_TOPIC}"
    SCHEMA_REGISTRY_HOST_NAME: "${SCHEMA_REGISTRY_HOSTNAME:-schema-registry}"
    SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"

The Schema Registry uses the Confluent default compatibility mode (BACKWARD). For details on compatibility modes, see the Schema Registry documentation.


Producer Configuration

The producer is configured for reliability and data integrity.

self.producer = KafkaProducer(
    bootstrap_servers=self.config.servers,
    value_serializer=lambda v: v,
    retries=self.config.kafka_retries,
    acks=self.config.acks,
    **security_config,
)

Key Parameters

Parameter Purpose
bootstrap_servers Kafka broker addresses for initial cluster connection
retries Number of automatic retries for failed sends
acks Acknowledgment level required from brokers before considering a write successful
security_config SSL/SASL settings built by KafkaSecurity for secure broker connections

The producer is implemented in kafka_interface.py. Security configuration (SSL/SASL) is handled by KafkaSecurity and merged into the producer at initialization.


Message Format

Messages use the Confluent Wire Format, prepending metadata to the Avro binary payload for schema lookup during deserialization.

Wire Format Structure

+------------------+------------------+---------------------------+
| Magic Byte (1B)  | Schema ID (4B)   | Avro Binary Payload       |
| 0x00             | Big-endian int   | Serialized VQEDecResult   |
+------------------+------------------+---------------------------+
Component Size Description
Magic Byte 1 byte Always 0x00, identifies Confluent wire format
Schema ID 4 bytes Big-endian integer referencing the Schema Registry
Avro Binary Variable Compact binary-encoded VQEDecoratedResult

Serialization Code

def to_avro_bytes(self, obj: T, schema_name: str = 'vqe_decorated_result') -> bytes:
    """Convert object to Avro binary format with Confluent wire header."""
    schema = self.schema
    parsed_schema = avro.schema.parse(json.dumps(schema))

    writer = DatumWriter(parsed_schema)
    bytes_writer = io.BytesIO()

    # Confluent wire format header
    bytes_writer.write(bytes([0]))  # Magic byte
    bytes_writer.write(self.registry.id_cache[schema_name].to_bytes(4, byteorder='big'))

    encoder = BinaryEncoder(bytes_writer)
    writer.write(self.serialize(obj), encoder)
    return bytes_writer.getvalue()

Avro Serialization Serialization source: quantum_pipeline/stream/serialization/interfaces/vqe.py


Dynamic Topic Subscription

Kafka Connect uses regex-based subscription to discover and consume all VQE result topics automatically.

Configuration

{
  "topics.regex": "vqe_decorated_result_.*",
  "refresh.topics.enabled": "true"
}

Full connector configuration: docker/connectors/minio-sink-config.json

How It Works

  1. Kafka Connect periodically scans the broker for topics matching the regex pattern vqe_decorated_result_.*.
  2. When a new simulation run creates a new topic (e.g., vqe_decorated_result_mol0_H2_it100_bs_sto3g_bk_aer_simulator), Kafka Connect detects it automatically.
  3. The connector begins consuming from the new topic without any configuration changes or restarts.
  4. Messages from all matched topics are written to MinIO with a directory structure that separates data by topic name.

Parallel Version Support

Multiple simulation versions can run in parallel (e.g., A/B testing or canary releases) without data structure conflicts.


Security and Cluster Configuration

Kafka configuration follows Confluent best practices. For SSL/TLS encryption, SASL authentication, and production cluster tuning, see the Confluent Security documentation and Broker Configuration reference.

Configuration Reference


References