مقدمه
الگوی Pub/Sub یک الگوی پیام رسانی یک طرفه همه کاره است که در آن یک ناشر داده/پیام تولید می کند و مشترک برای دریافت انواع خاصی از پیام ها ثبت نام می کند. می توان آن را با استفاده از معماری همتا به همتا یا یک واسطه پیام برای میانجیگری ارتباطات پیاده سازی کرد.
تصویر بالا مدل Peer-to-Peer Pub/Sub را نشان میدهد، جایی که یک ناشر بدون واسطه پیامها را مستقیماً برای مشترکین ارسال میکند. مشترکین برای دریافت پیام باید آدرس یا نقطه پایانی ناشر را بدانند.
در تصویر بالا، مدل Pub/Sub از یک واسطه پیام به عنوان مرکز مرکزی برای ارسال پیام بین ناشران و مشترکین استفاده می کند. کارگزار در تبادل پیام میانجیگری می کند و پیام های ناشران را بین مشترکین توزیع می کند. گره های مشترک به جای ناشر مستقیماً در کارگزار مشترک می شوند. حضور یک کارگزار جدا شدن گره های سیستم را بهبود می بخشد زیرا هم ناشر و هم مشترکین فقط با کارگزار تعامل دارند. در این آموزش، یک برنامه چت بلادرنگ برای نشان دادن بیشتر این الگو خواهید ساخت.
پیش نیازها
- Node.js (نسخه >= 12) روی سیستم عامل شما نصب شده است.
- یک ویرایشگر کد مانند VSCode
- Redis روی دستگاه شما نصب شده است
- درک اولیه HTML، DOM، VanillaJS و WebSocket.
مرحله 1 – پیاده سازی سمت سرور
برای شروع اجرای سمت سرور، یک برنامه اصلی Nodejs را با استفاده از دستور اولیه اولیه می کنیم:
npm init -y
دستور بالا یک فایل package.json پیش فرض ایجاد می کند.
در مرحله بعد، بسته وابستگی WebSocket (ws) را که در طول کل دوره این ساخت مورد نیاز است نصب می کنیم:
npm install ws
پیاده سازی سمت سرور یک برنامه چت ساده سمت سرور خواهد بود. روند کار زیر را دنبال می کنیم:
- یک سرور راه اندازی کنید
- فایل HTML را بخوانید تا در مرورگر رندر شود
- یک اتصال WebSocket را راه اندازی کنید.
راه اندازی سرور
یک فایل به نام app.js در دایرکتوری خود ایجاد کنید و کد زیر را داخل آن قرار دهید:
const http = require("http");
const server = http.createServer((req, res) => {
res.end("Hello Chat App");
});
const PORT = 3459;
server.listen(PORT, () => {
console.log(`Server up and running on port ${PORT}`);
});
متد createServer
در ماژول http
داخلی Node.js
برای راه اندازی سرور استفاده خواهد شد. پورتی که در آن سرور باید به درخواستها گوش دهد تنظیم شد و روش گوش دادن در نمونه سرور ایجاد شده برای گوش دادن به درخواستهای ورودی در پورت مشخص شده فراخوانی شد.
دستور node app.js
را در ترمینال خود اجرا کنید و باید پاسخی مانند این داشته باشید:
OutputServer is up and running on port 3459
اگر از این پورت در مرورگر خود درخواست می کنید، باید چیزی شبیه به این را به عنوان پاسخ خود داشته باشید:
فایل HTML را بخوانید تا در مرورگر رندر شود
یک فایل به نام index.html در پوشه اصلی ایجاد کنید و کد زیر را کپی کنید:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Document</title>
</head>
<body>
<p>Serving HTML file</p>
</body>
</html>
این یک فایل html اولیه است که Hello را ارائه می دهد. اکنون، ما باید این فایل را بخوانیم و هر زمان که درخواست HTTP به سرور ما ارسال شد، آن را به عنوان پاسخ ارائه کنیم.
// app.js
const server = http.createServer((req, res) => {
const htmlFilePath = path.join(__dirname, "index.html");
fs.readFile(htmlFilePath, (err, data) => {
if (err) {
res.writeHead(500);
res.end("Error occured while reading file");
}
res.writeHead(200, { "Content-Type": "text/html" });
res.end(data);
});
});
در اینجا، ما از ماژول مسیر داخلی و تابع join
برای الحاق بخش های مسیر به یکدیگر استفاده می کنیم. سپس از تابع readFile
برای خواندن فایل index.html
به صورت ناهمزمان استفاده می شود. به دو آرگومان نیاز دارد: مسیر فایل برای خواندن و یک بازخوانی. کد وضعیت 500
به سربرگ پاسخ ارسال می شود و پیام خطا به مشتری ارسال می شود. اگر داده ها با موفقیت خوانده شوند، یک کد وضعیت موفقیت 200
را به سربرگ پاسخ ارسال می کنیم و داده های پاسخ را برای مشتری که در این حالت محتوای فایل است، ارسال می کنیم. اگر هیچ کدگذاری مشخص نشده باشد، مانند رمزگذاری UTF-8، بافر خام برگردانده می شود. در غیر این صورت فایل HTML
برگردانده می شود.
یک درخواست از سرور در مرورگر خود داشته باشید، و باید این را داشته باشید:
راه اندازی اتصال WebSocket
// app.js
const webSocketServer = new WebSocket.Server({ server });
webSocketServer.on("connection", (client) => {
console.log("successfully connected to the client");
client.on("message", (streamMessage) => {
console.log("message", streamMessage);
distributeClientMessages(streamMessage);
});
});
const distributeClientMessages = (message) => {
for (const client of webSocketServer.clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
}
};
در خط قبلی کد، یک سرور WebSocket جدید به نام webSocketServer
ایجاد می کنیم و آن را به سرور HTTP
موجود خود متصل می کنیم. این به ما امکان می دهد هم درخواست های استاندارد HTTP و هم اتصالات WebSocket را در یک پورت 3459 مدیریت کنیم.
رویداد اتصال on() زمانی فعال می شود که یک اتصال WebSocket موفق برقرار شود. کلاینت در تابع callback
یک شی اتصال WebSocket است که نشان دهنده اتصال به مشتری است. از آن برای ارسال و دریافت پیام و گوش دادن به رویدادهایی مانند پیام مشتری استفاده می شود.
تابع distrubuteClientMessages در اینجا برای ارسال پیام های دریافتی به همه مشتریان متصل استفاده می شود. یک آرگومان پیام را می گیرد و روی کلاینت های متصل به سرور ما تکرار می شود. سپس وضعیت اتصال هر کلاینت را بررسی می کند (readyState === WebSocket.OPEN). این برای اطمینان از اینکه سرور فقط به مشتریانی که اتصالات باز دارند پیام ارسال می کند. اگر اتصال کلاینت باز باشد، سرور با استفاده از روش client.send(message) پیام را برای آن کلاینت ارسال می کند.
مرحله 2 – پیاده سازی سمت مشتری
برای اجرای سمت کلاینت، فایل index.html
خود را کمی تغییر می دهیم.
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <title>Document</title> </head> <body> <p>Pub/Sub Pattern with Chat Messaging</p> <div id="messageContainer"></div> <form id="messageForm"> <form id="messageForm"> <input type="text" id="messageText" placeholder="Send a message" style=" padding: 10px; margin: 5px; border-radius: 5px; border: 1px solid #ccc; outline: none; " onfocus="this.style.borderColor='#007bff';" onblur="this.style.borderColor='#ccc';" /> <input type="button" value="Send Message" style=" padding: 10px; margin: 5px; border-radius: 5px; background-color: #007bff; color: white; border: none; cursor: pointer; " onmouseover="this.style.backgroundColor='#0056b3';" onmouseout="this.style.backgroundColor='#007bff';" /> </form> </form> <script> const url = window.location.host; const socket = new WebSocket(`ws://${url}`); </script> </body> </html>
در این قطعه کد، یک عنصر فرم اضافه کردیم که دارای ورودی و دکمه ای برای ارسال پیام است. اتصالات WebSocket توسط کلاینتها آغاز میشوند و برای برقراری ارتباط با یک سرور دارای WebSocket که در ابتدا راهاندازی کردهایم، باید نمونهای از شی WebSocket ایجاد کنیم که ws://url را مشخص کند که سروری را که میخواهیم استفاده کنیم را مشخص میکند. متغیر URL، هنگامی که وارد سیستم شوید، اتصال URL به پورت 3459 را خواهد داشت که سرور ما در حال گوش دادن است.
// app.js
console.log("url", url); // localhost:3459
console.log("socket", socket); // { url: "ws://localhost:3459/", readyState: 0, bufferedAmount: 0, onopen: null, onerror: null, onclose: null, extensions: "", protocol: "", onmessage: null, binaryType: "blob" }
بنابراین، هنگامی که در مرورگر خود درخواست را به سرور وارد می کنید، باید این را ببینید:
بیایید اسکریپت خود را ارتقا دهیم تا بتوانیم از مشتری به سرور پیام بفرستیم و از سرور پیام دریافت کنیم.
// index.html
<script>
const url = window.location.host;
const socket = new WebSocket(`ws://${url}`);
const messageContainer = document.getElementById("messageContainer");
socket.onmessage = function (eventMessage) {
eventMessage.data.text().then((text) => {
const messageContent = document.createElement("p");
messageContent.innerHTML = text;
document
.getElementById("messageContainer")
.appendChild(messageContent);
});
};
const form = document.getElementById("messageForm");
form.addEventListener("submit", (event) => {
event.preventDefault();
const message = document.getElementById("messageText").value;
socket.send(message);
document.getElementById("messageText").value = "";
});
</script>
همانطور که قبلا ذکر شد، ما URLی را که درخواستی را به سرور ما ارسال می کند از سمت مشتری (مرورگر) بازیابی می کنیم و یک نمونه شی WebSocket جدید با URL ایجاد می کنیم. سپس با کلیک روی دکمه Send Message
یک رویداد در عنصر فرم ایجاد می کنیم. متن وارد شده توسط کاربر در رابط کاربری از عنصر ورودی استخراج می شود و روش ارسال در نمونه سوکت برای ارسال پیام به سرور فراخوانی می شود.
رویداد onmessage
که بر روی شی سوکت فراخوانی می شود، هنگامی که پیامی از سرور دریافت می شود، راه اندازی می شود. این برای به روز رسانی رابط کاربری یک پیام دریافتی استفاده می شود. پارامتر eventMessage
در تابع callback
داده (پیام) ارسال شده از سرور را دارد، اما به صورت Blob برمی گردد. سپس از متد ()text بر روی دادههای Blob استفاده میشود، که یک وعده را برمیگرداند و با استفاده از then() برای دریافت متن واقعی از سرور حل میشود.
بیایید آنچه را که داریم آزمایش کنیم. سرور را با اجرا راه اندازی کنید
node app.js
سپس، دو تب مختلف مرورگر را باز کنید، http://localhost:3459/
را باز کنید و سعی کنید برای آزمایش، پیامهایی را بین برگهها ارسال کنید:
مرحله 3 – مقیاس کردن برنامه
فرض کنید برنامه ما شروع به رشد می کند، و سعی می کنیم با داشتن چندین نمونه از سرور چت خود، آن را مقیاس بندی کنیم. چیزی که میخواهیم به آن برسیم این است که دو کاربر مختلف که به دو سرور مختلف متصل هستند باید بتوانند پیامهای متنی را با موفقیت به یکدیگر ارسال کنند. در حال حاضر ما فقط یک سرور داریم و اگر سرور دیگری را درخواست کنیم، مثلاً http://localhost:3460/
، پیام های سرور در پورت 3459
را نخواهیم داشت. یعنی فقط کاربران متصل به 3460
می توانند با خودشان چت کنند. پیاده سازی فعلی به گونه ای کار می کند که وقتی یک پیام چت بر روی نمونه سرور در حال کار ما ارسال می شود، پیام به صورت محلی فقط برای مشتریان متصل به آن سرور خاص توزیع می شود، همانطور که نشان داده شده است زمانی که http://localhost:3459/
را روی دو باز می کنیم. مرورگرهای مختلف حال، بیایید ببینیم چگونه میتوانیم دو سرور مختلف آنها را ادغام کنند تا بتوانند با یکدیگر صحبت کنند.
مرحله 4 – Redis به عنوان یک کارگزار پیام
Redis یک ذخیره ساز ساختار داده در حافظه سریع و انعطاف پذیر است. اغلب به عنوان یک پایگاه داده یا یک سرور کش برای ذخیره داده ها استفاده می شود. علاوه بر این، می توان از آن برای اجرای یک الگوی تبادل پیام متمرکز Pub/Sub استفاده کرد. سرعت و انعطاف پذیری Redis آن را به یک انتخاب بسیار محبوب برای اشتراک گذاری داده ها در یک سیستم توزیع شده تبدیل کرده است.
هدف در اینجا ادغام سرورهای چت ما با استفاده از Redis به عنوان واسطه پیام است. هر نمونه سرور هر پیام دریافتی از مشتری (مرورگر) را به طور همزمان برای کارگزار پیام منتشر می کند. کارگزار پیام برای هر پیامی که از نمونه های سرور ارسال می شود مشترک می شود.
بیایید فایل app.js
خود را تغییر دهیم:
//app.js
const http = require("http");
const fs = require("fs");
const path = require("path");
const WebSocket = require("ws");
const Redis = require("ioredis");
const redisPublisher = new Redis();
const redisSubscriber = new Redis();
const server = http.createServer((req, res) => {
const htmlFilePath = path.join(__dirname, "index.html");
fs.readFile(htmlFilePath, (err, data) => {
if (err) {
res.writeHead(500);
res.end("Error occured while reading file");
}
res.writeHead(200, { "Content-Type": "text/html" });
res.end(data);
});
});
const webSocketServer = new WebSocket.Server({ server });
webSocketServer.on("connection", (client) => {
console.log("succesfully connected to the client");
client.on("message", (streamMessage) => {
redisPublisher.publish("chat_messages", streamMessage);
});
});
redisSubscriber.subscribe("chat_messages");
console.log("sub", redisSubscriber.subscribe("messages"));
redisSubscriber.on("message", (channel, message) => {
console.log("redis", channel, message);
for (const client of webSocketServer.clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
}
});
const PORT = process.argv[2] || 3459;
server.listen(PORT, () => {
console.log(`Server up and running on port ${PORT}`);
});
در اینجا، ما از قابلیتهای انتشار/اشتراک Redis
استفاده میکنیم. دو نمونه اتصال مختلف، یک بار برای انتشار پیام ها و دیگری برای اشتراک در یک کانال، ایجاد شد. هنگامی که پیامی از مشتری ارسال می شود، آن را با استفاده از روش ناشر در نمونه redisPublisher در یک کانال Redis به نام "chat_messages"
منتشر می کنیم. روش subscribe در نمونه redisSubscribe فراخوانی می شود تا در همان کانال chat_message مشترک شوید. هر زمان که پیامی در این کانال منتشر می شود، شنونده رویداد redisSubscriber.on فعال می شود. این شنونده رویداد روی تمام کلاینتهای WebSocket متصل فعلی تکرار میشود و پیام دریافتی را برای هر مشتری ارسال میکند. این برای اطمینان از این است که وقتی یک کاربر پیامی را ارسال می کند، همه کاربران دیگر متصل به هر نمونه سرور آن پیام را در زمان واقعی دریافت می کنند.
اگر دو سرور مختلف راه اندازی می کنید، بگویید:
node app.js 3459
node app.js 3460
وقتی متن چت روی یک نمونه ارسال میشود، اکنون میتوانیم پیامها را در سرورهای متصل خود به جای تنها یک سرور خاص پخش کنیم. میتوانید این را با اجرای http://localhost:3459/
و http://localhost:3460/
آزمایش کنید، سپس چتهایی را بین آنها ارسال کنید و ببینید که پیامها در زمان واقعی در دو سرور پخش میشوند.
می توانید پیام های منتشر شده در یک کانال را از redis-cli
نظارت کنید و همچنین برای دریافت پیام های مشترک در کانال مشترک شوید:
دستور redis-cli
را اجرا کنید. سپس وارد MONITOR
شوید. به مرورگر خود برگردید و چت را شروع کنید. در ترمینال خود، باید چیزی شبیه به این را ببینید، با فرض اینکه یک متن چت Wow ارسال کنید:
برای مشاهده پیام های منتشر شده مشترک، همان دستور redis-cli
را اجرا کنید و SUBSCRIBE channelName
را وارد کنید. channelName در مورد ما chat_messages
خواهد بود. اگر یک متن ارسال کنید باید چیزی شبیه به این را در ترمینال خود داشته باشید: عالی از مرورگر:
اکنون، ما میتوانیم چندین نمونه از سرور خود را در پورتهای مختلف یا حتی ماشینهای مختلف اجرا کنیم، و تا زمانی که آنها در یک کانال Redis مشترک شوند، میتوانند پیامها را برای همه مشتریان متصل دریافت و پخش کنند، و اطمینان حاصل شود که کاربران میتوانند به طور یکپارچه در بین نمونهها چت کنند.
به خاطر دارید که در بخش مقدمه درباره اجرای الگوی Pub/Sub با استفاده از یک واسطه پیام صحبت کردیم؟ این مثال کاملاً آن را خلاصه می کند.
در شکل بالا 2 کلاینت (مرورگر) مختلف به سرورهای چت متصل هستند. سرورهای چت نه به طور مستقیم، بلکه از طریق یک نمونه Redis به هم متصل هستند. این بدان معنی است که در حالی که آنها اتصالات مشتری را به طور مستقل مدیریت می کنند، اطلاعات (پیام های چت) را از طریق یک رسانه مشترک (Redis) به اشتراک می گذارند. هر سرور چت بالا به Redis متصل می شود. این اتصال برای انتشار پیامها به Redis و اشتراک در کانالهای Redis برای دریافت پیامها استفاده میشود. هنگامی که کاربر پیامی را ارسال می کند، سرور چت آن را در کانال مشخص شده در Redis منتشر می کند.
هنگامی که Redis یک پیام منتشر شده را دریافت می کند، این پیام را برای همه سرورهای چت مشترک پخش می کند. سپس هر سرور چت پیام را به همه کلاینتهای متصل ارسال میکند و اطمینان حاصل میکند که هر کاربر پیامهای ارسال شده توسط هر کاربر را دریافت میکند، صرف نظر از اینکه به کدام سرور متصل است.
این معماری به ما این امکان را می دهد که برنامه چت خود را به صورت افقی با افزودن نمونه های بیشتر سرور در صورت نیاز، مقیاس کنیم. به لطف قابلیتهای سیستم انتشار/اشتراک Redis، که توزیع یکنواخت پیام را در همه نمونهها تضمین میکند، هر نمونه میتواند مجموعهای از مشتریان متصل خود را مدیریت کند. این راه اندازی برای مدیریت تعداد زیادی کاربر همزمان کارآمد است و در دسترس بودن برنامه شما را تضمین می کند.
نتیجه
در این آموزش، ضمن ایجاد یک برنامه چت ساده برای نشان دادن این الگو، با استفاده از Redis به عنوان واسطه پیام، الگوی Publish/Subscribe را یاد گرفتیم. مرحله بعدی یادگیری نحوه پیاده سازی یک سیستم پیام رسانی همتا به همتا در مواردی است که کارگزار پیام ممکن است بهترین راه حل نباشد، به عنوان مثال، در سیستم های پیچیده توزیع شده که در آن یک نقطه شکست واحد (Broker) یک گزینه نیست.