چگونه یک تولیدکننده کافکا را برای منبع داده از طریق CLI تنظیم کنیم

مقدمه

آپاچی کافکا اسکریپت های پوسته ای را برای تولید و مصرف پیام های متنی اولیه به و از یک خوشه کافکا فراهم می کند. در حالی که آنها برای کاوش و آزمایش مفید هستند، برنامه های کاربردی دنیای واقعی به صورت برنامه نویسی به کافکا دسترسی دارند. برای این منظور، کافکا کتابخانه های مشتری زیادی را برای زبان ها و محیط های برنامه نویسی پرکاربرد ارائه می دهد. در این آموزش، یک برنامه جاوا ایجاد می کنید که داده ها را در یک موضوع کافکا تولید می کند. شما با استفاده از Apache Maven که ابزاری برای ساخت و بسته بندی پروژه های جاوا است، یک پروژه جاوا ایجاد می کنید و کتابخانه مشتری کافکا را به عنوان یک وابستگی اضافه می کنید. سپس، کلاسی را پیاده‌سازی می‌کنید که با تولید پیام‌ها و بازیابی ابرداده‌های درون خوشه‌ای درباره آن‌ها، از مشتری کافکا استفاده می‌کند.

پیش نیازها
  • دستگاهی با حداقل 4 گیگابایت رم و 2 سی پی یو
  • کیت توسعه جاوا (JDK) 8 یا بالاتر روی Droplet یا دستگاه محلی شما نصب شده است
  • آپاچی کافکا روی Droplet یا ماشین محلی شما نصب و پیکربندی شده است
  • آشنایی با طرح دایرکتوری استاندارد پروژه های جاوا

مرحله 1 – ایجاد یک پروژه Maven

در این مرحله، Apache Maven را نصب می‌کنید و از آن برای ایجاد پروژه‌ای استفاده می‌کنید که از آن برای ارتباط با کافکا استفاده می‌کنید. در اوبونتو، Maven به راحتی در مخازن رسمی موجود است.

ابتدا لیست بسته های موجود خود را با اجرای:

sudo apt update

برای نصب آن دستور زیر را اجرا کنید:

sudo apt install maven

با خواندن شماره نسخه آن، تأیید کنید که نصب شده است:

mvn --version

خروجی بسته به نسخه جاوا و پلتفرم شما مشابه موارد زیر خواهد بود:

OutputApache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"

در مرحله بعد، دایرکتوری ایجاد کنید که در آن پروژه های جاوا خود را برای کار با کافکا ذخیره کنید:

mkdir ~/kafka-projects

به دایرکتوری تازه ایجاد شده بروید:

cd ~/kafka-projects

سپس با اجرای زیر یک پروژه جاوا خالی تولید کنید:

mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false

در اینجا، به Maven دستور می دهید که پروژه جدیدی به نام dokafka با شناسه گروهی com.dokafka تولید کند. شناسه گروه به طور منحصر به فرد این پروژه را در سراسر اکوسیستم Maven شناسایی می کند. این پروژه بر اساس کهن الگوی maven-archetype-quickstart ایجاد خواهد شد، که Maven الگوها را به این شکل فراخوانی می کند.

خروجی های زیادی وجود خواهد داشت، به خصوص اگر این اولین باری باشد که Maven اجرا می شود. انتهای خروجی به شکل زیر خواهد بود:

Output...
INFO] Generating project in Batch mode
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: packageInPathFormat, Value: com/dokafka
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.537 s
[INFO] Finished at: 2024-03-10T10:43:07Z
[INFO] ------------------------------------------------------------------------

Maven بسته های جاوا لازم را از مخزن مرکزی خود دانلود کرده و پروژه dokafka را با استفاده از الگوی maven-archetype-quickstart ایجاد کرده است.

با اجرای زیر به دایرکتوری پروژه بروید:

cd dokafka

ساختار پروژه به شکل زیر است:

├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── dokafka
│ └── App.java
└── test
└── java
└── com
└── dokafka
└── AppTest.java

به عنوان بخشی از پیش نیازها، شما در مورد ساختار استاندارد پروژه Maven که در اینجا مشاهده می کنید، یاد گرفتید. دایرکتوری src/main/java کد منبع پروژه را نگه می‌دارد، src/test/java حاوی منابع آزمایشی است و pom.xml در ریشه پروژه، فایل پیکربندی اصلی Maven است.

