مقدمه
آپاچی کافکا یک رویداد توزیع شده منبع باز و پلتفرم پردازش جریانی است که به زبان جاوا نوشته شده است و برای پردازش فیدهای داده در زمان واقعی ساخته شده است. ذاتاً مقیاس پذیر است، با توان عملیاتی و در دسترس بودن بالا. کافکا که توسط بنیاد نرمافزار آپاچی توسعه یافته است، به دلیل قابلیت اطمینان، سهولت استفاده و تحمل خطا مورد استقبال گستردهای قرار گرفته است. این توسط بزرگترین سازمان های جهان برای مدیریت حجم زیادی از داده ها به شیوه ای توزیع شده و کارآمد استفاده می شود.
در این آموزش، آپاچی کافکا را دانلود و راه اندازی می کنید. با ایجاد و حذف موضوعات و همچنین ارسال و دریافت رویدادها با استفاده از اسکریپت های ارائه شده آشنا خواهید شد. همچنین درباره پروژه های مشابه با همان هدف و نحوه مقایسه کافکا یاد خواهید گرفت.
پیش نیازها
- دستگاهی با حداقل 4 گیگابایت رم و 2 سی پی یو. در مورد سرور اوبونتو
- جاوا 8 یا بالاتر روی Droplet یا دستگاه محلی شما نصب شده است.
مرحله 1 – دانلود و پیکربندی آپاچی کافکا
در این قسمت آپاچی کافکا را بر روی دستگاه خود دانلود و استخراج خواهید کرد. برای امنیت بیشتر، آن را تحت حساب کاربری خود تنظیم خواهید کرد. سپس، آن را با استفاده از KRaft پیکربندی و اجرا خواهید کرد.
ابتدا یک کاربر جداگانه ایجاد می کنید که کافکا تحت آن اجرا می شود. با اجرای دستور زیر کاربری به نام kafka ایجاد کنید:
sudo adduser kafka
از شما رمز عبور حساب کاربری خواسته می شود. یک رمز عبور قوی وارد کنید و با فشار دادن ENTER برای هر فیلد، از تکمیل اطلاعات اضافی صرفنظر کنید.
در نهایت، به کاربر خاص کافکا بروید:
su kafka
در مرحله بعد، بسته انتشار کافکا را از صفحه رسمی دانلودها دانلود خواهید کرد. در زمان نوشتن، آخرین نسخه 3.7.0 بود که برای Scala 2.13 ساخته شده بود. اگر از macOS یا لینوکس استفاده می کنید، می توانید کافکا را با کرل دانلود کنید.
از این دستور برای دانلود کافکا استفاده کنید و آن را در /tmp قرار دهید:
curl -o /tmp/kafka.tgz https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
شما نسخه را تحت ~/kafka، در فهرست اصلی ذخیره خواهید کرد. آن را با اجرا ایجاد کنید:
mkdir ~/kafka
سپس با اجرا کردن، آن را به ~/kafka استخراج کنید:
tar -xzf /tmp/kafka.tgz -C ~/kafka --strip-components=1
از آنجایی که بایگانی که دانلود کردید حاوی یک پوشه ریشه با همان نام نسخه کافکا است، –strip-components=1 آن را رد می کند و همه چیز را در آن استخراج می کند.
در زمان نگارش، Kafka 3 آخرین نسخه اصلی بود که از دو سیستم برای مدیریت ابرداده پشتیبانی میکرد: Apache ZooKeeper و Kafka KRaft (مخفف Kafka Raft). ZooKeeper یک پروژه منبع باز است که یک روش استاندارد برای هماهنگی داده های توزیع شده برای برنامه ها را ارائه می دهد که توسط بنیاد نرم افزار آپاچی نیز توسعه یافته است.
با این حال، با شروع Kafka 3.3، پشتیبانی از KRaft معرفی شد. KRaft یک سیستم هدفمند برای هماهنگی فقط نمونه های کافکا است، فرآیند نصب را ساده می کند و امکان مقیاس پذیری بسیار بیشتری را فراهم می کند. با KRaft، کافکا خود مسئولیت کامل داده ها را به جای نگه داشتن ابرداده های اداری به صورت خارجی بر عهده دارد.
در حالی که هنوز در دسترس است، انتظار می رود پشتیبانی ZooKeeper از Kafka 4 و به بعد حذف شود. در این آموزش، کافکا را با استفاده از KRaft راه اندازی می کنید.
شما باید یک شناسه منحصر به فرد برای خوشه کافکا جدید خود ایجاد کنید. در حال حاضر، فقط از یک گره تشکیل شده است. به فهرستی که اکنون کافکا در آن زندگی می کند بروید:
cd ~/kafka
کافکا با KRaft پیکربندی خود را در config/kraft/server.properties ذخیره میکند، در حالی که فایل پیکربندی ZooKeeper config/server.properties است.
قبل از اجرای آن برای اولین بار، باید برخی از تنظیمات پیش فرض را لغو کنید. فایل را برای ویرایش با اجرای:
nano config/kraft/server.properties
خطوط زیر را بیابید:
...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
...
تنظیمات log.dirs مشخص میکند که کافکا فایلهای گزارش خود را در کجا نگه میدارد. بهطور پیشفرض، آنها را در /tmp/kafka-logs ذخیره میکند، زیرا تضمین میشود که قابل نوشتن هستند، هرچند موقتی. مقدار را با مسیر مشخص شده جایگزین کنید:
... ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/home/kafka/kafka-logs ...
از آنجایی که یک کاربر جداگانه برای کافکا ایجاد کرده اید، مسیر فهرست سیاهه ها را در زیر فهرست اصلی کاربر قرار می دهید. اگر وجود نداشته باشد، کافکا آن را خلق خواهد کرد. وقتی کارتان تمام شد، فایل را ذخیره و ببندید.
اکنون که کافکا را پیکربندی کرده اید، دستور زیر را برای ایجاد یک شناسه خوشه تصادفی اجرا کنید:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
سپس با اجرای دستور زیر و وارد کردن شناسه، برای فایل های log فضای ذخیره سازی ایجاد کنید:
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
خروجی خواهد بود:
Output
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.
در نهایت، می توانید برای اولین بار سرور کافکا را راه اندازی کنید:
bin/kafka-server-start.sh config/kraft/server.properties
انتهای خروجی مشابه این خواهد بود:
Output
...
[2024-02-26 10:38:26,889] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2024-02-26 10:38:26,891] INFO Kafka version: 3.7.0 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka startTimeMs: 1708943906890 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,892] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)
خروجی نشان می دهد که کافکا با استفاده از KRaft با موفقیت مقداردهی اولیه کرده است و اتصالات را در 0.0.0.0:9092 می پذیرد.
هنگامی که CTRL + C را فشار دهید، فرآیند خارج می شود. از آنجایی که اجرای کافکا با باز نگه داشتن یک جلسه ترجیح داده نمی شود، در مرحله بعد سرویسی برای اجرای کافکا در پس زمینه ایجاد خواهید کرد.
مرحله 2 – ایجاد یک سرویس systemd برای کافکا
در این بخش، یک سرویس systemd برای اجرای کافکا در پسزمینه همیشه ایجاد میکنید. خدمات systemd را می توان به طور مداوم شروع، متوقف و مجددا راه اندازی کرد.
پیکربندی سرویس را در فایلی با نام code-server.service در فهرست /lib/systemd/system ذخیره میکنید، جایی که systemd سرویسهای خود را ذخیره میکند. با استفاده از ویرایشگر متن خود آن را ایجاد کنید:
sudo nano /etc/systemd/system/kafka.service
خطوط زیر را اضافه کنید:
[Unit]
Description=kafka-server
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
در اینجا ابتدا توضیحات سرویس را مشخص می کنید. سپس در قسمت [Service] نوع سرویس را تعریف می کنید (ساده به این معنی است که دستور باید به سادگی اجرا شود) و دستوری را که اجرا می شود ارائه می دهید. شما همچنین مشخص میکنید که کاربری که اجرا میشود به عنوان kafka است، و در صورت خروج کافکا، سرویس باید به طور خودکار راهاندازی مجدد شود.
بخش [نصب] به سیستم دستور می دهد تا زمانی که امکان ورود به سرور شما فراهم شد، این سرویس را راه اندازی کند. پس از اتمام، فایل را ذخیره و ببندید.
سرویس کافکا را با اجرای دستور زیر راه اندازی کنید:
sudo systemctl start kafka
با مشاهده وضعیت آن، بررسی کنید که به درستی شروع شده است:
sudo systemctl status kafka
خروجی مشابه زیر را خواهید دید:
Output
● kafka.service - kafka-server
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; preset: enabled)
Active: active (running) since Mon 2024-02-26 11:17:30 UTC; 2min 40s ago
Main PID: 1061 (sh)
Tasks: 94 (limit: 4646)
Memory: 409.2M
CPU: 10.491s
CGroup: /system.slice/kafka.service
├─1061 /bin/sh -c "/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1"
└─1062 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:gc*:file=/home/kafka/kafka/bin/../logs/kaf>
Feb 26 11:17:30 kafka-test1 systemd[1]: Started kafka.service - kafka-server.
برای شروع خودکار کافکا پس از راه اندازی مجدد سرور، سرویس آن را با اجرای دستور زیر فعال کنید:
sudo systemctl enable kafka
در این مرحله، شما یک سرویس systemd برای کافکا ایجاد کرده و آن را فعال کرده اید، به طوری که در هر بوت سرور شروع می شود. در مرحله بعد، با ایجاد و حذف موضوعات در کافکا و همچنین نحوه تولید و مصرف پیامهای متنی با استفاده از اسکریپتهای موجود آشنا خواهید شد.
مرحله 3 – تولید و مصرف پیام های موضوعی
اکنون که یک سرور کافکا راه اندازی کرده اید، با موضوعات و نحوه مدیریت آنها با استفاده از اسکریپت های ارائه شده آشنا خواهید شد. همچنین یاد خواهید گرفت که چگونه پیامها را از یک موضوع ارسال و بازگردانی کنید.
همانطور که در مقاله جریان رویداد توضیح داده شد، انتشار و دریافت پیامها با موضوعات مرتبط هستند. یک موضوع می تواند مربوط به دسته ای باشد که یک پیام به آن تعلق دارد.
اسکریپت ارائه شده kafka-topics.sh را می توان برای مدیریت موضوعات در کافکا از طریق CLI استفاده کرد. برای ایجاد موضوعی به نام first-topic دستور زیر را اجرا کنید:
bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092
همه اسکریپت های کافکا ارائه شده نیاز دارند که آدرس سرور را با –bootstrap-server مشخص کنید.
خروجی خواهد بود:
Output
Created topic first-topic.
برای فهرست کردن همه موضوعات موجود، به جای –create در –list بفرستید:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
موضوعی را که ایجاد کرده اید می بینید:
Output
first-topic
شما می توانید اطلاعات و آمار دقیق در مورد موضوع را با عبور در –describe دریافت کنید:
bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092
خروجی شبیه به این خواهد بود:
Output
Topic: first-topic TopicId: VtjiMIUtRUulwzxJL5qVjg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
خط اول نام موضوع، شناسه و ضریب تکرار را مشخص می کند که 1 است زیرا موضوع فقط در ماشین فعلی وجود دارد. خط دوم عمداً تورفتگی دارد و اطلاعاتی را در مورد پارتیشن اول (و تنها) موضوع نشان می دهد. کافکا به شما اجازه می دهد تا موضوع را پارتیشن بندی کنید، به این معنی که بخش های مختلف یک موضوع را می توان در سرورهای مختلف توزیع کرد و مقیاس پذیری را افزایش داد. در اینجا، تنها یک پارتیشن وجود دارد.
اکنون که موضوعی را ایجاد کرده اید، با استفاده از اسکریپت kafka-console-producer.sh پیام هایی برای آن تولید خواهید کرد. دستور زیر را برای شروع تولید کننده اجرا کنید:
bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092
یک اعلان خالی خواهید دید:
>
تهیه کننده منتظر ارسال پیامک شماست. تست را وارد کرده و ENTER را فشار دهید. اعلان به شکل زیر خواهد بود:
>test
>
تهیه کننده اکنون منتظر پیام بعدی است، یعنی پیام قبلی با موفقیت به کافکا ابلاغ شده است. می توانید هر تعداد پیام را برای آزمایش وارد کنید. برای خروج از سازنده، CTRL+C را فشار دهید.
برای بازخوانی پیام های موضوع، به یک مصرف کننده نیاز دارید. کافکا یک مصرف کننده ساده را در قالب kafka-console-consumer.sh ارائه می دهد. آن را با اجرا اجرا کنید:
bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092
با این حال، خروجی وجود نخواهد داشت. دلیل آن این است که مصرف کننده در حال پخش داده ها از موضوع است و در حال حاضر چیزی تولید و ارسال نمی شود. برای مصرف پیام هایی که قبل از شروع مصرف کننده تولید کرده اید، باید موضوع را از ابتدا با اجرای زیر بخوانید:
bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092
مصرف کننده همه رویدادهای موضوع را دوباره پخش می کند و پیام ها را واکشی می کند:
Outputtest
...
همانند سازنده، برای خروج، CTRL+C را فشار دهید.
برای تأیید اینکه مصرف کننده واقعاً داده ها را پخش می کند، آن را در یک جلسه ترمینال جداگانه باز می کنید. یک جلسه SSH ثانویه را باز کنید و مصرف کننده را در پیکربندی پیش فرض اجرا کنید:
bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092
در جلسه اولیه، سازنده را اجرا کنید:
bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092
سپس پیام های دلخواه خود را وارد کنید:
>second test
>third test
>
بلافاصله دریافت آنها توسط مصرف کننده را مشاهده خواهید کرد:
Output
second test
third test
پس از اتمام آزمایش، هم تولیدکننده و هم مصرف کننده را خاتمه دهید.
برای حذف اولین تاپیک، –delete را به kafka-topics.sh منتقل کنید:
bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092
خروجی وجود نخواهد داشت. میتوانید موضوعات را فهرست کنید تا تأیید کنید که واقعاً حذف شده است:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
خروجی خواهد بود:
Output
__consumer_offsets
__معادل_مصرفکننده موضوعی درونی برای کافکا است که میزان مطالعه یک موضوع توسط مصرفکننده را ذخیره میکند.
در این مرحله، شما یک موضوع کافکا ایجاد کردهاید و پیامهایی در آن ایجاد کردهاید. سپس، پیام ها را با استفاده از اسکریپت ارائه شده مصرف کرده و در نهایت آنها را به صورت بلادرنگ دریافت کرده اید. در مرحله بعد، با نحوه مقایسه کافکا با سایر کارگزاران رویداد و نرم افزارهای مشابه آشنا خواهید شد.
مقایسه با معماری های مشابه
آپاچی کافکا راه حلی واقعی برای موارد استفاده از جریان رویداد در نظر گرفته می شود. با این حال، Apache Pulsar و RabbitMQ نیز به طور گسترده مورد استفاده قرار می گیرند و به عنوان گزینه های همه کاره، البته با تفاوت در رویکردشان، برجسته می شوند.
تفاوت اصلی بین صف پیام و جریان رویداد در این است که وظیفه اصلی اولی رساندن پیام ها به مشتریان در سریع ترین روش ممکن و بدون توجه به سفارش آنها است. چنین سیستم هایی معمولاً پیام ها را تا زمانی که توسط مصرف کنندگان تأیید شود در حافظه ذخیره می کنند. فیلتر کردن و مسیریابی پیام ها جنبه مهمی دارد، زیرا مصرف کنندگان می توانند به دسته های خاصی از داده ها علاقه نشان دهند. RabbitMQ یک نمونه قوی از یک سیستم پیام رسانی سنتی است که در آن چندین مصرف کننده می توانند در یک موضوع مشترک شوند و چندین نسخه از یک پیام را دریافت کنند.
از سوی دیگر، جریان رویداد بر پایداری متمرکز است. رویدادها باید بایگانی شوند، مرتب نگهداری شوند و یک بار پردازش شوند. مسیریابی آنها به مصرف کنندگان خاص مهم نیست، زیرا ایده این است که همه مصرف کنندگان رویدادها را یکسان پردازش می کنند.
Apache Pulsar یک سیستم پیامرسان منبع باز است که توسط بنیاد نرمافزار آپاچی توسعه یافته و از جریان رویداد پشتیبانی میکند. برخلاف کافکا، که از ابتدا با آن ساخته شد، Pulsar به عنوان یک راهحل سنتی صفبندی پیام شروع به کار کرد و بعداً قابلیتهای جریان رویداد را به دست آورد. بنابراین Pulsar زمانی مفید است که ترکیبی از هر دو روش مورد نیاز باشد، بدون نیاز به استقرار برنامه های کاربردی جداگانه.
نتیجه
شما اکنون آپاچی کافکا را به طور ایمن در پسزمینه سرور خود دارید که به عنوان یک سرویس سیستم پیکربندی شده است. شما همچنین یاد گرفتهاید که چگونه موضوعات را از خط فرمان دستکاری کنید، و همچنین پیامها را تولید و مصرف کنید. با این حال، جذابیت اصلی کافکا، تنوع گسترده مشتریان برای ادغام آن در برنامه های شما است.