نحوه راه اندازی یک خوشه کافکا چند گره با استفاده از KCraft

مقدمه

آپاچی کافکا یک رویداد توزیع شده منبع باز و پلتفرم پردازش جریانی است که به زبان جاوا نوشته شده است و برای پردازش فیدهای داده در زمان واقعی ساخته شده است. ذاتاً مقیاس پذیر است، با توان عملیاتی و در دسترس بودن بالا. به گونه ای طراحی شده است که از صدها گره در هر خوشه پشتیبانی می کند.

در این آموزش، شما یک خوشه کافکا ایجاد می کنید که از پروتکل اجماع KRaft استفاده می کند. شما یاد خواهید گرفت که چگونه گره ها را به گونه ای پیکربندی کنید که بخشی از یک خوشه باشند و مشاهده کنید که چگونه پارتیشن های موضوع به گره های مختلف اختصاص داده می شوند. همچنین یاد خواهید گرفت که چگونه موضوعات را به کارگزاران خاص در خوشه اختصاص دهید.

پیش نیازها
  • سه قطره با حداقل 4 گیگابایت رم و 2 CPU
  • یک نام دامنه کاملاً ثبت شده با سه زیر دامنه که به سمت سه قطره اشاره دارد.
  • آپاچی کافکا روی Droplets شما نصب و پیکربندی شده باشد

مرحله 1 – پیکربندی گره های کافکا

در این مرحله، سه سرور کافکا را که ایجاد کرده‌اید به عنوان بخشی از پیش‌نیازها پیکربندی می‌کنید تا بخشی از همان کلاستر KRaft باشند. با KRaft، خود گره‌ها می‌توانند وظایف اداری را بدون نیاز به هزینه اضافی و وابسته به Apache ZooKeeper سازماندهی و انجام دهند.

پیکربندی گره اول

شما با پیکربندی اولین گره شروع خواهید کرد. ابتدا سرویس را در اولین Droplet با اجرای زیر متوقف کنید:

sudo systemctl stop kafka

به عنوان کاربر کافکا، به فهرستی که کافکا در آن زندگی می کند بروید و فایل پیکربندی آن را برای ویرایش با اجرای زیر باز کنید:

vi /config/kraft/server.properties

خطوط زیر را بیابید:

...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
...

این سه پارامتر گره کافکا را طوری پیکربندی می‌کنند که هم به‌عنوان کارگزار و هم کنترل‌کننده عمل کند، به این معنی که داده‌ها (کارگزار) را دریافت و مصرف می‌کند و وظایف اداری (کنترل‌کننده) را انجام می‌دهد. این جداسازی در استقرارهای بزرگ که در آن کنترلرها را می توان برای افزایش کارایی و افزونگی جدا نگه داشت، مفید است.

node.id شناسه گره را در خوشه مشخص کرد. این اولین گره است، بنابراین باید در 1 باقی بماند. همه گره ها باید شناسه گره منحصر به فرد داشته باشند، بنابراین گره های دوم و سوم به ترتیب دارای شناسه 2 و 3 خواهند بود.

controller.quorum.voters شناسه گره ها را به آدرس ها و پورت های مربوطه خود برای ارتباط نگاشت می کند. اینجا جایی است که آدرس تمام گره های خوشه ای را مشخص می کنید تا هر گره از گره های دیگر آگاه باشد. خط را به شکل زیر تغییر دهید:

...
[email protected]_domain:9093,[email protected]_domain:9093,[email protected]_domain:9093
...

در اینجا، شما هر سه گره در خوشه را با شناسه مربوطه خود فهرست می کنید. به یاد داشته باشید که آدرس دامنه خود را که در طول پیش نیازها تنظیم کرده اید، جایگزین your_domain کنید.

بعد، خطوط زیر را در فایل پیدا کنید:

...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
...

listeners آدرس هایی را که گره کافکا به آنها گوش می دهد، تعریف می کند، در حالی که advertised.listeners آدرس هایی را مشخص می کند که برای اتصال به گره به مشتریان ارسال می شود. این به شما امکان می‌دهد زیرمجموعه‌ای از آدرس‌های واقعی را که مشتریان باید استفاده کنند، مشخص کنید.

