Redis Pipelining

Redis Pipelining

Last modified on 2025-04-18 , by hjjae2

Redis Pipelining #

" How to optimize round-trip times by batching Redis commands “

여러 명령(commands)을 한번에 요청/응답하는 것

DB 에서는 Bulk 연산자를 지원하지만, 레디스에서는 Bulk 연산자를 지원하지 않는다. 대신 pipeline api 를 지원한다.

(흔히 나오는 예시) (아래)HTTP pipelining 과 비슷한 개념이다.

![/images/redis_pipelining_example.png]

문제1. RTT (Round-Trip Time) #

Redis는 TCP 기반 위에서 동작한다.

즉, 요청/응답을 위해 TCP 기반의 네트워크 I/O가 발생할 것이다. Redis 의 성능이 아무리 좋아도, RTT가 길다면 클라이언트 입장에서 처리량이 줄어들 수 밖에 없다.

문제2. Socket I/O #

RTT 뿐만 아니라, 파이프라이닝 없이 여러 커맨드를 요청했을 때 발생하는 socket I/O 비용도 크다. 한번에 요청하면 이 비용을 절약할 수 있다.

” This involves calling the read() and write() syscall, that means going from user land to kernel land. The context switch is a huge speed penalty.

When pipelining is used, many commands are usually read with a single read() system call, and multiple replies are delivered with a single write() system call. “

Spring Data Redis :: Pipelining #

(Spring Data Redis의) RedisTemplate 은 파이프라이닝 기능을 지원하는 몇 개의 메서드를 제공한다.

  • execute / executePipelined

execute #

파이프라이닝을 사용하되, 결과(result)를 신경쓰지 않는다면 execute 메서드에 pipeline 인자에 true 값만 주면 된다.

public class RedisTemplate<K, V> 
        extends RedisAccessor 
        implements RedisOperations<K, V>, BeanClassLoaderAware {
    ...
    public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) { ... }
    ...
}

executePipelined #

public class RedisTemplate<K, V> 
        extends RedisAccessor 
        implements RedisOperations<K, V>, BeanClassLoaderAware {
    ...
    public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {

		Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
		Assert.notNull(session, "Callback object must not be null");

		RedisConnectionFactory factory = getRequiredConnectionFactory();
		// bind connection
		RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
		try {
			return execute((RedisCallback<List<Object>>) connection -> {
				connection.openPipeline(); // (1) : Open pipeline
				boolean pipelinedClosed = false;
				try {
					Object result = executeSession(session);
					if (result != null) {
						throw new InvalidDataAccessApiUsageException(
								"Callback cannot return a non-null value as it gets overwritten by the pipeline");
					}
					List<Object> closePipeline = connection.closePipeline(); // (3) : Close pipeline
					pipelinedClosed = true;
					return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer); // (2) : Deserilize (mixed)results
				} finally {
					if (!pipelinedClosed) {
						connection.closePipeline(); // (3) : Close pipeline
					}
				}
			});
		} finally {
			RedisConnectionUtils.unbindConnection(factory);
		}
	}

    ...

    public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer) { ... }
    ...
}

pipeline 을 열고, 닫는것을 제외하고는 거의 execute 와 유사하다.

사용 예시 #

List<Object> results = stringRedisTemplate.executePipelined(
  new RedisCallback<Object>() {
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
      StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
      for(int i=0; i< batchSize; i++) {
        stringRedisConn.rPop("myqueue");
      }
    return null;
  }
});

위 예시는 하나의 요청(커넥션)에서 bulk rPop 연산을 처리하는 예시이다.

results 에는 pop 된 아이템들이 결과물로 나온다.

RedisConnection.openPipeline() #

public interface RedisConnection extends RedisCommands, AutoCloseable {
    ...

    /**
	 * Activates the pipeline mode for this connection. When pipelined, all commands return null (the reply is read at the
	 * end through {@link #closePipeline()}. Calling this method when the connection is already pipelined has no effect.
	 * Pipelining is used for issuing commands without requesting the response right away but rather at the end of the
	 * batch. While somewhat similar to MULTI, pipelining does not guarantee atomicity - it only tries to improve
	 * performance when issuing a lot of commands (such as in batching scenarios).
	 * <p>
	 * Note:
	 * </p>
	 * Consider doing some performance testing before using this feature since in many cases the performance benefits are
	 * minimal yet the impact on usage are not.
	 *
	 * @see #multi()
	 */
	void openPipeline();

