aboutsummaryrefslogtreecommitdiff
path: root/slowpoke.go
blob: fb491a15369ae169fde1c97b852c5e380aba56af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main

import (
	"io"
	"net"
	"time"

	"github.com/op/go-logging"
)

type Slowpoke struct {
	conn       net.Conn
	targetAddr *net.TCPAddr
	latency    time.Duration
	bufferSize int
	isClosed   bool
	close      chan bool
	logger     *logging.Logger
}

func NewSlowpoke(conn net.Conn, targetAddr *net.TCPAddr, latency time.Duration, bufferSize int, logger *logging.Logger) *Slowpoke {
	return &Slowpoke{
		conn:       conn,
		targetAddr: targetAddr,
		latency:    latency,
		bufferSize: bufferSize,
		isClosed:   false,
		close:      make(chan bool),
		logger:     logger,
	}
}

func (s *Slowpoke) StartTransfer() {
	defer s.conn.Close()
	target, err := net.DialTCP("tcp", nil, s.targetAddr)
	if err != nil {
		s.logger.Errorf("Failed to connect to target address %s:\n%v", s.targetAddr, err)
		return
	}
	defer target.Close()
	s.logger.Debugf("Established connection to %s", target.RemoteAddr())

	go s.transferWithLatency(s.conn, target)
	go s.transferWithLatency(target, s.conn)

	<-s.close

	s.logger.Infof("Connection between client %s and target %s closed", s.conn.RemoteAddr(), target.RemoteAddr())
}

func (s *Slowpoke) createBuffer() []byte {
	return make([]byte, s.bufferSize)
}

func (s *Slowpoke) transferWithLatency(source net.Conn, target net.Conn) {
	byteBuffer := s.createBuffer()

	for {

		bytesRead, readError := source.Read(byteBuffer)

		if bytesRead > 0 {

			s.logger.Debugf("Transferring %d bytes from %s to %s with %s added latency", bytesRead, source.RemoteAddr(), target.RemoteAddr(), s.latency)

			if s.latency != 0 {
				time.Sleep(s.latency)
			}

			bytesWritten, writeError := target.Write(byteBuffer[0:bytesRead])

			if writeError != nil {
				s.handleError("Error during write: %v", writeError)
				break
			}
			if bytesRead != bytesWritten {
				s.logger.Warningf("Read %d bytes but could only write %d bytes", bytesRead, bytesWritten)
			}
		}
		if readError != nil {
			s.handleError("Error during read: %v", readError)
			break
		}
	}

}

func (s *Slowpoke) handleError(msg string, err error) {
	if s.isClosed {
		// One of the send/receive streams was already closed. Nothing to do.
		return
	}

	s.isClosed = true
	s.close <- true

	if err == io.EOF {
		// EOF is expected and not really an error
		s.logger.Debug("Received EOF")
	} else {
		s.logger.Errorf(msg, err)
	}
}