From 1509996c093e41a76be6ad5fd9429f100d65f60c Mon Sep 17 00:00:00 2001 From: James Barnett Date: Sun, 23 Dec 2018 20:30:06 +0000 Subject: Add basic TCP proxy with injected latency --- README.md | 22 +++++++++++- cmd/main.go | 81 +++++++++++++++++++++++++++++++++++++++++++ slowpoke.go | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 214 insertions(+), 1 deletion(-) create mode 100644 cmd/main.go create mode 100644 slowpoke.go diff --git a/README.md b/README.md index 2d93ea1..41fdad3 100644 --- a/README.md +++ b/README.md @@ -1 +1,21 @@ -# slowpoke \ No newline at end of file +# Slowpoke +Slowpoke is a simple TCP proxy which can introduce a configurable latency between packet transfers. +This allows you to test and profile how your application behaves with different levels of latency between services such as databases or caches. + +# Running +TODO once binaries built + +# Use cases +The primary use case for this tool is to simulate running you application on a slower network than the one you develop on. If you know your production environment has non-trivial latency on TCP connections, it's useful to run with these latencies on your development environment to ensure your application is still performant. + +For example, in your production environment database connections may have to go through a VPN to cross different regions/AZs, which adds 5ms of latency to every packet. +By using Slowpoke you can simulate this 5ms overhead on your development env which would otherwise probably have near zero latency, especially if you run you DB on the same machine as your app connected via the loopback interface. +This may highlight areas of your application which perform poorly with this limitation, which otherwise would not have been apparent until deployed in production. +E.g. you may have a screen which needs to deserialise 200 DB entities and currently fetches each entity in it's own query, rather than fetching them all in bulk. On your development env without the network latency this may have no noticeable performance impact, but with 5ms of packet latency that's now ~2 seconds added to your page load time and highlights something that needs to be fixed. + + +# Note on packets +In order to keep Slowpoke simple and portable it doesn't actually add latency between each _packet_. Instead, it adds latency between writes of a configurably sized byte buffer. By default this buffer size is 1500 to match the standard ethernet MTU size, but can be changed depending on your use case. + +Incidentally this also lets you simulate the MTU size change between running on the loopback interface (localhost) usually at the maximum 65536, and going over ethernet at 1500, which is sometimes another subtle difference between dev and prod deployments of applications. + diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..92ec8bd --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,81 @@ +package main + +import ( + "fmt" + "net" + "os" + "time" + + "github.com/jessevdk/go-flags" + "github.com/op/go-logging" + + "github.com/jamesbarnett91/slowpoke" +) + +var log = logging.MustGetLogger("main") + +var opts struct { + TargetAddress string `short:"t" long:"target" description:"TODO" required:"true"` + Port int `short:"p" long:"port" description:"TODO" required:"true"` + Verbose []bool `short:"v" long:"verbose" description:"TODO"` + Latency time.Duration `short:"l" long:"latency" default:"0ms" description:"TODO"` +} + +func init() { + _, err := flags.Parse(&opts) + if err != nil { + log.Error(err) + os.Exit(1) + } + + configureLogger() +} + +func configureLogger() { + logBackend := logging.NewLogBackend(os.Stderr, "", 0) + logFormat := logging.MustStringFormatter(`%{color}%{time:15:04:05.000} [%{level:.3s}]%{color:reset} - %{message}`) + logger := logging.AddModuleLevel(logging.NewBackendFormatter(logBackend, logFormat)) + + if len(opts.Verbose) == 0 { + logger.SetLevel(logging.WARNING, "") + } else if len(opts.Verbose) == 1 { + logger.SetLevel(logging.INFO, "") + } else { + logger.SetLevel(logging.DEBUG, "") + } + + logging.SetBackend(logger) +} + +func main() { + log.Infof("Proxying from :%d to %s with latency of %s", opts.Port, opts.TargetAddress, opts.Latency) + + listener := getListener(opts.Port) + waitForClients(listener) +} + +func getListener(port int) net.Listener { + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + log.Errorf("Failed to start listening on port %d:\n%v", port, err) + os.Exit(1) + } + log.Debugf("Waiting for connections on port %d", port) + + return listener +} + +func waitForClients(listener net.Listener) { + for { + client, err := listener.Accept() + if err != nil { + log.Errorf("Failed to accept connection:\n%v", err) + break + } + log.Infof("Accepted connection from client %v\n", client.RemoteAddr()) + + s := slowpoke.New(client, opts.TargetAddress, opts.Latency, log) + + go s.StartTransfer() + } +} diff --git a/slowpoke.go b/slowpoke.go new file mode 100644 index 0000000..3178196 --- /dev/null +++ b/slowpoke.go @@ -0,0 +1,112 @@ +package slowpoke + +import ( + "io" + "net" + "time" + + "github.com/op/go-logging" +) + +type Slowpoke struct { + conn net.Conn + targetAddr string + latency time.Duration + isClosed bool + close chan bool + logger *logging.Logger +} + +func New(conn net.Conn, targetAddr string, latency time.Duration, logger *logging.Logger) *Slowpoke { + return &Slowpoke{ + conn: conn, + targetAddr: targetAddr, + latency: latency, + isClosed: false, + close: make(chan bool), + logger: logger, + } +} + +func (s *Slowpoke) StartTransfer() { + defer s.conn.Close() + target, err := net.Dial("tcp", s.targetAddr) + if err != nil { + // TODO validate target addr before this point + s.logger.Errorf("Failed to connect to target address %s:\n%v", s.targetAddr, err) + return + } + defer target.Close() + s.logger.Debugf("Established connection to %s", target.RemoteAddr()) + + go s.transferWithLatency(s.conn, target) + go s.transferWithLatency(target, s.conn) + + <-s.close + + s.logger.Infof("Connection between client %s and target %s closed", s.conn.RemoteAddr(), target.RemoteAddr()) +} + +func createBuffer() []byte { + // TODO configurable + return make([]byte, 1500) +} + +func (s *Slowpoke) transferWithLatency(source net.Conn, target net.Conn) { + byteBuffer := createBuffer() + + var transferDirection string + // If the data source is the client then we are sending + if source == s.conn { + transferDirection = "%d bytes sent" + } else { + transferDirection = "%d bytes received" + } + + for { + + bytesRead, readError := source.Read(byteBuffer) + + if bytesRead > 0 { + + if s.latency != 0 { + s.logger.Debugf(transferDirection+" with latency of %s", bytesRead, s.latency) + time.Sleep(s.latency) + } else { + s.logger.Debugf(transferDirection, bytesRead) + } + + bytesWritten, writeError := target.Write(byteBuffer[0:bytesRead]) + + if writeError != nil { + s.handleError("Error during write: %v", writeError) + break + } + if bytesRead != bytesWritten { + s.logger.Warningf("Read %d bytes but could only write %d bytes", bytesRead, bytesWritten) + } + } + if readError != nil { + s.handleError("Error during read: %v", readError) + break + } + } + +} + +func (s *Slowpoke) handleError(msg string, err error) { + if s.isClosed { + // One of the send/receive streams was already closed. Nothing to do. + return + } + + s.isClosed = true + s.close <- true + + if err == io.EOF { + // EOF is expected and not really an error + s.logger.Debug("Received EOF") + } else { + s.logger.Errorf(msg, err) + } +} -- cgit v1.2.3