این پروژه تنها حاوی یک فایل منبع، App.java است. محتویات آن را نشان دهید تا ببینید Maven چه چیزی تولید کرده است:

cat src/main/java/com/dokafka/App.java

خروجی خواهد بود:

package com.dokafka;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}

برای اجرای این کد، ابتدا باید پروژه را با اجرای زیر بسازید:

mvn package

Maven کد را کامپایل کرده و آن را در یک فایل JAR بسته بندی می کند تا اجرا شود. انتهای خروجی به این صورت خواهد بود که به معنای تکمیل آن است:

Output...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.830 s
[INFO] Finished at: 2024-03-10T10:41:24Z
[INFO] ------------------------------------------------------------------------

Maven فایل JAR به دست آمده را زیر فهرست هدف قرار داد. برای اجرای کلاس App که به تازگی ساخته اید، دستور زیر را اجرا کنید و شناسه کامل کلاس را وارد کنید:

java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.App

خروجی خواهد بود:

OutputHello World!

شما Maven را نصب کرده اید و یک پروژه جاوا خالی ایجاد کرده اید. در مرحله بعد وابستگی های لازم را برای کافکا اضافه خواهید کرد.

مرحله 2 – افزودن وابستگی های Maven

اکنون کلاینت جاوا کافکا و همچنین وابستگی های دیگر را برای لاگ به پروژه خود اضافه خواهید کرد. همچنین می‌توانید Maven را طوری پیکربندی کنید که این وابستگی‌ها را در طول بسته‌بندی لحاظ کند. ابتدا، وابستگی مشتریان کافکا را اضافه می کنید. به صفحه مخزن Maven برای مشتری جاوا در مرورگر خود بروید و آخرین نسخه موجود را انتخاب کنید، سپس قطعه XML ارائه شده را برای Maven کپی کنید. در زمان نگارش، آخرین نسخه کتابخانه کلاینت جاوا 3.7.0 بود.

وابستگی ها به pom.xml در ریشه پروژه شما اضافه می شوند. آن را برای ویرایش باز کنید:

nano pom.xml

بخش <dependencies> را پیدا کنید و تعریف وابستگی را اضافه کنید:

...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
...
</dependencies>

این کار کتابخانه مشتری کافکا را در اختیار پروژه شما قرار می دهد. با این حال، خود کتابخانه به دو وابستگی دیگر نیاز دارد که باید به صورت دستی اضافه کنید. آنها از کتابخانه SLF4J که برای ثبت پیام‌ها استفاده می‌کند، سرچشمه می‌گیرند، زیرا از بسیاری از کتابخانه‌های ورود به سیستم پشتیبانی می‌کند و به توسعه‌دهنده اجازه می‌دهد تا در مورد نحوه پردازش پیام‌های گزارش انعطاف‌پذیر باشد. دو وابستگی که باید اضافه کنید عبارتند از:

  • slf4j-api که خود کتابخانه است
  • slf4j-simple، که لاگ ها را پردازش کرده و به ترمینال خروجی می دهد

هنگامی که وابستگی ها را تعریف کردید، باید آنها را در کنار JAR نهایی ساخته شده در دسترس قرار دهید. بخش &lt;build> pom.xml را پیدا کنید و خطوط برجسته را اضافه کنید:

...
<build>
<pluginManagement>
<plugins>
...
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...

در اینجا، افزونه maven-dependency-plugin را برای کپی کردن همه وابستگی ها در زمان بسته بندی پیکربندی می کنید. فایل‌های JAR وابستگی‌ها، در این پیکربندی پروژه، تحت target/lib قرار خواهند گرفت. توجه داشته باشید که نباید بخش &lt;plugins> موجود را در <pluginManagement&gt; تغییر دهید. وقتی کارتان تمام شد، فایل را ذخیره و ببندید.

پروژه را بسازید تا مطمئن شوید همه چیز به درستی پیکربندی شده است:

mvn package

انتهای خروجی باید مشابه این باشد:

