С помощью этого руководства вы сконфигурируете Managed Kafka® как брокер сообщений, связав его с сервисами publisher и subscriber, работающими на виртуальной машине Ubuntu 22.04. Вы будете использовать виртуальную сеть VPC и подсети для связи виртуальной машины и сервиса Managed Kafka®.
Вы будете использовать следующие сервисы:
Виртуальные машины — сервис, в рамках которого предоставляется виртуальная машина.
Managed Kafka® — сервис для развертывания и управления кластерами Kafka®.
Публичный IP-адрес — для доступа к сервису через интернет.
VPC-сеть — изолированная виртуальная сеть для создания безопасной инфраструктуры.
Шаги:
Перед началом работы
-
Если вы уже зарегистрированы, войдите под своей учетной записью.
1. Разверните необходимые ресурсы в облаке
Создайте виртуальную сеть с названием pub-sub-VPC.
-
Название: pub-sub-subnet.
Адрес: 10.10.1.0/24.
VPC: pub-sub-VPC.
DNS-серверы: 8.8.8.8
Убедитесь, что в личном кабинете на странице сервиса VPC:
отображается сеть pub-sub-VPC;
количество подсетей — 1;
подсеть pub-sub-subnet доступна.
Создайте виртуальную машину со следующими параметрами:
Название: pub-sub.
Образ: Публичные → Ubuntu 22.04.
Метод аутентификации: SSH-ключ и пароль.
SSH-ключ: ваш SSH-ключ.
Пароль: ваш пароль.
Имя хоста: pub-sub.
Подключить публичный IP: включено.
Тип IP-адреса: Прямой.
Группы безопасности: SSH-access_ru.AZ-1.
Подсеть: pub-sub-subnet.
Гарантированная доля vCPU: 10%.
vCPU: 1.
RAM: 1.
Убедитесь, что в личном кабинете на странице сервиса «Виртуальные машины» отображается виртуальная машина pub-sub в статуса «Запущена».
Создайте кластер Managed Kafka® со следующими параметрами:
Название: pub-sub.
Версия Kafka: 3.9.0.
Брокеры: 1.
vCPU: 4.
RAM: 16.
Подсеть: pub-sub-subnet.
Убедитесь, что в личном кабинете на странице сервиса «Managed Kafka®» отображается кластер pub-sub в статусе «Доступен».
2. Настройте окружение на виртуальной машине
Подключитесь к виртуальной машине pub-sub через серийную консоль.
-
sudo cloud-init cleansudo cloud-init init
Обновите систему и установите необходимые пакеты:
sudo apt update && sudo apt upgrade -ysudo apt install -y python3 python3-venv python3-pip kafkacat
3. Разработайте сервисы publisher и subscriber
Создайте директорию «pubsub» и перейдите в нее:
mkdir pubsubcd pubsubСоздайте файл publisher.py с помощью команды:
nano publisher.pyСкопируйте код в файл:
import argparseimport jsonimport osimport sysimport uuidfrom datetime import datetime, timezonefrom kafka import KafkaProducerfrom dotenv import load_dotenvdef build_payload(message: str) -> str:"""Return JSON-encoded message with id and timestamp."""return json.dumps({"id": str(uuid.uuid4()),"timestamp": datetime.now(timezone.utc).isoformat(),"message": message,})def main() -> None:load_dotenv()parser = argparse.ArgumentParser(description="Publish a message to Kafka.")parser.add_argument("message",nargs="?",help="Message text; if omitted you will be prompted.",)parser.add_argument("--topic",default=os.getenv("TOPIC", "messages"),help="Kafka topic name (default: messages)",)args = parser.parse_args()msg_text = args.message or input("Enter your message: ")kafka_brokers = os.getenv("KAFKA_BROKERS", "").split(",")kafka_writer_username = os.getenv("KAFKA_WRITER_USERNAME")kafka_writer_password = os.getenv("KAFKA_WRITER_PASSWORD")if not kafka_brokers or not kafka_writer_username or not kafka_writer_password:print("Kafka brokers, writer username and writer password are required")sys.exit(1)try:producer_config = {'bootstrap_servers': kafka_brokers,'value_serializer': lambda v: v.encode('utf-8'),'security_protocol': 'SASL_PLAINTEXT', # Changed from SASL_SSL'sasl_mechanism': 'SCRAM-SHA-512','sasl_plain_username': kafka_writer_username,'sasl_plain_password': kafka_writer_password,'api_version': (2, 0, 0),}print(f"Connecting to Kafka brokers: {kafka_brokers}")producer = KafkaProducer(**producer_config)print(f"Sending message to topic: {args.topic}")future = producer.send(args.topic, build_payload(msg_text))result = future.get(timeout=30)producer.flush()producer.close()print(f"Published to topic '{args.topic}' (partition: {result.partition}, offset: {result.offset}).")except Exception as exc:print(f"Kafka connection failed: {exc}", file=sys.stderr)sys.exit(1)if __name__ == "__main__":main()Создайте файл subscriber.py с помощью команды:
nano subscriber.pyСкопируйте код в файл:
import argparseimport jsonimport osimport sysfrom kafka import KafkaConsumer, TopicPartitionfrom dotenv import load_dotenvdef pretty_print(raw: str) -> None:try:print(json.dumps(json.loads(raw), indent=2))except json.JSONDecodeError:print(f"[non-JSON] {raw!r}")def main() -> None:load_dotenv()parser = argparse.ArgumentParser(description="Subscribe without group coordination.")parser.add_argument("--topic", default=os.getenv("TOPIC", "messages"))args = parser.parse_args()brokers = os.getenv("KAFKA_BROKERS", "").split(",")username = os.getenv("KAFKA_READER_USERNAME")password = os.getenv("KAFKA_READER_PASSWORD")if not kafka_brokers or not kafka_writer_username or not kafka_writer_password:print("Kafka brokers, writer username and writer password are required")sys.exit(1)try:consumer = KafkaConsumer(bootstrap_servers=brokers,security_protocol="SASL_PLAINTEXT",sasl_mechanism="SCRAM-SHA-512",sasl_plain_username=username,sasl_plain_password=password,value_deserializer=lambda v: v.decode("utf-8"),auto_offset_reset="earliest",enable_auto_commit=False,group_id=None, # no group joinapi_version=(2, 0, 0),)parts = consumer.partitions_for_topic(args.topic)if not parts:print(f"Topic '{args.topic}' not found or no partitions.", file=sys.stderr)sys.exit(1)assignment = [TopicPartition(args.topic, p) for p in sorted(parts)]consumer.assign(assignment)consumer.seek_to_beginning(*assignment)print(f"Assigned without group to partitions: {assignment}")for msg in consumer:pretty_print(msg.value)except Exception as exc:print(f"Kafka connection failed: {exc}", file=sys.stderr)sys.exit(1)if __name__ == "__main__":main()Создайте файл requirements.txt с помощью команды:
nano requirements.txtСкопируйте код в файл:
kafka-python==2.0.2python-dotenv==1.0.1Создайте файл .env с помощью команды:
nano .envСкопируйте код в файл:
KAFKA_BROKERS=<KAFKA_BROKER_IP>:9094KAFKA_WRITER_USERNAME=<KAFKA_WRITER_USERNAME>KAFKA_WRITER_PASSWORD=<KAFKA_WRITER_PASSWORD>KAFKA_READER_USERNAME=<KAFKA_READER_USERNAME>KAFKA_READER_PASSWORD=<KAFKA_READER_PASSWORD>TOPIC=messagesGROUP_ID=subscriber-groupГде:
<KAFKA_BROKER_IP> — IP-адрес сервиса Managed Kafka®.
<KAFKA_WRITER_USERNAME> — логин от кластера Managed Kafka® с ролью Writer.
<KAFKA_WRITER_PASSWORD> — пароль от кластера Managed Kafka® с ролью Writer.
<KAFKA_READER_USERNAME> — логин от кластера Managed Kafka® с ролью Reader.
<KAFKA_READER_PASSWORD> — пароль от кластера Managed Kafka® с ролью Reader.
IP-адрес, логины и пароли можно найти на странице информации о кластере в блоке Данные для подключения.
Создайте и активируйте виртуальное окружение:
python3 -m venv venvsource venv/bin/activateУстановите зависимости:
pip install -r requirements.txtСоздайте топик:
echo "test message" | kafkacat -P -b <KAFKA_BROKER_IP>:9094 -X security.protocol=SASL_PLAINTEXT -X sasl.mechanism=SCRAM-SHA-512 -X sasl.username=<KAFKA_ADMIN_USERNAME> -X sasl.password=<KAFKA_ADMIN_PASSWORD> -t messagesГде:
<KAFKA_BROKER_IP> — IP-адрес сервиса Managed Kafka®.
<KAFKA_ADMIN_USERNAME> — логин от кластера Managed Kafka® с ролью Admin.
<KAFKA_ADMIN_PASSWORD> — пароль от кластера Managed Kafka® с ролью Admin.
IP-адрес, логины и пароли можно найти на странице информации о кластере в блоке Данные для подключения.
4. Протестируйте работу очереди сообщений с Managed Kafka®
Запустите сервис subscriber:
python subscriber.pyОткройте новое окно терминала, не закрывая текущий терминал.
Перейдите в директорию с сервисами:
cd pubsubАктивируйте виртуальное окружение:
source venv/bin/activateОтправьте сообщение в очередь:
python publisher.py "Hello from Ubuntu!"Переключитесь обратно на терминал 1 и проверьте, что сообщение успешно получено.
5. Удалите доступ по SSH для виртуальной машины
Так как для настроенного сервиса больше не требуется доступ по SSH, удалите доступ для повышения безопасности.
В личном кабинете перейдите в сервис «Виртуальные машины» и выберите машину pub-sub, созданную на первом шаге.
Перейдите в раздел Сетевые параметры.
Нажмите на Изменить группы безопасности для публичного IP-адреса.
Удалите группу «SSH-access_ru».
Нажмите Сохранить.
Попробуйте подключиться к виртуальной машине по SSH и убедитесь, что доступ отсутствует.
Результат
Вы сконфигурировали Managed Kafka® для фоновой обработки задач, связали его с сервисами publisher и subscriber, работающими на виртуальной машине. Вы получили опыт работы с очередями сообщений и безопасным доступом.
- Перед началом работы
- 1. Разверните необходимые ресурсы в облаке
- 2. Настройте окружение на виртуальной машине
- 3. Разработайте сервисы publisher и subscriber
- 4. Протестируйте работу очереди сообщений с Managed Kafka®
- 5. Удалите доступ по SSH для виртуальной машины
- Результат