    ...
}
public class LettuceConnection extends AbstractRedisConnection {

    ...

    @Override
	public void openPipeline() {
		if (!isPipelined) {
			isPipelined = true;
			ppline = new ArrayList<>();
			flushState = this.pipeliningFlushPolicy.newPipeline();
			flushState.onOpen(this.getOrCreateDedicatedConnection());
		}
	}

    ...
}

특징을 살펴보면,

  1. (해당 커넥션에) 파이프라인 모드를 활성화한다.
    • 이미 파이프라인이 오픈된 상태에서는 아무 효과가 없다.
  2. 파이프라인 동안에는, 모든 명령어는 null 을 반환한다. (null 반환하지 않을 경우 Exception 터지는 로직이 있기도 함)
  3. MULTI 와 비슷할 수 있지만, pipelining 은 atomicity 를 보장하지는 않는다.
    • 파이프라이닝은 다수의 명령어에 대한 성능 향상이 목적이다.



RedisConnection.closePipeline() #

public interface RedisConnection extends RedisCommands, AutoCloseable {
    ...

    /**
	 * Executes the commands in the pipeline and returns their result. If the connection is not pipelined, an empty
	 * collection is returned.
	 *
	 * @throws RedisPipelineException if the pipeline contains any incorrect/invalid statements
	 * @return the result of the executed commands.
	 */
	List<Object> closePipeline() throws RedisPipelineException;

    ...
}
public class LettuceConnection extends AbstractRedisConnection {

    ...

    @Override
	public List<Object> closePipeline() {

		if (!isPipelined) {
			return Collections.emptyList();
		}

		flushState.onClose(this.getOrCreateDedicatedConnection());
		flushState = null;
		isPipelined = false;
		List<io.lettuce.core.protocol.RedisCommand<?, ?, ?>> futures = new ArrayList<>(ppline.size());
		for (LettuceResult<?, ?> result : ppline) {
			futures.add(result.getResultHolder());
		}

		try {
			boolean done = LettuceFutures.awaitAll(timeout, TimeUnit.MILLISECONDS,
					futures.toArray(new RedisFuture[futures.size()]));

			List<Object> results = new ArrayList<>(futures.size());

			Exception problem = null;

			if (done) {
				for (LettuceResult<?, ?> result : ppline) {

					if (result.getResultHolder().getOutput().hasError()) {

						Exception err = new InvalidDataAccessApiUsageException(result.getResultHolder().getOutput().getError());
						// remember only the first error
						if (problem == null) {
							problem = err;
						}
						results.add(err);
					} else if (!result.isStatus()) {

						try {
							results.add(result.conversionRequired() ? result.convert(result.get()) : result.get());
						} catch (DataAccessException e) {
							if (problem == null) {
								problem = e;
							}
							results.add(e);
						}
					}
				}
			}
			ppline.clear();

			if (problem != null) {
				throw new RedisPipelineException(problem, results);
			}

			if (done) {
				return results;
			}

			throw new RedisPipelineException(new QueryTimeoutException("Redis command timed out"));
		} catch (Exception e) {
			throw new RedisPipelineException(e);
		}
	}

    ...
}

특징을 살펴보면,

  1. 파이프라인 안에서 실행한 명령어들에 대한 결과(results)가 반환된다.
    • 파이프라이닝 상태가 아니었다면, 빈 컬렉션(empty collection)이 반환된다.

특징 #

  1. command 의 순서가 변경되지 않는다.

Redis pipelining 은 단순히 요청을 한번에 보내는 것일 뿐 요청(커맨드)의 순서에 영향을 주지 않는다.
공식 문서에서 이것과 관련된 내용은 못찾았다. 다만, 여기를 포함해 다른 글들을 참고할 수 있을 것 같다.

항상 파이프라이닝을 적용하면 될까? #

보통의 경우 파이프라이닝 적용을 통해 성능의 이점을 취할 수 있다. 다만, 여러 개의 명령어를 처리해야 하는 만큼 하나의 커넥션을 점유해 사용하는 시간이 길어질 수 있다. 이로 인해, 커넥션 경합 현상이 발생할 수 있다.

출처 #