Автор выбрал фонд Free and Open Source Fund для получения пожертвования в рамках программы Write for DOnations.
Apache Kafka — популярный распределенный брокер сообщений, предназначенный для обработки больших объемов данных в реальном времени. Кластер Kafka отличается масштабируемостью и отказоустойчивостью и имеет более высокую пропускную способность по сравнению с другими брокерами сообщений, такими как ActiveMQ и RabbitMQ. Хотя он обычно используется в качестве системы рассылки сообщений издатель/подписчик, многие организации также используют этого брокера для агрегации логов, поскольку он предоставляет надежное хранение для публикуемых сообщений.
Система рассылки сообщений издатель/подписчик позволяет одному или нескольким производителям публиковать сообщения без учета количества потребителей или того, как эти потребители будут обрабатывать сообщения. Подписанные клиенты уведомляются автоматически об обновлениях и создании новых сообщений. Эта система является более эффективной и масштабируемой по сравнению с системами, где клиенты периодически выполняют опрос, чтобы определить, доступны ли новые сообщения.
В этом обучающем руководстве вы выполните защищенную установку и настройку Apache Kafka 2.1.1 на сервере Debian 10, а затем протестируете настройки посредством создания и получения сообщения Hello World
. Затем вы сможете при желании провести установку KafkaT для мониторинга Kafka и настройки кластера Kafka с несколькими узлами.
Чтобы выполнить описанные ниже шаги, вам потребуется следующее:
Примечание. Установка без 4 ГБ оперативной памяти может вызвать сбои в работе Kafka, когда виртуальная машина Java (JVM) будет выдавать ошибку Out Of Memory
при запуске.
Поскольку Kafka может обрабатывать запросы через сеть, лучше всего создать для этого отдельного пользователя. Это позволит минимизировать ущерб для системы в случае взлома сервера Kafka. На этом шаге вы создадите отдельного пользователя kafka.
Выполните вход с помощью пользователя без прав root с привилегиями sudo и создайте пользователя kafka
с помощью команды useradd
:
- sudo useradd kafka -m
Флаг -m
гарантирует, что для пользователя будет создана домашняя директория. Домашняя директория /home/kafka
будет выступать в качестве директории рабочего пространства для последующего выполнения команд.
Установите пароль с помощью команды passwd
:
- sudo passwd kafka
Введите пароль, который вы хотите использовать для этого пользователя.
Затем добавьте пользователя kafka в группу sudo
с помощью команды adduser
для предоставления ему необходимых привилегий для установки зависимостей Kafka:
- sudo adduser kafka sudo
Ваш пользователь kafka готов к работе. Выполните вход в учетную запись с помощью команды su
:
- su -l kafka
После создания пользователя для Kafka можно перейти к загрузке и извлечению двоичных файлов Kafka.
На этом шаге мы выполним загрузку и извлечение двоичных файлов Kafka в специально отведенные для этого папки домашней директории пользователя kafka.
Сначала создайте директорию в /home/kafka
с названием Downloads
, чтобы сохранить там загруженные данные:
- mkdir ~/Downloads
Затем установите curl
с помощью команды apt-get
для получения возможности загрузки удаленных файлов:
- sudo apt-get update && sudo apt-get install curl
После получения соответствующего предупреждения введите Y
для подтверждения загрузки curl
.
После установки curl
мы используем его для загрузки двоичных файлов Kafka:
- curl "https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz
Создайте директорию с названием kafka
и замените ее на эту директорию. Она будет служить базовой директорией для установки Kafka:
- mkdir ~/kafka && cd ~/kafka
Извлеките архив, который вы загрузили, с помощью команды tar
:
- tar -xvzf ~/Downloads/kafka.tgz --strip 1
Мы указали флаг --strip 1
, чтобы обеспечить извлечение архива в директорию ~/kafka/
, а не в другую директорию, например ~/kafka/kafka_2.12-2.1.1/
.
После успешной загрузки и извлечения двоичных файлов мы можем перейти к настройке Kafka для удаления темы.
Поведение Kafka по умолчанию не позволяет нам удалять название темы, категории, группы или ветки, где были опубликованы сообщения. Для изменения нужно отредактировать файл конфигурации.
Опции конфигурации Kafka указаны в файле server.properties
. Откройте файл в nano
или вашем любимом редакторе:
- nano ~/kafka/config/server.properties
Давайте добавим настройку, которая позволит нам удалять темы Kafka. Добавим в конец файла следующую выделенную строку:
...
group.initial.rebalance.delay.ms
delete.topic.enable = true
Сохраните файл и закройте nano
. После настройки Kafka можно создать файлы элементов systemd
для запуска и активации Kafka при загрузке системы.
В этом разделе мы научимся создавать файлы элементов systemd
для службы Kafka. Это поможет нам выполнять стандартные действия со службой, в том числе запускать, останавливать и перезапускать службу Kafka аналогично другим службам Linux.
Kafka использует службу ZooKeeper для управления состоянием кластера и конфигурациями. Она обычно используется в качестве неотъемлемого компонента распределенных систем. В этом обучающем руководстве мы научимся использовать Zookeeper для управления этими аспектами работы Kafka. Если вам требуется дополнительная информация, воспользуйтесь официальной документацией ZooKeeper.
Вначале мы создадим файл элемента для zookeeper
:
- sudo nano /etc/systemd/system/zookeeper.service
Введите следующее определение элемента в файле:
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
В разделе [Unit]
указывается, что для запуска ZooKeeper требуется готовность сети и файловой системы.
В разделе [Service]
указывается, что systemd
использует файлы оболочки zookeeper-server-start.sh
и zookeeper-server-stop.sh
для запуска и остановки службы. Также указывается, что в случае аномального выхода ZooKeeper следует перезапускать.
Затем создайте сервисный файл systemd
для kafka
:
- sudo nano /etc/systemd/system/kafka.service
Введите следующее определение элемента в файле:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
В разделе [Unit]
указывается, что этот элемент является зависимым от zookeeper.service
. Это позволит автоматически запускать zookeeper
после запуска службы kafka
.
В разделе [Service]
указывается, что systemd
использует файлы оболочки kafka-server-start.sh
и kafka-server-stop.sh
для запуска и остановки службы. В нем также указывается, что Kafka автоматически перезапускается в случае аварийного завершения работы.
После определения всех элементов выполните запуск Kafka с помощью следующей команды:
- sudo systemctl start kafka
Чтобы убедиться, что сервер успешно запущен, проверьте логи журнала для элемента kafka
:
- sudo journalctl -u kafka
Вы увидите примерно следующий результат:
OutputMar 23 13:31:48 kafka systemd[1]: Started kafka.service.
Теперь у нас имеется сервер Kafka, прослушивающий порт 9092
, который используется Kafka по умолчанию.
Мы запустили службу kafka
, но если мы теперь перезагрузим сервер, она не запустится автоматически. Чтобы активировать запуск kafka
при загрузке сервера, выполните следующую команду:
- sudo systemctl enable kafka
После запуска и активации служб следует проверить установку.
Опубликуем и примем сообщение Hello World
, чтобы проверить правильность работы сервера Kafka. Для публикации сообщений в Kafka требуется следующее:
Сначала создайте тему с названием TutorialTopic
, введя следующую команду:
- ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
Вы можете создать издателя из командной строки с помощью скрипта kafka-console-producer.sh
. Ему требуется имя хоста сервера Kafka, порт и название темы в качестве аргументов.
Опубликуем строку Hello, World
в теме TutorialTopic
:
- echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
Флаг --broker-list
указывает список брокеров сообщений, которым будет отправлено сообщение, в данном случае это localhost:9092
. Флаг --topic
обозначает тему как TutorialTopic
.
Затем вы можете создать подписчика Kafka с помощью скрипта kafka-console-consumer.sh
. Команда ожидает получить в качестве аргументов имя хоста и номер порта сервера ZooKeeper.
Следующая команда получает сообщения из TutorialTopic
. Обратите внимание, что использование флага --from-beginning
позволяет получать сообщения, которые были опубликованы до запуска подписчика:
- ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server `localhost:9092` --topic TutorialTopic --from-beginning
Флаг --bootstrap-server
указывает список точек входа в кластер Kafka. В данном случае мы используем localhost:9092
.
Теперь мы должны увидеть на терминале сообщение Hello, World
:
OutputHello, World
Скрипт продолжит работать и будет ждать, когда будут опубликованы новые сообщения для темы. Вы можете открыть новый терминал и запустить издателя для нескольких новых сообщений. Вы должны увидеть все сообщения в выводе подписчика. Дополнительную информацию по использованию Kafka можно найти в официальной документации по Kafka.
После окончания тестирование нажмите CTRL+C
, чтобы остановить скрипт подписчика. Мы протестировали установку и теперь можем перейти к установке KafkaT, чтобы нам было удобнее администрировать кластер Kafka.
KafkaT — это инструмент от Airbnb, который упрощает просмотр данных о кластере Kafka и выполняет некоторые административные задачи из командной строки. Поскольку он написан на Ruby, для его использования вам потребуется Ruby. Также вам потребуется пакет build-essential
для создания других «бриллиантов», используемых в качестве зависимостей. Выполните установку с помощью apt
:
- sudo apt install ruby ruby-dev build-essential
Теперь вы можете выполнить установку KafkaT с помощью команды gem
:
- sudo CFLAGS=-Wno-error=format-overflow gem install kafkat
Опция CFLAGS=-Wno-error=format-overflow
отключает предупреждения о переполнении формата и требуется для зависимости ZooKeeper (зависимость KafkaT).
KafkaT использует .kafkatcfg
в качестве файла конфигурации для определения директорий и логов на вашем сервере Kafka. Также нужно добавить запись, указывающую KafkaT путь к экземпляру ZooKeeper.
Создайте новый файл .kafkatcfg
:
- nano ~/.kafkatcfg
Добавьте следующие строки для указания требуемой информации о вашем сервере Kafka и экземпляре Zookeeper:
{
"kafka_path": "~/kafka",
"log_path": "/tmp/kafka-logs",
"zk_path": "localhost:2181"
}
Теперь вы можете использовать KafkaT. Для начала продемонстрируем, как вы можете использовать его, чтобы просмотреть подробную информацию обо всех разделах Kafka:
- kafkat partitions
Результат будет выглядеть следующим образом:
OutputTopic Partition Leader Replicas ISRs
TutorialTopic 0 0 [0] [0]
__consumer_offsets 0 0 [0] [0]
...
На экране результатов будет показано сообщение TutorialTopic
, а также внутренняя тема __consumer_offsets
, используемая Kafka для хранения данных клиентов. Вы можете спокойно игнорировать строки, начинающиеся с __consumer_offsets
.
Чтобы узнать больше о KafkaT, перейдите в соответствующий репозиторий на GitHub.
Мы установили KafkaT и теперь можем настроить Kafka на кластере Debian 10 для создания кластера из нескольких узлов.
Если вы хотите создать кластер с несколькими брокерами, используя дополнительные серверы Debian 10, повторите шаги 1, 4 и 5 на каждом новом сервере. Также необходимо внести следующие изменения в файл ~/kafka/config/server.properties
для каждой системы:
Изменим значение свойства broker.id
так, что оно будет уникальным внутри всего кластера. Это свойство служит уникальным идентификатором каждого сервера в кластере и может содержать любую строку в качестве значения. Например, в качестве идентификаторов могут использоваться "server1"
, "server2"
и т. д.
Значение свойства zookeeper.connect
следует изменить таким образом, чтобы все узлы указывали на один экземпляр ZooKeeper. Это свойство указывает адрес экземпляра ZooKeeper и имеет формат <HOSTNAME/IP_ADDRESS>:<PORT>
. Для этого обучающего руководства мы используем your_first_server_IP:2181
, заменив your_first_server_IP
IP-адресом уже настроенного сервера Debian 10.
Если вы хотите использовать несколько экземпляров ZooKeeper для вашего кластера, то значение свойства zookeeper.connect
должно представлять собой одинаковую, разделенную запятыми строку со списком IP-адресов и номеров портов для всех экземпляров ZooKeeper.
Примечание. Если на сервере Debian 10 с установленным Zookeeper работает брандмауэр, необходимо открыть на нем порт 2181
для получения входящих запросов от других узлов кластера.
Теперь, когда все установки выполнены, вы можете удалить права администратора пользователя kafka
. Прежде чем сделать это, выйдите и войдите снова с помощью любого пользователя без прав root с привилегиями sudo. Если вы все еще используете один сеанс командной строки, который вы запустили в начале прохождения руководства, просто введите exit
.
Удалите пользователя kafka
из группы sudo:
- sudo deluser kafka sudo
Чтобы дополнительно повысить безопасность вашего сервера Kafka, заблокируйте пароль пользователя kafka
с помощью команды passwd
. Это гарантирует, что никто не сможет напрямую выполнить вход на сервер с помощью этой учетной записи:
- sudo passwd kafka -l
На данный момент только пользователь root или sudo может выполнить вход как пользователь kafka
, воспользовавшись следующей командой:
- sudo su - kafka
В будущем, если вы захотите разблокировать его, используйте passwd
с флагом -u
:
- sudo passwd kafka -u
Вы успешно ограничили права администратора пользователя kafka
.
Теперь Apache Kafka работает на вашем сервере Debian в безопасном режиме. Вы можете использовать ее в своих проектах, создав издателей и подписчиков Kafka с помощью клиентов Kafka, которые доступны для большинства языков программирования. Дополнительную информацию о Kafka можно найти в документации по Apache Kafka.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!
Sign up for Infrastructure as a Newsletter.
Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.
Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.