Базовые команды
В разделе приведены команды для kafkacat — утилиты для работы с Kafka® через командную строку. Подробнее о работе с kafkacat читайте в официальной документации.
Используйте безопасные соединения (SSL/SASL) при работе с Kafka® в облаке.
Воспользуйтесь сервисом мониторинга в личном кабинете, чтобы отслеживать состояние кластера.
Установка kafkacat
Обновление пакетов:
sudo apt-get update
Установка:
sudo apt install -y kafkacat
Подключение к кластеру
Для подключения к кластеру используйте параметр -b (брокер) и настройки аутентификации, если они требуются. Пример подключения с аутентификацией:
kafkacat -b <внутренний_IP_брокера>:<порт_брокера> \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=SCRAM-SHA-512 \-X sasl.username="<логин_пользователя>" \-X sasl.password="<пароль_пользователя>"
Отправка сообщений в топик (Producer)
Команда kafkacat -P отправляет сообщения в указанный топик. K: — разделитель ключа и значения, например key:value.
Пример команды:
kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -t <имя_топика> -P \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=SCRAM-SHA-512 \-X sasl.username="<логин_пользователя>" \-X sasl.password="<пароль_пользователя>" \-K:
Далее введите сообщение в формате key:value, например:
1:foo2:bar
Для завершения ввода нажмите Ctrl + D.
Чтение сообщений из топика (Consumer)
Команда kafkacat -C читает сообщения из указанного топика. -f — формат вывода сообщений.
Пример команды:
kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -t <имя_топика> -C \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=SCRAM-SHA-512 \-X sasl.username="<логин_пользователя>" \-X sasl.password="<пароль_пользователя>" \-f 'Key: %k\nValue: %s\n'
Пример вывода:
Key: 1Value: fooKey: 2Value: bar% Reached end of topic <имя_топика> [0] at offset 2
Чтение сообщений с начала топика
Команда kafkacat -C -o beginning читает сообщения с начала топика.
Пример команды:
kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -t <имя_топика> -C -o beginning \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=SCRAM-SHA-512 \-X sasl.username="<логин_пользователя>" \-X sasl.password="<пароль_пользователя>"
Просмотр метаданных кластера
Команда kafkacat -L отображает список брокеров, топиков и их партиций.
Пример команды:
kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -L \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=SCRAM-SHA-512 \-X sasl.username="<логин_пользователя>" \-X sasl.password="<пароль_пользователя>"
Пример вывода:
Metadata for all topics (from broker -1: <внутренний_IP_брокера>:<порт_брокера>):3 brokers:broker 1 at <IP_брокера_1>:<порт>broker 2 at <IP_брокера_2>:<порт>broker 3 at <IP_брокера_3>:<порт>1 topics:topic "my_topic" with 3 partitions:partition 0, leader 1, replicas: 1,2,3, isrs: 1,2,3partition 1, leader 2, replicas: 2,3,1, isrs: 2,3,1partition 2, leader 3, replicas: 3,1,2, isrs: 3,1,2
Чтение сообщений из определенной партиции
Команда kafkacat -C -p <номер_партиции> читает сообщения только из указанной партиции.
Пример команды:
kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -t <имя_топика> -C -p 0 \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=SCRAM-SHA-512 \-X sasl.username="<логин_пользователя>" \-X sasl.password="<пароль_пользователя>"
Чтение сообщений с форматированием
Команда kafkacat -C -f читает сообщения с пользовательским форматированием.
Пример команды:
kafkacat -b <внутренний_IP_брокера>:<порт_брокера> -t <имя_топика> -C \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=SCRAM-SHA-512 \-X sasl.username="<логин_пользователя>" \-X sasl.password="<пароль_пользователя>" \-f 'Partition: %p, Offset: %o, Key: %k, Value: %s\n'
Пример вывода:
Partition: 0, Offset: 0, Key: key1, Value: value1Partition: 0, Offset: 1, Key: key2, Value: value2