Evolution
Тема интерфейса

Чтение сообщений из топиков Managed Kafka®

С помощью этого руководства вы настроите чтение сообщений из топика Managed Kafka® и отображение полученных данных в логах задачи Spark. Вы создадите две задачи Spark c использованием скриптов для разового и для непрерывного чтения данных.

В результате вы получите возможность просматривать сообщения из топиков Managed Kafka® в логах задачи Spark.

Вы будете использовать следующие сервисы:

  • Object Storage — сервис для хранения данных любого типа и объема. Будет использоваться в качестве хранилища для скриптов.

  • Managed Spark — сервис, который позволяет развернуть кластерное вычислительное решение на основе Apache Spark для распределенной обработки данных.

  • Managed Kafka® — сервис для развертывания и управления кластерами Kafka® в инфраструктуре платформы Evolution.

Шаги:

Перед началом работы

  1. Если вы уже зарегистрированы, войдите под своей учетной записью.

  2. Убедитесь, что в проекте, где будет запускаться задача Spark, доступен сервис Managed Kafka®.

  3. Создайте кластер Managed Kafka®. На шаге Сетевые настройки в списке Подсеть выберите подсеть, указанную при создании инстанса Spark.

  4. Подключитесь к кластеру Managed Kafka® и отправьте несколько сообщений в топик.

1. Подготовьте скрипт задачи

На этом шаге вы загрузите в хранилище Object Storage файлы, содержащие скрипты для чтения топика Managed Kafka®. Скрипт из файла kafka_spark.py выполняет однократное чтение сообщений из топика, а скрипт из файла kafka_spark_streaming.py — непрерывное.

  1. Скопируйте скрипт и назовите файл kafka_spark.py.

    from pyspark.sql import SparkSession
    import os
    kafka_user = os.environ["KAFKA_USER"]
    kafka_pass = os.environ["KAFKA_PASS"]
    kafka_topic = os.environ["KAFKA_TOPIC"]
    kafka_server = os.environ["KAFKA_SERVER"]
    spark = SparkSession.builder.appName("kafka").getOrCreate()
    df = (
    spark.read.format("kafka")
    .option("kafka.bootstrap.servers", kafka_server)
    .option("kafka.security.protocol", "SASL_PLAINTEXT")
    .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
    .option(
    "kafka.sasl.jaas.config",
    f'org.apache.kafka.common.security.scram.ScramLoginModule required username="{kafka_user}" password="{kafka_pass}";',
    )
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .option("endingOffsets", "latest")
    .load()
    )
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    df.show(truncate=False)
    spark.stop()
  2. Скопируйте скрипт и назовите файл kafka_spark_streaming.py.

    from pyspark.sql import SparkSession
    import os
    kafka_user = os.environ["KAFKA_USER"]
    kafka_pass = os.environ["KAFKA_PASS"]
    kafka_topic = os.environ["KAFKA_TOPIC"]
    kafka_server = os.environ["KAFKA_SERVER"]
    spark = (
    SparkSession.builder.appName("kafka")
    .getOrCreate()
    )
    df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_server)
    .option("kafka.security.protocol", "SASL_PLAINTEXT")
    .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
    .option(
    "kafka.sasl.jaas.config",
    f'org.apache.kafka.common.security.scram.ScramLoginModule required username="{kafka_user}" password="{kafka_pass}";',
    )
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
    )
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    сonsole = (df
    .writeStream
    .outputMode('append')
    .format('console')
    .start()
    )
    console.awaitTermination()
    spark.stop()
  3. Откройте сервис Object Storage.

  4. Нажмите Загрузить и загрузите файлы со скриптами.

2. Создайте задачу Spark

На этом шаге вы создадите задачу Spark с использованием подготовленного скрипта. Скрипт выполнит чтение сообщений, отправленных в топик Managed Kafka®, и выведет данные из них в логи задачи Spark.

Для продолжения работы убедитесь, что статус инстанса Spark изменился на «Готов».

  1. Перейдите в сервис Managed Spark.

  2. Откройте созданный ранее инстанс.

  3. Перейдите на вкладку Задачи.

  4. Нажмите Создать задачу.

  5. В блоке Образ выберите базовый образ Spark-3.5.0.

  6. В блоке Скрипт приложения:

    • В поле Тип запускаемой задачи выберите Python.

    • В поле Путь к запускаемому файлу укажите путь к файлу kafka_spark.py.

  7. В блоке Настройки активируйте опцию Добавить параметры окружения. Добавьте следующие параметры и их значения:

    Параметр

    Значение

    KAFKA_USER

    Логин для подключения к кластеру Managed Kafka®, например, cloud-admin.

    KAFKA_PASS

    Пароль для подключения к кластеру Managed Kafka® с указанным логином.

    KAFKA_TOPIC

    Имя топика Managed Kafka®.

    KAFKA_SERVER

    Внутренний IP-адрес кластера Managed Kafka®.

    Чтобы узнать внутренний IP-адрес, логин и пароль, откройте сервис Managed Kafka® в отдельной вкладке, в списке кластеров нажмите на название созданного ранее кластера и перейдите в блок Данные для подключения.

  8. В блоке Настройки активируйте опцию Добавить Spark конфигурацию (–conf).

    • В поле Аргумент укажите spark.jars.packages.

    • В поле Значение укажите org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0.

  9. Нажмите Создать.

Задача Spark начнет выполняться и отобразится на странице инстанса во вкладке Задачи.

Подробнее о развертывании на официальном сайте.

3. Проверьте логи

На этом шаге вы проверите логи задачи Spark и отображение в них данных из топика Managed Kafka®.

Для продолжения работы убедитесь, что статус задачи Spark изменился на «Завершена».

  1. В строке задачи нажмите Горизонтальное меню и выберите Перейти к логам.

  2. Используйте фильтр, чтобы найти логи, содержащие сообщения из топика Managed Kafka®.

Пример данных, полученных из топика Managed Kafka®:

Данные, полученные из топика Managed Kafka®.

4. Запустите непрерывное чтение топика Managed Kafka®

На этом шаге вы создадите вторую задачу Spark с использованием скрипта, который будет непрерывно поддерживать соединение с топиком Managed Kafka® и выполнять чтение поступающих в него сообщений.

  1. В строке задачи Spark, выполненной ранее, нажмите Горизонтальное меню и выберите Скопировать задачу.

  2. В блоке Скрипт приложения в поле Путь к запускаемому файлу укажите путь к файлу kafka_spark_streaming.py.

  3. Нажмите Создать.

  4. Дождитесь, пока статус задачи изменится на «Выполняется».

  5. В строке задачи нажмите Горизонтальное меню и выберите Перейти к логам.

  6. Используйте фильтр, чтобы найти логи, содержащие сообщения из топика Managed Kafka®. Если в топик Managed Kafka® не поступают новые данные, в логах будут только отправленные ранее сообщения и информация об ожидании.

  7. Отправьте новое сообщение в топик Managed Kafka®.

  8. Посмотрите в логах задачи Spark информацию о новом сообщении.

  9. Задача Spark c данными параметрами будет выполняться, пока вы ее не завершите. Чтобы завершить задачу, в строке задачи нажмите Горизонтальное меню и выберите Остановить.

Результат

Вы настроили чтение сообщений из топика Managed Kafka® и вывод полученных данных в логи задачи Spark с помощью скриптов.