Structure and Design

Timer

首先在 TCPSender 里面实现一个 Timer 来实现计时的功能,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Timer
{
private:
uint64_t initial_RTO;
uint64_t RTO;
uint64_t time_passed_;
bool is_running_;

public:
explicit Timer( uint64_t initial_RTO_ )
: initial_RTO( initial_RTO_ ), RTO( initial_RTO_ ), time_passed_( 0 ), is_running_( false ) {};
bool is_running() const { return is_running_; };
bool is_expired() const { return is_running_ && time_passed_ >= RTO; };
void start()
{
if ( is_running_ ) {
throw std::runtime_error( "Starting a started timer" );
} else {
is_running_ = true;
time_passed_ = 0;
}
};
void stop()
{
if ( !is_running_ ) {
throw std::runtime_error( "Stopping a not running timer" );
} else {
is_running_ = false;
}
};
void reset_time()
{
// set the timer forcely
is_running_ = true;
time_passed_ = 0;
};
void double_RTO() { RTO <<= 1; };
void reset_RTO() { RTO = initial_RTO; };
void tick( uint64_t ms_since_last_tick ) { time_passed_ += ms_since_last_tick; };
};

有了这个timer之后就可以很好地对后面进行计时了

TCPSender

TCPSender 里面,我设置了以下几个成员:

1
2
3
4
5
6
7
8
9
10
11
12
13
ByteStream input_;
Wrap32 isn_;
uint64_t initial_RTO_ms_;
uint64_t acked_seqno_;
uint64_t next_send_seqno_;
uint64_t retransmissions_cnt_;
uint64_t unacked_cnt_;
uint16_t window_size_;
bool SYN_sent;
bool FIN_sent;
bool need_rst;
Timer timer;
std::list<std::shared_ptr<TCPSenderMessage>> segments_sent_but_not_acked;

这里面我选择用一个 list 来储存已经发了但是没有 ackSegment ,这里的变量含义如下:

  • acked_seqno_ 表示目前已经完全 acked 了的 seqno
  • next_send_seqno_ 表示下一个没发送的 seqno
  • retransmissions_cnt_ 表示重传的所有 Bytes
  • unacked_cnt_ 表示发了还没有 ack 的计数器
  • window_size_ 记录当前可用的 window size

辅助函数

为了避免 Segment 在重发的过程中需要重复生成,这里我统一使用一个接口来创建 Segment 即:

1
2
3
4
5
std::shared_ptr<TCPSenderMessage> create_segment( Wrap32 seqno,
bool SYN,
const std::string& payload,
bool FIN,
bool RST );

这里的行为就是创建一个管理 TCPSenderMessage 的指针,并且把这个指针保存到 list 里面,这样减少了拷贝次数

push 流程

总体上就是在当前发送了还没有 acked 的数据少于当前Window size 的时候,不停地想办法从 input_ 里面拉数据,然后按照手册设置timer就行了,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
void TCPSender::push( const TransmitFunction& transmit )
{
// Your code here.
(void)transmit;
TRACE_ENTRY
uint64_t available_window_size = window_size_ == 0 ? 1 : window_size_;
while ( unacked_cnt_ < available_window_size ) {

// Prepare syn bit
bool SYN = false;
if ( !SYN_sent ) {
SYN = true;
SYN_sent = true;
unacked_cnt_++;
}

// prepare seqno
Wrap32 seqno = Wrap32::wrap( next_send_seqno_, isn_ );

// prepare payload
string payload;
uint64_t payload_size = min( TCPConfig::MAX_PAYLOAD_SIZE, available_window_size - unacked_cnt_ );
read( input_.reader(), payload_size, payload );
unacked_cnt_ += payload.size();

// prepare fin bit
bool FIN = false;
if ( !FIN_sent && input_.reader().is_finished() && unacked_cnt_ < available_window_size ) {
FIN = true;
FIN_sent = true;
unacked_cnt_++;
}

// prepare RST bit
bool RST = false;
if ( need_rst ) {
RST = true;
need_rst = false;
}
if ( input_.reader().has_error() ) {
RST = true;
}

auto msg = create_segment( seqno, SYN, payload, FIN, RST );
if ( msg->sequence_length() == 0 ) {
// don't send empty message
segments_sent_but_not_acked.pop_back();
break;
}

// start sending
transmit( *msg );
next_send_seqno_ += msg->sequence_length();
if ( !timer.is_running() ) {
// Timer case 4
timer.start();
}

if ( FIN ) {
// break on closed connection
break;
}

if ( input_.reader().bytes_buffered() == 0 ) {
// break on empty input
break;
}
}
TRACE_EXIT
return;
}

