nav-img
Evolution

Настройка Platform V Synapse Streaming Event Processing

Для Platform V Synapse Streaming Event Processing предварительно требуется создать и настроить кластер Kafka, а затем прописать параметры подключения к кластеру Kafka в конфигурационном файле Platform V Synapse Streaming Event Processing.

В инструкции рассмотрим настройку с одним кластером Kafka, когда события направляются из топика input в топик output.

Перед началом работы

Шаг 1. Установите оператор Kafka

  1. Создайте пространство имен для Kafka:

    kubectl create namespace kafka
  2. Выполните команду:

    kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

Шаг 2. Создайте кластер Kafka

  1. Создайте kafka-single-node.yaml и скопируйте спецификацию:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
    name: kafka-node
    namespace: kafka
    labels:
    strimzi.io/cluster: kafka-cluster
    spec:
    replicas: 1
    roles:
    - controller
    - broker
    storage:
    type: jbod
    volumes:
    - id: 0
    type: persistent-claim
    size: 1Gi
    deleteClaim: false
    kraftMetadata: shared
    ----
    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
    name: kafka-cluster
    namespace: kafka
    annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
    spec:
    kafka:
    version: 3.8.0
    metadataVersion: 3.8-IV0
    listeners:
    - name: plain
    port: 9092
    type: internal
    tls: false
    - name: tls
    port: 9093
    type: internal
    tls: true
    config:
    offsets.topic.replication.factor: 1
    transaction.state.log.replication.factor: 1
    transaction.state.log.min.isr: 1
    default.replication.factor: 1
    min.insync.replicas: 1
    entityOperator:
    topicOperator: {}
    userOperator: {}
  2. Выполните команду:

    kubectl apply -f kafka-single-node.yaml

Шаг 3. Установите Platform V Synapse Streaming Event Processing

Шаг 4. Настройте параметры подключения к Kafka

  1. Выполните HTTP-запрос:

    PATCH /v2/clusters/{clusterId}/addons/synapse-evpc

    В теле запроса передайте параметры:

    {
    "clusterId": "<идентификатор_кластера>",
    "addonName": "synapse-evpc",
    "addonConfiguration": "{\"defaults\":{\"kafka_example\":{\"type\":\"kafka\",\"bootstrap.servers\":\"kafka-cluster-kafka-bootstrap.kafka.svc:9092\",\"security.protocol\":\"PLAINTEXT\",\"enable.idempotence\":\"false\",\"group.id\":\"example-group\"}}}"
    }

    Где:

    • type — тип пресета.

    • bootstrap.servers — bootstrap подключение к Apache Kafka вида host:port.

    • security.protocol — тип протокола подключения. PLAINTEXT — доступ к данным без аутентификации.

    • enable.idempotence — включение или отключение режима идемпотентности. Необходимо указать в настройках клиента при подключении к брокеру Kafka версии ниже третьей.

Шаг 5. Настройте конфигурации событий

  1. Выполните HTTP-запрос:

    PATCH /v2/clusters/{clusterId}/addons/synapse-evpc

    В теле запроса передайте параметры:

    {
    "clusterId": "<идентификатор_кластера>",
    "addonName": "synapse-evpc",
    "addonConfiguration": "{\n \"mappings\": [\n {\n \"fileName\": \"flow.conf\",\n \"content\": \"{\\n source: {\\n name: \\\"source\\\"\\n type: \\\"source\\\"\\n topic: \\\"input\\\"\\n config: ${defaults.kafka_example}\\n destination: ${transformationStep}\\n }\\n\\n destination: {\\n name: \\\"output\\\"\\n type: \\\"destination\\\"\\n topic: \\\"output\\\"\\n config: ${defaults.kafka_example}\\n }\\n\\n transformationStep: {\\n name: \\\"transformation\\\"\\n type: \\\"dsl\\\"\\n format: {\\n input: \\\"json\\\"\\n output: \\\"json\\\"\\n }\\n dsl: {\\n source: \\\"file\\\"\\n path: \\\"simple.tr\\\"\\n }\\n destination: ${destination}\\n }\\n\\n flow: {\\n name: \\\"TEST\\\"\\n source: [${source}]\\n }\\n}\"\n },\n {\n \"fileName\": \"simple.tr\",\n \"content\": \"out = in\"\n }\n ]\n}"
    }

    Где:

    • source — источник событий.

    • source.name — название источника.

    • source.type — тип источника.

    • source.topic — название топика.

    • source.config — конфигурация источника.

    • source.destination — следующий шаг потока.

    • destination — точка назначения событий.

    • destination.name — название приемника.

    • destination.type — тип точки назначения.

    • destination.topic — название топика.

    • destination.config — конфигурация транспорта назначения.

    • transformationStep — шаг трансформации событий.

    • transformationStep.name — название шага трансформации.

    • transformationStep.type — язык трансформации.

    • transformationStep.format — описание форматов входящего и исходящего событий.

    • transformationStep.dsl — описание расположения правил трансформации.

    • transformationStep.destination — следующий шаг потока.

    • flow — основной конфигурационный элемент.

    • flow.name — название основного конфигурационного элемента.

    • flow.source — источник событий.

  2. Перезапустите Deployment:

    kubectl restart deployment <пространство_имен_плагина> <название_deployment>

Теперь Platform V Synapse Streaming Event Processing готов к использованию.