The Challenges of Distributed Microservices
As organisations scale their applications, they often encounter several challenges:
- Scalability: As the number of services grows, so does the complexity of managing communication between them.
- Reliability: Ensuring that messages are delivered reliably and in the correct order can be difficult.
- Security: Protecting data in transit and at rest is crucial, especially in multi-tenant environments.
- Integration: Seamlessly integrating with existing systems and third-party services can be cumbersome.
What makes nats suitable for modern asyncronous systems?
NATS is a powerful tool designed to address these challenges, making it suitable for modern asynchronous systems. Here’s how NATS stands out:
- Seamless Scaling: NATS clients do not need to change when deployments change. Whether you’re adding or removing backend nodes, NATS scales effortlessly from a single process to a global super-cluster with leaf node servers.
- Simple Integration: With a straightforward API and minimal configuration, NATS is easy to integrate into existing systems.
- Flexible Delivery: NATS offers various delivery guarantees, including “at most once,” “at least once,” and “exactly once” through JetStream.
- Robust Security: NATS supports TLS, credentials, username/password, and token-based authentication. JetStream also provides encryption at rest.
- Reliability & Availability with persistent Messaging: Supports memory and file persistence, allowing messages to be replayed by time, count, or sequence number. Durable subscriptions ensure reliability.
- Third-Party Integrations: NATS supports WebSockets, Kafka, IBM MQ, Redis, Apache Spark, Apache Flink, and more.
Use Cases for NATS
NATS is versatile and can be used in various scenarios, including:
- Cloud Messaging: Ideal for microservices and service mesh architectures.
- IoT and Edge: Efficiently handles telemetry, sensor data, and command/control operations.
- Event/Data Streaming: Supports real-time data processing and analytics.
Common Architecture of an Event-Based Data System

Hands-On with NATS
To get started with NATS, you can use Docker to run a NATS server. The provided nats-docker.yml file sets up a NATS server with JetStream enabled for message persistence.
version: '3.8'
services:
nats:
image: nats:2.10.22
ports:
- "4222:4222" # NATS client port
- "8222:8222" # NATS monitoring port
environment:
- NATS_SERVER_NAME=nats-server
command: ["-js"] # Enable JetStream if needed
networks:
- nats_network
volumes:
- nats_data:/data # Mount the volume to persist data
networks:
nats_network:
driver: bridge
volumes:
nats_data:
driver: local
Connecting to NATS
The NatsConnectionManager class in the codebase handles the connection to the NATS server. It initializes a connection and sets up a JetStream context for managing streams and subjects.
import asyncio
import nats
import os
from constants import *
class NatsConnectionManager:
stream_name = “clientA_sensor”
subject_faulty = “sensors.faulty”
subject_metadata = “sensors.metadata”
subject_value = “sensors.value”
subject_sos = “sensors.sos”
sensor_consumer = “rule_executor”
def __init__(self):
self.nc = None
self.js = None
self.psub = None
async def init(self, subject, stream, durable=""):
nats_host = "0.0.0.0:4222"
self.nc = await nats.connect(f"nats://{nats_host}", connect_timeout=3, max_reconnect_attempts=2)
# Create JetStream context.
self.js = self.nc.jetstream()
await self.js.add_stream(
name=self.stream_name,
subjects=[
self.subject_faulty,
self.subject_metadata,
self.subject_value,
self.subject_sos,
]
)
self.psub = await self.js.pull_subscribe(subject=subject, stream=self.stream_name, durable=self.sensor_consumer)
Publishing and Subscribing to Messages
async def publish_message(subject, data):
# nats_host = os.environ.get("NATS_SERVER")
nats_host = "0.0.0.0:4222"
nc = await nats.connect(f"nats://{nats_host}", connect_timeout=3, max_reconnect_attempts=2)
js = nc.jetstream()
ack = await js.publish(subject, data)
await nc.close()
Processing Events
import asyncio, json
from NatsManager import NatsConnectionManager, publish_message
from constants import *
from RuleManager import *
from datetime import datetime
class MessageProcessor:
def __init__(self):
self.natsConnection = NatsConnectionManager()
async def init(self, subject, stream, durable):
await self.natsConnection.init(subject=subject, stream=stream, durable=durable)
async def start_reading(self, subject, stream, durable, batch_size=20):
self.natsConnection.psub = await self.natsConnection.js.pull_subscribe(subject=subject, stream=stream, durable=durable)
# Pull a batch of messages
msgs = await self.natsConnection.pull_batch(subject, batch_size)
if msgs is not None:
for msg in msgs:
msg_txt = msg.data.decode()
# Parse the message as JSON
message = json.loads(msg_txt)
print(f"message::{message}")
async def main():
try:
process_event = MessageProcessor()
# Initialize the NATS connection
await process_event.init(event_queue, stream_gms, queue_consumer)
# Start reading messages
await process_event.start_reading('sensors.value', 'clientA_sensor', 'rule_executor', batch_size=30)
await process_event.natsConnection.nc.close()
except Exception as e:
print(f"Error in main: {e}")
if __name__ == '__main__':
asyncio.run(main())
Conclusion
NATS is a powerful tool for building distributed microservices architectures. By implementing an event-based communication flow, you can achieve a scalable, resilient, and flexible system. The provided codebase offers a practical example of how to integrate NATS into your microservices, demonstrating its potential to enhance your application’s communication capabilities.