Базовые команды

В разделе приведены команды для kafkacat — утилиты для работы с Kafka® через командную строку. Подробнее о работе с kafkacat читайте в официальной документации.

Примечание
  • Используйте безопасные соединения (SSL/SASL) при работе с Kafka® в облаке.

  • Воспользуйтесь сервисом мониторинга в личном кабинете, чтобы отслеживать состояние кластера.

Установка kafkacat

Обновление пакетов:

sudo apt-get update

Установка:

sudo apt install -y kafkacat

Подключение к кластеру

Для подключения к кластеру используйте параметр -b (брокер) и настройки аутентификации, если они требуются. Пример подключения с аутентификацией:

kafkacat -b <внутренний_IP_брокера>:<порт_брокера> \
-X security.protocol = SASL_PLAINTEXT \
-X sasl.mechanism = SCRAM-SHA-512 \
-X sasl.username = "<логин_пользователя>" \
-X sasl.password = "<пароль_пользователя>"

Отправка сообщений в топик (Producer)

Команда kafkacat -P отправляет сообщения в указанный топик. K: — разделитель ключа и значения, например key:value.

Пример команды:

kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -t <имя_топика> -P \
-X security.protocol = SASL_PLAINTEXT \
-X sasl.mechanism = SCRAM-SHA-512 \
-X sasl.username = "<логин_пользователя>" \
-X sasl.password = "<пароль_пользователя>" \
-K:

Далее введите сообщение в формате key:value, например:

1 :foo
2 :bar

Для завершения ввода нажмите Ctrl + D.

Чтение сообщений из топика (Consumer)

Команда kafkacat -C читает сообщения из указанного топика. -f — формат вывода сообщений.

Пример команды:

kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -t <имя_топика> -C \
-X security.protocol = SASL_PLAINTEXT \
-X sasl.mechanism = SCRAM-SHA-512 \
-X sasl.username = "<логин_пользователя>" \
-X sasl.password = "<пароль_пользователя>" \
-f 'Key: %k\nValue: %s\n'

Пример вывода:

Key: 1
Value: foo
Key: 2
Value: bar
% Reached end of topic <имя_топика> [ 0 ] at offset 2

Чтение сообщений с начала топика

Команда kafkacat -C -o beginning читает сообщения с начала топика.

Пример команды:

kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -t <имя_топика> -C -o beginning \
-X security.protocol = SASL_PLAINTEXT \
-X sasl.mechanism = SCRAM-SHA-512 \
-X sasl.username = "<логин_пользователя>" \
-X sasl.password = "<пароль_пользователя>"

Просмотр метаданных кластера

Команда kafkacat -L отображает список брокеров, топиков и их партиций.

Пример команды:

kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -L \
-X security.protocol = SASL_PLAINTEXT \
-X sasl.mechanism = SCRAM-SHA-512 \
-X sasl.username = "<логин_пользователя>" \
-X sasl.password = "<пароль_пользователя>"

Пример вывода:

Metadata for all topics ( from broker -1: <внутренний_IP_брокера>:<порт_брокера> ) :
3 brokers:
broker 1 at <IP_брокера_1>:<порт>
broker 2 at <IP_брокера_2>:<порт>
broker 3 at <IP_брокера_3>:<порт>
1 topics:
topic "my_topic" with 3 partitions:
partition 0 , leader 1 , replicas: 1 ,2,3, isrs: 1 ,2,3
partition 1 , leader 2 , replicas: 2 ,3,1, isrs: 2 ,3,1
partition 2 , leader 3 , replicas: 3 ,1,2, isrs: 3 ,1,2

Чтение сообщений из определенной партиции

Команда kafkacat -C -p <номер_партиции> читает сообщения только из указанной партиции.

Пример команды:

kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -t <имя_топика> -C -p 0 \
-X security.protocol = SASL_PLAINTEXT \
-X sasl.mechanism = SCRAM-SHA-512 \
-X sasl.username = "<логин_пользователя>" \
-X sasl.password = "<пароль_пользователя>"

Чтение сообщений с форматированием

Команда kafkacat -C -f читает сообщения с пользовательским форматированием.

Пример команды:

kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -t <имя_топика> -C \
-X security.protocol = SASL_PLAINTEXT \
-X sasl.mechanism = SCRAM-SHA-512 \
-X sasl.username = "<логин_пользователя>" \
-X sasl.password = "<пароль_пользователя>" \
-f 'Partition: %p, Offset: %o, Key: %k, Value: %s\n'

Пример вывода:

Partition: 0 , Offset: 0 , Key: key1, Value: value1
Partition: 0 , Offset: 1 , Key: key2, Value: value2
Evolution