آشنایی با Kafka

مقدمه

آپاچی کافکا یک رویداد توزیع شده منبع باز و پلتفرم پردازش جریانی است که به زبان جاوا نوشته شده است و برای پردازش فیدهای داده در زمان واقعی ساخته شده است. ذاتاً مقیاس پذیر است، با توان عملیاتی و در دسترس بودن بالا. کافکا که توسط بنیاد نرم‌افزار آپاچی توسعه یافته است، به دلیل قابلیت اطمینان، سهولت استفاده و تحمل خطا مورد استقبال گسترده‌ای قرار گرفته است. این توسط بزرگترین سازمان های جهان برای مدیریت حجم زیادی از داده ها به شیوه ای توزیع شده و کارآمد استفاده می شود.

در این آموزش، آپاچی کافکا را دانلود و راه اندازی می کنید. با ایجاد و حذف موضوعات و همچنین ارسال و دریافت رویدادها با استفاده از اسکریپت های ارائه شده آشنا خواهید شد. همچنین درباره پروژه های مشابه با همان هدف و نحوه مقایسه کافکا یاد خواهید گرفت.

پیش نیازها
  • دستگاهی با حداقل 4 گیگابایت رم و 2 سی پی یو. در مورد سرور اوبونتو
  • جاوا 8 یا بالاتر روی Droplet یا دستگاه محلی شما نصب شده است.

مرحله 1 – دانلود و پیکربندی آپاچی کافکا

در این قسمت آپاچی کافکا را بر روی دستگاه خود دانلود و استخراج خواهید کرد. برای امنیت بیشتر، آن را تحت حساب کاربری خود تنظیم خواهید کرد. سپس، آن را با استفاده از KRaft پیکربندی و اجرا خواهید کرد.

ابتدا یک کاربر جداگانه ایجاد می کنید که کافکا تحت آن اجرا می شود. با اجرای دستور زیر کاربری به نام kafka ایجاد کنید:

sudo adduser kafka

از شما رمز عبور حساب کاربری خواسته می شود. یک رمز عبور قوی وارد کنید و با فشار دادن ENTER برای هر فیلد، از تکمیل اطلاعات اضافی صرفنظر کنید.

در نهایت، به کاربر خاص کافکا بروید:

su kafka

در مرحله بعد، بسته انتشار کافکا را از صفحه رسمی دانلودها دانلود خواهید کرد. در زمان نوشتن، آخرین نسخه 3.7.0 بود که برای Scala 2.13 ساخته شده بود. اگر از macOS یا لینوکس استفاده می کنید، می توانید کافکا را با کرل دانلود کنید.

از این دستور برای دانلود کافکا استفاده کنید و آن را در /tmp قرار دهید:

curl -o /tmp/kafka.tgz https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz

شما نسخه را تحت ~/kafka، در فهرست اصلی ذخیره خواهید کرد. آن را با اجرا ایجاد کنید:

mkdir ~/kafka

سپس با اجرا کردن، آن را به ~/kafka استخراج کنید:

tar -xzf /tmp/kafka.tgz -C ~/kafka --strip-components=1

از آنجایی که بایگانی که دانلود کردید حاوی یک پوشه ریشه با همان نام نسخه کافکا است، –strip-components=1 آن را رد می کند و همه چیز را در آن استخراج می کند.

در زمان نگارش، Kafka 3 آخرین نسخه اصلی بود که از دو سیستم برای مدیریت ابرداده پشتیبانی می‌کرد: Apache ZooKeeper و Kafka KRaft (مخفف Kafka Raft). ZooKeeper یک پروژه منبع باز است که یک روش استاندارد برای هماهنگی داده های توزیع شده برای برنامه ها را ارائه می دهد که توسط بنیاد نرم افزار آپاچی نیز توسعه یافته است.

با این حال، با شروع Kafka 3.3، پشتیبانی از KRaft معرفی شد. KRaft یک سیستم هدفمند برای هماهنگی فقط نمونه های کافکا است، فرآیند نصب را ساده می کند و امکان مقیاس پذیری بسیار بیشتری را فراهم می کند. با KRaft، کافکا خود مسئولیت کامل داده ها را به جای نگه داشتن ابرداده های اداری به صورت خارجی بر عهده دارد.

در حالی که هنوز در دسترس است، انتظار می رود پشتیبانی ZooKeeper از Kafka 4 و به بعد حذف شود. در این آموزش، کافکا را با استفاده از KRaft راه اندازی می کنید.

