aboutsummaryrefslogtreecommitdiff
path: root/slowpoke.go
blob: 317819619005f52a9a0a46381e46bc32a44972d3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package slowpoke

import (
	"io"
	"net"
	"time"

	"github.com/op/go-logging"
)

type Slowpoke struct {
	conn       net.Conn
	targetAddr string
	latency    time.Duration
	isClosed   bool
	close      chan bool
	logger     *logging.Logger
}

func New(conn net.Conn, targetAddr string, latency time.Duration, logger *logging.Logger) *Slowpoke {
	return &Slowpoke{
		conn:       conn,
		targetAddr: targetAddr,
		latency:    latency,
		isClosed:   false,
		close:      make(chan bool),
		logger:     logger,
	}
}

func (s *Slowpoke) StartTransfer() {
	defer s.conn.Close()
	target, err := net.Dial("tcp", s.targetAddr)
	if err != nil {
		// TODO validate target addr before this point
		s.logger.Errorf("Failed to connect to target address %s:\n%v", s.targetAddr, err)
		return
	}
	defer target.Close()
	s.logger.Debugf("Established connection to %s", target.RemoteAddr())

	go s.transferWithLatency(s.conn, target)
	go s.transferWithLatency(target, s.conn)

	<-s.close

	s.logger.Infof("Connection between client %s and target %s closed", s.conn.RemoteAddr(), target.RemoteAddr())
}

func createBuffer() []byte {
	// TODO configurable
	return make([]byte, 1500)
}

func (s *Slowpoke) transferWithLatency(source net.Conn, target net.Conn) {
	byteBuffer := createBuffer()

	var transferDirection string
	// If the data source is the client then we are sending
	if source == s.conn {
		transferDirection = "%d bytes sent"
	} else {
		transferDirection = "%d bytes received"
	}

	for {

		bytesRead, readError := source.Read(byteBuffer)

		if bytesRead > 0 {

			if s.latency != 0 {
				s.logger.Debugf(transferDirection+" with latency of %s", bytesRead, s.latency)
				time.Sleep(s.latency)
			} else {
				s.logger.Debugf(transferDirection, bytesRead)
			}

			bytesWritten, writeError := target.Write(byteBuffer[0:bytesRead])

			if writeError != nil {
				s.handleError("Error during write: %v", writeError)
				break
			}
			if bytesRead != bytesWritten {
				s.logger.Warningf("Read %d bytes but could only write %d bytes", bytesRead, bytesWritten)
			}
		}
		if readError != nil {
			s.handleError("Error during read: %v", readError)
			break
		}
	}

}

func (s *Slowpoke) handleError(msg string, err error) {
	if s.isClosed {
		// One of the send/receive streams was already closed. Nothing to do.
		return
	}

	s.isClosed = true
	s.close <- true

	if err == io.EOF {
		// EOF is expected and not really an error
		s.logger.Debug("Received EOF")
	} else {
		s.logger.Errorf(msg, err)
	}
}