Structure and Design Timer 首先在 TCPSender
里面实现一个 Timer
来实现计时的功能,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class Timer { private : uint64_t initial_RTO; uint64_t RTO; uint64_t time_passed_; bool is_running_; public : explicit Timer ( uint64_t initial_RTO_ ) : initial_RTO( initial_RTO_ ), RTO( initial_RTO_ ), time_passed_( 0 ), is_running_( false ) { }; bool is_running () const { return is_running_; }; bool is_expired () const { return is_running_ && time_passed_ >= RTO; }; void start () { if ( is_running_ ) { throw std::runtime_error ( "Starting a started timer" ); } else { is_running_ = true ; time_passed_ = 0 ; } }; void stop () { if ( !is_running_ ) { throw std::runtime_error ( "Stopping a not running timer" ); } else { is_running_ = false ; } }; void reset_time () { is_running_ = true ; time_passed_ = 0 ; }; void double_RTO () { RTO <<= 1 ; }; void reset_RTO () { RTO = initial_RTO; }; void tick ( uint64_t ms_since_last_tick ) { time_passed_ += ms_since_last_tick; }; };
有了这个timer之后就可以很好地对后面进行计时了
TCPSender 在 TCPSender
里面,我设置了以下几个成员:
1 2 3 4 5 6 7 8 9 10 11 12 13 ByteStream input_; Wrap32 isn_; uint64_t initial_RTO_ms_;uint64_t acked_seqno_;uint64_t next_send_seqno_;uint64_t retransmissions_cnt_;uint64_t unacked_cnt_;uint16_t window_size_;bool SYN_sent;bool FIN_sent;bool need_rst;Timer timer; std::list<std::shared_ptr<TCPSenderMessage>> segments_sent_but_not_acked;
这里面我选择用一个 list
来储存已经发了但是没有 ack
的 Segment
,这里的变量含义如下:
acked_seqno_
表示目前已经完全 acked
了的 seqno
next_send_seqno_
表示下一个没发送的 seqno
retransmissions_cnt_
表示重传的所有 Bytes
unacked_cnt_
表示发了还没有 ack
的计数器
window_size_
记录当前可用的 window size
辅助函数 为了避免 Segment
在重发的过程中需要重复生成,这里我统一使用一个接口来创建 Segment
即:
1 2 3 4 5 std::shared_ptr<TCPSenderMessage> create_segment ( Wrap32 seqno, bool SYN, const std::string& payload, bool FIN, bool RST ) ;
这里的行为就是创建一个管理 TCPSenderMessage
的指针,并且把这个指针保存到 list
里面,这样减少了拷贝次数
push 流程 总体上就是在当前发送了还没有 acked
的数据少于当前Window size 的时候,不停地想办法从 input_
里面拉数据,然后按照手册设置timer就行了,具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 void TCPSender::push ( const TransmitFunction& transmit ) { (void )transmit; TRACE_ENTRY uint64_t available_window_size = window_size_ == 0 ? 1 : window_size_; while ( unacked_cnt_ < available_window_size ) { bool SYN = false ; if ( !SYN_sent ) { SYN = true ; SYN_sent = true ; unacked_cnt_++; } Wrap32 seqno = Wrap32::wrap ( next_send_seqno_, isn_ ); string payload; uint64_t payload_size = min ( TCPConfig::MAX_PAYLOAD_SIZE, available_window_size - unacked_cnt_ ); read ( input_.reader (), payload_size, payload ); unacked_cnt_ += payload.size (); bool FIN = false ; if ( !FIN_sent && input_.reader ().is_finished () && unacked_cnt_ < available_window_size ) { FIN = true ; FIN_sent = true ; unacked_cnt_++; } bool RST = false ; if ( need_rst ) { RST = true ; need_rst = false ; } if ( input_.reader ().has_error () ) { RST = true ; } auto msg = create_segment ( seqno, SYN, payload, FIN, RST ); if ( msg->sequence_length () == 0 ) { segments_sent_but_not_acked.pop_back (); break ; } transmit ( *msg ); next_send_seqno_ += msg->sequence_length (); if ( !timer.is_running () ) { timer.start (); } if ( FIN ) { break ; } if ( input_.reader ().bytes_buffered () == 0 ) { break ; } } TRACE_EXIT return ; }
receive 流程 这里首先就是遍历整个 list
看有没有块被全部ack了,如果有就把它移除,并且按照要求重置 timer
和设置 RTO
,具体的坑下面讲
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 void TCPSender::receive ( const TCPReceiverMessage& msg ) { (void )msg; TRACE_ENTRY window_size_ = msg.window_size; if ( msg.RST ) { need_rst = true ; input_.set_error (); } if ( msg.ackno.has_value () ) { uint64_t ackno = msg.ackno.value ().unwrap ( isn_, next_send_seqno_ ); if ( ackno > next_send_seqno_ ) { return ; } acked_seqno_ = ackno > acked_seqno_ ? ackno : acked_seqno_; for ( auto it = segments_sent_but_not_acked.begin (); it != segments_sent_but_not_acked.end (); ) { if ( ( **it ).sequence_length () + ( **it ).seqno.unwrap ( isn_, next_send_seqno_ ) <= acked_seqno_ ) { unacked_cnt_ -= ( **it ).sequence_length (); it = segments_sent_but_not_acked.erase ( it ); timer.reset_RTO (); if ( !segments_sent_but_not_acked.empty () ) { if ( unacked_cnt_ == 0 ) { throw std::runtime_error ( "unacked_cnt_ is 0 when the list is not empty" ); } timer.reset_time (); } retransmissions_cnt_ = 0 ; } else { it++; } } if ( segments_sent_but_not_acked.empty () ) { if ( timer.is_running () ) { timer.stop (); } } } TRACE_EXIT }
tick流程 tick
整体比较简单,就只需要实现手册上面说的就行了,主要是手册第六条
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void TCPSender::tick ( uint64_t ms_since_last_tick, const TransmitFunction& transmit ) { (void )ms_since_last_tick; (void )transmit; (void )initial_RTO_ms_; timer.tick ( ms_since_last_tick ); bool do_retransmit = false ; if ( timer.is_expired () ) { if ( !segments_sent_but_not_acked.empty () ) { transmit ( *segments_sent_but_not_acked.front () ); do_retransmit = true ; } if ( window_size_ != 0 ) { if ( do_retransmit ) { retransmissions_cnt_++; } timer.double_RTO (); } timer.reset_time (); } }
Implementation Challenges 这里主要有以下几个坑的地方:
这里的RST位和流的 has_error
应该是一致的,但是这里手册没有说,导致我自己去记录了 need_rst
多此一举还麻烦
case 6 (a) 里面可能出现当前全部ack了刚好超时的看清了,所以这里要特判
发回来的包里面的 ackno
是有可能大于当前已经发送的,此时应该什么都不做,认为这个包在传递过程中出错了,我开始是抛出异常,后面是直接设置 RST
但按照样例看应该直接return
边遍历边删除的时候记得更新迭代器
Remaining Bugs None
最后测试截图如下: 输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 [0/1] Re-running CMake... -- Building in 'Debug' mode. -- Configuring done (1.1s) -- Generating done (0.0s) -- Build files have been written to: /home/eric/minnow/build [1/1] cd /home/eric/minnow/build && /usr/local/bin/ctest --output-on-failure --stop-on-failure --timeout 12 -R '^byte_stream_|^reassembler_|^wrapping|^recv|^send' Test project /home/eric/minnow/build Start 1: compile with bug-checkers 1/36 Test Start 3: byte_stream_basics 2/36 Test Start 4: byte_stream_capacity 3/36 Test Start 5: byte_stream_one_write 4/36 Test Start 6: byte_stream_two_writes 5/36 Test Start 7: byte_stream_many_writes 6/36 Test Start 8: byte_stream_stress_test 7/36 Test Start 9: reassembler_single 8/36 Test Start 10: reassembler_cap 9/36 Test Start 11: reassembler_seq 10/36 Test Start 12: reassembler_dup 11/36 Test Start 13: reassembler_holes 12/36 Test Start 14: reassembler_overlapping 13/36 Test Start 15: reassembler_win 14/36 Test Start 16: wrapping_integers_cmp 15/36 Test Start 17: wrapping_integers_wrap 16/36 Test Start 18: wrapping_integers_unwrap 17/36 Test Start 19: wrapping_integers_roundtrip 18/36 Test Start 20: wrapping_integers_extra 19/36 Test Start 21: recv_connect 20/36 Test Start 22: recv_transmit 21/36 Test Start 23: recv_window 22/36 Test Start 24: recv_reorder 23/36 Test Start 25: recv_reorder_more 24/36 Test Start 26: recv_close 25/36 Test Start 27: recv_special 26/36 Test Start 28: send_connect 27/36 Test Start 29: send_transmit 28/36 Test Start 30: send_retx 29/36 Test Start 31: send_window 30/36 Test Start 32: send_ack 31/36 Test Start 33: send_close 32/36 Test Start 34: send_extra 33/36 Test Start 37: compile with optimization 34/36 Test Start 38: byte_stream_speed_test 35/36 Test Start 39: reassembler_speed_test 36/36 Test 100% tests passed, 0 tests failed out of 36 Total Test time (real) = 11.47 sec