Skip to main content

Overview

Redis pub/sub enables real-time messaging between different parts of your application. The Upstash Redis SDK provides a Subscriber class that handles channel subscriptions and message delivery over HTTP using Server-Sent Events (SSE).
Pub/sub in Upstash Redis uses HTTP streaming with Server-Sent Events, not the traditional Redis protocol. This makes it compatible with serverless and edge environments.

Basic Usage

Subscribe to a Channel

import { Redis } from '@upstash/redis';

const redis = new Redis({
  url: process.env.UPSTASH_REDIS_REST_URL!,
  token: process.env.UPSTASH_REDIS_REST_TOKEN!,
});

// Subscribe to a single channel
const subscriber = redis.subscribe<string>('notifications');

subscriber.on('message', (data) => {
  console.log('Received:', data.message);
  console.log('Channel:', data.channel);
});

Subscribe to Multiple Channels

const subscriber = redis.subscribe<string>([
  'channel1',
  'channel2',
  'channel3'
]);

subscriber.on('message', (data) => {
  console.log(`Message from ${data.channel}: ${data.message}`);
});

Publishing Messages

// Publish to a channel
const receiverCount = await redis.publish('notifications', 'Hello World');
console.log(`Message delivered to ${receiverCount} subscribers`);

// Publish complex objects (automatically serialized)
await redis.publish('events', {
  type: 'user.login',
  userId: '123',
  timestamp: Date.now(),
});

Type Safety

Specify the message type when creating a subscriber:
interface ChatMessage {
  user: string;
  message: string;
  timestamp: number;
}

const subscriber = redis.subscribe<ChatMessage>('chat:room1');

subscriber.on('message', (data) => {
  // data.message is typed as ChatMessage
  console.log(`${data.message.user}: ${data.message.message}`);
});

Event Listeners

Message Events

Listen for all messages across subscribed channels:
subscriber.on('message', (data) => {
  console.log('Channel:', data.channel);
  console.log('Message:', data.message);
});

Channel-Specific Events

Listen to messages from a specific channel:
const subscriber = redis.subscribe(['news', 'sports', 'weather']);

subscriber.on('message:news', (data) => {
  console.log('News update:', data.message);
});

subscriber.on('message:sports', (data) => {
  console.log('Sports update:', data.message);
});

Subscription Events

Handle subscription lifecycle events:
subscriber.on('subscribe', (count) => {
  console.log(`Subscribed. Total subscriptions: ${count}`);
});

subscriber.on('unsubscribe', (count) => {
  console.log(`Unsubscribed. Remaining subscriptions: ${count}`);
});

Error Events

Handle errors during subscription:
subscriber.on('error', (error) => {
  console.error('Subscription error:', error);
});

Pattern Subscriptions

Subscribe to channels matching a pattern using psubscribe:
// Subscribe to all channels starting with 'user:'
const subscriber = redis.psubscribe<string>('user:*');

subscriber.on('pmessage', (data) => {
  console.log('Pattern:', data.pattern);
  console.log('Channel:', data.channel);
  console.log('Message:', data.message);
});

Multiple Patterns

const subscriber = redis.psubscribe([
  'user:*',
  'order:*',
  'payment:*'
]);

subscriber.on('pmessage', (data) => {
  if (data.pattern === 'user:*') {
    console.log('User event:', data.message);
  } else if (data.pattern === 'order:*') {
    console.log('Order event:', data.message);
  }
});

Pattern-Specific Events

const subscriber = redis.psubscribe('notification:*');

subscriber.on('pmessage:notification:*', (data) => {
  console.log(`Notification on ${data.channel}:`, data.message);
});

Managing Subscriptions

Unsubscribe from Channels

const subscriber = redis.subscribe(['channel1', 'channel2', 'channel3']);

// Unsubscribe from specific channels
await subscriber.unsubscribe(['channel1']);

// Unsubscribe from all channels
await subscriber.unsubscribe();

Get Subscribed Channels

const channels = subscriber.getSubscribedChannels();
console.log('Subscribed to:', channels);

Remove All Listeners

subscriber.removeAllListeners();

Practical Examples

Real-Time Chat

interface ChatMessage {
  user: string;
  message: string;
  timestamp: number;
}

const chatRoom = 'room:lobby';

// Subscribe to chat messages
const subscriber = redis.subscribe<ChatMessage>(chatRoom);

subscriber.on('message', (data) => {
  const msg = data.message;
  console.log(`[${new Date(msg.timestamp).toLocaleTimeString()}] ${msg.user}: ${msg.message}`);
});

