Leveraging NATS for Distributed Microservices: Implementing Event-Driven Architecture

Table of Contents

The Challenges of Distributed Microservices

As organisations scale their applications, they often encounter several challenges:

  • Scalability: As the number of services grows, so does the complexity of managing communication between them.
  • Reliability: Ensuring that messages are delivered reliably and in the correct order can be difficult.
  • Security: Protecting data in transit and at rest is crucial, especially in multi-tenant environments.
  • Integration: Seamlessly integrating with existing systems and third-party services can be cumbersome.

What makes nats suitable for modern asyncronous systems?

NATS is a powerful tool designed to address these challenges, making it suitable for modern asynchronous systems. Here’s how NATS stands out:

  • Seamless Scaling: NATS clients do not need to change when deployments change. Whether you’re adding or removing backend nodes, NATS scales effortlessly from a single process to a global super-cluster with leaf node servers.
  • Simple Integration: With a straightforward API and minimal configuration, NATS is easy to integrate into existing systems.
  • Flexible Delivery: NATS offers various delivery guarantees, including “at most once,” “at least once,” and “exactly once” through JetStream.
  • Robust Security: NATS supports TLS, credentials, username/password, and token-based authentication. JetStream also provides encryption at rest.
  • Reliability & Availability with persistent Messaging: Supports memory and file persistence, allowing messages to be replayed by time, count, or sequence number. Durable subscriptions ensure reliability.
  • Third-Party Integrations: NATS supports WebSockets, Kafka, IBM MQ, Redis, Apache Spark, Apache Flink, and more.

Use Cases for NATS

NATS is versatile and can be used in various scenarios, including:

  • Cloud Messaging: Ideal for microservices and service mesh architectures.
  • IoT and Edge: Efficiently handles telemetry, sensor data, and command/control operations.
  • Event/Data Streaming: Supports real-time data processing and analytics.

Common Architecture of an Event-Based Data System

Hands-On with NATS

To get started with NATS, you can use Docker to run a NATS server. The provided nats-docker.yml file sets up a NATS server with JetStream enabled for message persistence.

version: '3.8'
services:
  nats:
    image: nats:2.10.22
    ports:
      - "4222:4222"  # NATS client port
      - "8222:8222"  # NATS monitoring port
    environment:
      - NATS_SERVER_NAME=nats-server
    command: ["-js"]  # Enable JetStream if needed
    networks:
      - nats_network
    volumes:
      - nats_data:/data  # Mount the volume to persist data
networks:
  nats_network:
    driver: bridge
volumes:
  nats_data:
    driver: local 

Connecting to NATS

The NatsConnectionManager class in the codebase handles the connection to the NATS server. It initializes a connection and sets up a JetStream context for managing streams and subjects.

import asyncio
import nats
import os
from constants import *

class NatsConnectionManager:
stream_name = “clientA_sensor”
subject_faulty = “sensors.faulty”
subject_metadata = “sensors.metadata”
subject_value = “sensors.value”
subject_sos = “sensors.sos”
sensor_consumer = “rule_executor”

def __init__(self):
    self.nc = None
    self.js = None
    self.psub = None
async def init(self, subject, stream, durable=""):
    nats_host = "0.0.0.0:4222"
    self.nc = await nats.connect(f"nats://{nats_host}", connect_timeout=3, max_reconnect_attempts=2)
    # Create JetStream context.
    self.js = self.nc.jetstream()
    await self.js.add_stream(
        name=self.stream_name,
        subjects=[
            self.subject_faulty,
            self.subject_metadata,
            self.subject_value,
            self.subject_sos,
        ]
    )
    self.psub = await self.js.pull_subscribe(subject=subject, stream=self.stream_name, durable=self.sensor_consumer)

Publishing and Subscribing to Messages

async def publish_message(subject, data):
    # nats_host = os.environ.get("NATS_SERVER")
    nats_host = "0.0.0.0:4222"
    nc = await nats.connect(f"nats://{nats_host}", connect_timeout=3, max_reconnect_attempts=2)
    js = nc.jetstream()
    ack = await js.publish(subject, data)
    await nc.close()

Processing Events

import asyncio, json
from NatsManager import NatsConnectionManager, publish_message
from constants import *
from RuleManager import *
from datetime import datetime
class MessageProcessor:
    def __init__(self):
        self.natsConnection = NatsConnectionManager()
    async def init(self, subject, stream, durable):
        await self.natsConnection.init(subject=subject, stream=stream, durable=durable)
    async def start_reading(self, subject, stream, durable, batch_size=20):
        self.natsConnection.psub = await self.natsConnection.js.pull_subscribe(subject=subject, stream=stream, durable=durable)
        # Pull a batch of messages
        msgs = await self.natsConnection.pull_batch(subject, batch_size)
        if msgs is not None:
            for msg in msgs:
                msg_txt = msg.data.decode()
                # Parse the message as JSON
                message = json.loads(msg_txt)
                print(f"message::{message}")
async def main():
    try:
        process_event = MessageProcessor()
        # Initialize the NATS connection
        await process_event.init(event_queue, stream_gms, queue_consumer)
        # Start reading messages
        await process_event.start_reading('sensors.value', 'clientA_sensor', 'rule_executor', batch_size=30)
        await process_event.natsConnection.nc.close()
    except Exception as e:
        print(f"Error in main: {e}")
if __name__ == '__main__':
    asyncio.run(main())

Conclusion

NATS is a powerful tool for building distributed microservices architectures. By implementing an event-based communication flow, you can achieve a scalable, resilient, and flexible system. The provided codebase offers a practical example of how to integrate NATS into your microservices, demonstrating its potential to enhance your application’s communication capabilities.