diff options
Diffstat (limited to 'src/network/connection.h')
-rw-r--r-- | src/network/connection.h | 392 |
1 files changed, 185 insertions, 207 deletions
diff --git a/src/network/connection.h b/src/network/connection.h index 9febfb2b7..47b0805ce 100644 --- a/src/network/connection.h +++ b/src/network/connection.h @@ -41,8 +41,7 @@ namespace con class ConnectionReceiveThread; class ConnectionSendThread; -typedef enum MTProtocols -{ +typedef enum MTProtocols { MTP_PRIMARY, MTP_UDP, MTP_MINETEST_RELIABLE_UDP @@ -54,43 +53,49 @@ typedef enum MTProtocols inline bool seqnum_higher(u16 totest, u16 base) { - if (totest > base) { - if ((totest - base) > (SEQNUM_MAX / 2)) + if (totest > base) + { + if ((totest - base) > (SEQNUM_MAX/2)) return false; return true; } - if ((base - totest) > (SEQNUM_MAX / 2)) + if ((base - totest) > (SEQNUM_MAX/2)) return true; return false; } -inline bool seqnum_in_window(u16 seqnum, u16 next, u16 window_size) +inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size) { u16 window_start = next; - u16 window_end = (next + window_size) % (SEQNUM_MAX + 1); + u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1); if (window_start < window_end) { return ((seqnum >= window_start) && (seqnum < window_end)); } + return ((seqnum < window_end) || (seqnum >= window_start)); } static inline float CALC_DTIME(u64 lasttime, u64 curtime) { - float value = (curtime - lasttime) / 1000.0; - return MYMAX(MYMIN(value, 0.1), 0.0); + float value = ( curtime - lasttime) / 1000.0; + return MYMAX(MYMIN(value,0.1),0.0); } struct BufferedPacket { - BufferedPacket(u8 *a_data, u32 a_size) : data(a_data, a_size) {} - BufferedPacket(u32 a_size) : data(a_size) {} - Buffer<u8> data; // Data of the packet, including headers - float time = 0.0f; // Seconds from buffering the packet or re-sending + BufferedPacket(u8 *a_data, u32 a_size): + data(a_data, a_size) + {} + BufferedPacket(u32 a_size): + data(a_size) + {} + Buffer<u8> data; // Data of the packet, including headers + float time = 0.0f; // Seconds from buffering the packet or re-sending float totaltime = 0.0f; // Seconds from buffering the packet u64 absolute_send_time = -1; Address address; // Sender or destination @@ -98,8 +103,8 @@ struct BufferedPacket }; // This adds the base headers to the data and makes a packet out of it -BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data, u32 protocol_id, - session_t sender_peer_id, u8 channel); +BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data, + u32 protocol_id, session_t sender_peer_id, u8 channel); // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet // Increments split_seqnum if a split packet is made @@ -111,7 +116,8 @@ SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum); struct IncomingSplitPacket { - IncomingSplitPacket(u32 cc, bool r) : chunk_count(cc), reliable(r) {} + IncomingSplitPacket(u32 cc, bool r): + chunk_count(cc), reliable(r) {} IncomingSplitPacket() = delete; @@ -119,7 +125,10 @@ struct IncomingSplitPacket u32 chunk_count; bool reliable; // If true, isn't deleted on timeout - bool allReceived() const { return (chunks.size() == chunk_count); } + bool allReceived() const + { + return (chunks.size() == chunk_count); + } bool insert(u32 chunk_num, SharedBuffer<u8> &chunkdata); SharedBuffer<u8> reassemble(); @@ -210,8 +219,7 @@ with a buffer in the receiving and transmitting end. #define RELIABLE_HEADER_SIZE 3 #define SEQNUM_INITIAL 65500 -enum PacketType : u8 -{ +enum PacketType: u8 { PACKET_TYPE_CONTROL = 0, PACKET_TYPE_ORIGINAL = 1, PACKET_TYPE_SPLIT = 2, @@ -230,20 +238,22 @@ class ReliablePacketBuffer public: ReliablePacketBuffer() = default; - bool getFirstSeqnum(u16 &result); + bool getFirstSeqnum(u16& result); BufferedPacket popFirst(); BufferedPacket popSeqnum(u16 seqnum); void insert(BufferedPacket &p, u16 next_expected); void incrementTimeouts(float dtime); - std::list<BufferedPacket> getTimedOuts(float timeout, unsigned int max_packets); + std::list<BufferedPacket> getTimedOuts(float timeout, + unsigned int max_packets); void print(); bool empty(); RPBSearchResult notFound(); u32 size(); + private: RPBSearchResult findPacket(u16 seqnum); // does not perform locking @@ -272,7 +282,7 @@ public: private: // Key is seqnum - std::map<u16, IncomingSplitPacket *> m_buf; + std::map<u16, IncomingSplitPacket*> m_buf; std::mutex m_map_mutex; }; @@ -286,16 +296,17 @@ struct OutgoingPacket bool ack; OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_, - bool reliable_, bool ack_ = false) : - peer_id(peer_id_), - channelnum(channelnum_), data(data_), reliable(reliable_), - ack(ack_) + bool reliable_,bool ack_=false): + peer_id(peer_id_), + channelnum(channelnum_), + data(data_), + reliable(reliable_), + ack(ack_) { } }; -enum ConnectionCommandType -{ +enum ConnectionCommandType{ CONNCMD_NONE, CONNCMD_SERVE, CONNCMD_CONNECT, @@ -341,7 +352,10 @@ struct ConnectionCommand type = CONNCMD_CONNECT; address = address_; } - void disconnect() { type = CONNCMD_DISCONNECT; } + void disconnect() + { + type = CONNCMD_DISCONNECT; + } void disconnect_peer(session_t peer_id_) { type = CONNCMD_DISCONNECT_PEER; @@ -387,7 +401,7 @@ public: u16 readNextIncomingSeqNum(); u16 incNextIncomingSeqNum(); - u16 getOutgoingSequenceNumber(bool &successfull); + u16 getOutgoingSequenceNumber(bool& successfull); u16 readOutgoingSequenceNumber(); bool putBackSequenceNumber(u16); @@ -401,10 +415,10 @@ public: // re-send them if no ACK is received ReliablePacketBuffer outgoing_reliables_sent; - // queued reliable packets + //queued reliable packets std::queue<BufferedPacket> queued_reliables; - // queue commands prior splitting to packets + //queue commands prior splitting to packets std::deque<ConnectionCommand> queued_commands; IncomingSplitBuffer incoming_splits; @@ -414,65 +428,37 @@ public: void UpdatePacketLossCounter(unsigned int count); void UpdatePacketTooLateCounter(); - void UpdateBytesSent(unsigned int bytes, unsigned int packages = 1); + void UpdateBytesSent(unsigned int bytes,unsigned int packages=1); void UpdateBytesLost(unsigned int bytes); void UpdateBytesReceived(unsigned int bytes); void UpdateTimers(float dtime); const float getCurrentDownloadRateKB() - { - MutexAutoLock lock(m_internal_mutex); - return cur_kbps; - }; + { MutexAutoLock lock(m_internal_mutex); return cur_kbps; }; const float getMaxDownloadRateKB() - { - MutexAutoLock lock(m_internal_mutex); - return max_kbps; - }; + { MutexAutoLock lock(m_internal_mutex); return max_kbps; }; const float getCurrentLossRateKB() - { - MutexAutoLock lock(m_internal_mutex); - return cur_kbps_lost; - }; + { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; }; const float getMaxLossRateKB() - { - MutexAutoLock lock(m_internal_mutex); - return max_kbps_lost; - }; + { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; }; const float getCurrentIncomingRateKB() - { - MutexAutoLock lock(m_internal_mutex); - return cur_incoming_kbps; - }; + { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; }; const float getMaxIncomingRateKB() - { - MutexAutoLock lock(m_internal_mutex); - return max_incoming_kbps; - }; + { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; }; const float getAvgDownloadRateKB() - { - MutexAutoLock lock(m_internal_mutex); - return avg_kbps; - }; + { MutexAutoLock lock(m_internal_mutex); return avg_kbps; }; const float getAvgLossRateKB() - { - MutexAutoLock lock(m_internal_mutex); - return avg_kbps_lost; - }; + { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; }; const float getAvgIncomingRateKB() - { - MutexAutoLock lock(m_internal_mutex); - return avg_incoming_kbps; - }; + { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; }; const unsigned int getWindowSize() const { return window_size; }; void setWindowSize(unsigned int size) { window_size = size; }; - private: std::mutex m_internal_mutex; int window_size = MIN_RELIABLE_WINDOW_SIZE; @@ -510,14 +496,14 @@ class PeerHelper { public: PeerHelper() = default; - PeerHelper(Peer *peer); + PeerHelper(Peer* peer); ~PeerHelper(); - PeerHelper &operator=(Peer *peer); - Peer *operator->() const; - bool operator!(); - Peer *operator&() const; - bool operator!=(void *ptr); + PeerHelper& operator=(Peer* peer); + Peer* operator->() const; + bool operator!(); + Peer* operator&() const; + bool operator!=(void* ptr); private: Peer *m_peer = nullptr; @@ -525,8 +511,7 @@ private: class Connection; -typedef enum -{ +typedef enum { CUR_DL_RATE, AVG_DL_RATE, CUR_INC_RATE, @@ -535,144 +520,140 @@ typedef enum AVG_LOSS_RATE, } rate_stat_type; -class Peer -{ -public: - friend class PeerHelper; - - Peer(Address address_, u16 id_, Connection *connection) : - id(id_), m_connection(connection), address(address_), - m_last_timeout_check(porting::getTimeMs()){}; - - virtual ~Peer() - { - MutexAutoLock usage_lock(m_exclusive_access_mutex); - FATAL_ERROR_IF(m_usage != 0, "Reference counting failure"); - }; - - // Unique id of the peer - u16 id; - - void Drop(); - - virtual void PutReliableSendCommand( - ConnectionCommand &c, unsigned int max_packet_size){}; - - virtual bool getAddress(MTProtocols type, Address &toset) = 0; - - bool isPendingDeletion() - { - MutexAutoLock lock(m_exclusive_access_mutex); - return m_pending_deletion; - }; - - void ResetTimeout() - { - MutexAutoLock lock(m_exclusive_access_mutex); - m_timeout_counter = 0.0; - }; - - bool isTimedOut(float timeout); - - unsigned int m_increment_packets_remaining = 0; - - virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; }; - virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum){}; - virtual SharedBuffer<u8> addSplitPacket( - u8 channel, const BufferedPacket &toadd, bool reliable) - { - errorstream << "Peer::addSplitPacket called," - << " this is supposed to be never called!" << std::endl; - return SharedBuffer<u8>(0); - }; - - virtual bool Ping(float dtime, SharedBuffer<u8> &data) { return false; }; - - virtual float getStat(rtt_stat_type type) const - { - switch (type) { - case MIN_RTT: - return m_rtt.min_rtt; - case MAX_RTT: - return m_rtt.max_rtt; - case AVG_RTT: - return m_rtt.avg_rtt; - case MIN_JITTER: - return m_rtt.jitter_min; - case MAX_JITTER: - return m_rtt.jitter_max; - case AVG_JITTER: - return m_rtt.jitter_avg; +class Peer { + public: + friend class PeerHelper; + + Peer(Address address_,u16 id_,Connection* connection) : + id(id_), + m_connection(connection), + address(address_), + m_last_timeout_check(porting::getTimeMs()) + { + }; + + virtual ~Peer() { + MutexAutoLock usage_lock(m_exclusive_access_mutex); + FATAL_ERROR_IF(m_usage != 0, "Reference counting failure"); + }; + + // Unique id of the peer + u16 id; + + void Drop(); + + virtual void PutReliableSendCommand(ConnectionCommand &c, + unsigned int max_packet_size) {}; + + virtual bool getAddress(MTProtocols type, Address& toset) = 0; + + bool isPendingDeletion() + { MutexAutoLock lock(m_exclusive_access_mutex); return m_pending_deletion; }; + + void ResetTimeout() + {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; }; + + bool isTimedOut(float timeout); + + unsigned int m_increment_packets_remaining = 0; + + virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; }; + virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {}; + virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd, + bool reliable) + { + errorstream << "Peer::addSplitPacket called," + << " this is supposed to be never called!" << std::endl; + return SharedBuffer<u8>(0); + }; + + virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; }; + + virtual float getStat(rtt_stat_type type) const { + switch (type) { + case MIN_RTT: + return m_rtt.min_rtt; + case MAX_RTT: + return m_rtt.max_rtt; + case AVG_RTT: + return m_rtt.avg_rtt; + case MIN_JITTER: + return m_rtt.jitter_min; + case MAX_JITTER: + return m_rtt.jitter_max; + case AVG_JITTER: + return m_rtt.jitter_avg; + } + return -1; } - return -1; - } + protected: + virtual void reportRTT(float rtt) {}; -protected: - virtual void reportRTT(float rtt){}; + void RTTStatistics(float rtt, + const std::string &profiler_id = "", + unsigned int num_samples = 1000); - void RTTStatistics(float rtt, const std::string &profiler_id = "", - unsigned int num_samples = 1000); + bool IncUseCount(); + void DecUseCount(); - bool IncUseCount(); - void DecUseCount(); + std::mutex m_exclusive_access_mutex; - std::mutex m_exclusive_access_mutex; + bool m_pending_deletion = false; - bool m_pending_deletion = false; + Connection* m_connection; - Connection *m_connection; + // Address of the peer + Address address; - // Address of the peer - Address address; - - // Ping timer - float m_ping_timer = 0.0f; + // Ping timer + float m_ping_timer = 0.0f; + private: -private: - struct rttstats - { - float jitter_min = FLT_MAX; - float jitter_max = 0.0f; - float jitter_avg = -1.0f; - float min_rtt = FLT_MAX; - float max_rtt = 0.0f; - float avg_rtt = -1.0f; + struct rttstats { + float jitter_min = FLT_MAX; + float jitter_max = 0.0f; + float jitter_avg = -1.0f; + float min_rtt = FLT_MAX; + float max_rtt = 0.0f; + float avg_rtt = -1.0f; - rttstats() = default; - }; + rttstats() = default; + }; - rttstats m_rtt; - float m_last_rtt = -1.0f; + rttstats m_rtt; + float m_last_rtt = -1.0f; - // current usage count - unsigned int m_usage = 0; + // current usage count + unsigned int m_usage = 0; - // Seconds from last receive - float m_timeout_counter = 0.0f; + // Seconds from last receive + float m_timeout_counter = 0.0f; - u64 m_last_timeout_check; + u64 m_last_timeout_check; }; class UDPPeer : public Peer { public: + friend class PeerHelper; friend class ConnectionReceiveThread; friend class ConnectionSendThread; friend class Connection; - UDPPeer(u16 a_id, Address a_address, Connection *connection); + UDPPeer(u16 a_id, Address a_address, Connection* connection); virtual ~UDPPeer() = default; - void PutReliableSendCommand(ConnectionCommand &c, unsigned int max_packet_size); + void PutReliableSendCommand(ConnectionCommand &c, + unsigned int max_packet_size); - bool getAddress(MTProtocols type, Address &toset); + bool getAddress(MTProtocols type, Address& toset); u16 getNextSplitSequenceNumber(u8 channel); void setNextSplitSequenceNumber(u8 channel, u16 seqnum); - SharedBuffer<u8> addSplitPacket( - u8 channel, const BufferedPacket &toadd, bool reliable); + SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd, + bool reliable); protected: /* @@ -681,39 +662,34 @@ protected: */ void reportRTT(float rtt); - void RunCommandQueues(unsigned int max_packet_size, unsigned int maxcommands, - unsigned int maxtransfer); + void RunCommandQueues( + unsigned int max_packet_size, + unsigned int maxcommands, + unsigned int maxtransfer); float getResendTimeout() - { - MutexAutoLock lock(m_exclusive_access_mutex); - return resend_timeout; - } + { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; } void setResendTimeout(float timeout) - { - MutexAutoLock lock(m_exclusive_access_mutex); - resend_timeout = timeout; - } - bool Ping(float dtime, SharedBuffer<u8> &data); + { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; } + bool Ping(float dtime,SharedBuffer<u8>& data); Channel channels[CHANNEL_COUNT]; bool m_pending_disconnect = false; - private: // This is changed dynamically float resend_timeout = 0.5; bool processReliableSendCommand( - ConnectionCommand &c, unsigned int max_packet_size); + ConnectionCommand &c, + unsigned int max_packet_size); }; /* Connection */ -enum ConnectionEventType -{ +enum ConnectionEventType{ CONNEVENT_NONE, CONNEVENT_DATA_RECEIVED, CONNEVENT_PEER_ADDED, @@ -733,7 +709,7 @@ struct ConnectionEvent std::string describe() { - switch (type) { + switch(type) { case CONNEVENT_NONE: return "CONNEVENT_NONE"; case CONNEVENT_DATA_RECEIVED: @@ -767,7 +743,10 @@ struct ConnectionEvent timeout = timeout_; address = address_; } - void bindFailed() { type = CONNEVENT_BIND_FAILED; } + void bindFailed() + { + type = CONNEVENT_BIND_FAILED; + } }; class PeerHandler; @@ -791,7 +770,7 @@ public: void Connect(Address address); bool Connected(); void Disconnect(); - void Receive(NetworkPacket *pkt); + void Receive(NetworkPacket* pkt); bool TryReceive(NetworkPacket *pkt); void Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable); session_t GetPeerID() const { return m_peer_id; } @@ -804,10 +783,10 @@ public: protected: PeerHelper getPeerNoEx(session_t peer_id); - u16 lookupPeer(Address &sender); + u16 lookupPeer(Address& sender); - u16 createPeer(Address &sender, MTProtocols protocol, int fd); - UDPPeer *createServerPeer(Address &sender); + u16 createPeer(Address& sender, MTProtocols protocol, int fd); + UDPPeer* createServerPeer(Address& sender); bool deletePeer(session_t peer_id, bool timeout); void SetPeerID(session_t id) { m_peer_id = id; } @@ -830,7 +809,6 @@ protected: void putEvent(ConnectionEvent &e); void TriggerSend(); - private: MutexedQueue<ConnectionEvent> m_event_queue; |