tocdepth

2

Настройка Platform V Synapse Streaming Event Processing

Для Platform V Synapse Streaming Event Processing предварительно требуется создать и настроить кластер Kafka, а затем прописать параметры подключения к кластеру Kafka в конфигурационном файле Platform V Synapse Streaming Event Processing.

В инструкции рассмотрим настройку с одним кластером Kafka, когда события направляются из топика input в топик output.

Перед началом работы

  1. Создайте кластер с публичным IP и хотя бы одну групу узлов.

  2. Подключитесь к кластеру.

Шаг 1. Установите оператор Kafka

  1. Создайте пространство имен для Kafka:

    kubectl create namespace kafka
    
  2. Выполните команду:

    kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
    

Шаг 2. Создайте кластер Kafka

  1. Создайте kafka-single-node.yaml и скопируйте спецификацию:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaNodePool
    metadata:
      name: kafka-node
      namespace: kafka
      labels:
        strimzi.io/cluster: kafka-cluster
    spec:
      replicas: 1
      roles:
        - controller
        - broker
      storage:
        type: jbod
        volumes:
          - id: 0
            type: persistent-claim
            size: 1Gi
            deleteClaim: false
            kraftMetadata: shared
    ----
    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: kafka-cluster
      namespace: kafka
      annotations:
        strimzi.io/node-pools: enabled
        strimzi.io/kraft: enabled
    spec:
      kafka:
        version: 3.8.0
        metadataVersion: 3.8-IV0
        listeners:
         - name: plain
           port: 9092
           type: internal
           tls: false
         - name: tls
           port: 9093
           type: internal
           tls: true
        config:
           offsets.topic.replication.factor: 1
           transaction.state.log.replication.factor: 1
           transaction.state.log.min.isr: 1
           default.replication.factor: 1
           min.insync.replicas: 1
      entityOperator:
        topicOperator: {}
        userOperator: {}
    
  2. Выполните команду:

    kubectl apply -f kafka-single-node.yaml
    

Шаг 3. Установите Platform V Synapse Streaming Event Processing

Установите плагин Platform V Synapse Streaming Event Processing в кластер Managed Kubernetes.

Шаг 4. Настройте параметры подключения к Kafka

  1. Пройдите аутентификацию в API.

  2. Выполните HTTP-запрос:

    PATCH /v2/clusters/{clusterId}/addons/synapse-evpc
    

    Где clusterIdидентификатор кластера.

    В теле запроса передайте параметры:

    {
      "clusterId": "<идентификатор_кластера>",
      "addonName": "synapse-evpc",
      "addonConfiguration": "{\"defaults\":{\"kafka_example\":{\"type\":\"kafka\",\"bootstrap.servers\":\"kafka-cluster-kafka-bootstrap.kafka.svc:9092\",\"security.protocol\":\"PLAINTEXT\",\"enable.idempotence\":\"false\",\"group.id\":\"example-group\"}}}"
    }
    

    Где:

    • type — тип пресета.

    • bootstrap.servers — bootstrap подключение к Apache Kafka вида host:port.

    • security.protocol — тип протокола подключения. PLAINTEXT — доступ к данным без аутентификации.

    • enable.idempotence — включение или отключение режима идемпотентности. Необходимо указать в настройках клиента при подключении к брокеру Kafka версии ниже третьей.

Шаг 5. Настройте конфигурации событий

  1. Выполните HTTP-запрос:

    PATCH /v2/clusters/{clusterId}/addons/synapse-evpc
    

    В теле запроса передайте параметры:

    {
      "clusterId": "<идентификатор_кластера>",
      "addonName": "synapse-evpc",
      "addonConfiguration": "{\n    \"mappings\": [\n        {\n            \"fileName\": \"flow.conf\",\n            \"content\": \"{\\n  source: {\\n    name: \\\"source\\\"\\n    type: \\\"source\\\"\\n    topic: \\\"input\\\"\\n    config: ${defaults.kafka_example}\\n    destination: ${transformationStep}\\n  }\\n\\n  destination: {\\n    name: \\\"output\\\"\\n    type: \\\"destination\\\"\\n    topic: \\\"output\\\"\\n    config: ${defaults.kafka_example}\\n  }\\n\\n  transformationStep: {\\n    name: \\\"transformation\\\"\\n    type: \\\"dsl\\\"\\n    format: {\\n      input: \\\"json\\\"\\n      output: \\\"json\\\"\\n    }\\n    dsl: {\\n      source: \\\"file\\\"\\n      path: \\\"simple.tr\\\"\\n    }\\n    destination: ${destination}\\n  }\\n\\n  flow: {\\n    name: \\\"TEST\\\"\\n    source: [${source}]\\n  }\\n}\"\n        },\n        {\n            \"fileName\": \"simple.tr\",\n            \"content\": \"out = in\"\n        }\n    ]\n}"
    }
    

    Где:

    • source — источник событий.

    • source.name — название источника.

    • source.type — тип источника.

    • source.topic — название топика.

    • source.config — конфигурация источника.

    • source.destination — следующий шаг потока.

    • destination — точка назначения событий.

    • destination.name — название приемника.

    • destination.type — тип точки назначения.

    • destination.topic — название топика.

    • destination.config — конфигурация транспорта назначения.

    • transformationStep — шаг трансформации событий.

    • transformationStep.name — название шага трансформации.

    • transformationStep.type — язык трансформации.

    • transformationStep.format — описание форматов входящего и исходящего событий.

    • transformationStep.dsl — описание расположения правил трансформации.

    • transformationStep.destination — следующий шаг потока.

    • flow — основной конфигурационный элемент.

    • flow.name — название основного конфигурационного элемента.

    • flow.source — источник событий.

  2. Перезапустите Deployment:

    kubectl restart deployment <пространство_имен_плагина> <название_deployment>
    

Теперь Platform V Synapse Streaming Event Processing готов к использованию.

Запустили Evolution free tier
для Dev & Test
Получить