خطوط را به شکل زیر تغییر دهید و نام دامنه واقعی خود را جایگزین your_domain کنید:

...
listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kafka1.your_domain:9092
...

از آنجایی که این گره در یک خوشه خواهد بود، شما به صراحت آدرس ها را به قطره ای که روی آن اجرا می شود اشاره کرده اید.

سپس، تنظیمات num.partitions را پیدا کنید:

...
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
...

همانطور که نظر بیان می کند، این تعداد پارتیشن های پیش فرض هر موضوع جدید را پیکربندی می کند. از آنجایی که شما سه گره دارید، آن را مضرب دو تنظیم کنید:

...
num.partitions=6
...

مقدار 6 در اینجا تضمین می کند که هر گره به طور پیش فرض دو پارتیشن موضوعی را نگه می دارد.

در مرحله بعد، ضریب تکرار را برای موضوعات داخلی پیکربندی می‌کنید، که جبران‌های مصرف‌کننده و وضعیت تراکنش‌ها را حفظ می‌کند. خطوط زیر را بیابید:

...
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
...

آنها را با مقادیر زیر تنظیم کنید:

...
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
...

در اینجا، شما مشخص می کنید که حداقل دو گره باید در مورد ابرداده داخلی همگام باشند. وقتی کارتان تمام شد، فایل را ذخیره و ببندید.

پس از تنظیم شماره پارتیشن پیش‌فرض، باید ذخیره‌سازی گزارش را مجدداً تنظیم کنید. ابتدا فایل های گزارش موجود را با اجرای:

rm -rf /home/kafka/kafka-logs/*

سپس، یک شناسه خوشه جدید ایجاد کنید و آن را در یک متغیر محیطی ذخیره کنید:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

آن را در ترمینال نشان دهید:

echo $KAFKA_CLUSTER_ID

خروجی ID خواهد بود:

OutputMjj4bch9Q3-B0TEXv8_zPg

به این مقدار توجه کنید؛ برای پیکربندی گره دوم و سوم به آن نیاز دارید.

در نهایت دستور زیر را برای ایجاد ذخیره‌سازی گزارش اجرا کنید:

./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

خروجی مشابه این خواهد بود:

Output...
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.
پیکربندی گره دوم و سوم

پیکربندی سایر گره ها بسیار شبیه به کاری است که برای اولین گره انجام داده اید. توجه داشته باشید که node.id را نیز به روز کنید:

...
node.id=node_number
...

مقادیر مناسب برای گره دوم و سوم به ترتیب 2 و 3 است و آدرس های مناسب را برای شنوندگان و advertised.listeners تنظیم می کند.

هنگام بازسازی ذخیره‌سازی گزارش، از شناسه خوشه از اولین گره دوباره استفاده کنید:

KAFKA_CLUSTER_ID="your_cluster_id"

وقتی کارتان تمام شد، سرویس کافکا را در هر سه گره با اجرای:

sudo systemctl start kafka

در این مرحله، سه گره کافکا را طوری پیکربندی کرده‌اید که بخشی از یک خوشه KRaft باشند. شما یک موضوع ایجاد می کنید و پیام هایی را در خوشه خود تولید و مصرف می کنید.

مرحله 2 – اتصال به Cluster

در این مرحله، با استفاده از اسکریپت های پوسته همراه با کافکا، به خوشه کافکا متصل خواهید شد. همچنین یک موضوع ایجاد می‌کنید و سعی می‌کنید داده‌ها را از خوشه تولید و مصرف کنید. سپس، یکی از گره ها را پایین می آورید و مشاهده می کنید که چگونه کافکا از دست دادن را کاهش می دهد.

کافکا اسکریپت kafka-metadata-quorum.sh را ارائه می دهد که اطلاعاتی در مورد خوشه و اعضای آن نشان می دهد. دستور زیر را برای اجرای آن اجرا کنید:

./bin/kafka-metadata-quorum.sh --bootstrap-controller kafka1.your_domain:9093 describe --status

شما به یکی از گره ها در پورت 9093 متصل می شوید که نقطه پایانی برای کنترلر است (اما نه برای کارگزار). به یاد داشته باشید که kafka1.your_domain را با دامنه ای که به یکی از گره های کافکا شما اشاره دارد جایگزین کنید.

خروجی مشابه این خواهد بود:

OutputClusterId: G3TeIZoeTSCvG2YOWvPE2w
LeaderId: 3
LeaderEpoch: 2
HighWatermark: 383
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 55
CurrentVoters: [1,2,3]
CurrentObservers: []

اسکریپت اطلاعات اولیه در مورد وضعیت خوشه را فهرست می کند. در خروجی نشان داده شده، می بینید که گره 3 به عنوان رهبر انتخاب می شود و هر سه گره ([1،2،3]) در مجموعه رای گیری هستند و بر روی آن تصمیم توافق می کنند.

با اجرای زیر موضوعی به نام اولین تاپیک ایجاد کنید:

./bin/kafka-topics.sh --create --topic first-topic --bootstrap-server kafka1.your_domain:9092 --replication-factor 2

خروجی خواهد بود:

Created topic first-topic.

سپس اسکریپت kafka-topics.sh را اجرا کنید تا ببینید که چگونه پارتیشن ها روی گره ها چیده شده اند:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

تنظیم ضریب تکرار بر روی 2 تضمین می کند که موضوع در حداقل دو گره در دسترس خواهد بود.

خروجی مشابه این خواهد بود:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

می بینید که هر پارتیشن دارای لیدر، دو replica و دو in- sync replica set (ISR) است. رهبر پارتیشن یک گره کارگزار است که داده های پارتیشن را به مشتریان ارائه می دهد، در حالی که کپی ها فقط کپی ها را نگه می دارند. یک گره کپی اگر به طور پیش فرض با رهبر در ده ثانیه گذشته باشد، ISR در نظر گرفته می شود. این فاصله زمانی بر اساس هر موضوع قابل تنظیم است.

اکنون که موضوعی را ایجاد کرده اید، پیام های آن را با استفاده از اسکریپت kafka-console-producer.sh تولید خواهید کرد. دستور زیر را برای شروع تولید کننده اجرا کنید:

./bin/kafka-console-producer.sh --topic first-topic --bootstrap-server kafka1.your_domain:9092

یک اعلان خالی خواهید دید:

>

تهیه کننده منتظر ارسال پیامک شماست. تست را وارد کرده و ENTER را فشار دهید. اعلان به شکل زیر خواهد بود:

>Hello World!
>

تهیه کننده اکنون منتظر پیام بعدی است، یعنی پیام قبلی با موفقیت به کافکا ابلاغ شده است. می توانید هر تعداد پیام را برای آزمایش وارد کنید. برای خروج از سازنده، CTRL+C را فشار دهید.

شما به یک مصرف کننده نیاز دارید تا پیام های موضوع را دوباره بخواند. کافکا یک مصرف کننده ساده به عنوان kafka-console-consumer.sh فراهم می کند. آن را با اجرا اجرا کنید:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

پیام هایی که از موضوع خوانده می شوند را خواهید دید:

OutputHello World!
...

شبیه سازی شکست گره

در سومین گره کافکا، سرویس را با اجرای زیر متوقف کنید:

sudo systemctl stop kafka

سپس، موضوع را با اجرا شرح دهید:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

خروجی مشابه این خواهد بود:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1
Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

اگرچه گره 3 به عنوان یک کپی فهرست شده است، اما در مجموعه های ISR وجود ندارد زیرا در دسترس نیست. هنگامی که دوباره به خوشه می پیوندد، با گره های دیگر همگام می شود و سعی می کند جایگاه قبلی خود را به دست آورد.

دوباره سعی کنید پیام های مبحث اول را بخوانید:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

خواهید دید که طبق معمول قابل دسترسی هستند:

OutputHello World!
...

به لطف وجود کپی ها، دو گره اول کنترل را به دست می گیرند و به مصرف کننده خدمت می کنند. اکنون می توانید کافکا را در سرور سوم راه اندازی کنید:

sudo systemctl start kafka

در این مرحله، دیدید که چگونه کافکا در دسترس نبودن یک گره در خوشه را کاهش می دهد. اکنون یاد می گیرید که چگونه یک گره را از خوشه حذف کنید.

مرحله 3 – انتقال داده ها بین گره ها

در این مرحله، نحوه انتقال موضوعات بین گره‌ها در یک خوشه کافکا را خواهید آموخت. هنگام اضافه کردن گره به یک خوشه موجود با موضوعات، کافکا به طور خودکار هیچ پارتیشنی را به آن منتقل نمی کند، که ممکن است بخواهید انجام دهید. این همچنین برای حذف گره ها مفید است، زیرا پارتیشن های موجود به طور خودکار به گره های باقی مانده منتقل نمی شوند.

کافکا اسکریپتی به نام kafka-reassign-partitions.sh ارائه می دهد که می تواند برنامه های انتقال را تولید، اجرا و تأیید کند. از آن برای ایجاد طرحی برای انتقال پارتیشن های مبحث اول به دو گره اول استفاده خواهید کرد.

ابتدا باید مشخص کنید که کدام موضوعات باید جابجا شوند. اسکریپت یک فایل JSON را با تعریف موضوعات می پذیرد، بنابراین آن را برای ویرایش ایجاد و باز کنید:

vi topics-to-move.json

خطوط زیر را اضافه کنید:

{
"topics": [
{
"topic": "first-topic"
}
],
"version": 1
}

تحت عنوان ها، یک شی را تعریف می کنید که به موضوع اول ارجاع می دهد. وقتی کارتان تمام شد، فایل را ذخیره و ببندید.

دستور زیر را برای تولید طرح مهاجرت اجرا کنید و به جای kafka1.your_domain با دامنه ای که به یکی از گره های کافکا شما اشاره دارد، قرار دهید:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

شما “1،2” را به –broker-list منتقل می کنید که نشان دهنده شناسه کارگزاران هدف است.

خروجی مشابه این خواهد بود:

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

این اسکریپت در مجموع دو طرح ایجاد کرد که به ترتیب طرح‌بندی پارتیشن فعلی و پیشنهادی را توصیف می‌کرد. اگر بخواهید بعداً تغییرات را برگردانید، اولین طرح ارائه می شود. به طرح دوم توجه کنید که در یک فایل جداگانه به نام migration-plan.json ذخیره خواهید کرد. آن را برای ویرایش ایجاد و باز کنید:

vi migration-plan.json

برنامه اجرایی دوم را اضافه کنید:

{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

ذخیره کنید و فایل را ببندید. سپس دستور زیر را برای اجرای آن اجرا کنید:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --execute

خروجی خواهد بود:

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5

اسکریپت اشاره کرد که جابجایی آغاز شده است. برای مشاهده پیشرفت مهاجرت، به جای آن وارد –verify شوید:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verify

پس از مدتی، خروجی شبیه به این خواهد شد:

OutputStatus of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic first-topic

اکنون می توانید موضوع اول را توضیح دهید تا بررسی کنید که هیچ پارتیشنی در بروکر 3 وجود ندارد:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

خروجی به شکل زیر خواهد بود:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1

فقط کارگزاران 1 و 2 به عنوان کپی و ISR حضور دارند، به این معنی که مهاجرت با موفقیت انجام شد.

در این مرحله، شما یک طرح مهاجرت برای انتقال موضوع اول از کارگزار 3 به موارد باقیمانده ایجاد کرده‌اید و یاد گرفته‌اید که چگونه تأیید کنید که مهاجرت بدون مشکل انجام شده است.

نتیجه

اکنون یک خوشه کافکا دارید که از سه گره تشکیل شده است که با استفاده از پروتکل KRaft با هم ارتباط برقرار می کنند. شما همچنین یاد گرفته اید که چگونه خوشه و چیدمان پارتیشن ها را بررسی کنید. شما افزونگی خوشه را با پایین آوردن یک گره و خواندن از یک موضوع آزمایش کرده اید. در نهایت، یاد گرفتید که چگونه موضوعات را به گره‌های خوشه تغییر دهید.

[تعداد: 1   میانگین: 5/5]
دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

شاید دوست داشته باشید