The Pipeline class allows you to batch multiple Redis commands into a single HTTP request, significantly improving performance when executing multiple operations.
Overview
Upstash REST API supports command pipelining to send multiple commands in batch instead of sending each command one by one. When using pipelines, several commands are sent using a single HTTP request, and a single JSON array response is returned.
Execution is not atomic : Commands in a pipeline are executed in order, but commands from other clients can interleave. Use multi() for atomic transactions.
Creating a Pipeline
pipeline()
Create a pipeline for batching commands (non-atomic).
const pipeline = redis . pipeline ();
multi()
Create a transaction for atomic execution of commands.
const transaction = redis . multi ();
Methods
exec()
Execute all commands in the pipeline and return results.
const pipeline = redis . pipeline ();
pipeline . set ( 'key1' , 'value1' );
pipeline . set ( 'key2' , 'value2' );
pipeline . get ( 'key1' );
const results = await pipeline . exec ();
// results: ['OK', 'OK', 'value1']
Array of results corresponding to each command in the pipeline
With Type Inference
When commands are chained, TypeScript automatically infers the return types:
const results = await redis . pipeline ()
. set ( 'key' , 'value' )
. get ( 'key' )
. incr ( 'counter' )
. exec ();
// results: ['OK', 'value', 1]
// TypeScript knows the types!
Manual Type Annotation
You can manually specify return types when needed:
interface User {
name : string ;
age : number ;
}
const p = redis . pipeline ();
p . set ( 'user:1' , { name: 'Alice' , age: 30 });
p . get < User >( 'user:1' );
const results = await p . exec <[ 'OK' , User ]>();
Error Handling
By default, if any command fails, the entire pipeline throws an error:
try {
const results = await pipeline . exec ();
} catch ( error ) {
console . error ( 'Pipeline failed:' , error );
}
Keep Errors Option
To get individual errors for each command, use keepErrors: true:
const p = redis . pipeline ();
p . get ( 'key1' );
p . get ( 'key2' );
p . get ( 'nonexistent' );
const results = await p . exec ({ keepErrors: true });
// results: [
// { result: 'value1', error: undefined },
// { result: 'value2', error: undefined },
// { result: null, error: undefined }
// ]
Execution options If true, returns objects with result and error properties instead of throwing on errors
When keepErrors is false (default): Array of command results When keepErrors is true: Array of objects with:
result: The command result
error: Error message if command failed, undefined otherwise
length()
Get the number of commands in the pipeline before execution.
const p = redis . pipeline ();
p . set ( 'key1' , 'value1' );
p . set ( 'key2' , 'value2' );
const count = p . length ();
// count: 2
Number of commands queued in the pipeline
Command Methods
All Redis commands available on the Redis client are also available on Pipeline. Commands are chainable:
String Commands
const p = redis . pipeline ()
. set ( 'key' , 'value' )
. get ( 'key' )
. mset ({ key1: 'value1' , key2: 'value2' })
. mget ( 'key1' , 'key2' )
. incr ( 'counter' )
. incrby ( 'counter' , 5 );
const results = await p . exec ();
Hash Commands
const p = redis . pipeline ()
. hset ( 'user:1' , { name: 'Alice' , age: 30 })
. hget ( 'user:1' , 'name' )
. hgetall ( 'user:1' )
. hincrby ( 'user:1' , 'age' , 1 );
const results = await p . exec ();
List Commands
const p = redis . pipeline ()
. lpush ( 'queue' , 'job1' , 'job2' )
. llen ( 'queue' )
. lrange ( 'queue' , 0 , - 1 )
. lpop ( 'queue' );
const results = await p . exec ();
Set Commands
const p = redis . pipeline ()
. sadd ( 'tags' , 'redis' , 'database' )
. smembers ( 'tags' )
. sismember ( 'tags' , 'redis' );
const results = await p . exec ();
Sorted Set Commands
const p = redis . pipeline ()
. zadd ( 'leaderboard' , { score: 100 , member: 'player1' })
. zadd ( 'leaderboard' , { score: 200 , member: 'player2' })
. zrange ( 'leaderboard' , 0 , - 1 )
. zrank ( 'leaderboard' , 'player1' );
const results = await p . exec ();
JSON Commands
const p = redis . pipeline ()
. json . set ( 'user:1' , '$' , { name: 'Alice' , age: 30 })
. json . get ( 'user:1' )
. json . numincrby ( 'user:1' , '$.age' , 1 );
const results = await p . exec ();
Usage Patterns
Basic Pipelining
import { Redis } from '@upstash/redis' ;
const redis = Redis . fromEnv ();
const pipeline = redis . pipeline ();
pipeline . set ( 'key1' , 'value1' );
pipeline . set ( 'key2' , 'value2' );
pipeline . get ( 'key1' );
pipeline . get ( 'key2' );
const [ set1 , set2 , get1 , get2 ] = await pipeline . exec ();
// set1: 'OK'
// set2: 'OK'
// get1: 'value1'
// get2: 'value2'
Chaining Commands
const results = await redis . pipeline ()
. set ( 'user:1:name' , 'Alice' )
. set ( 'user:1:email' , 'alice@example.com' )
. get ( 'user:1:name' )
. get ( 'user:1:email' )
. exec ();
Atomic Transactions
// Use multi() for atomic execution
const transaction = redis . multi ();
transaction . set ( 'balance' , 100 );
transaction . decrby ( 'balance' , 30 );
transaction . incrby ( 'balance' , 50 );
const results = await transaction . exec ();
// All commands execute atomically
// results: ['OK', 70, 120]
Complex Operations
interface User {
name : string ;
email : string ;
score : number ;
}
const p = redis . pipeline ();
// Create multiple users
p . hset ( 'user:1' , { name: 'Alice' , email: 'alice@example.com' });
p . hset ( 'user:2' , { name: 'Bob' , email: 'bob@example.com' });
// Add to leaderboard
p . zadd ( 'leaderboard' , { score: 100 , member: 'user:1' });
p . zadd ( 'leaderboard' , { score: 150 , member: 'user:2' });
// Get data
p . hgetall < User >( 'user:1' );
p . zrange ( 'leaderboard' , 0 , - 1 , { withScores: true });
const results = await p . exec ();
Error Handling with keepErrors
const p = redis . pipeline ();
p . set ( 'key1' , 'value1' );
p . get ( 'key1' );
p . lpop ( 'nonexistent-list' ); // Might return null, not an error
p . get ( 'key2' );
const results = await p . exec ({ keepErrors: true });
results . forEach (( result , index ) => {
if ( result . error ) {
console . error ( `Command ${ index } failed:` , result . error );
} else {
console . log ( `Command ${ index } result:` , result . result );
}
});
Pipelining can dramatically improve performance by reducing round-trip time:
// Without pipeline: 4 round trips
await redis . set ( 'key1' , 'value1' );
await redis . set ( 'key2' , 'value2' );
await redis . get ( 'key1' );
await redis . get ( 'key2' );
// Total time: ~400ms (100ms per request)
// With pipeline: 1 round trip
const results = await redis . pipeline ()
. set ( 'key1' , 'value1' )
. set ( 'key2' , 'value2' )
. get ( 'key1' )
. get ( 'key2' )
. exec ();
// Total time: ~100ms
TypeScript Examples
Type-Safe Pipeline
interface User {
id : string ;
name : string ;
email : string ;
}
interface Product {
id : string ;
name : string ;
price : number ;
}
const results = await redis . pipeline ()
. set < User >( 'user:1' , { id: '1' , name: 'Alice' , email: 'alice@example.com' })
. set < Product >( 'product:1' , { id: '1' , name: 'Widget' , price: 9.99 })
. get < User >( 'user:1' )
. get < Product >( 'product:1' )
. exec ();
// results are properly typed:
// ['OK', 'OK', User | null, Product | null]
Mixing Command Types
const p = redis . pipeline ();
// String operations
p . set ( 'counter' , '0' );
p . incr ( 'counter' );
// Hash operations
p . hset ( 'user:1' , { name: 'Alice' });
p . hget ( 'user:1' , 'name' );
// List operations
p . lpush ( 'queue' , 'job1' );
p . llen ( 'queue' );
// Execute all
const [
setResult , // 'OK'
incrResult , // 1
hsetResult , // 1
hgetResult , // 'Alice'
lpushResult , // 1
llenResult // 1
] = await p . exec ();
Comparison: pipeline() vs multi()
Feature pipeline() multi() Atomicity No - commands can interleave Yes - all commands execute atomically Use Case Performance optimization Transactions requiring consistency Redis Command Sends multiple commands Wraps commands in MULTI/EXEC Rollback Not supported All or nothing execution
// Pipeline: Non-atomic, for performance
const pipeline = redis . pipeline ();
pipeline . set ( 'key1' , 'value1' );
pipeline . set ( 'key2' , 'value2' );
await pipeline . exec ();
// Other clients can modify keys between commands
// Transaction: Atomic, for consistency
const transaction = redis . multi ();
transaction . set ( 'key1' , 'value1' );
transaction . set ( 'key2' , 'value2' );
await transaction . exec ();
// All commands execute as a single atomic operation