مقدمه
آپاچی کافکا اسکریپت های پوسته ای را برای تولید و مصرف پیام های متنی اولیه به و از یک خوشه کافکا فراهم می کند. در حالی که آنها برای کاوش و آزمایش مفید هستند، برنامه های کاربردی دنیای واقعی به صورت برنامه نویسی به کافکا دسترسی دارند. برای این منظور، کافکا کتابخانه های مشتری زیادی را برای زبان ها و محیط های برنامه نویسی پرکاربرد ارائه می دهد. در این آموزش، یک برنامه جاوا ایجاد می کنید که داده ها را در یک موضوع کافکا تولید می کند. شما با استفاده از Apache Maven که ابزاری برای ساخت و بسته بندی پروژه های جاوا است، یک پروژه جاوا ایجاد می کنید و کتابخانه مشتری کافکا را به عنوان یک وابستگی اضافه می کنید. سپس، کلاسی را پیادهسازی میکنید که با تولید پیامها و بازیابی ابردادههای درون خوشهای درباره آنها، از مشتری کافکا استفاده میکند.
پیش نیازها
- دستگاهی با حداقل 4 گیگابایت رم و 2 سی پی یو
- کیت توسعه جاوا (JDK) 8 یا بالاتر روی Droplet یا دستگاه محلی شما نصب شده است
- آپاچی کافکا روی Droplet یا ماشین محلی شما نصب و پیکربندی شده است
- آشنایی با طرح دایرکتوری استاندارد پروژه های جاوا
مرحله 1 – ایجاد یک پروژه Maven
در این مرحله، Apache Maven را نصب میکنید و از آن برای ایجاد پروژهای استفاده میکنید که از آن برای ارتباط با کافکا استفاده میکنید. در اوبونتو، Maven به راحتی در مخازن رسمی موجود است.
ابتدا لیست بسته های موجود خود را با اجرای:
sudo apt update
برای نصب آن دستور زیر را اجرا کنید:
sudo apt install maven
با خواندن شماره نسخه آن، تأیید کنید که نصب شده است:
mvn --version
خروجی بسته به نسخه جاوا و پلتفرم شما مشابه موارد زیر خواهد بود:
OutputApache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"
در مرحله بعد، دایرکتوری ایجاد کنید که در آن پروژه های جاوا خود را برای کار با کافکا ذخیره کنید:
mkdir ~/kafka-projects
به دایرکتوری تازه ایجاد شده بروید:
cd ~/kafka-projects
سپس با اجرای زیر یک پروژه جاوا خالی تولید کنید:
mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false
در اینجا، به Maven دستور می دهید که پروژه جدیدی به نام dokafka
با شناسه گروهی com.dokafka
تولید کند. شناسه گروه به طور منحصر به فرد این پروژه را در سراسر اکوسیستم Maven
شناسایی می کند. این پروژه بر اساس کهن الگوی maven-archetype-quickstart
ایجاد خواهد شد، که Maven الگوها را به این شکل فراخوانی می کند.
خروجی های زیادی وجود خواهد داشت، به خصوص اگر این اولین باری باشد که Maven اجرا می شود. انتهای خروجی به شکل زیر خواهد بود:
Output...
INFO] Generating project in Batch mode
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: packageInPathFormat, Value: com/dokafka
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.537 s
[INFO] Finished at: 2024-03-10T10:43:07Z
[INFO] ------------------------------------------------------------------------
Maven بسته های جاوا لازم را از مخزن مرکزی خود دانلود کرده و پروژه dokafka
را با استفاده از الگوی maven-archetype-quickstart
ایجاد کرده است.
با اجرای زیر به دایرکتوری پروژه بروید:
cd dokafka
ساختار پروژه به شکل زیر است:
├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── dokafka
│ └── App.java
└── test
└── java
└── com
└── dokafka
└── AppTest.java
به عنوان بخشی از پیش نیازها، شما در مورد ساختار استاندارد پروژه Maven که در اینجا مشاهده می کنید، یاد گرفتید. دایرکتوری src/main/java
کد منبع پروژه را نگه میدارد، src/test/java
حاوی منابع آزمایشی است و pom.xml
در ریشه پروژه، فایل پیکربندی اصلی Maven است.
این پروژه تنها حاوی یک فایل منبع، App.java
است. محتویات آن را نشان دهید تا ببینید Maven چه چیزی تولید کرده است:
cat src/main/java/com/dokafka/App.java
خروجی خواهد بود:
package com.dokafka;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}
برای اجرای این کد، ابتدا باید پروژه را با اجرای زیر بسازید:
mvn package
Maven کد را کامپایل کرده و آن را در یک فایل JAR بسته بندی می کند تا اجرا شود. انتهای خروجی به این صورت خواهد بود که به معنای تکمیل آن است:
Output...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.830 s
[INFO] Finished at: 2024-03-10T10:41:24Z
[INFO] ------------------------------------------------------------------------
Maven فایل JAR
به دست آمده را زیر فهرست هدف قرار داد. برای اجرای کلاس App
که به تازگی ساخته اید، دستور زیر را اجرا کنید و شناسه کامل کلاس را وارد کنید:
java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.App
خروجی خواهد بود:
OutputHello World!
شما Maven را نصب کرده اید و یک پروژه جاوا خالی ایجاد کرده اید. در مرحله بعد وابستگی های لازم را برای کافکا اضافه خواهید کرد.
مرحله 2 – افزودن وابستگی های Maven
اکنون کلاینت جاوا کافکا و همچنین وابستگی های دیگر را برای لاگ به پروژه خود اضافه خواهید کرد. همچنین میتوانید Maven را طوری پیکربندی کنید که این وابستگیها را در طول بستهبندی لحاظ کند. ابتدا، وابستگی مشتریان کافکا را اضافه می کنید. به صفحه مخزن Maven برای مشتری جاوا در مرورگر خود بروید و آخرین نسخه موجود را انتخاب کنید، سپس قطعه XML ارائه شده را برای Maven کپی کنید. در زمان نگارش، آخرین نسخه کتابخانه کلاینت جاوا 3.7.0
بود.
وابستگی ها به pom.xml
در ریشه پروژه شما اضافه می شوند. آن را برای ویرایش باز کنید:
nano pom.xml
بخش <dependencies>
; را پیدا کنید و تعریف وابستگی را اضافه کنید:
...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
...
</dependencies>
این کار کتابخانه مشتری کافکا را در اختیار پروژه شما قرار می دهد. با این حال، خود کتابخانه به دو وابستگی دیگر نیاز دارد که باید به صورت دستی اضافه کنید. آنها از کتابخانه SLF4J که برای ثبت پیامها استفاده میکند، سرچشمه میگیرند، زیرا از بسیاری از کتابخانههای ورود به سیستم پشتیبانی میکند و به توسعهدهنده اجازه میدهد تا در مورد نحوه پردازش پیامهای گزارش انعطافپذیر باشد. دو وابستگی که باید اضافه کنید عبارتند از:
- slf4j-api که خود کتابخانه است
- slf4j-simple، که لاگ ها را پردازش کرده و به ترمینال خروجی می دهد
هنگامی که وابستگی ها را تعریف کردید، باید آنها را در کنار JAR نهایی ساخته شده در دسترس قرار دهید. بخش <build>
pom.xml
را پیدا کنید و خطوط برجسته را اضافه کنید:
...
<build>
<pluginManagement>
<plugins>
...
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...
در اینجا، افزونه maven-dependency-plugin
را برای کپی کردن همه وابستگی ها در زمان بسته بندی پیکربندی می کنید. فایلهای JAR وابستگیها، در این پیکربندی پروژه، تحت target/lib
قرار خواهند گرفت. توجه داشته باشید که نباید بخش <plugins>
موجود را در <pluginManagement>
; تغییر دهید. وقتی کارتان تمام شد، فایل را ذخیره و ببندید.
پروژه را بسازید تا مطمئن شوید همه چیز به درستی پیکربندی شده است:
mvn package
انتهای خروجی باید مشابه این باشد:
Output...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka ---
[INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka ---
[INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar
[INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar
[INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar
[INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar
[INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar
[INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar
[INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar
[INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.205 s
[INFO] Finished at: 2024-03-12T06:36:34Z
[INFO] ------------------------------------------------------------------------
میتوانید فایلها را در زیر target/lib
فهرست کنید تا مطمئن شوید که وابستگیها واقعاً کپی شدهاند:
Outputhamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar
junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jar
شما وابستگی های لازم را به پروژه Maven خود اضافه کرده اید. اکنون می خواهید به کافکا متصل شوید و پیام ها را به صورت برنامه ریزی شده تولید کنید.
مرحله 3 – ایجاد یک تولید کننده کافکا در جاوا
در این مرحله، یک تولید کننده کافکا در جاوا راه اندازی می کنید و برای یک موضوع پیام می نویسید.
مطابق ساختار پروژه، کد منبع در زیر src/main/java/com/dokafka
ذخیره میشود. از آنجایی که برای بقیه آموزش نیازی به App.java
نخواهید داشت، با اجرای زیر آن را حذف کنید:
rm src/main/java/com/dokafka/App.java
شما کد سازنده را در کلاسی به نام ProducerDemo ذخیره خواهید کرد. فایل همراه را برای ویرایش ایجاد و باز کنید:
nano src/main/java/com/dokafka/ProducerDemo.java
خطوط زیر را اضافه کنید:
package com.dokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, "Hello World!");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}
ابتدا کلاس ProducerDemo
را تعریف می کنید، کلاس های استفاده شده را وارد می کنید و یک Logger
ایجاد می کنید. در روش اصلی ابتدا آدرس خوشه کافکا (bootstrapServers) و نام موضوع تولید پیام (topicName) را اعلام می کنید.
سپس، یک شیء Properties
را نمونهسازی میکنید، که شبیه به یک فرهنگ لغت کلید-مقدار است و پیکربندی را برای عملکرد تولیدکننده کافکا شما نگه میدارد. شما ویژگی BOOTSTRAP_SERVERS_CONFIG
را با آدرس خوشه کافکا پر می کنید. همچنین ورودیهای KEY_SERIALIZER_CLASS_CONFIG
و VALUE_SERIALIZER_CLASS_CONFIG
را روی StringSerializer.class.getName ()
تنظیم کنید.
این ویژگی ها مشخص می کنند که کدام سریال سازها باید برای پردازش کلیدها و مقادیر پیام های تولید شده استفاده شوند. سریال سازها کلاس هایی هستند که ورودی را می پذیرند و آرایه ای از بایت ها را به عنوان خروجی پس می دهند و آماده انتقال از طریق شبکه هستند. Deserializers
برعکس عمل می کنند و شی اصلی را از جریان بایت ها بازسازی می کنند. در اینجا، هم کلید و هم مقدار با استفاده از StringSerializer
داخلی بهصورت رشتهها سریالسازی میشوند.
در مرحله بعد، شما یک KafkaProducer
را اعلام و نمونهسازی میکنید:
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
سازنده کلیدها و مقادیری از نوع String
را با خصوصیات همراه برای پیکربندی می پذیرد. برای ارسال پیام به یک موضوع، KafkaProducer
یک ProducerRecord را می پذیرد که با نام موضوع و خود پیام که Hello World!
است، نمونه برداری می کنید. توجه داشته باشید که خود سازنده به موضوع خاصی وابسته نیست.
پس از ارسال پیام، تولید کننده را فلاش کرده و می بندید. فراخوانی producer.send()
ناهمزمان است، به این معنی که جریان کنترل به متد اصلی باز می گردد در حالی که پیام در رشته دیگری ارسال می شود. از آنجایی که این برنامه نمونه میخواهد بعد از آن خارج شود، شما تولیدکننده را مجبور میکنید تا با فلاشینگ، هر آنچه را که باقی مانده است، ارسال کند. سپس، آن را میبندید () و به کافکا نشان میدهد که سازنده در حال نابودی است.
در مرحله بعد، یک اسکریپت ایجاد خواهید کرد که ساختمان و اجرای ProducerDemo را مدیریت می کند. شما آن را در فایلی به نام run-producer.sh ذخیره خواهید کرد. آن را برای ویرایش ایجاد و باز کنید:
nano run-producer.sh
خطوط زیر را اضافه کنید:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo
وقتی کارتان تمام شد، فایل را ذخیره و ببندید. قسمت برجسته مشخص می کند که وابستگی ها در کجا قرار دارند.
سپس آن را به عنوان اجرایی علامت بزنید:
chmod +x run-producer.sh
در نهایت، سعی کنید یک Hello World را تولید کنید! با اجرای آن پیام دهید:
./run-producer.sh
خروجی طولانی خواهد بود و انتهای آن باید به این صورت باشد:
Output...
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
KafkaProducer ثبت شد که با موفقیت ایجاد شد و بعداً ثبت نشد. اکنون پیام در موضوع java_demo نوشته شده است و می توانید آن را با استفاده از اسکریپت kafka-console-consumer.sh بازیابی کنید.
در یک پوسته جداگانه، به دایرکتوری نصب کافکا خود بروید و دستور زیر را برای خواندن موضوع اجرا کنید:
bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092
خروجی خواهد بود:
OutputHello World!
برای خروج می توانید CTRL+C
را فشار دهید.
در این مرحله، به صورت برنامهنویسی، پیامی را در موضوع java_demo
تولید کردهاید و با استفاده از اسکریپت bash ارائهشده توسط کافکا، آن را دوباره بخوانید. اکنون می آموزید که چگونه از اطلاعاتی که کافکا پس از ارسال موفقیت آمیز پیام برمی گرداند، استفاده کنید.
مرحله 4 – بازیابی متادیتا با استفاده از Callbacks
متد send() KafkaProducer
تماسهای برگشتی را میپذیرد، که به شما امکان میدهد بر اساس رویدادهایی که رخ میدهند، مانند زمانی که رکورد دریافت میشود، عمل کنید. این برای بازیابی اطلاعات در مورد نحوه مدیریت رکورد توسط خوشه مفید است.
برای گسترش تماس send()
با یک callback، ابتدا ProducerDemo
را برای ویرایش باز کنید:
nano src/main/java/com/dokafka/ProducerDemo.java
کد را به شکل زیر تغییر دهید:
...
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("An error occurred!", e);
return;
}
log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
recordMetadata.timestamp(),
recordMetadata.partition(),
recordMetadata.offset()));
}
});
...
اکنون یک پیاده سازی از واسط Callback را به متد send()
منتقل می کنید و onCompletion()
را پیاده سازی می کنید که RecordMetadata
و به صورت اختیاری یک Exception
را دریافت می کند. سپس، اگر خطایی رخ داد، آن را ثبت کنید. در غیر این صورت، مُهر زمانی، شماره پارتیشن و افست رکورد را که اکنون در خوشه است ثبت میکنید. از آنجایی که ارسال پیام از این طریق ناهمزمان است، زمانی که خوشه رکورد را بپذیرد، کد شما فراخوانی میشود، بدون اینکه شما صریحاً منتظر بمانید تا این اتفاق بیفتد.
وقتی کارتان تمام شد، فایل را ذخیره و ببندید، سپس سازنده را اجرا کنید:
./run-producer.sh
به یک پیام جدید در انتهای خروجی توجه کنید:
Output...
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2
...
پیامی که به تازگی تولید شده بود توسط خوشه پذیرفته شد و در پارتیشن 0 ذخیره شد.
اگر دوباره آن را اجرا کنید، متوجه خواهید شد که افست یک بزرگتر است، که نشان دهنده مکان پیام در پارتیشن است:
Output[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3
نتیجه
در این مقاله، شما یک پروژه جاوا را با استفاده از Maven ایجاد کردهاید و آن را به وابستگیهایی برای ارتباط با کافکا مجهز کردهاید. سپس، کلاسی ایجاد کردهاید که پیامهایی را به خوشه کافکا شما تولید میکند و آن را برای بازیابی اطلاعات سوابق ارسالی گسترش میدهد.