شما باید یک شناسه منحصر به فرد برای خوشه کافکا جدید خود ایجاد کنید. در حال حاضر، فقط از یک گره تشکیل شده است. به فهرستی که اکنون کافکا در آن زندگی می کند بروید:

cd ~/kafka

کافکا با KRaft پیکربندی خود را در config/kraft/server.properties ذخیره می‌کند، در حالی که فایل پیکربندی ZooKeeper config/server.properties است.

قبل از اجرای آن برای اولین بار، باید برخی از تنظیمات پیش فرض را لغو کنید. فایل را برای ویرایش با اجرای:

nano config/kraft/server.properties

خطوط زیر را بیابید:

...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
...

تنظیمات log.dirs مشخص می‌کند که کافکا فایل‌های گزارش خود را در کجا نگه می‌دارد. به‌طور پیش‌فرض، آن‌ها را در /tmp/kafka-logs ذخیره می‌کند، زیرا تضمین می‌شود که قابل نوشتن هستند، هرچند موقتی. مقدار را با مسیر مشخص شده جایگزین کنید:

...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/home/kafka/kafka-logs
...

از آنجایی که یک کاربر جداگانه برای کافکا ایجاد کرده اید، مسیر فهرست سیاهه ها را در زیر فهرست اصلی کاربر قرار می دهید. اگر وجود نداشته باشد، کافکا آن را خلق خواهد کرد. وقتی کارتان تمام شد، فایل را ذخیره و ببندید.

اکنون که کافکا را پیکربندی کرده اید، دستور زیر را برای ایجاد یک شناسه خوشه تصادفی اجرا کنید:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

سپس با اجرای دستور زیر و وارد کردن شناسه، برای فایل های log فضای ذخیره سازی ایجاد کنید:

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

خروجی خواهد بود:

Output
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.

در نهایت، می توانید برای اولین بار سرور کافکا را راه اندازی کنید:

bin/kafka-server-start.sh config/kraft/server.properties

انتهای خروجی مشابه این خواهد بود:

Output
...
[2024-02-26 10:38:26,889] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2024-02-26 10:38:26,891] INFO Kafka version: 3.7.0 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka startTimeMs: 1708943906890 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,892] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

خروجی نشان می دهد که کافکا با استفاده از KRaft با موفقیت مقداردهی اولیه کرده است و اتصالات را در 0.0.0.0:9092 می پذیرد.

هنگامی که CTRL + C را فشار دهید، فرآیند خارج می شود. از آنجایی که اجرای کافکا با باز نگه داشتن یک جلسه ترجیح داده نمی شود، در مرحله بعد سرویسی برای اجرای کافکا در پس زمینه ایجاد خواهید کرد.

مرحله 2 – ایجاد یک سرویس systemd برای کافکا

در این بخش، یک سرویس systemd برای اجرای کافکا در پس‌زمینه همیشه ایجاد می‌کنید. خدمات systemd را می توان به طور مداوم شروع، متوقف و مجددا راه اندازی کرد.

پیکربندی سرویس را در فایلی با نام code-server.service در فهرست /lib/systemd/system ذخیره می‌کنید، جایی که systemd سرویس‌های خود را ذخیره می‌کند. با استفاده از ویرایشگر متن خود آن را ایجاد کنید:

sudo nano /etc/systemd/system/kafka.service

خطوط زیر را اضافه کنید:

[Unit]
Description=kafka-server
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

در اینجا ابتدا توضیحات سرویس را مشخص می کنید. سپس در قسمت [Service] نوع سرویس را تعریف می کنید (ساده به این معنی است که دستور باید به سادگی اجرا شود) و دستوری را که اجرا می شود ارائه می دهید. شما همچنین مشخص می‌کنید که کاربری که اجرا می‌شود به عنوان kafka است، و در صورت خروج کافکا، سرویس باید به طور خودکار راه‌اندازی مجدد شود.

بخش [نصب] به سیستم دستور می دهد تا زمانی که امکان ورود به سرور شما فراهم شد، این سرویس را راه اندازی کند. پس از اتمام، فایل را ذخیره و ببندید.

سرویس کافکا را با اجرای دستور زیر راه اندازی کنید:

sudo systemctl start kafka

با مشاهده وضعیت آن، بررسی کنید که به درستی شروع شده است:

sudo systemctl status kafka

خروجی مشابه زیر را خواهید دید:

Output
● kafka.service - kafka-server
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; preset: enabled)
Active: active (running) since Mon 2024-02-26 11:17:30 UTC; 2min 40s ago
Main PID: 1061 (sh)
Tasks: 94 (limit: 4646)
Memory: 409.2M
CPU: 10.491s
CGroup: /system.slice/kafka.service
├─1061 /bin/sh -c "/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1"
└─1062 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:gc*:file=/home/kafka/kafka/bin/../logs/kaf>
Feb 26 11:17:30 kafka-test1 systemd[1]: Started kafka.service - kafka-server.

برای شروع خودکار کافکا پس از راه اندازی مجدد سرور، سرویس آن را با اجرای دستور زیر فعال کنید:

sudo systemctl enable kafka

در این مرحله، شما یک سرویس systemd برای کافکا ایجاد کرده و آن را فعال کرده اید، به طوری که در هر بوت سرور شروع می شود. در مرحله بعد، با ایجاد و حذف موضوعات در کافکا و همچنین نحوه تولید و مصرف پیام‌های متنی با استفاده از اسکریپت‌های موجود آشنا خواهید شد.

مرحله 3 – تولید و مصرف پیام های موضوعی

اکنون که یک سرور کافکا راه اندازی کرده اید، با موضوعات و نحوه مدیریت آنها با استفاده از اسکریپت های ارائه شده آشنا خواهید شد. همچنین یاد خواهید گرفت که چگونه پیام‌ها را از یک موضوع ارسال و بازگردانی کنید.

همانطور که در مقاله جریان رویداد توضیح داده شد، انتشار و دریافت پیام‌ها با موضوعات مرتبط هستند. یک موضوع می تواند مربوط به دسته ای باشد که یک پیام به آن تعلق دارد.

اسکریپت ارائه شده kafka-topics.sh را می توان برای مدیریت موضوعات در کافکا از طریق CLI استفاده کرد. برای ایجاد موضوعی به نام first-topic دستور زیر را اجرا کنید:

bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092

همه اسکریپت های کافکا ارائه شده نیاز دارند که آدرس سرور را با –bootstrap-server مشخص کنید.

خروجی خواهد بود:

Output
Created topic first-topic.

برای فهرست کردن همه موضوعات موجود، به جای –create در –list بفرستید:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

موضوعی را که ایجاد کرده اید می بینید:

Output
first-topic

شما می توانید اطلاعات و آمار دقیق در مورد موضوع را با عبور در –describe دریافت کنید:

bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092

خروجی شبیه به این خواهد بود:

Output
Topic: first-topic TopicId: VtjiMIUtRUulwzxJL5qVjg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1

خط اول نام موضوع، شناسه و ضریب تکرار را مشخص می کند که 1 است زیرا موضوع فقط در ماشین فعلی وجود دارد. خط دوم عمداً تورفتگی دارد و اطلاعاتی را در مورد پارتیشن اول (و تنها) موضوع نشان می دهد. کافکا به شما اجازه می دهد تا موضوع را پارتیشن بندی کنید، به این معنی که بخش های مختلف یک موضوع را می توان در سرورهای مختلف توزیع کرد و مقیاس پذیری را افزایش داد. در اینجا، تنها یک پارتیشن وجود دارد.

اکنون که موضوعی را ایجاد کرده اید، با استفاده از اسکریپت kafka-console-producer.sh پیام هایی برای آن تولید خواهید کرد. دستور زیر را برای شروع تولید کننده اجرا کنید:

bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092

یک اعلان خالی خواهید دید:

>

تهیه کننده منتظر ارسال پیامک شماست. تست را وارد کرده و ENTER را فشار دهید. اعلان به شکل زیر خواهد بود:

>test
>

تهیه کننده اکنون منتظر پیام بعدی است، یعنی پیام قبلی با موفقیت به کافکا ابلاغ شده است. می توانید هر تعداد پیام را برای آزمایش وارد کنید. برای خروج از سازنده، CTRL+C را فشار دهید.

برای بازخوانی پیام های موضوع، به یک مصرف کننده نیاز دارید. کافکا یک مصرف کننده ساده را در قالب kafka-console-consumer.sh ارائه می دهد. آن را با اجرا اجرا کنید:

bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092

با این حال، خروجی وجود نخواهد داشت. دلیل آن این است که مصرف کننده در حال پخش داده ها از موضوع است و در حال حاضر چیزی تولید و ارسال نمی شود. برای مصرف پیام هایی که قبل از شروع مصرف کننده تولید کرده اید، باید موضوع را از ابتدا با اجرای زیر بخوانید:

bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092

