Use ETCD Service Discovery in test infra

Recently I did a sharing to the team about ETCD service discovery. It also can be used to implement your test infra to hide test server changes from your infra.

Some slides from my presentation

Screenshot 2019-11-25 at 9.16.55 AM

Screenshot 2019-11-25 at 9.17.04 AM

Screenshot 2019-11-25 at 9.17.12 AM

Screenshot 2019-11-25 at 9.17.19 AM

The below is the architecture (I also used MySQL, Kafka and ES for other task requirements. You can ignore it)

Screenshot 2019-11-25 at 9.35.22 AM

How to utilise ETCD to hide test server changes from your infra

Think about a test infra requirement:

  • You are doing API automation testing. You not only have to verify the response but also want to check if the *.log in server contain correct logging info. Say, if it fails, response will be something like 5xx error code from server which is generic, you want to confirm it contains something like ‘balabala duplicated. Transaction xxx’ – so you can ensure it is really failing due to that specific error
  • Your servers are deploying in a docker/micro services environment which IPs may keep changed.
    • You may think can ask IT to configure DNS for it. But think about it, that you have more than two servers which serving the same APIs.

In this scenario you can use ETCD to achieve the goal:

  • Firstly, start a very basic TCP server in your every server instance. The TCP server will listen to request which contains finding-text and will return the log entry which contains the finding-text. The logic is very simple, it will accept a find-text, and grep the file and return the log entry to client.
    • func startFileParseServer(port string, file string) {
         ln, err := net.Listen("tcp", ":" + port)
         if err != nil {
            eChan <- err
         }
         defer ln.Close()
         for {
            conn, err := ln.Accept()
            if err != nil {
               eChan <- err
            }
      
            buf := make([]byte, 1024)
            // Read the incoming connection into the buffer.
            _, err = conn.Read(buf)
            if err != nil {
               log.Println("Error reading:", err.Error())
            }
      
            buf = getValidByte([]byte(buf))
      
            log.Println("#" + string(buf) + "#")
            readFile, err := os.Open(file)
            if err != nil {
               log.Fatalf("failed to open file: %s", err)
            }
            fileScanner := bufio.NewScanner(readFile)
            fileScanner.Split(bufio.ScanLines)
            var fileTextLines []string
            for fileScanner.Scan() {
               fileTextLines = append(fileTextLines, fileScanner.Text())
            }
            _ = readFile.Close()
            for _, l := range fileTextLines {
               log.Println(l)
               if strings.Contains(l, string(buf)) {
                  io.WriteString(conn, l)
               }
            }
            // io.WriteString(conn, "")
      
            conn.Close()
         }
      }
      
      // https://studygolang.com/articles/2911
      func getValidByte(src []byte) []byte {
         var str_buf []byte
         for _, v := range src {
            if v != 0 {
               str_buf = append(str_buf, v)
            }
         }
         return str_buf
      }
  • Secondly, make your TCP servers registered to ETCD
    • package serviceregistry
      
      import (
         "context"
         "crypto/rand"
         "errors"
         "go.etcd.io/etcd/clientv3"
         "go.etcd.io/etcd/mvcc/mvccpb"
         "log"
         "math/big"
         errUtil "xxx/component/err"
         configFactory "xxx/config"
         "time"
      )
      
      var (
         conf              = configFactory.Config()
         tcpUploadServices = make(map[string]string)
         tcpSearchServices = make(map[string]string)
         warn              = "[WARN]SvcR"
         cli               *clientv3.Client
      )
      
      // cli *clientv3.Client, keyPrefix string
      func init() {
         log.Println("Initializing Service Registry...")
         var err error
         if cli, err = clientv3.New(clientv3.Config{
            Endpoints:   []string{conf.Etcd.Url},
            DialTimeout: 5 * time.Second,
         }); err != nil {
            log.Fatal(err)
         }
         initServices(conf.Etcd.Etcd_tcp_upload_prefix_tcp)
         initServices(conf.Etcd.Etcd_tcp_search_prefix_tcp)
      }
      
      func initServices(prefix string) {
         resp, err := cli.Get(context.Background(), prefix, clientv3.WithPrefix())
         if err != nil {
            log.Println(err)
         }
         for _, kv := range resp.Kvs {
            switch prefix {
            case conf.Etcd.Etcd_tcp_upload_prefix_tcp:
               tcpUploadServices[string(kv.Key)] = string(kv.Value)
            case conf.Etcd.Etcd_tcp_search_prefix_tcp:
               tcpSearchServices[string(kv.Key)] = string(kv.Value)
            }
            log.Println("*** Available Service: " + string(kv.Key) + string(kv.Value))
         }
      }
      
      func Watch(prefix string) {
         defer func() {
            if err := cli.Close(); err != nil {
               log.Printf("%s: Somehow fail to close then etcd client v3 conn handler. Ignoring...\n", warn)
            }
         }()
         log.Printf("Service Registry is listening for services %s.* updates...", prefix)
         rch := cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
         for n := range rch {
            for _, ev := range n.Events {
               switch ev.Type {
               case mvccpb.PUT:
                  switch prefix {
                  case conf.Etcd.Etcd_tcp_upload_prefix_tcp:
                     tcpUploadServices[string(ev.Kv.Key)] = string(ev.Kv.Value)
                  case conf.Etcd.Etcd_tcp_search_prefix_tcp:
                     tcpSearchServices[string(ev.Kv.Key)] = string(ev.Kv.Value)
                  }
                  log.Println("*** Service is PUT: " + string(ev.Kv.Key) + "#" + string(ev.Kv.Value))
               case mvccpb.DELETE:
                  switch prefix {
                  case conf.Etcd.Etcd_tcp_upload_prefix_tcp:
                     delete(tcpUploadServices, string(ev.Kv.Key))
                  case conf.Etcd.Etcd_tcp_search_prefix_tcp:
                     delete(tcpSearchServices, string(ev.Kv.Key))
                  }
                  log.Println("*** Service is DELETED: " + string(ev.Kv.Key) + "#" + string(ev.Kv.Value))
               }
            }
         }
      }
      
      func Resolver(prefix string) (string, error) {
         if prefix == conf.Etcd.Etcd_tcp_upload_prefix_tcp {
            return fetchServer(tcpUploadServices)
         }
         if prefix == conf.Etcd.Etcd_tcp_search_prefix_tcp {
            return fetchServer(tcpSearchServices)
         }
         return "", errors.New(errUtil.Lookup(errUtil.SERVER_ERR_UNSUPPORTED_OPERATION))
      }
      
      func fetchServer(services map[string]string) (string, error) {
         // TODO: can implement a real strategy here, say a weighted round robin algorithm
         max := big.NewInt(int64(len(services)))
         if max.Int64() == int64(0) {
            return "", errors.New(errUtil.Lookup(errUtil.SERVER_ERR_SERVER_UNAVAILABLE))
         }
         i, _ := rand.Int(rand.Reader, max)
         r := i.Int64()
         var count = int64(0)
         var resolve = ""
         for k, v := range services {
            if count != r {
               count = count + 1
               continue
            } else {
               log.Printf("*** Picked up [%s]:[%s] to serve", k, v)
               resolve = v
            }
         }
         return resolve, nil
      }
  • Thirdly, write a simple Load Balance (LB) which will provide a single endpoint to your test infra for consumption
    • package main
      
      import (
         "io"
         "log"
         "net"
         configFactory "xxxx/config"
         "xxxxx/serviceregistry"
         "sync"
         "time"
      )
      
      var (
         conf = configFactory.Config()
         warn = "[WARN]LB"
      )
      
      func handleConnection(in net.Conn, prefix string) {
      
         dest, err := serviceregistry.Resolver(prefix)
         if err != nil {
            log.Println(err.Error())
            _ = in.Close() // inform client
            return
         }
         log.Printf("Connecting to service: %s...\n", dest)
      
         out, err := net.Dial("tcp", dest)
         defer func() {
            err1 := in.Close();
            err2 := out.Close();
            if err1 != nil || err2 != nil {
               log.Printf("%s: unable to close tcp for %s. Ignoring...\n", warn, dest)
            } else {
               log.Printf("Closed tcp for %s.\n", dest)
            }
            // try to recover from it - when there are many requests the io.Copy randomly will crash due to NPE
            if r := recover(); r != nil {
               log.Printf("%s: while tried to recover from: %s", warn, r)
            }
         }()
      
         eChan := make(chan error, 2)
      
         cp := func(dst io.Writer, src io.Reader) {
            _, err := io.Copy(dst, src)
            eChan <- err
         }
      
         go cp(out, in)
         go cp(in, out)
      
         err = <-eChan
         if err != nil && err != io.EOF {
            log.Printf("%s: %s\n", warn, err)
            return
         }
      }
      
      func init() {
         go serviceregistry.Watch(conf.Etcd.Etcd_tcp_upload_prefix_tcp)
         go serviceregistry.Watch(conf.Etcd.Etcd_tcp_search_prefix_tcp)
      
         // await service registry initialization
         time.Sleep(2 * time.Second)
      }
      
      func main() {
         messages := make(chan int)
         var wg sync.WaitGroup
         wg.Add(2)
      
         go func() {
            loadBalance(conf.Upload.Protocol, conf.Upload.Host, conf.Upload.Port, conf.Etcd.Etcd_tcp_upload_prefix_tcp)
            messages <- 1
         }()
      
         go func() {
            loadBalance(conf.Tcpsearch.Protocol, conf.Tcpsearch.Host, conf.Tcpsearch.Port, conf.Etcd.Etcd_tcp_search_prefix_tcp)
            messages <- 2
         }()
      
         wg.Wait()
      
      }
      
      func loadBalance(Protocol string, HOST string, PORT string, prefix string) {
         l, err := net.Listen(Protocol, HOST+":"+PORT)
         if err != nil {
            log.Println(err)
            return
         }
         defer func() {
            if err := l.Close(); err != nil {
               log.Printf("%s: unable to shutdown server. Ignoring...\n", warn)
            }
         }()
         log.Printf("Load Balancer is serving services %s.* updates...", prefix)
         for {
            c, err := l.Accept()
            if err != nil {
               log.Println(err)
               return
            }
            go handleConnection(c, prefix)
         }
      }
  • The last, in your test infra/test code, can just connect to LB to grep logs from remote servers
    • func GetDeductServerInfoLog(finder string) string {
         // connect to this socket
         conn, _ := net.Dial("tcp", <YOUR_LB_ENDPOINT> + ":<PORT>")
         conn.Write([]byte(finder))
         message, _ := bufio.NewReader(conn).ReadString('\n')
         log.Print("GetDeductServerInfoLog: " + message)
         defer conn.Close()
         return message
      }