С помощью этого руководства вы сконфигурируете Managed Kafka как брокер сообщений, связав его с сервисами publisher и subscriber, работающими на виртуальной машине Ubuntu 22.04. Вы будете использовать виртуальную сеть VPC и подсети для связи виртуальной машины и сервиса Managed Kafka.
Вы будете использовать следующие сервисы:
«Виртуальные машины» — сервис, в рамках которого предоставляется виртуальная машина.
Managed Kafka — сервис для развертывания и управления кластерами Apache Kafka®.
«Публичные IP» — сервис для организации доступа к сервису через интернет.
VPC — изолированная виртуальная сеть для создания безопасной инфраструктуры.
Шаги:
Если вы уже зарегистрированы, войдите под своей учетной записью.
Создайте виртуальную сеть с названием pub-sub-VPC.
Создайте подсеть со следующими параметрами:
Название: pub-sub-subnet.
Адрес: 10.10.1.0/24.
VPC: pub-sub-VPC.
DNS-серверы: 8.8.8.8
Убедитесь, что в личном кабинете на странице сервиса VPC:
отображается сеть pub-sub-VPC;
количество подсетей — 1;
подсеть pub-sub-subnet доступна.
Создайте виртуальную машину со следующими параметрами:
Название: pub-sub.
Образ: Публичные → Ubuntu 22.04.
Метод аутентификации: SSH-ключ и пароль.
SSH-ключ: ваш SSH-ключ.
Пароль: ваш пароль.
Имя хоста: pub-sub.
Подключить публичный IP: включено.
Тип IP-адреса: Прямой.
Группы безопасности: SSH-access_ru.AZ-1.
Подсеть: pub-sub-subnet.
Гарантированная доля vCPU: 10%.
vCPU: 1.
RAM: 1.
Убедитесь, что в личном кабинете на странице сервиса «Виртуальные машины» отображается виртуальная машина pub-sub в статуса «Запущена».
Создайте кластер Managed Kafka со следующими параметрами:
Название: pub-sub.
Версия Apache Kafka®: 4.2.0.
Брокеры: 1.
vCPU: 4.
RAM: 16.
Подсеть: pub-sub-subnet.
Убедитесь, что в личном кабинете на странице сервиса Managed Kafka отображается кластер pub-sub в статусе «Доступен».
Подключитесь к виртуальной машине pub-sub через серийную консоль.
sudo cloud-init cleansudo cloud-init init
Обновите систему и установите необходимые пакеты:
sudo apt update && sudo apt upgrade -ysudo apt install -y python3 python3-venv python3-pip kafkacat
На странице кластера Managed Kafka перейдите на вкладку Пользователи и создайте пользователя с ролями cloud_writer и cloud_reader.
Создайте директорию «pubsub» и перейдите в нее:
mkdir pubsubcd pubsub
Создайте файл publisher.py с помощью команды:
nano publisher.py
Скопируйте код в файл:
import argparseimport jsonimport osimport sysimport uuidfrom datetime import datetime, timezonefrom kafka import KafkaProducerfrom dotenv import load_dotenvdef build_payload(message: str) -> str:"""Return JSON-encoded message with id and timestamp."""return json.dumps({"id": str(uuid.uuid4()),"timestamp": datetime.now(timezone.utc).isoformat(),"message": message,})def main() -> None:load_dotenv()parser = argparse.ArgumentParser(description="Publish a message to Kafka.")parser.add_argument("message",nargs="?",help="Message text; if omitted you will be prompted.",)parser.add_argument("--topic",default=os.getenv("TOPIC", "messages"),help="Kafka topic name (default: messages)",)args = parser.parse_args()msg_text = args.message or input("Enter your message: ")kafka_brokers = os.getenv("KAFKA_BROKERS", "").split(",")kafka_writer_username = os.getenv("KAFKA_WRITER_USERNAME")kafka_writer_password = os.getenv("KAFKA_WRITER_PASSWORD")if not kafka_brokers or not kafka_writer_username or not kafka_writer_password:print("Kafka brokers, writer username and writer password are required")sys.exit(1)try:producer_config = {'bootstrap_servers': kafka_brokers,'value_serializer': lambda v: v.encode('utf-8'),'security_protocol': 'SASL_PLAINTEXT', # Changed from SASL_SSL'sasl_mechanism': 'SCRAM-SHA-512','sasl_plain_username': kafka_writer_username,'sasl_plain_password': kafka_writer_password,'api_version': (2, 0, 0),}print(f"Connecting to Kafka brokers: {kafka_brokers}")producer = KafkaProducer(**producer_config)print(f"Sending message to topic: {args.topic}")future = producer.send(args.topic, build_payload(msg_text))result = future.get(timeout=30)producer.flush()producer.close()print(f"Published to topic '{args.topic}' (partition: {result.partition}, offset: {result.offset}).")except Exception as exc:print(f"Kafka connection failed: {exc}", file=sys.stderr)sys.exit(1)if __name__ == "__main__":main()
Создайте файл subscriber.py с помощью команды:
nano subscriber.py
Скопируйте код в файл:
import argparseimport jsonimport osimport sysfrom kafka import KafkaConsumer, TopicPartitionfrom dotenv import load_dotenvdef pretty_print(raw: str) -> None:try:print(json.dumps(json.loads(raw), indent=2))except json.JSONDecodeError:print(f"[non-JSON] {raw!r}")def main() -> None:load_dotenv()parser = argparse.ArgumentParser(description="Subscribe without group coordination.")parser.add_argument("--topic", default=os.getenv("TOPIC", "messages"))args = parser.parse_args()brokers = os.getenv("KAFKA_BROKERS", "").split(",")username = os.getenv("KAFKA_READER_USERNAME")password = os.getenv("KAFKA_READER_PASSWORD")if not kafka_brokers or not kafka_writer_username or not kafka_writer_password:print("Kafka brokers, writer username and writer password are required")sys.exit(1)try:consumer = KafkaConsumer(bootstrap_servers=brokers,security_protocol="SASL_PLAINTEXT",sasl_mechanism="SCRAM-SHA-512",sasl_plain_username=username,sasl_plain_password=password,value_deserializer=lambda v: v.decode("utf-8"),auto_offset_reset="earliest",enable_auto_commit=False,group_id=None, # no group joinapi_version=(2, 0, 0),)parts = consumer.partitions_for_topic(args.topic)if not parts:print(f"Topic '{args.topic}' not found or no partitions.", file=sys.stderr)sys.exit(1)assignment = [TopicPartition(args.topic, p) for p in sorted(parts)]consumer.assign(assignment)consumer.seek_to_beginning(*assignment)print(f"Assigned without group to partitions: {assignment}")for msg in consumer:pretty_print(msg.value)except Exception as exc:print(f"Kafka connection failed: {exc}", file=sys.stderr)sys.exit(1)if __name__ == "__main__":main()
Создайте файл requirements.txt с помощью команды:
nano requirements.txt
Скопируйте код в файл:
kafka-python==2.0.2python-dotenv==1.0.1
Создайте файл .env с помощью команды:
nano .env
Скопируйте код в файл:
KAFKA_BROKERS=<kafka_broker_ip>:9094KAFKA_WRITER_USERNAME=<kafka_writer_username>KAFKA_WRITER_PASSWORD=<kafka_writer_username>KAFKA_READER_USERNAME=<kafka_reader_username>KAFKA_READER_PASSWORD=<kafka_reader_password>TOPIC=messagesGROUP_ID=subscriber-group
Где:
<kafka_broker_ip> — IP-адрес кластера Managed Kafka. Скопируйте его на странице информации о кластере в блоке Общие параметры.
<kafka_writer_username> — логин пользователя с ролью Writer.
<kafka_writer_password> — пароль пользователя с ролью Writer.
<kafka_reader_username> — логин пользователя с ролью Reader.
<kafka_reader_password> — пароль пользователя с ролью Reader.
Создайте и активируйте виртуальное окружение:
python3 -m venv venvsource venv/bin/activate
Установите зависимости:
pip install -r requirements.txt
Создайте файл конфигурации, например cluster.cfg:
clusters:- name: "<custom_config_name>"brokers:- <private_IP>:<port>SASL:mechanism: SCRAM-SHA-512username: "<login>"password: "<password>"TLS: nullsecurity-protocol: SASL_PLAINTEXT
Где:
<custom_config_name> — произвольное имя конфигурации.
<private_IP>, <port> — внутренний IP-адрес и порт брокера. Скопируйте их на странице информации о кластере в блоке Общие параметры.
<login>, <password> — имя пользователя и пароль, заданные при создании пользователя.
Создайте топик:
kaf topic create "messages" --config cluster.cfg --cluster "<custom_config_name>"
Где <custom_config_name> — произвольное имя конфигурации.
Также вы можете создать топик в личном кабинете.
Запустите сервис subscriber:
python subscriber.py
Откройте новое окно терминала, не закрывая текущий терминал.
Перейдите в директорию с сервисами:
cd pubsub
Активируйте виртуальное окружение:
source venv/bin/activate
Отправьте сообщение в очередь:
python publisher.py "Hello from Ubuntu!"
Переключитесь обратно на терминал 1 и проверьте, что сообщение успешно получено.
Так как для настроенного сервиса больше не требуется доступ по SSH, удалите доступ для повышения безопасности.
В личном кабинете перейдите в сервис «Виртуальные машины» и выберите машину pub-sub, созданную на первом шаге.
Перейдите в раздел Сетевые параметры.
Нажмите на Изменить группы безопасности для публичного IP-адреса.
Удалите группу «SSH-access_ru».
Нажмите Сохранить.
Попробуйте подключиться к виртуальной машине по SSH и убедитесь, что доступ отсутствует.
Вы сконфигурировали Managed Kafka для фоновой обработки задач, связали его с сервисами publisher и subscriber, работающими на виртуальной машине. Вы получили опыт работы с очередями сообщений и безопасным доступом.