Output...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka ---
[INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka ---
[INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar
[INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar
[INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar
[INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar
[INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar
[INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar
[INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar
[INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.205 s
[INFO] Finished at: 2024-03-12T06:36:34Z
[INFO] ------------------------------------------------------------------------

می‌توانید فایل‌ها را در زیر target/lib فهرست کنید تا مطمئن شوید که وابستگی‌ها واقعاً کپی شده‌اند:

Outputhamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar
junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jar

شما وابستگی های لازم را به پروژه Maven خود اضافه کرده اید. اکنون می خواهید به کافکا متصل شوید و پیام ها را به صورت برنامه ریزی شده تولید کنید.

مرحله 3 – ایجاد یک تولید کننده کافکا در جاوا

در این مرحله، یک تولید کننده کافکا در جاوا راه اندازی می کنید و برای یک موضوع پیام می نویسید.

مطابق ساختار پروژه، کد منبع در زیر src/main/java/com/dokafka ذخیره می‌شود. از آنجایی که برای بقیه آموزش نیازی به App.java نخواهید داشت، با اجرای زیر آن را حذف کنید:

rm src/main/java/com/dokafka/App.java

شما کد سازنده را در کلاسی به نام ProducerDemo ذخیره خواهید کرد. فایل همراه را برای ویرایش ایجاد و باز کنید:

nano src/main/java/com/dokafka/ProducerDemo.java

خطوط زیر را اضافه کنید:

package com.dokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, "Hello World!");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}

ابتدا کلاس ProducerDemo را تعریف می کنید، کلاس های استفاده شده را وارد می کنید و یک Logger ایجاد می کنید. در روش اصلی ابتدا آدرس خوشه کافکا (bootstrapServers) و نام موضوع تولید پیام (topicName) را اعلام می کنید.

سپس، یک شیء Properties را نمونه‌سازی می‌کنید، که شبیه به یک فرهنگ لغت کلید-مقدار است و پیکربندی را برای عملکرد تولیدکننده کافکا شما نگه می‌دارد. شما ویژگی BOOTSTRAP_SERVERS_CONFIG را با آدرس خوشه کافکا پر می کنید. همچنین ورودی‌های KEY_SERIALIZER_CLASS_CONFIG و VALUE_SERIALIZER_CLASS_CONFIG را روی StringSerializer.class.getName () تنظیم کنید.

این ویژگی ها مشخص می کنند که کدام سریال سازها باید برای پردازش کلیدها و مقادیر پیام های تولید شده استفاده شوند. سریال سازها کلاس هایی هستند که ورودی را می پذیرند و آرایه ای از بایت ها را به عنوان خروجی پس می دهند و آماده انتقال از طریق شبکه هستند. Deserializers برعکس عمل می کنند و شی اصلی را از جریان بایت ها بازسازی می کنند. در اینجا، هم کلید و هم مقدار با استفاده از StringSerializer داخلی به‌صورت رشته‌ها سریال‌سازی می‌شوند.

در مرحله بعد، شما یک KafkaProducer را اعلام و نمونه‌سازی می‌کنید:

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

سازنده کلیدها و مقادیری از نوع String را با خصوصیات همراه برای پیکربندی می پذیرد. برای ارسال پیام به یک موضوع، KafkaProducer یک ProducerRecord را می پذیرد که با نام موضوع و خود پیام که Hello World! است، نمونه برداری می کنید. توجه داشته باشید که خود سازنده به موضوع خاصی وابسته نیست.

پس از ارسال پیام، تولید کننده را فلاش کرده و می بندید. فراخوانی producer.send() ناهمزمان است، به این معنی که جریان کنترل به متد اصلی باز می گردد در حالی که پیام در رشته دیگری ارسال می شود. از آنجایی که این برنامه نمونه می‌خواهد بعد از آن خارج شود، شما تولیدکننده را مجبور می‌کنید تا با فلاشینگ، هر آنچه را که باقی مانده است، ارسال کند. سپس، آن را می‌بندید () و به کافکا نشان می‌دهد که سازنده در حال نابودی است.

در مرحله بعد، یک اسکریپت ایجاد خواهید کرد که ساختمان و اجرای ProducerDemo را مدیریت می کند. شما آن را در فایلی به نام run-producer.sh ذخیره خواهید کرد. آن را برای ویرایش ایجاد و باز کنید:

nano run-producer.sh

خطوط زیر را اضافه کنید:

#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo

وقتی کارتان تمام شد، فایل را ذخیره و ببندید. قسمت برجسته مشخص می کند که وابستگی ها در کجا قرار دارند.

سپس آن را به عنوان اجرایی علامت بزنید:

chmod +x run-producer.sh

در نهایت، سعی کنید یک Hello World را تولید کنید! با اجرای آن پیام دهید:

./run-producer.sh

خروجی طولانی خواهد بود و انتهای آن باید به این صورت باشد:

Output...
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered

KafkaProducer ثبت شد که با موفقیت ایجاد شد و بعداً ثبت نشد. اکنون پیام در موضوع java_demo نوشته شده است و می توانید آن را با استفاده از اسکریپت kafka-console-consumer.sh بازیابی کنید.

در یک پوسته جداگانه، به دایرکتوری نصب کافکا خود بروید و دستور زیر را برای خواندن موضوع اجرا کنید:

bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092

خروجی خواهد بود:

OutputHello World!

برای خروج می توانید CTRL+C را فشار دهید.

در این مرحله، به صورت برنامه‌نویسی، پیامی را در موضوع java_demo تولید کرده‌اید و با استفاده از اسکریپت bash ارائه‌شده توسط کافکا، آن را دوباره بخوانید. اکنون می آموزید که چگونه از اطلاعاتی که کافکا پس از ارسال موفقیت آمیز پیام برمی گرداند، استفاده کنید.

مرحله 4 – بازیابی متادیتا با استفاده از Callbacks

متد send() KafkaProducer تماس‌های برگشتی را می‌پذیرد، که به شما امکان می‌دهد بر اساس رویدادهایی که رخ می‌دهند، مانند زمانی که رکورد دریافت می‌شود، عمل کنید. این برای بازیابی اطلاعات در مورد نحوه مدیریت رکورد توسط خوشه مفید است.

برای گسترش تماس send() با یک callback، ابتدا ProducerDemo را برای ویرایش باز کنید:

nano src/main/java/com/dokafka/ProducerDemo.java

کد را به شکل زیر تغییر دهید:

...
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("An error occurred!", e);
return;
}
log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
recordMetadata.timestamp(),
recordMetadata.partition(),
recordMetadata.offset()));
}
});
...

اکنون یک پیاده سازی از واسط Callback را به متد send() منتقل می کنید و onCompletion() را پیاده سازی می کنید که RecordMetadata و به صورت اختیاری یک Exception را دریافت می کند. سپس، اگر خطایی رخ داد، آن را ثبت کنید. در غیر این صورت، مُهر زمانی، شماره پارتیشن و افست رکورد را که اکنون در خوشه است ثبت می‌کنید. از آنجایی که ارسال پیام از این طریق ناهمزمان است، زمانی که خوشه رکورد را بپذیرد، کد شما فراخوانی می‌شود، بدون اینکه شما صریحاً منتظر بمانید تا این اتفاق بیفتد.

وقتی کارتان تمام شد، فایل را ذخیره و ببندید، سپس سازنده را اجرا کنید:

./run-producer.sh

به یک پیام جدید در انتهای خروجی توجه کنید:

Output...
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2
...

پیامی که به تازگی تولید شده بود توسط خوشه پذیرفته شد و در پارتیشن 0 ذخیره شد.

اگر دوباره آن را اجرا کنید، متوجه خواهید شد که افست یک بزرگتر است، که نشان دهنده مکان پیام در پارتیشن است:

Output[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3

نتیجه

در این مقاله، شما یک پروژه جاوا را با استفاده از Maven ایجاد کرده‌اید و آن را به وابستگی‌هایی برای ارتباط با کافکا مجهز کرده‌اید. سپس، کلاسی ایجاد کرده‌اید که پیام‌هایی را به خوشه کافکا شما تولید می‌کند و آن را برای بازیابی اطلاعات سوابق ارسالی گسترش می‌دهد.

[تعداد: 1   میانگین: 5/5]
دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

شاید دوست داشته باشید