- tocdepth
2
Настройка Platform V Synapse Streaming Event Processing
Для Platform V Synapse Streaming Event Processing предварительно требуется создать и настроить кластер Kafka, а затем прописать параметры подключения к кластеру Kafka в конфигурационном файле Platform V Synapse Streaming Event Processing.
В инструкции рассмотрим настройку с одним кластером Kafka, когда события направляются из топика input
в топик output
.
Перед началом работы
Создайте кластер с публичным IP и хотя бы одну групу узлов.
Шаг 1. Установите оператор Kafka
Создайте пространство имен для Kafka:
kubectl create namespace kafka
Выполните команду:
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
Шаг 2. Создайте кластер Kafka
Создайте
kafka-single-node.yaml
и скопируйте спецификацию:apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaNodePool metadata: name: kafka-node namespace: kafka labels: strimzi.io/cluster: kafka-cluster spec: replicas: 1 roles: - controller - broker storage: type: jbod volumes: - id: 0 type: persistent-claim size: 1Gi deleteClaim: false kraftMetadata: shared ---- apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-cluster namespace: kafka annotations: strimzi.io/node-pools: enabled strimzi.io/kraft: enabled spec: kafka: version: 3.8.0 metadataVersion: 3.8-IV0 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 entityOperator: topicOperator: {} userOperator: {}
Выполните команду:
kubectl apply -f kafka-single-node.yaml
Шаг 3. Установите Platform V Synapse Streaming Event Processing
Установите плагин Platform V Synapse Streaming Event Processing в кластер Managed Kubernetes.
Шаг 4. Настройте параметры подключения к Kafka
Пройдите аутентификацию в API.
Выполните HTTP-запрос:
PATCH /v2/clusters/{clusterId}/addons/synapse-evpc
Где
clusterId
— идентификатор кластера.В теле запроса передайте параметры:
{ "clusterId": "<идентификатор_кластера>", "addonName": "synapse-evpc", "addonConfiguration": "{\"defaults\":{\"kafka_example\":{\"type\":\"kafka\",\"bootstrap.servers\":\"kafka-cluster-kafka-bootstrap.kafka.svc:9092\",\"security.protocol\":\"PLAINTEXT\",\"enable.idempotence\":\"false\",\"group.id\":\"example-group\"}}}" }
Где:
type
— тип пресета.bootstrap.servers
— bootstrap подключение к Apache Kafka вида host:port.security.protocol
— тип протокола подключения. PLAINTEXT — доступ к данным без аутентификации.enable.idempotence
— включение или отключение режима идемпотентности. Необходимо указать в настройках клиента при подключении к брокеру Kafka версии ниже третьей.
Шаг 5. Настройте конфигурации событий
Выполните HTTP-запрос:
PATCH /v2/clusters/{clusterId}/addons/synapse-evpc
В теле запроса передайте параметры:
{ "clusterId": "<идентификатор_кластера>", "addonName": "synapse-evpc", "addonConfiguration": "{\n \"mappings\": [\n {\n \"fileName\": \"flow.conf\",\n \"content\": \"{\\n source: {\\n name: \\\"source\\\"\\n type: \\\"source\\\"\\n topic: \\\"input\\\"\\n config: ${defaults.kafka_example}\\n destination: ${transformationStep}\\n }\\n\\n destination: {\\n name: \\\"output\\\"\\n type: \\\"destination\\\"\\n topic: \\\"output\\\"\\n config: ${defaults.kafka_example}\\n }\\n\\n transformationStep: {\\n name: \\\"transformation\\\"\\n type: \\\"dsl\\\"\\n format: {\\n input: \\\"json\\\"\\n output: \\\"json\\\"\\n }\\n dsl: {\\n source: \\\"file\\\"\\n path: \\\"simple.tr\\\"\\n }\\n destination: ${destination}\\n }\\n\\n flow: {\\n name: \\\"TEST\\\"\\n source: [${source}]\\n }\\n}\"\n },\n {\n \"fileName\": \"simple.tr\",\n \"content\": \"out = in\"\n }\n ]\n}" }
Где:
source
— источник событий.source.name
— название источника.source.type
— тип источника.source.topic
— название топика.source.config
— конфигурация источника.source.destination
— следующий шаг потока.destination
— точка назначения событий.destination.name
— название приемника.destination.type
— тип точки назначения.destination.topic
— название топика.destination.config
— конфигурация транспорта назначения.transformationStep
— шаг трансформации событий.transformationStep.name
— название шага трансформации.transformationStep.type
— язык трансформации.transformationStep.format
— описание форматов входящего и исходящего событий.transformationStep.dsl
— описание расположения правил трансформации.transformationStep.destination
— следующий шаг потока.flow
— основной конфигурационный элемент.flow.name
— название основного конфигурационного элемента.flow.source
— источник событий.
Перезапустите Deployment:
kubectl restart deployment <пространство_имен_плагина> <название_deployment>
Теперь Platform V Synapse Streaming Event Processing готов к использованию.
для Dev & Test