gRPC Client Connection Pooling

Photo by Denise Jans on Unsplash

gRPC Client Connection Pooling

Original article was published on my website: https://promisefemi.vercel.app/blog/grpc-client-connection-pooling

Recently I was working on a daemon for consuming MQTT messages sent from thousands of IOT devices. One of the requirements was that the daemon needed to be able to handle about 15k messages per second (I know right 😮 ).

Without going into too much detail (that would be for another blog) we decided to cut the round trip of our MySQL database completely and introduce a gRPC server that would act as a buffer between our daemon and the database. By doing that our daemon would simply drop messages off to the gRPC server and the server would queue and write to the database on its own time.

Building the gRPC server was the easy part the hard part is managing the needs of the daemons, a number of goroutines would need to send messages to the gRPC server, but using the same client connection would significantly affect performance since new requests would have to wait for the connection to finish whatever task is currently being handled, just imagine 14K+ messages waiting for one connection to become free. At this point it was clear using one connection was a no-go and creating new connections is out of the picture (creating new connections is very expensive here, so we decided to use a connection pool.

A lot of the ideas here were inspired by This Blog, i strongly recommend it.

If you don’t know what a connection pool is I strongly advise that you read the blog I linked above.

Jumping In

If you don’t want to go through everything, i have already created a go package that can be used right away. https://github.com/promisefemi/grpc-client-pool

Before anything else, let's talk about how to create new gRPC client connections

conn, err := grpc.Dial(":9000", grpc.WithTransportCredentials(insecure.NewCredential()), grpc.WithBlock())
if err != nil{
    // Handle error
}
// create a new client and use connection

or with context

ctx, cancelCtx := context.WithTimeout(context.Background(), 3 * time.Second)
conn, err := grpc.DialContext(ctx,":9000", grpc.WithTransportCredentials(insecure.NewCredential()), grpc.WithBlock())
if err != nil{
    // Handle error
}
// create a new client and use connection

Pool Structure

At its core, a connection pool contains three important properties maxOpenConnection, maxIdleConnection and idleConnections

maxOpenConnection - indicates how many connections we are allowed to open before we have to start queue connection requests.

maxIdleConnection - how many connections are we allowed to keep before we start closing connections?

idleConnections - a list of connections that can be reused.

There are also two important methodsGet and put these methods are responsible for getting and caching connections. Altogether these properties and methods are responsible for managing our connection pool.

type ClientPool struct {
    mu sync.Mutex //mutex to protect the idleConnections and numOfOpenConnection properties

    address             string // gRPC server address
    configOptions       []grpc.DialOption // gRPC dial configurations
    maxOpenConnection   int // maximum number of open connections allowed
    maxIdleConnection   int // maximum number of idle connections allwed
    idleConnections     map[string]*ClientCon // list of idle connections
    numOfOpenConnection int // number of currently open connections
}

ClientCon is a custom type, so let's create that as well

type ClientCon struct {
    id   string
    pool *ClientPool
    Conn *grpc.ClientConn
}

Now let us proceed with our put and get methods

Put method

The responsibility of the put method is very simple, it simply takes a connection and returns it back to the list of idle connections (the pool) or closes it if there is no room.

func (cp *ClientPool) put(conn *ClientCon) {
    //Lock the pool and defer the unlock 
    cp.mu.Lock()
    defer cp.mu.Unlock()
    // check to see if we can return the connection back to the idleConnection
    if cp.maxIdleConnection >= len(cp.idleConnections) {
        cp.idleConnections[conn.id] = conn
    } else {
    // if the number of idle connections are more than allowed, close the connection
        cp.numOfOpenConnection--
        _ = conn.Conn.Close()
    }
}

and that’s about it, the put method is done.

Get method

The get method is a little bit different, the get method has to handle three different scenarios:

  1. Return a connection from the list of idle connections if there is any available

  2. Queue connection request if we have surpassed the number of connections we can create

  3. Create a new connection

Let's see how this translates into code.

func (cp *ClientPool) Get() (*ClientCon, error) {
    //Lock the pool
    cp.mu.Lock()

// check if we have any connection available 
    if len(cp.idleConnections) > 0 {
  // return the first connection available, and remove from the list of idleConnection
        for _, val := range cp.idleConnections {
            delete(cp.idleConnections, val.id)
            // increment the number of open connections
            cp.numOfOpenConnection++
            cp.mu.Unlock()
            return val, nil
        }
    }
...

For the first step, we check if we have any idle connections and if we do return the first connection, and increment the list of open connections.

If there are no available connections we move to the next step.

Before we start creating new connections, its important to know if we have not surpassed the number of open connections we are allowed to create, and if we have we would need to queue the connection request and resolve it at a later time.

Go channels are coming to the rescue here. But before we do that we need to introduce a new type that’s going to act as the channel type.

The queueChan struct contains two properties a, this channel is used to receive a ClientCon request once it is fulfilled and an errorChan a channel that receives an error if a connection request could not be fulfilled.

// A queue struct ( The struct is not a channel i just named it that way)
type queueChan struct {
    connectionChan chan *ClientCon
    errorChan      chan error
}

and let's add an update to the ClientPool to contain the channel that would act as the queue.

type ClientPool struct {
    ...
    connectionQueue     chan *queueChan // channel for queuing connection requests when there are no idle connections left. 
}

Back in the get method, we need to create a new queue request for a connection, pass that request to the pool connection queue then use a select to block until the request is resolved

...
// Check if we are allowed to open new connections 
if cp.maxOpenConnection > 0 && cp.numOfOpenConnection >= cp.maxOpenConnection {
    // create a new queue request 
        queueRequest := &queueChan{
            connectionChan: make(chan *ClientCon),
            errorChan:      make(chan error),
        }
        // pass request into connection queue
        cp.connectionQueue <- queueRequest

        // block with a select until the request is fuffiled or an error occurs instead
        select {
        case conn := <-queueRequest.connectionChan:
            cp.numOfOpenConnection++
            cp.mu.Unlock()
            return conn, nil
        case err := <-queueRequest.errorChan:
            cp.mu.Unlock()
            return nil, err
        }
    } 
...

With that we have successfully queued the connection request and when an idle connection is available the request would be resolved. We would come back to how we intend on resolving requests from the connection queue.

The last part of the get method is very simple, at this point, there is no idle connection and we can create a new connection (instead of queuing it).

conn, err := cp.openConnection()
    if err != nil {
        return nil, err
    }

    cp.numOfOpenConnection++
    cp.mu.Unlock()
    return conn, nil
}

func (cp *ClientPool) openConnection() (*ClientCon, error) {
    // Dial a new grpc client connection or use DialContext if that works for you
     newConn, err := grpc.Dial(cp.address, cp.configOptions...)

    if err != nil {
        return nil, err
    }
    // create a new ClientCon
    return &ClientCon{
        id:   fmt.Sprintf("%v", time.Now().Unix()),
        pool: cp,
        Conn: newConn,
    }, nil
}

and all together

// A queue struct ( The struct is not a channel i just named it that way)
type queueChan struct {
    connectionChan chan *ClientCon
    errorChan      chan error
} 

func (cp *ClientPool) Get() (*ClientCon, error) {
    //Lock the pool
    cp.mu.Lock()

// check if we have any connection available 
    if len(cp.idleConnections) > 0 {
  // return the first connection available, and remove from the list of idleConnection
        for _, val := range cp.idleConnections {
            delete(cp.idleConnections, val.id)
            // increment the number of open connections
            cp.numOfOpenConnection++
            cp.mu.Unlock()
            return val, nil
        }
    }
    // Check if we are allowed to open new connections 
    if cp.maxOpenConnection > 0 && cp.numOfOpenConnection >= cp.maxOpenConnection {
        // create a new queue request 
            queueRequest := &queueChan{
                connectionChan: make(chan *ClientCon),
                errorChan:      make(chan error),
            }
            // pass request into connection queue
            cp.connectionQueue <- queueRequest

            // block with a select until the request is fuffiled or an error occurs instead
            select {
            case conn := <-queueRequest.connectionChan:
                cp.numOfOpenConnection++
                cp.mu.Unlock()
                return conn, nil
            case err := <-queueRequest.errorChan:
                cp.mu.Unlock()
                return nil, err
            }
        } 
        conn, err := cp.openConnection()
        if err != nil {
                return nil, err
        }

        cp.numOfOpenConnection++
        cp.mu.Unlock()
        return conn, nil
}

func (cp *ClientPool) openConnection() (*ClientCon, error) {
    // Dial a new grpc client connection or use DialContext if that works for you
     newConn, err := grpc.Dial(cp.address, cp.configOptions...)

    if err != nil {
        return nil, err
    }
    // create a new Clientcon
    return &ClientCon{
        id:   fmt.Sprintf("%v", time.Now().Unix()),
        pool: cp,
        Conn: newConn,
    }, nil
}

And with all that we are almost done. Remember when we queued connection requests? well it's time to resolve those requests.

To do that we need to add a method that would run on a separate goroutine, constantly waiting for queued connection requests and the chance to resolve them. let's call this method handleConnectionQueue

Handling Connection Queue

There are three basic things we need to care about when handling connection queue requests:

  1. Watching for new requests in the queue

  2. Checking for a newly idle connection

  3. Creating a connection if there are no idle connections and if we are allowed to do so.

As you may have noticed there are a lot of similarities with the Get method, but something to take note of is that we are not returning an error immediately after a connection request fails instead we will continue to retry for a given time frame before deciding that the request failed.

Let us begin

func (cp *ClientPool) handleConnectionQueue() {
    for rq := range cp.connectionQueue {

        var (
            hasTimedOut  = false
            hasCompleted = false
            timeout      = time.After(time.Duration(3) * time.Second)
        )
        //continually try to get/create a connection until timeout or connection completed
        for {

            if hasCompleted || hasTimedOut {
                break
            }
            //continually check for timeout or try to get/create a connection
            select {
            case <-timeout:
                hasTimedOut = true
                rq.errorChan <- ErrConnectionWaitTimeout
            default:
            ...

And within the default: we would be doing two things, trying to get an idle connection or trying to create a new connection

                ...
                //    first check if an idle connection is available
                cp.mu.Lock()
                numberOfIdleConnections := len(cp.idleConnections)
                if numberOfIdleConnections > 0 {
                    for _, val := range cp.idleConnections {
                        delete(cp.idleConnections, val.id)
                        cp.mu.Unlock()
                        rq.connectionChan <- val
                        hasCompleted = true
                        break
                    }
                } else if cp.maxOpenConnection > 0 && cp.maxOpenConnection > cp.numOfOpenConnection {
                    //check if pool has not exceeded number of allowed open connections
                    // increase numberOfConnection hoping new connection would be created
                    // unlock mutext to free up resources for other connection, since creating new connections could take a while
                    cp.numOfOpenConnection++
                    cp.mu.Unlock()

                    conn, err := cp.openConnection()
                    //ignoring error because the only error we care about is the timeout, we continue to retry
                    cp.mu.Lock()
                    cp.numOfOpenConnection--
                    cp.mu.Unlock()
                    if err == nil {
                        rq.connectionChan <- conn
                        hasCompleted = true
                    }

                } else {
                    //unlock pool and restart
                    cp.mu.Unlock()
                }
            }
        }
    }
}

All together

func (cp *ClientPool) handleConnectionQueue() {
    for rq := range cp.connectionQueue {

        var (
            hasTimedOut  = false
            hasCompleted = false
            timeout      = time.After(time.Duration(3) * time.Second)
        )
        //continually try to get/create a connection until timeout or connection completed
        for {

            if hasCompleted || hasTimedOut {
                break
            }
            //continually check for timeout or try to get/create a connection
            select {
            case <-timeout:
                hasTimedOut = true
                rq.errorChan <- ErrConnectionWaitTimeout
            default://    first check if a idle connection is available
                cp.mu.Lock()
                numberOfIdleConnections := len(cp.idleConnections)
                if numberOfIdleConnections > 0 {
                    for _, val := range cp.idleConnections {
                        delete(cp.idleConnections, val.id)
                        cp.mu.Unlock()
                        rq.connectionChan <- val
                        hasCompleted = true
                        break
                    }
                } else if cp.maxOpenConnection > 0 && cp.maxOpenConnection > cp.numOfOpenConnection {
                    //check if pool has not exceeded number of allowed open connections
                    // increase numberOfConnection hoping new connection would be created
                    // unlock mutext to free up resources for other connection, since creating new connections could take a while
                    cp.numOfOpenConnection++
                    cp.mu.Unlock()

                    conn, err := cp.openConnection()
                    //ignoring error because the only error we care about is the timeout, we continue to retry
                    cp.mu.Lock()
                    cp.numOfOpenConnection--
                    cp.mu.Unlock()
                    if err == nil {
                        rq.connectionChan <- conn
                        hasCompleted = true
                    }

                } else {
                    //unlock pool and restart
                    cp.mu.Unlock()
                }
            }
        }
    }
}

That is most of what we need to build a functional pool.

We still need to write a method to release a connection when you're done using it, for ease of usage we can bind that method to the ClientCon . All the method does is call the ClientPool put method, as i have stated above all the put method does is return a connection back to the list of idle connections or close it completely.

func (c *ClientCon) Release() {
    c.pool.put(c)
}

And for the last part, we want to write a function that helps us create a new pool, configure it and start the handleConnectionQueue goroutine

type PoolConfig struct {
    MaxOpenConnection     int // number of open Connections
    MaxIdleConnection     int // number of idle connections
    ConnectionQueueLength int //buffer capacity of the queue channel
     Address               string // gRPC server address
    ConfigOptions         []grpc.DialOption // gRPC connection options
}

func NewClientPool(config *PoolConfig) *ClientPool {
    clientPool := &ClientPool{
        mu:                  sync.Mutex{},
        address:             config.Address,
        configOptions:       config.ConfigOptions,
        maxOpenConnection:   config.MaxOpenConnection,
        maxIdleConnection:   config.MaxOpenConnection,
        numOfOpenConnection: 0,
        connectionQueue:     make(chan *queueChan, config.ConnectionQueueLength),
        idleConnections:     make(map[string]*ClientCon, 0),
    }
// start goroutine
    go clientPool.handleConnectionQueue()
    return clientPool
}

For real now, that's it we have successfully created a gRPC client connection pool, to create a new pool we would simply

...
//Create a new pool config
poolConfig := &pool.PoolConfig{
        MaxOpenConnection:     10,
        MaxIdleConnection:     10,
        ConnectionQueueLength: 10000,
         Address:               ":9000",
        ConfigOptions: []grpc.DialOption{
            grpc.WithTransportCredentials(insecure.NewCredentials()),
            grpc.WithBlock(),
        },
    }

// create a new client pool
    connPool := pool.NewClientPool(poolConfig)

//get a new ClientConn
        client, err := connPool.Get()
        if err != nil {
            log.Fatalf("%s", err)
        }

// Setup a new grpc client
// send a request using the conn
        userMessage := &proto.UserMessage{
            FirstName: "Promise",
            LastName:  "Femi",
            Email:     "",
        }

        uc := proto.NewUserClient(client.Conn)
        response, err := uc.Set(context.Background(), userMessage)
        if err != nil {
            fmt.Printf("error unable to set user -- %s -- %d\n", err, connPool.GetNumberOfOpenConnections())
        } else {
            fmt.Printf("%+v -- %d \n", response, connPool.GetNumberOfOpenConnections())
        }
// remember to release connections.
        client.Release()

We are at the end of a great journey 🙇🏾. Thank you for reading my article, and you can reach out to me if you have any feedback ✌🏾.

I have already created a package that implements all the features outlined in this article @ https://github.com/promise/grpc-client-pool