receive 流程

这里首先就是遍历整个 list 看有没有块被全部ack了,如果有就把它移除,并且按照要求重置 timer 和设置 RTO ,具体的坑下面讲

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void TCPSender::receive( const TCPReceiverMessage& msg )
{
// Your code here.
(void)msg;
TRACE_ENTRY
window_size_ = msg.window_size;
if ( msg.RST ) {
need_rst = true;
input_.set_error();
}

if ( msg.ackno.has_value() ) {
uint64_t ackno = msg.ackno.value().unwrap( isn_, next_send_seqno_ );
if ( ackno > next_send_seqno_ ) {
// This case means the receiver send the wrong message
return;
}
acked_seqno_ = ackno > acked_seqno_ ? ackno : acked_seqno_;

for ( auto it = segments_sent_but_not_acked.begin(); it != segments_sent_but_not_acked.end(); ) {
if ( ( **it ).sequence_length() + ( **it ).seqno.unwrap( isn_, next_send_seqno_ ) <= acked_seqno_ ) {
// This segment is fully acked
unacked_cnt_ -= ( **it ).sequence_length();
it = segments_sent_but_not_acked.erase( it );

// Timer case 7 (a)
timer.reset_RTO();
if ( !segments_sent_but_not_acked.empty() ) {
if ( unacked_cnt_ == 0 ) {
throw std::runtime_error( "unacked_cnt_ is 0 when the list is not empty" );
}
// Timer case 7 (b)
timer.reset_time();
}

// case 7 (c)
retransmissions_cnt_ = 0;

} else {
it++;
}
}

if ( segments_sent_but_not_acked.empty() ) {
// Timer case 5
if ( timer.is_running() ) {
timer.stop();
}
}
}

TRACE_EXIT
}

tick流程

tick 整体比较简单,就只需要实现手册上面说的就行了,主要是手册第六条

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void TCPSender::tick( uint64_t ms_since_last_tick, const TransmitFunction& transmit )
{
// Your code here.
(void)ms_since_last_tick;
(void)transmit;
(void)initial_RTO_ms_;
timer.tick( ms_since_last_tick );
bool do_retransmit = false;
if ( timer.is_expired() ) {
// Timer case 6 (a)
if ( !segments_sent_but_not_acked.empty() ) {
transmit( *segments_sent_but_not_acked.front() );
do_retransmit = true;
}

if ( window_size_ != 0 ) {
if ( do_retransmit ) {
// Timer case 6 (b) i
retransmissions_cnt_++;
}

// Timer case 6 (b) ii
timer.double_RTO();
}
// Timer case 6 (c)
timer.reset_time();
}
}

Implementation Challenges

这里主要有以下几个坑的地方:

  • 这里的RST位和流的 has_error 应该是一致的,但是这里手册没有说,导致我自己去记录了 need_rst 多此一举还麻烦
  • case 6 (a) 里面可能出现当前全部ack了刚好超时的看清了,所以这里要特判
  • 发回来的包里面的 ackno 是有可能大于当前已经发送的,此时应该什么都不做,认为这个包在传递过程中出错了,我开始是抛出异常,后面是直接设置 RST 但按照样例看应该直接return
  • 边遍历边删除的时候记得更新迭代器

Remaining Bugs

None

Experimental results and performance.

最后测试截图如下:

