diff options
Diffstat (limited to 'src/network/connection.cpp')
-rw-r--r-- | src/network/connection.cpp | 525 |
1 files changed, 258 insertions, 267 deletions
diff --git a/src/network/connection.cpp b/src/network/connection.cpp index 3692e45a9..dbcece99d 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -41,29 +41,29 @@ namespace con /* defines used for debugging and profiling */ /******************************************************************************/ #ifdef NDEBUG - #define LOG(a) a - #define PROFILE(a) +#define LOG(a) a +#define PROFILE(a) #else - #if 0 +#if 0 /* this mutex is used to achieve log message consistency */ std::mutex log_message_mutex; - #define LOG(a) \ - { \ - MutexAutoLock loglock(log_message_mutex); \ - a; \ - } - #else - // Prevent deadlocks until a solution is found after 5.2.0 (TODO) - #define LOG(a) a - #endif +#define LOG(a) \ + { \ + MutexAutoLock loglock(log_message_mutex); \ + a; \ + } +#else +// Prevent deadlocks until a solution is found after 5.2.0 (TODO) +#define LOG(a) a +#endif - #define PROFILE(a) a +#define PROFILE(a) a #endif #define PING_TIMEOUT 5.0 -BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data, - u32 protocol_id, session_t sender_peer_id, u8 channel) +BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data, u32 protocol_id, + session_t sender_peer_id, u8 channel) { u32 packet_size = data.getSize() + BASE_HEADER_SIZE; BufferedPacket p(packet_size); @@ -123,8 +123,7 @@ void makeSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max, u16 seqnum start = end + 1; chunk_num++; - } - while (end != data.getSize() - 1); + } while (end != data.getSize() - 1); for (SharedBuffer<u8> &chunk : *chunks) { // Write chunk_count @@ -167,11 +166,11 @@ SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum) void ReliablePacketBuffer::print() { MutexAutoLock listlock(m_list_mutex); - LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl); + LOG(dout_con << "Dump of ReliablePacketBuffer:" << std::endl); unsigned int index = 0; for (BufferedPacket &bufferedPacket : m_list) { - u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1])); - LOG(dout_con<<index<< ":" << s << std::endl); + u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE + 1])); + LOG(dout_con << index << ":" << s << std::endl); index++; } } @@ -191,9 +190,8 @@ u32 ReliablePacketBuffer::size() RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum) { std::list<BufferedPacket>::iterator i = m_list.begin(); - for(; i != m_list.end(); ++i) - { - u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); + for (; i != m_list.end(); ++i) { + u16 s = readU16(&(i->data[BASE_HEADER_SIZE + 1])); if (s == seqnum) break; } @@ -205,7 +203,7 @@ RPBSearchResult ReliablePacketBuffer::notFound() return m_list.end(); } -bool ReliablePacketBuffer::getFirstSeqnum(u16& result) +bool ReliablePacketBuffer::getFirstSeqnum(u16 &result) { MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) @@ -237,17 +235,16 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) MutexAutoLock listlock(m_list_mutex); RPBSearchResult r = findPacket(seqnum); if (r == notFound()) { - LOG(dout_con<<"Sequence number: " << seqnum - << " not found in reliable buffer"<<std::endl); + LOG(dout_con << "Sequence number: " << seqnum + << " not found in reliable buffer" << std::endl); throw NotFoundException("seqnum not found in buffer"); } BufferedPacket p = *r; - RPBSearchResult next = r; ++next; if (next != notFound()) { - u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1])); + u16 s = readU16(&(next->data[BASE_HEADER_SIZE + 1])); m_oldest_non_answered_ack = s; } @@ -267,25 +264,27 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) MutexAutoLock listlock(m_list_mutex); if (p.data.getSize() < BASE_HEADER_SIZE + 3) { errorstream << "ReliablePacketBuffer::insert(): Invalid data size for " - "reliable packet" << std::endl; + "reliable packet" + << std::endl; return; } u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]); if (type != PACKET_TYPE_RELIABLE) { errorstream << "ReliablePacketBuffer::insert(): type is not reliable" - << std::endl; + << std::endl; return; } u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]); if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) { errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of " - "expected window " << std::endl; + "expected window " + << std::endl; return; } if (seqnum == next_expected) { errorstream << "ReliablePacketBuffer::insert(): seqnum is next expected" - << std::endl; + << std::endl; return; } @@ -293,8 +292,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) // Find the right place for the packet and insert it there // If list is empty, just add it - if (m_list.empty()) - { + if (m_list.empty()) { m_list.push_back(p); m_oldest_non_answered_ack = seqnum; // Done. @@ -304,47 +302,47 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) // Otherwise find the right place std::list<BufferedPacket>::iterator i = m_list.begin(); // Find the first packet in the list which has a higher seqnum - u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); + u16 s = readU16(&(i->data[BASE_HEADER_SIZE + 1])); /* case seqnum is smaller then next_expected seqnum */ /* this is true e.g. on wrap around */ if (seqnum < next_expected) { - while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) { + while (((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) { ++i; if (i != m_list.end()) - s = readU16(&(i->data[BASE_HEADER_SIZE+1])); + s = readU16(&(i->data[BASE_HEADER_SIZE + 1])); } } /* non wrap around case (at least for incoming and next_expected */ - else - { - while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) { + else { + while (((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) { ++i; if (i != m_list.end()) - s = readU16(&(i->data[BASE_HEADER_SIZE+1])); + s = readU16(&(i->data[BASE_HEADER_SIZE + 1])); } } if (s == seqnum) { /* nothing to do this seems to be a resent packet */ /* for paranoia reason data should be compared */ - if ( - (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) || - (i->data.getSize() != p.data.getSize()) || - (i->address != p.address) - ) - { + if ((readU16(&(i->data[BASE_HEADER_SIZE + 1])) != seqnum) || + (i->data.getSize() != p.data.getSize()) || + (i->address != p.address)) { /* if this happens your maximum transfer window may be to big */ fprintf(stderr, - "Duplicated seqnum %d non matching packet detected:\n", + "Duplicated seqnum %d non matching packet " + "detected:\n", seqnum); fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n", - readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(), + readU16(&(i->data[BASE_HEADER_SIZE + 1])), + i->data.getSize(), i->address.serializeString().c_str()); fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n", - readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(), + readU16(&(p.data[BASE_HEADER_SIZE + 1])), + p.data.getSize(), p.address.serializeString().c_str()); - throw IncomingDataCorruption("duplicated packet isn't same as original one"); + throw IncomingDataCorruption( + "duplicated packet isn't same as original one"); } } /* insert or push back */ @@ -355,7 +353,8 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) } /* update last packet number */ - m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]); + m_oldest_non_answered_ack = + readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE + 1]); } void ReliablePacketBuffer::incrementTimeouts(float dtime) @@ -367,8 +366,8 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime) } } -std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout, - unsigned int max_packets) +std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts( + float timeout, unsigned int max_packets) { MutexAutoLock listlock(m_list_mutex); std::list<BufferedPacket> timed_outs; @@ -376,7 +375,7 @@ std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout, if (bufferedPacket.time >= timeout) { timed_outs.push_back(bufferedPacket); - //this packet will be sent right afterwards reset timeout here + // this packet will be sent right afterwards reset timeout here bufferedPacket.time = 0.0f; if (timed_outs.size() >= max_packets) break; @@ -447,19 +446,19 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia errorstream << "Invalid data size for split packet" << std::endl; return SharedBuffer<u8>(); } - u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); - u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); - u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]); - u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]); + u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]); + u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]); + u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE + 3]); + u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE + 5]); if (type != PACKET_TYPE_SPLIT) { errorstream << "IncomingSplitBuffer::insert(): type is not split" - << std::endl; + << std::endl; return SharedBuffer<u8>(); } if (chunk_num >= chunk_count) { errorstream << "IncomingSplitBuffer::insert(): chunk_num=" << chunk_num - << " >= chunk_count=" << chunk_count << std::endl; + << " >= chunk_count=" << chunk_count << std::endl; return SharedBuffer<u8>(); } @@ -474,14 +473,13 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia if (chunk_count != sp->chunk_count) { errorstream << "IncomingSplitBuffer::insert(): chunk_count=" - << chunk_count << " != sp->chunk_count=" << sp->chunk_count - << std::endl; + << chunk_count << " != sp->chunk_count=" << sp->chunk_count + << std::endl; return SharedBuffer<u8>(); } if (reliable != sp->reliable) - LOG(derr_con<<"Connection: WARNING: reliable="<<reliable - <<" != sp->reliable="<<sp->reliable - <<std::endl); + LOG(derr_con << "Connection: WARNING: reliable=" << reliable + << " != sp->reliable=" << sp->reliable << std::endl); // Cut chunk data out of packet u32 chunkdatasize = p.data.getSize() - headersize; @@ -521,7 +519,8 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) } for (u16 j : remove_queue) { MutexAutoLock listlock(m_map_mutex); - LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl); + LOG(dout_con << "NOTE: Removing timed out unreliable split packet" + << std::endl); delete m_buf[j]; m_buf.erase(j); } @@ -531,8 +530,8 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) ConnectionCommand */ -void ConnectionCommand::send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, - bool reliable_) +void ConnectionCommand::send( + session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_) { type = CONNCMD_SEND; peer_id = peer_id_; @@ -570,36 +569,36 @@ void Channel::setNextSplitSeqNum(u16 seqnum) next_outgoing_split_seqnum = seqnum; } -u16 Channel::getOutgoingSequenceNumber(bool& successful) +u16 Channel::getOutgoingSequenceNumber(bool &successful) { MutexAutoLock internal(m_internal_mutex); u16 retval = next_outgoing_seqnum; u16 lowest_unacked_seqnumber; /* shortcut if there ain't any packet in outgoing list */ - if (outgoing_reliables_sent.empty()) - { + if (outgoing_reliables_sent.empty()) { next_outgoing_seqnum++; return retval; } - if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber)) - { + if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber)) { if (lowest_unacked_seqnumber < next_outgoing_seqnum) { // ugly cast but this one is required in order to tell compiler we - // know about difference of two unsigned may be negative in general - // but we already made sure it won't happen in this case - if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) { + // know about difference of two unsigned may be negative in + // general but we already made sure it won't happen in this case + if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > + window_size) { successful = false; return 0; } - } - else { + } else { // ugly cast but this one is required in order to tell compiler we - // know about difference of two unsigned may be negative in general - // but we already made sure it won't happen in this case - if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) > - window_size) { + // know about difference of two unsigned may be negative in + // general but we already made sure it won't happen in this case + if ((next_outgoing_seqnum + + (u16)(SEQNUM_MAX - + lowest_unacked_seqnumber)) > + window_size) { successful = false; return 0; } @@ -618,7 +617,7 @@ u16 Channel::readOutgoingSequenceNumber() bool Channel::putBackSequenceNumber(u16 seqnum) { - if (((seqnum + 1) % (SEQNUM_MAX+1)) == next_outgoing_seqnum) { + if (((seqnum + 1) % (SEQNUM_MAX + 1)) == next_outgoing_seqnum) { next_outgoing_seqnum = seqnum; return true; @@ -633,7 +632,8 @@ void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets) current_packet_successful += packets; } -void Channel::UpdateBytesReceived(unsigned int bytes) { +void Channel::UpdateBytesReceived(unsigned int bytes) +{ MutexAutoLock internal(m_internal_mutex); current_bytes_received += bytes; } @@ -644,7 +644,6 @@ void Channel::UpdateBytesLost(unsigned int bytes) current_bytes_lost += bytes; } - void Channel::UpdatePacketLossCounter(unsigned int count) { MutexAutoLock internal(m_internal_mutex); @@ -665,19 +664,21 @@ void Channel::UpdateTimers(float dtime) if (packet_loss_counter > 1.0f) { packet_loss_counter -= 1.0f; - unsigned int packet_loss = 11; /* use a neutral value for initialization */ + unsigned int packet_loss = + 11; /* use a neutral value for initialization */ unsigned int packets_successful = 0; - //unsigned int packet_too_late = 0; + // unsigned int packet_too_late = 0; bool reasonable_amount_of_data_transmitted = false; { MutexAutoLock internal(m_internal_mutex); packet_loss = current_packet_loss; - //packet_too_late = current_packet_too_late; + // packet_too_late = current_packet_too_late; packets_successful = current_packet_successful; - if (current_bytes_transfered > (unsigned int) (window_size*512/2)) { + if (current_bytes_transfered > + (unsigned int)(window_size * 512 / 2)) { reasonable_amount_of_data_transmitted = true; } current_packet_loss = 0; @@ -690,38 +691,33 @@ void Channel::UpdateTimers(float dtime) bool done = false; if (packets_successful > 0) { - successful_to_lost_ratio = packet_loss/packets_successful; + successful_to_lost_ratio = packet_loss / packets_successful; } else if (packet_loss > 0) { window_size = std::max( - (window_size - 10), - MIN_RELIABLE_WINDOW_SIZE); + (window_size - 10), MIN_RELIABLE_WINDOW_SIZE); done = true; } if (!done) { if ((successful_to_lost_ratio < 0.01f) && - (window_size < MAX_RELIABLE_WINDOW_SIZE)) { + (window_size < MAX_RELIABLE_WINDOW_SIZE)) { /* don't even think about increasing if we didn't even * use major parts of our window */ if (reasonable_amount_of_data_transmitted) - window_size = std::min( - (window_size + 100), + window_size = std::min((window_size + 100), MAX_RELIABLE_WINDOW_SIZE); } else if ((successful_to_lost_ratio < 0.05f) && (window_size < MAX_RELIABLE_WINDOW_SIZE)) { /* don't even think about increasing if we didn't even * use major parts of our window */ if (reasonable_amount_of_data_transmitted) - window_size = std::min( - (window_size + 50), + window_size = std::min((window_size + 50), MAX_RELIABLE_WINDOW_SIZE); } else if (successful_to_lost_ratio > 0.15f) { - window_size = std::max( - (window_size - 100), + window_size = std::max((window_size - 100), MIN_RELIABLE_WINDOW_SIZE); } else if (successful_to_lost_ratio > 0.1f) { - window_size = std::max( - (window_size - 50), + window_size = std::max((window_size - 50), MIN_RELIABLE_WINDOW_SIZE); } } @@ -730,16 +726,17 @@ void Channel::UpdateTimers(float dtime) if (bpm_counter > 10.0f) { { MutexAutoLock internal(m_internal_mutex); - cur_kbps = - (((float) current_bytes_transfered)/bpm_counter)/1024.0f; + cur_kbps = (((float)current_bytes_transfered) / bpm_counter) / + 1024.0f; current_bytes_transfered = 0; - cur_kbps_lost = - (((float) current_bytes_lost)/bpm_counter)/1024.0f; - current_bytes_lost = 0; - cur_incoming_kbps = - (((float) current_bytes_received)/bpm_counter)/1024.0f; - current_bytes_received = 0; - bpm_counter = 0.0f; + cur_kbps_lost = (((float)current_bytes_lost) / bpm_counter) / + 1024.0f; + current_bytes_lost = 0; + cur_incoming_kbps = + (((float)current_bytes_received) / bpm_counter) / + 1024.0f; + current_bytes_received = 0; + bpm_counter = 0.0f; } if (cur_kbps > max_kbps) { @@ -754,24 +751,21 @@ void Channel::UpdateTimers(float dtime) max_incoming_kbps = cur_incoming_kbps; } - rate_samples = MYMIN(rate_samples+1,10); - float old_fraction = ((float) (rate_samples-1) )/( (float) rate_samples); - avg_kbps = avg_kbps * old_fraction + - cur_kbps * (1.0 - old_fraction); - avg_kbps_lost = avg_kbps_lost * old_fraction + + rate_samples = MYMIN(rate_samples + 1, 10); + float old_fraction = ((float)(rate_samples - 1)) / ((float)rate_samples); + avg_kbps = avg_kbps * old_fraction + cur_kbps * (1.0 - old_fraction); + avg_kbps_lost = avg_kbps_lost * old_fraction + cur_kbps_lost * (1.0 - old_fraction); - avg_incoming_kbps = avg_incoming_kbps * old_fraction + - cur_incoming_kbps * (1.0 - old_fraction); + avg_incoming_kbps = avg_incoming_kbps * old_fraction + + cur_incoming_kbps * (1.0 - old_fraction); } } - /* Peer */ -PeerHelper::PeerHelper(Peer* peer) : - m_peer(peer) +PeerHelper::PeerHelper(Peer *peer) : m_peer(peer) { if (peer && !peer->IncUseCount()) m_peer = nullptr; @@ -785,7 +779,7 @@ PeerHelper::~PeerHelper() m_peer = nullptr; } -PeerHelper& PeerHelper::operator=(Peer* peer) +PeerHelper &PeerHelper::operator=(Peer *peer) { m_peer = peer; if (peer && !peer->IncUseCount()) @@ -793,24 +787,24 @@ PeerHelper& PeerHelper::operator=(Peer* peer) return *this; } -Peer* PeerHelper::operator->() const +Peer *PeerHelper::operator->() const { return m_peer; } -Peer* PeerHelper::operator&() const +Peer *PeerHelper::operator&() const { return m_peer; } bool PeerHelper::operator!() { - return ! m_peer; + return !m_peer; } -bool PeerHelper::operator!=(void* ptr) +bool PeerHelper::operator!=(void *ptr) { - return ((void*) m_peer != ptr); + return ((void *)m_peer != ptr); } bool Peer::IncUseCount() @@ -838,8 +832,9 @@ void Peer::DecUseCount() delete this; } -void Peer::RTTStatistics(float rtt, const std::string &profiler_id, - unsigned int num_samples) { +void Peer::RTTStatistics( + float rtt, const std::string &profiler_id, unsigned int num_samples) +{ if (m_last_rtt > 0) { /* set min max values */ @@ -850,18 +845,18 @@ void Peer::RTTStatistics(float rtt, const std::string &profiler_id, /* do average calculation */ if (m_rtt.avg_rtt < 0.0) - m_rtt.avg_rtt = rtt; + m_rtt.avg_rtt = rtt; else - m_rtt.avg_rtt = m_rtt.avg_rtt * (num_samples/(num_samples-1)) + - rtt * (1/num_samples); + m_rtt.avg_rtt = m_rtt.avg_rtt * (num_samples / (num_samples - 1)) + + rtt * (1 / num_samples); /* do jitter calculation */ - //just use some neutral value at beginning + // just use some neutral value at beginning float jitter = m_rtt.jitter_min; if (rtt > m_last_rtt) - jitter = rtt-m_last_rtt; + jitter = rtt - m_last_rtt; if (rtt <= m_last_rtt) jitter = m_last_rtt - rtt; @@ -872,14 +867,17 @@ void Peer::RTTStatistics(float rtt, const std::string &profiler_id, m_rtt.jitter_max = jitter; if (m_rtt.jitter_avg < 0.0) - m_rtt.jitter_avg = jitter; + m_rtt.jitter_avg = jitter; else - m_rtt.jitter_avg = m_rtt.jitter_avg * (num_samples/(num_samples-1)) + - jitter * (1/num_samples); + m_rtt.jitter_avg = + m_rtt.jitter_avg * + (num_samples / (num_samples - 1)) + + jitter * (1 / num_samples); if (!profiler_id.empty()) { g_profiler->graphAdd(profiler_id + " RTT [ms]", rtt * 1000.f); - g_profiler->graphAdd(profiler_id + " jitter [ms]", jitter * 1000.f); + g_profiler->graphAdd( + profiler_id + " jitter [ms]", jitter * 1000.f); } } /* save values required for next loop */ @@ -891,7 +889,7 @@ bool Peer::isTimedOut(float timeout) MutexAutoLock lock(m_exclusive_access_mutex); u64 current_time = porting::getTimeMs(); - float dtime = CALC_DTIME(m_last_timeout_check,current_time); + float dtime = CALC_DTIME(m_last_timeout_check, current_time); m_last_timeout_check = current_time; m_timeout_counter += dtime; @@ -909,28 +907,28 @@ void Peer::Drop() } PROFILE(std::stringstream peerIdentifier1); - PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc() - << ";" << id << ";RELIABLE]"); + PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc() << ";" << id + << ";RELIABLE]"); PROFILE(g_profiler->remove(peerIdentifier1.str())); PROFILE(std::stringstream peerIdentifier2); - PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc() - << ";" << id << ";RELIABLE]"); + PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc() << ";" << id + << ";RELIABLE]"); PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier2.str(), SPT_AVG)); delete this; } -UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) : - Peer(a_address,a_id,connection) +UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection *connection) : + Peer(a_address, a_id, connection) { for (Channel &channel : channels) channel.setWindowSize(START_RELIABLE_WINDOW_SIZE); } -bool UDPPeer::getAddress(MTProtocols type,Address& toset) +bool UDPPeer::getAddress(MTProtocols type, Address &toset) { - if ((type == MTP_UDP) || (type == MTP_MINETEST_RELIABLE_UDP) || (type == MTP_PRIMARY)) - { + if ((type == MTP_UDP) || (type == MTP_MINETEST_RELIABLE_UDP) || + (type == MTP_PRIMARY)) { toset = address; return true; } @@ -943,7 +941,7 @@ void UDPPeer::reportRTT(float rtt) if (rtt < 0.0) { return; } - RTTStatistics(rtt,"rudp",MAX_RELIABLE_WINDOW_SIZE*10); + RTTStatistics(rtt, "rudp", MAX_RELIABLE_WINDOW_SIZE * 10); float timeout = getStat(AVG_RTT) * RESEND_TIMEOUT_FACTOR; if (timeout < RESEND_TIMEOUT_MIN) @@ -955,11 +953,10 @@ void UDPPeer::reportRTT(float rtt) resend_timeout = timeout; } -bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data) +bool UDPPeer::Ping(float dtime, SharedBuffer<u8> &data) { m_ping_timer += dtime; - if (m_ping_timer >= PING_TIMEOUT) - { + if (m_ping_timer >= PING_TIMEOUT) { // Create and send PING packet writeU8(&data[0], PACKET_TYPE_CONTROL); writeU8(&data[1], CONTROLTYPE_PING); @@ -969,8 +966,7 @@ bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data) return false; } -void UDPPeer::PutReliableSendCommand(ConnectionCommand &c, - unsigned int max_packet_size) +void UDPPeer::PutReliableSendCommand(ConnectionCommand &c, unsigned int max_packet_size) { if (m_pending_disconnect) return; @@ -980,41 +976,37 @@ void UDPPeer::PutReliableSendCommand(ConnectionCommand &c, if (chan.queued_commands.empty() && /* don't queue more packets then window size */ (chan.queued_reliables.size() < chan.getWindowSize() / 2)) { - LOG(dout_con<<m_connection->getDesc() - <<" processing reliable command for peer id: " << c.peer_id - <<" data size: " << c.data.getSize() << std::endl); - if (!processReliableSendCommand(c,max_packet_size)) { + LOG(dout_con << m_connection->getDesc() + << " processing reliable command for peer id: " << c.peer_id + << " data size: " << c.data.getSize() << std::endl); + if (!processReliableSendCommand(c, max_packet_size)) { chan.queued_commands.push_back(c); } - } - else { - LOG(dout_con<<m_connection->getDesc() - <<" Queueing reliable command for peer id: " << c.peer_id - <<" data size: " << c.data.getSize() <<std::endl); + } else { + LOG(dout_con << m_connection->getDesc() + << " Queueing reliable command for peer id: " << c.peer_id + << " data size: " << c.data.getSize() << std::endl); chan.queued_commands.push_back(c); if (chan.queued_commands.size() >= chan.getWindowSize() / 2) { LOG(derr_con << m_connection->getDesc() - << "Possible packet stall to peer id: " << c.peer_id - << " queued_commands=" << chan.queued_commands.size() - << std::endl); + << "Possible packet stall to peer id: " << c.peer_id + << " queued_commands=" << chan.queued_commands.size() + << std::endl); } } } bool UDPPeer::processReliableSendCommand( - ConnectionCommand &c, - unsigned int max_packet_size) + ConnectionCommand &c, unsigned int max_packet_size) { if (m_pending_disconnect) return true; Channel &chan = channels[c.channelnum]; - u32 chunksize_max = max_packet_size - - BASE_HEADER_SIZE - - RELIABLE_HEADER_SIZE; + u32 chunksize_max = max_packet_size - BASE_HEADER_SIZE - RELIABLE_HEADER_SIZE; - sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512); + sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE * 512); std::list<SharedBuffer<u8>> originals; u16 split_sequence_number = chan.readNextSplitSeqNum(); @@ -1022,7 +1014,8 @@ bool UDPPeer::processReliableSendCommand( if (c.raw) { originals.emplace_back(c.data); } else { - makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals); + makeAutoSplitPacket( + c.data, chunksize_max, split_sequence_number, &originals); chan.setNextSplitSeqNum(split_sequence_number); } @@ -1038,8 +1031,7 @@ bool UDPPeer::processReliableSendCommand( if (!have_sequence_number) break; - if (!have_initial_sequence_number) - { + if (!have_initial_sequence_number) { initial_sequence_number = seqnum; have_initial_sequence_number = true; } @@ -1059,11 +1051,14 @@ bool UDPPeer::processReliableSendCommand( while (!toadd.empty()) { BufferedPacket p = toadd.front(); toadd.pop(); -// LOG(dout_con<<connection->getDesc() -// << " queuing reliable packet for peer_id: " << c.peer_id -// << " channel: " << (c.channelnum&0xFF) -// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1]) -// << std::endl) + // LOG(dout_con<<connection->getDesc() + // << " queuing reliable packet for + //peer_id: " << c.peer_id + // << " channel: " << + //(c.channelnum&0xFF) + // << " seqnum: " << + //readU16(&p.data[BASE_HEADER_SIZE+1]) + // << std::endl) chan.queued_reliables.push(p); pcount++; } @@ -1081,9 +1076,9 @@ bool UDPPeer::processReliableSendCommand( /* remove packet */ toadd.pop(); - bool successfully_put_back_sequence_number - = chan.putBackSequenceNumber( - (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1))); + bool successfully_put_back_sequence_number = chan.putBackSequenceNumber( + (initial_sequence_number + + toadd.size() % (SEQNUM_MAX + 1))); FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error"); } @@ -1092,24 +1087,20 @@ bool UDPPeer::processReliableSendCommand( // 'log_message_mutex' and 'm_list_mutex'. u32 n_queued = chan.outgoing_reliables_sent.size(); - LOG(dout_con<<m_connection->getDesc() - << " Windowsize exceeded on reliable sending " - << c.data.getSize() << " bytes" - << std::endl << "\t\tinitial_sequence_number: " - << initial_sequence_number - << std::endl << "\t\tgot at most : " - << packets_available << " packets" - << std::endl << "\t\tpackets queued : " - << n_queued - << std::endl); + LOG(dout_con << m_connection->getDesc() + << " Windowsize exceeded on reliable sending " << c.data.getSize() + << " bytes" << std::endl + << "\t\tinitial_sequence_number: " << initial_sequence_number + << std::endl + << "\t\tgot at most : " << packets_available << " packets" + << std::endl + << "\t\tpackets queued : " << n_queued << std::endl); return false; } -void UDPPeer::RunCommandQueues( - unsigned int max_packet_size, - unsigned int maxcommands, - unsigned int maxtransfer) +void UDPPeer::RunCommandQueues(unsigned int max_packet_size, unsigned int maxcommands, + unsigned int maxtransfer) { for (Channel &channel : channels) { @@ -1122,19 +1113,22 @@ void UDPPeer::RunCommandQueues( ConnectionCommand c = channel.queued_commands.front(); LOG(dout_con << m_connection->getDesc() - << " processing queued reliable command " << std::endl); + << " processing queued reliable command " + << std::endl); // Packet is processed, remove it from queue - if (processReliableSendCommand(c,max_packet_size)) { + if (processReliableSendCommand(c, max_packet_size)) { channel.queued_commands.pop_front(); } else { LOG(dout_con << m_connection->getDesc() - << " Failed to queue packets for peer_id: " << c.peer_id - << ", delaying sending of " << c.data.getSize() - << " bytes" << std::endl); + << " Failed to queue packets for " + "peer_id: " + << c.peer_id + << ", delaying sending of " + << c.data.getSize() << " bytes" + << std::endl); } - } - catch (ItemNotFoundException &e) { + } catch (ItemNotFoundException &e) { // intentionally empty } } @@ -1153,8 +1147,8 @@ void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum) channels[channel].setNextSplitSeqNum(seqnum); } -SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd, - bool reliable) +SharedBuffer<u8> UDPPeer::addSplitPacket( + u8 channel, const BufferedPacket &toadd, bool reliable) { assert(channel < CHANNEL_COUNT); // Pre-condition return channels[channel].incoming_splits.insert(toadd, reliable); @@ -1164,13 +1158,13 @@ SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd Connection */ -Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, - bool ipv6, PeerHandler *peerhandler) : - m_udpSocket(ipv6), - m_protocol_id(protocol_id), - m_sendThread(new ConnectionSendThread(max_packet_size, timeout)), - m_receiveThread(new ConnectionReceiveThread(max_packet_size)), - m_bc_peerhandler(peerhandler) +Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6, + PeerHandler *peerhandler) : + m_udpSocket(ipv6), + m_protocol_id(protocol_id), + m_sendThread(new ConnectionSendThread(max_packet_size, timeout)), + m_receiveThread(new ConnectionReceiveThread(max_packet_size)), + m_bc_peerhandler(peerhandler) { /* Amount of time Receive() will wait for data, this is entirely different @@ -1184,7 +1178,6 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, m_receiveThread->start(); } - Connection::~Connection() { m_shutting_down = true; @@ -1192,7 +1185,7 @@ Connection::~Connection() m_sendThread->stop(); m_receiveThread->stop(); - //TODO for some unkonwn reason send/receive threads do not exit as they're + // TODO for some unkonwn reason send/receive threads do not exit as they're // supposed to be but wait on peer timeout. To speed up shutdown we reduce // timeout to half a second. m_sendThread->setPeerTimeout(0.5); @@ -1235,20 +1228,20 @@ PeerHelper Connection::getPeerNoEx(session_t peer_id) } /* find peer_id for address */ -u16 Connection::lookupPeer(Address& sender) +u16 Connection::lookupPeer(Address &sender) { MutexAutoLock peerlock(m_peers_mutex); - std::map<u16, Peer*>::iterator j; + std::map<u16, Peer *>::iterator j; j = m_peers.begin(); - for(; j != m_peers.end(); ++j) - { + for (; j != m_peers.end(); ++j) { Peer *peer = j->second; if (peer->isPendingDeletion()) continue; Address tocheck; - if ((peer->getAddress(MTP_MINETEST_RELIABLE_UDP, tocheck)) && (tocheck == sender)) + if ((peer->getAddress(MTP_MINETEST_RELIABLE_UDP, tocheck)) && + (tocheck == sender)) return peer->id; if ((peer->getAddress(MTP_UDP, tocheck)) && (tocheck == sender)) @@ -1273,14 +1266,13 @@ bool Connection::deletePeer(session_t peer_id, bool timeout) } Address peer_address; - //any peer has a primary address this never fails! + // any peer has a primary address this never fails! peer->getAddress(MTP_PRIMARY, peer_address); // Create event ConnectionEvent e; e.peerRemoved(peer_id, timeout, peer_address); putEvent(e); - peer->Drop(); return true; } @@ -1291,7 +1283,7 @@ ConnectionEvent Connection::waitEvent(u32 timeout_ms) { try { return m_event_queue.pop_front(timeout_ms); - } catch(ItemNotFoundException &ex) { + } catch (ItemNotFoundException &ex) { ConnectionEvent e; e.type = CONNEVENT_NONE; return e; @@ -1351,12 +1343,12 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout) events keep happening before the timeout expires. This is not considered to be a problem (is it?) */ - for(;;) { + for (;;) { ConnectionEvent e = waitEvent(timeout); if (e.type != CONNEVENT_NONE) LOG(dout_con << getDesc() << ": Receive: got event: " - << e.describe() << std::endl); - switch(e.type) { + << e.describe() << std::endl); + switch (e.type) { case CONNEVENT_NONE: return false; case CONNEVENT_DATA_RECEIVED: @@ -1381,7 +1373,7 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout) } case CONNEVENT_BIND_FAILED: throw ConnectionBindFailed("Failed to bind socket " - "(port already in use?)"); + "(port already in use?)"); } } return false; @@ -1399,8 +1391,7 @@ bool Connection::TryReceive(NetworkPacket *pkt) return Receive(pkt, 0); } -void Connection::Send(session_t peer_id, u8 channelnum, - NetworkPacket *pkt, bool reliable) +void Connection::Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable) { assert(channelnum < CHANNEL_COUNT); // Pre-condition @@ -1424,7 +1415,8 @@ Address Connection::GetPeerAddress(session_t peer_id) float Connection::getPeerStat(session_t peer_id, rtt_stat_type type) { PeerHelper peer = getPeerNoEx(peer_id); - if (!peer) return -1; + if (!peer) + return -1; return peer->getStat(type); } @@ -1432,30 +1424,31 @@ float Connection::getLocalStat(rate_stat_type type) { PeerHelper peer = getPeerNoEx(PEER_ID_SERVER); - FATAL_ERROR_IF(!peer, "Connection::getLocalStat we couldn't get our own peer? are you serious???"); + FATAL_ERROR_IF(!peer, "Connection::getLocalStat we couldn't get our own peer? " + "are you serious???"); float retval = 0.0; for (Channel &channel : dynamic_cast<UDPPeer *>(&peer)->channels) { - switch(type) { - case CUR_DL_RATE: - retval += channel.getCurrentDownloadRateKB(); - break; - case AVG_DL_RATE: - retval += channel.getAvgDownloadRateKB(); - break; - case CUR_INC_RATE: - retval += channel.getCurrentIncomingRateKB(); - break; - case AVG_INC_RATE: - retval += channel.getAvgIncomingRateKB(); - break; - case AVG_LOSS_RATE: - retval += channel.getAvgLossRateKB(); - break; - case CUR_LOSS_RATE: - retval += channel.getCurrentLossRateKB(); - break; + switch (type) { + case CUR_DL_RATE: + retval += channel.getCurrentDownloadRateKB(); + break; + case AVG_DL_RATE: + retval += channel.getAvgDownloadRateKB(); + break; + case CUR_INC_RATE: + retval += channel.getCurrentIncomingRateKB(); + break; + case AVG_INC_RATE: + retval += channel.getAvgIncomingRateKB(); + break; + case AVG_LOSS_RATE: + retval += channel.getAvgLossRateKB(); + break; + case CUR_LOSS_RATE: + retval += channel.getCurrentLossRateKB(); + break; default: FATAL_ERROR("Connection::getLocalStat Invalid stat type"); } @@ -1463,20 +1456,20 @@ float Connection::getLocalStat(rate_stat_type type) return retval; } -u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) +u16 Connection::createPeer(Address &sender, MTProtocols protocol, int fd) { // Somebody wants to make a new connection // Get a unique peer id (2 or higher) session_t peer_id_new = m_next_remote_peer_id; - u16 overflow = MAX_UDP_PEERS; + u16 overflow = MAX_UDP_PEERS; /* Find an unused peer id */ MutexAutoLock lock(m_peers_mutex); bool out_of_ids = false; - for(;;) { + for (;;) { // Check if exists if (m_peers.find(peer_id_new) == m_peers.end()) @@ -1501,17 +1494,17 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) m_peers[peer->id] = peer; m_peer_ids.push_back(peer->id); - m_next_remote_peer_id = (peer_id_new +1 ) % MAX_UDP_PEERS; + m_next_remote_peer_id = (peer_id_new + 1) % MAX_UDP_PEERS; - LOG(dout_con << getDesc() - << "createPeer(): giving peer_id=" << peer_id_new << std::endl); + LOG(dout_con << getDesc() << "createPeer(): giving peer_id=" << peer_id_new + << std::endl); ConnectionCommand cmd; SharedBuffer<u8> reply(4); writeU8(&reply[0], PACKET_TYPE_CONTROL); writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID); writeU16(&reply[2], peer_id_new); - cmd.createPeer(peer_id_new,reply); + cmd.createPeer(peer_id_new, reply); putCommand(cmd); // Create peer addition event @@ -1526,14 +1519,14 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) void Connection::PrintInfo(std::ostream &out) { m_info_mutex.lock(); - out<<getDesc()<<": "; + out << getDesc() << ": "; m_info_mutex.unlock(); } const std::string Connection::getDesc() { - return std::string("con(")+ - itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")"; + return std::string("con(") + itos(m_udpSocket.GetHandle()) + "/" + + itos(m_peer_id) + ")"; } void Connection::DisconnectPeer(session_t peer_id) @@ -1547,10 +1540,9 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum) { assert(channelnum < CHANNEL_COUNT); // Pre-condition - LOG(dout_con<<getDesc() - <<" Queuing ACK command to peer_id: " << peer_id << - " channel: " << (channelnum & 0xFF) << - " seqnum: " << seqnum << std::endl); + LOG(dout_con << getDesc() << " Queuing ACK command to peer_id: " << peer_id + << " channel: " << (channelnum & 0xFF) << " seqnum: " << seqnum + << std::endl); ConnectionCommand c; SharedBuffer<u8> ack(4); @@ -1563,10 +1555,9 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum) m_sendThread->Trigger(); } -UDPPeer* Connection::createServerPeer(Address& address) +UDPPeer *Connection::createServerPeer(Address &address) { - if (getPeerNoEx(PEER_ID_SERVER) != 0) - { + if (getPeerNoEx(PEER_ID_SERVER) != 0) { throw ConnectionException("Already connected to a server"); } |