مصرف کننده همه رویدادهای موضوع را دوباره پخش می کند و پیام ها را واکشی می کند:

Outputtest
...

همانند سازنده، برای خروج، CTRL+C را فشار دهید.

برای تأیید اینکه مصرف کننده واقعاً داده ها را پخش می کند، آن را در یک جلسه ترمینال جداگانه باز می کنید. یک جلسه SSH ثانویه را باز کنید و مصرف کننده را در پیکربندی پیش فرض اجرا کنید:

bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092

در جلسه اولیه، سازنده را اجرا کنید:

bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092

سپس پیام های دلخواه خود را وارد کنید:

>second test
>third test
>

بلافاصله دریافت آنها توسط مصرف کننده را مشاهده خواهید کرد:

Output
second test
third test

پس از اتمام آزمایش، هم تولیدکننده و هم مصرف کننده را خاتمه دهید.

برای حذف اولین تاپیک، –delete را به kafka-topics.sh منتقل کنید:

bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092

خروجی وجود نخواهد داشت. می‌توانید موضوعات را فهرست کنید تا تأیید کنید که واقعاً حذف شده است:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

خروجی خواهد بود:

Output
__consumer_offsets

__معادل_مصرف‌کننده موضوعی درونی برای کافکا است که میزان مطالعه یک موضوع توسط مصرف‌کننده را ذخیره می‌کند.

در این مرحله، شما یک موضوع کافکا ایجاد کرده‌اید و پیام‌هایی در آن ایجاد کرده‌اید. سپس، پیام ها را با استفاده از اسکریپت ارائه شده مصرف کرده و در نهایت آنها را به صورت بلادرنگ دریافت کرده اید. در مرحله بعد، با نحوه مقایسه کافکا با سایر کارگزاران رویداد و نرم افزارهای مشابه آشنا خواهید شد.

مقایسه با معماری های مشابه

آپاچی کافکا راه حلی واقعی برای موارد استفاده از جریان رویداد در نظر گرفته می شود. با این حال، Apache Pulsar و RabbitMQ نیز به طور گسترده مورد استفاده قرار می گیرند و به عنوان گزینه های همه کاره، البته با تفاوت در رویکردشان، برجسته می شوند.

تفاوت اصلی بین صف پیام و جریان رویداد در این است که وظیفه اصلی اولی رساندن پیام ها به مشتریان در سریع ترین روش ممکن و بدون توجه به سفارش آنها است. چنین سیستم هایی معمولاً پیام ها را تا زمانی که توسط مصرف کنندگان تأیید شود در حافظه ذخیره می کنند. فیلتر کردن و مسیریابی پیام ها جنبه مهمی دارد، زیرا مصرف کنندگان می توانند به دسته های خاصی از داده ها علاقه نشان دهند. RabbitMQ یک نمونه قوی از یک سیستم پیام رسانی سنتی است که در آن چندین مصرف کننده می توانند در یک موضوع مشترک شوند و چندین نسخه از یک پیام را دریافت کنند.

از سوی دیگر، جریان رویداد بر پایداری متمرکز است. رویدادها باید بایگانی شوند، مرتب نگهداری شوند و یک بار پردازش شوند. مسیریابی آنها به مصرف کنندگان خاص مهم نیست، زیرا ایده این است که همه مصرف کنندگان رویدادها را یکسان پردازش می کنند.

Apache Pulsar یک سیستم پیام‌رسان منبع باز است که توسط بنیاد نرم‌افزار آپاچی توسعه یافته و از جریان رویداد پشتیبانی می‌کند. برخلاف کافکا، که از ابتدا با آن ساخته شد، Pulsar به عنوان یک راه‌حل سنتی صف‌بندی پیام شروع به کار کرد و بعداً قابلیت‌های جریان رویداد را به دست آورد. بنابراین Pulsar زمانی مفید است که ترکیبی از هر دو روش مورد نیاز باشد، بدون نیاز به استقرار برنامه های کاربردی جداگانه.

نتیجه

شما اکنون آپاچی کافکا را به طور ایمن در پس‌زمینه سرور خود دارید که به عنوان یک سرویس سیستم پیکربندی شده است. شما همچنین یاد گرفته‌اید که چگونه موضوعات را از خط فرمان دستکاری کنید، و همچنین پیام‌ها را تولید و مصرف کنید. با این حال، جذابیت اصلی کافکا، تنوع گسترده مشتریان برای ادغام آن در برنامه های شما است.

[تعداد: 1   میانگین: 5/5]
دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

شاید دوست داشته باشید