输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
[0/1] Re-running CMake...
-- Building in 'Debug' mode.
-- Configuring done (1.1s)
-- Generating done (0.0s)
-- Build files have been written to: /home/eric/minnow/build
[1/1] cd /home/eric/minnow/build && /usr/local/bin/ctest --output-on-failure --stop-on-failure --timeout 12 -R '^byte_stream_|^reassembler_|^wrapping|^recv|^send'
Test project /home/eric/minnow/build
Start 1: compile with bug-checkers
1/36 Test #1: compile with bug-checkers ........ Passed 6.83 sec
Start 3: byte_stream_basics
2/36 Test #3: byte_stream_basics ............... Passed 0.01 sec
Start 4: byte_stream_capacity
3/36 Test #4: byte_stream_capacity ............. Passed 0.01 sec
Start 5: byte_stream_one_write
4/36 Test #5: byte_stream_one_write ............ Passed 0.01 sec
Start 6: byte_stream_two_writes
5/36 Test #6: byte_stream_two_writes ........... Passed 0.01 sec
Start 7: byte_stream_many_writes
6/36 Test #7: byte_stream_many_writes .......... Passed 0.04 sec
Start 8: byte_stream_stress_test
7/36 Test #8: byte_stream_stress_test .......... Passed 0.08 sec
Start 9: reassembler_single
8/36 Test #9: reassembler_single ............... Passed 0.01 sec
Start 10: reassembler_cap
9/36 Test #10: reassembler_cap .................. Passed 0.01 sec
Start 11: reassembler_seq
10/36 Test #11: reassembler_seq .................. Passed 0.01 sec
Start 12: reassembler_dup
11/36 Test #12: reassembler_dup .................. Passed 0.03 sec
Start 13: reassembler_holes
12/36 Test #13: reassembler_holes ................ Passed 0.01 sec
Start 14: reassembler_overlapping
13/36 Test #14: reassembler_overlapping .......... Passed 0.01 sec
Start 15: reassembler_win
14/36 Test #15: reassembler_win .................. Passed 0.36 sec
Start 16: wrapping_integers_cmp
15/36 Test #16: wrapping_integers_cmp ............ Passed 0.01 sec
Start 17: wrapping_integers_wrap
16/36 Test #17: wrapping_integers_wrap ........... Passed 0.01 sec
Start 18: wrapping_integers_unwrap
17/36 Test #18: wrapping_integers_unwrap ......... Passed 0.01 sec
Start 19: wrapping_integers_roundtrip
18/36 Test #19: wrapping_integers_roundtrip ...... Passed 0.74 sec
Start 20: wrapping_integers_extra
19/36 Test #20: wrapping_integers_extra .......... Passed 0.15 sec
Start 21: recv_connect
20/36 Test #21: recv_connect ..................... Passed 0.01 sec
Start 22: recv_transmit
21/36 Test #22: recv_transmit .................... Passed 0.23 sec
Start 23: recv_window
22/36 Test #23: recv_window ...................... Passed 0.01 sec
Start 24: recv_reorder
23/36 Test #24: recv_reorder ..................... Passed 0.01 sec
Start 25: recv_reorder_more
24/36 Test #25: recv_reorder_more ................ Passed 0.97 sec
Start 26: recv_close
25/36 Test #26: recv_close ....................... Passed 0.01 sec
Start 27: recv_special
26/36 Test #27: recv_special ..................... Passed 0.01 sec
Start 28: send_connect
27/36 Test #28: send_connect ..................... Passed 0.01 sec
Start 29: send_transmit
28/36 Test #29: send_transmit .................... Passed 0.37 sec
Start 30: send_retx
29/36 Test #30: send_retx ........................ Passed 0.01 sec
Start 31: send_window
30/36 Test #31: send_window ...................... Passed 0.07 sec
Start 32: send_ack
31/36 Test #32: send_ack ......................... Passed 0.01 sec
Start 33: send_close
32/36 Test #33: send_close ....................... Passed 0.01 sec
Start 34: send_extra
33/36 Test #34: send_extra ....................... Passed 0.03 sec
Start 37: compile with optimization
34/36 Test #37: compile with optimization ........ Passed 1.13 sec
Start 38: byte_stream_speed_test
35/36 Test #38: byte_stream_speed_test ........... Passed 0.07 sec
Start 39: reassembler_speed_test
36/36 Test #39: reassembler_speed_test ........... Passed 0.14 sec

100% tests passed, 0 tests failed out of 36

Total Test time (real) = 11.47 sec