subscriber.on('error', (error) => {
  console.error('Chat error:', error);
});

// Send a chat message
const sendMessage = async (user: string, message: string) => {
  await redis.publish(chatRoom, {
    user,
    message,
    timestamp: Date.now(),
  });
};

await sendMessage('Alice', 'Hello everyone!');
await sendMessage('Bob', 'Hi Alice!');

Notification System

interface Notification {
  type: 'info' | 'warning' | 'error';
  title: string;
  message: string;
}

class NotificationService {
  private subscriber;
  
  constructor(private redis: Redis, private userId: string) {
    // Subscribe to user-specific notifications
    this.subscriber = redis.subscribe<Notification>(`notifications:${userId}`);
    this.setupListeners();
  }
  
  private setupListeners() {
    this.subscriber.on('message', (data) => {
      const notification = data.message;
      this.handleNotification(notification);
    });
    
    this.subscriber.on('error', (error) => {
      console.error('Notification error:', error);
    });
  }
  
  private handleNotification(notification: Notification) {
    const icon = notification.type === 'error' ? '❌' : 
                 notification.type === 'warning' ? '⚠️' : 'ℹ️';
    console.log(`${icon} ${notification.title}: ${notification.message}`);
  }
  
  async sendNotification(targetUserId: string, notification: Notification) {
    await this.redis.publish(`notifications:${targetUserId}`, notification);
  }
  
  async cleanup() {
    await this.subscriber.unsubscribe();
  }
}

// Usage
const notificationService = new NotificationService(redis, 'user123');

await notificationService.sendNotification('user123', {
  type: 'info',
  title: 'New Message',
  message: 'You have a new message from Bob',
});

Event Broadcasting

interface SystemEvent {
  eventType: string;
  data: any;
  timestamp: number;
}

// Subscribe to all system events
const subscriber = redis.psubscribe<SystemEvent>('events:*');

subscriber.on('pmessage', (data) => {
  const event = data.message;
  const eventCategory = data.channel.split(':')[1];
  
  console.log(`[${eventCategory}] ${event.eventType}:`, event.data);
});

// Publish different types of events
await redis.publish('events:user', {
  eventType: 'user.login',
  data: { userId: '123', ip: '192.168.1.1' },
  timestamp: Date.now(),
});

await redis.publish('events:order', {
  eventType: 'order.created',
  data: { orderId: '456', amount: 99.99 },
  timestamp: Date.now(),
});

Multi-Channel Monitoring

const subscriber = redis.subscribe([
  'logs:error',
  'logs:warning',
  'logs:info'
]);

subscriber.on('message:logs:error', (data) => {
  console.error('🔴 ERROR:', data.message);
  // Send alert, log to monitoring service, etc.
});

subscriber.on('message:logs:warning', (data) => {
  console.warn('🟡 WARNING:', data.message);
});

subscriber.on('message:logs:info', (data) => {
  console.info('🟢 INFO:', data.message);
});

Message Serialization

Automatic Deserialization

By default, messages are automatically deserialized from JSON:
const subscriber = redis.subscribe<{ count: number }>('stats');

subscriber.on('message', (data) => {
  // data.message is automatically parsed
  console.log(data.message.count);
});

await redis.publish('stats', { count: 42 });

Disable Automatic Deserialization

To receive raw message strings:
const redisWithoutDeserialization = new Redis({
  url: process.env.UPSTASH_REDIS_REST_URL!,
  token: process.env.UPSTASH_REDIS_REST_TOKEN!,
  automaticDeserialization: false,
});

const subscriber = redisWithoutDeserialization.subscribe<string>('channel');

subscriber.on('message', (data) => {
  // data.message is a string
  const parsed = JSON.parse(data.message);
});

Important Notes

Pub/sub messages are fire-and-forget. If a subscriber is offline when a message is published, it will not receive that message. For persistent messaging, consider using Redis Streams instead.

Connection Management

  • Subscriptions use HTTP Server-Sent Events (SSE) for real-time updates
  • Each subscription maintains a persistent HTTP connection
  • Properly clean up subscriptions when done to avoid resource leaks
// Always cleanup when done
process.on('SIGINT', async () => {
  await subscriber.unsubscribe();
  process.exit(0);
});

Performance Considerations

  • Pattern subscriptions (psubscribe) are more expensive than exact matches
  • Limit the number of concurrent subscriptions per client
  • Use channel-specific event handlers for better performance

See Also