Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection remains in INACTIVE state for prolonged period of time. #2930

Open
himanshu0791 opened this issue Jul 26, 2024 · 1 comment
Open
Labels
for: team-attention An issue we need to discuss as a team to make progress status: waiting-for-triage

Comments

@himanshu0791
Copy link

himanshu0791 commented Jul 26, 2024

Bug Report

I noticed that some of the cached write connection from getWriteConnection(slot) from PooledClusterConnectionProvider.java remain in inactive state for indefinite period of time. This implementation is different from getReadConnection(slot) where we check if a connection is active before returning it to the caller.

Current Behavior

Connections remains in inactive state leading to Currently not connected. Commands are rejected for all requests based on config rejectCommandsWhenInactive.

In order to rule out any infra related issues where a netty channel never becomes active marking a connection as inactive, I tried Jedis client in parallel which do not run into any connection related issues. This is primarily to see if there are variables outside of the client affecting it. I tried versions 5.3.7, 6.2.2 and they both observed same issue.

Possible Solution

I made some changes in the getWriteConnection(slot) similar to read path and that effectively fixed the issue. Essentially whenever a node connection is requested, I check for the status and return a new one instead if its marked as inative. I understand this is a bit inefficient in the sense that actual connections which may become active as also closed as part of this code fix.
below are the code snippets of the code I have modified.

From PooledClusterConnectionProvider.java

        private CompletableFuture<StatefulRedisConnection<K, V>> getWriteConnection(int slot) {

        CompletableFuture<StatefulRedisConnection<K, V>> writer;// avoid races when reconfiguring partitions.
        synchronized (stateLock) {
            writer = writers[slot];
        }

        if (writer == null) {
        // writer instance is already completed. We can call get() without any blocking.
        if (writer == null || !writer.join().isOpen()) {
            synchronized (stateLock) {
                // reset slot connection so a new one can be saved. Connection if open, will be closed by
                // the underlying async connection provider.
                writers[slot] = null;
            }
            RedisClusterNode master = partitions.getMasterBySlot(slot);
            if (master == null) {
                clusterEventListener.onUncoveredSlot(slot);
                return Futures.failed(new PartitionSelectorException("Cannot determine a partition for slot " + slot + ".",
                        partitions.clone()));
            }

            // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different
            // host because the nodeId can be handled by a different host.
            RedisURI uri = master.getUri();
            ConnectionKey key = new ConnectionKey(ConnectionIntent.WRITE, uri.getHost(), uri.getPort());

            ConnectionFuture<StatefulRedisConnection<K, V>> future = getConnectionAsync(key);

            return future.thenApply(connection -> {

                synchronized (stateLock) {
                    if (writers[slot] == null) {
                        writers[slot] = CompletableFuture.completedFuture(connection);
                    }
                }

                return connection;
            }).toCompletableFuture();
        }

        return writer;
    }
    
   protected ConnectionFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ConnectionKey key) {

        ConnectionFuture<StatefulRedisConnection<K, V>> connectionFuture = connectionProvider.getConnection(key);
        ConnectionFuture<StatefulRedisConnection<K, V>> connectionFuture = connectionProvider.getConnection(key,
                StatefulConnection::isOpen, StatefulConnection::close);
        CompletableFuture<StatefulRedisConnection<K, V>> result = new CompletableFuture<>();

        connectionFuture.handle((connection, throwable) -> {

            if (throwable != null) {

                result.completeExceptionally(
                        RedisConnectionException.create(connectionFuture.getRemoteAddress(), Exceptions.bubble(throwable)));
            } else {
                result.complete(connection);
            }

            return null;
        });

        return ConnectionFuture.from(connectionFuture.getRemoteAddress(), result);
    }

    public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(ConnectionIntent connectionIntent,
           String host, int port) {

        try {
            beforeGetConnection(connectionIntent, host, port);

            return connectionProvider.getConnection(new ConnectionKey(connectionIntent, host, port)).toCompletableFuture();
            return connectionProvider.getConnection(new ConnectionKey(connectionIntent, host, port),
                            StatefulConnection::isOpen, StatefulConnection::close)
                    .toCompletableFuture();
        } catch (RedisException e) {
            throw e;
        } catch (RuntimeException e) {
            throw new RedisException(e);
        }
    }

From AsyncConnectionProvider.java

    public F getConnection(K key, Predicate<T> isActive, Consumer<T> closeConnection) {
       return getSynchronizer(key, isActive, closeConnection).getConnection();
   }

   private Sync<K, T, F> getSynchronizer(K key, Predicate<T> isActive, Consumer<T> closeConnection) {

       if (closed) {
           throw new IllegalStateException("ConnectionProvider is already closed");
       }

       Sync<K, T, F> sync = connections.get(key);

       if (sync != null) {
           if (sync.isInProgress() || (sync.isComplete() && isActive.test(sync.connection))) {
               return sync;
           }
           // closing to avoid leak in case connection is deactivated and not closed.
           closeConnection.accept(sync.connection);
       }
       // reset key so a new connection can be cached.
       connections.remove(key);

       AtomicBoolean atomicBoolean = new AtomicBoolean();

       sync = connections.computeIfAbsent(key, connectionKey -> {

           Sync<K, T, F> createdSync = new Sync<>(key, connectionFactory.apply(key));

           if (closed) {
               createdSync.cancel();
           }

           return createdSync;
       });

       if (atomicBoolean.compareAndSet(false, true)) {

           sync.getConnection().whenComplete((c, t) -> {

               if (t != null) {
                   connections.remove(key);
               }
           });
       }

       return sync;
   }

  //Added a new method   
  private boolean isInProgress() {
      return PHASE.get(this) == PHASE_IN_PROGRESS;
  }  

   protected CompletableFuture<StatefulRedisConnection<K, V>> getConnection(RedisNodeDescription redisNodeDescription) {

       RedisURI uri = redisNodeDescription.getUri();

       return connectionProvider.getConnection(toConnectionKey(uri), StatefulConnection::isOpen, StatefulConnection::close).toCompletableFuture();
   }  

Additional context

We baked this fix for over a month on an industry standard code with QPS in order of millions without any issues. Logging did show us that there were multiple instances where this fix helped a rquest go through instead of failing with above mentioned exception.

I would like your thoughts on this and if this is something we can turn into a pull request.

@himanshu0791 himanshu0791 changed the title Connection remain in INACTIVE state for prolonged period of time. Connection remains in INACTIVE state for prolonged period of time. Jul 26, 2024
@himanshu0791
Copy link
Author

@mp911de Appreciate your comment on this.

@tishun tishun added for: team-attention An issue we need to discuss as a team to make progress status: waiting-for-triage labels Jul 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: team-attention An issue we need to discuss as a team to make progress status: waiting-for-triage
Projects
None yet
Development

No branches or pull requests

2 participants