diff options
Diffstat (limited to 'src/network/connection.cpp')
-rw-r--r-- | src/network/connection.cpp | 525 |
1 files changed, 267 insertions, 258 deletions
diff --git a/src/network/connection.cpp b/src/network/connection.cpp index dbcece99d..3692e45a9 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -41,29 +41,29 @@ namespace con /* defines used for debugging and profiling */ /******************************************************************************/ #ifdef NDEBUG -#define LOG(a) a -#define PROFILE(a) + #define LOG(a) a + #define PROFILE(a) #else -#if 0 + #if 0 /* this mutex is used to achieve log message consistency */ std::mutex log_message_mutex; -#define LOG(a) \ - { \ - MutexAutoLock loglock(log_message_mutex); \ - a; \ - } -#else -// Prevent deadlocks until a solution is found after 5.2.0 (TODO) -#define LOG(a) a -#endif + #define LOG(a) \ + { \ + MutexAutoLock loglock(log_message_mutex); \ + a; \ + } + #else + // Prevent deadlocks until a solution is found after 5.2.0 (TODO) + #define LOG(a) a + #endif -#define PROFILE(a) a + #define PROFILE(a) a #endif #define PING_TIMEOUT 5.0 -BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data, u32 protocol_id, - session_t sender_peer_id, u8 channel) +BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data, + u32 protocol_id, session_t sender_peer_id, u8 channel) { u32 packet_size = data.getSize() + BASE_HEADER_SIZE; BufferedPacket p(packet_size); @@ -123,7 +123,8 @@ void makeSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max, u16 seqnum start = end + 1; chunk_num++; - } while (end != data.getSize() - 1); + } + while (end != data.getSize() - 1); for (SharedBuffer<u8> &chunk : *chunks) { // Write chunk_count @@ -166,11 +167,11 @@ SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum) void ReliablePacketBuffer::print() { MutexAutoLock listlock(m_list_mutex); - LOG(dout_con << "Dump of ReliablePacketBuffer:" << std::endl); + LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl); unsigned int index = 0; for (BufferedPacket &bufferedPacket : m_list) { - u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE + 1])); - LOG(dout_con << index << ":" << s << std::endl); + u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1])); + LOG(dout_con<<index<< ":" << s << std::endl); index++; } } @@ -190,8 +191,9 @@ u32 ReliablePacketBuffer::size() RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum) { std::list<BufferedPacket>::iterator i = m_list.begin(); - for (; i != m_list.end(); ++i) { - u16 s = readU16(&(i->data[BASE_HEADER_SIZE + 1])); + for(; i != m_list.end(); ++i) + { + u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); if (s == seqnum) break; } @@ -203,7 +205,7 @@ RPBSearchResult ReliablePacketBuffer::notFound() return m_list.end(); } -bool ReliablePacketBuffer::getFirstSeqnum(u16 &result) +bool ReliablePacketBuffer::getFirstSeqnum(u16& result) { MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) @@ -235,16 +237,17 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) MutexAutoLock listlock(m_list_mutex); RPBSearchResult r = findPacket(seqnum); if (r == notFound()) { - LOG(dout_con << "Sequence number: " << seqnum - << " not found in reliable buffer" << std::endl); + LOG(dout_con<<"Sequence number: " << seqnum + << " not found in reliable buffer"<<std::endl); throw NotFoundException("seqnum not found in buffer"); } BufferedPacket p = *r; + RPBSearchResult next = r; ++next; if (next != notFound()) { - u16 s = readU16(&(next->data[BASE_HEADER_SIZE + 1])); + u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1])); m_oldest_non_answered_ack = s; } @@ -264,27 +267,25 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) MutexAutoLock listlock(m_list_mutex); if (p.data.getSize() < BASE_HEADER_SIZE + 3) { errorstream << "ReliablePacketBuffer::insert(): Invalid data size for " - "reliable packet" - << std::endl; + "reliable packet" << std::endl; return; } u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]); if (type != PACKET_TYPE_RELIABLE) { errorstream << "ReliablePacketBuffer::insert(): type is not reliable" - << std::endl; + << std::endl; return; } u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]); if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) { errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of " - "expected window " - << std::endl; + "expected window " << std::endl; return; } if (seqnum == next_expected) { errorstream << "ReliablePacketBuffer::insert(): seqnum is next expected" - << std::endl; + << std::endl; return; } @@ -292,7 +293,8 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) // Find the right place for the packet and insert it there // If list is empty, just add it - if (m_list.empty()) { + if (m_list.empty()) + { m_list.push_back(p); m_oldest_non_answered_ack = seqnum; // Done. @@ -302,47 +304,47 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) // Otherwise find the right place std::list<BufferedPacket>::iterator i = m_list.begin(); // Find the first packet in the list which has a higher seqnum - u16 s = readU16(&(i->data[BASE_HEADER_SIZE + 1])); + u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); /* case seqnum is smaller then next_expected seqnum */ /* this is true e.g. on wrap around */ if (seqnum < next_expected) { - while (((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) { + while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) { ++i; if (i != m_list.end()) - s = readU16(&(i->data[BASE_HEADER_SIZE + 1])); + s = readU16(&(i->data[BASE_HEADER_SIZE+1])); } } /* non wrap around case (at least for incoming and next_expected */ - else { - while (((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) { + else + { + while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) { ++i; if (i != m_list.end()) - s = readU16(&(i->data[BASE_HEADER_SIZE + 1])); + s = readU16(&(i->data[BASE_HEADER_SIZE+1])); } } if (s == seqnum) { /* nothing to do this seems to be a resent packet */ /* for paranoia reason data should be compared */ - if ((readU16(&(i->data[BASE_HEADER_SIZE + 1])) != seqnum) || - (i->data.getSize() != p.data.getSize()) || - (i->address != p.address)) { + if ( + (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) || + (i->data.getSize() != p.data.getSize()) || + (i->address != p.address) + ) + { /* if this happens your maximum transfer window may be to big */ fprintf(stderr, - "Duplicated seqnum %d non matching packet " - "detected:\n", + "Duplicated seqnum %d non matching packet detected:\n", seqnum); fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n", - readU16(&(i->data[BASE_HEADER_SIZE + 1])), - i->data.getSize(), + readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(), i->address.serializeString().c_str()); fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n", - readU16(&(p.data[BASE_HEADER_SIZE + 1])), - p.data.getSize(), + readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(), p.address.serializeString().c_str()); - throw IncomingDataCorruption( - "duplicated packet isn't same as original one"); + throw IncomingDataCorruption("duplicated packet isn't same as original one"); } } /* insert or push back */ @@ -353,8 +355,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) } /* update last packet number */ - m_oldest_non_answered_ack = - readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE + 1]); + m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]); } void ReliablePacketBuffer::incrementTimeouts(float dtime) @@ -366,8 +367,8 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime) } } -std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts( - float timeout, unsigned int max_packets) +std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout, + unsigned int max_packets) { MutexAutoLock listlock(m_list_mutex); std::list<BufferedPacket> timed_outs; @@ -375,7 +376,7 @@ std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts( if (bufferedPacket.time >= timeout) { timed_outs.push_back(bufferedPacket); - // this packet will be sent right afterwards reset timeout here + //this packet will be sent right afterwards reset timeout here bufferedPacket.time = 0.0f; if (timed_outs.size() >= max_packets) break; @@ -446,19 +447,19 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia errorstream << "Invalid data size for split packet" << std::endl; return SharedBuffer<u8>(); } - u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]); - u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]); - u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE + 3]); - u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE + 5]); + u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); + u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); + u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]); + u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]); if (type != PACKET_TYPE_SPLIT) { errorstream << "IncomingSplitBuffer::insert(): type is not split" - << std::endl; + << std::endl; return SharedBuffer<u8>(); } if (chunk_num >= chunk_count) { errorstream << "IncomingSplitBuffer::insert(): chunk_num=" << chunk_num - << " >= chunk_count=" << chunk_count << std::endl; + << " >= chunk_count=" << chunk_count << std::endl; return SharedBuffer<u8>(); } @@ -473,13 +474,14 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia if (chunk_count != sp->chunk_count) { errorstream << "IncomingSplitBuffer::insert(): chunk_count=" - << chunk_count << " != sp->chunk_count=" << sp->chunk_count - << std::endl; + << chunk_count << " != sp->chunk_count=" << sp->chunk_count + << std::endl; return SharedBuffer<u8>(); } if (reliable != sp->reliable) - LOG(derr_con << "Connection: WARNING: reliable=" << reliable - << " != sp->reliable=" << sp->reliable << std::endl); + LOG(derr_con<<"Connection: WARNING: reliable="<<reliable + <<" != sp->reliable="<<sp->reliable + <<std::endl); // Cut chunk data out of packet u32 chunkdatasize = p.data.getSize() - headersize; @@ -519,8 +521,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) } for (u16 j : remove_queue) { MutexAutoLock listlock(m_map_mutex); - LOG(dout_con << "NOTE: Removing timed out unreliable split packet" - << std::endl); + LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl); delete m_buf[j]; m_buf.erase(j); } @@ -530,8 +531,8 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) ConnectionCommand */ -void ConnectionCommand::send( - session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_) +void ConnectionCommand::send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, + bool reliable_) { type = CONNCMD_SEND; peer_id = peer_id_; @@ -569,36 +570,36 @@ void Channel::setNextSplitSeqNum(u16 seqnum) next_outgoing_split_seqnum = seqnum; } -u16 Channel::getOutgoingSequenceNumber(bool &successful) +u16 Channel::getOutgoingSequenceNumber(bool& successful) { MutexAutoLock internal(m_internal_mutex); u16 retval = next_outgoing_seqnum; u16 lowest_unacked_seqnumber; /* shortcut if there ain't any packet in outgoing list */ - if (outgoing_reliables_sent.empty()) { + if (outgoing_reliables_sent.empty()) + { next_outgoing_seqnum++; return retval; } - if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber)) { + if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber)) + { if (lowest_unacked_seqnumber < next_outgoing_seqnum) { // ugly cast but this one is required in order to tell compiler we - // know about difference of two unsigned may be negative in - // general but we already made sure it won't happen in this case - if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > - window_size) { + // know about difference of two unsigned may be negative in general + // but we already made sure it won't happen in this case + if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) { successful = false; return 0; } - } else { + } + else { // ugly cast but this one is required in order to tell compiler we - // know about difference of two unsigned may be negative in - // general but we already made sure it won't happen in this case - if ((next_outgoing_seqnum + - (u16)(SEQNUM_MAX - - lowest_unacked_seqnumber)) > - window_size) { + // know about difference of two unsigned may be negative in general + // but we already made sure it won't happen in this case + if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) > + window_size) { successful = false; return 0; } @@ -617,7 +618,7 @@ u16 Channel::readOutgoingSequenceNumber() bool Channel::putBackSequenceNumber(u16 seqnum) { - if (((seqnum + 1) % (SEQNUM_MAX + 1)) == next_outgoing_seqnum) { + if (((seqnum + 1) % (SEQNUM_MAX+1)) == next_outgoing_seqnum) { next_outgoing_seqnum = seqnum; return true; @@ -632,8 +633,7 @@ void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets) current_packet_successful += packets; } -void Channel::UpdateBytesReceived(unsigned int bytes) -{ +void Channel::UpdateBytesReceived(unsigned int bytes) { MutexAutoLock internal(m_internal_mutex); current_bytes_received += bytes; } @@ -644,6 +644,7 @@ void Channel::UpdateBytesLost(unsigned int bytes) current_bytes_lost += bytes; } + void Channel::UpdatePacketLossCounter(unsigned int count) { MutexAutoLock internal(m_internal_mutex); @@ -664,21 +665,19 @@ void Channel::UpdateTimers(float dtime) if (packet_loss_counter > 1.0f) { packet_loss_counter -= 1.0f; - unsigned int packet_loss = - 11; /* use a neutral value for initialization */ + unsigned int packet_loss = 11; /* use a neutral value for initialization */ unsigned int packets_successful = 0; - // unsigned int packet_too_late = 0; + //unsigned int packet_too_late = 0; bool reasonable_amount_of_data_transmitted = false; { MutexAutoLock internal(m_internal_mutex); packet_loss = current_packet_loss; - // packet_too_late = current_packet_too_late; + //packet_too_late = current_packet_too_late; packets_successful = current_packet_successful; - if (current_bytes_transfered > - (unsigned int)(window_size * 512 / 2)) { + if (current_bytes_transfered > (unsigned int) (window_size*512/2)) { reasonable_amount_of_data_transmitted = true; } current_packet_loss = 0; @@ -691,33 +690,38 @@ void Channel::UpdateTimers(float dtime) bool done = false; if (packets_successful > 0) { - successful_to_lost_ratio = packet_loss / packets_successful; + successful_to_lost_ratio = packet_loss/packets_successful; } else if (packet_loss > 0) { window_size = std::max( - (window_size - 10), MIN_RELIABLE_WINDOW_SIZE); + (window_size - 10), + MIN_RELIABLE_WINDOW_SIZE); done = true; } if (!done) { if ((successful_to_lost_ratio < 0.01f) && - (window_size < MAX_RELIABLE_WINDOW_SIZE)) { + (window_size < MAX_RELIABLE_WINDOW_SIZE)) { /* don't even think about increasing if we didn't even * use major parts of our window */ if (reasonable_amount_of_data_transmitted) - window_size = std::min((window_size + 100), + window_size = std::min( + (window_size + 100), MAX_RELIABLE_WINDOW_SIZE); } else if ((successful_to_lost_ratio < 0.05f) && (window_size < MAX_RELIABLE_WINDOW_SIZE)) { /* don't even think about increasing if we didn't even * use major parts of our window */ if (reasonable_amount_of_data_transmitted) - window_size = std::min((window_size + 50), + window_size = std::min( + (window_size + 50), MAX_RELIABLE_WINDOW_SIZE); } else if (successful_to_lost_ratio > 0.15f) { - window_size = std::max((window_size - 100), + window_size = std::max( + (window_size - 100), MIN_RELIABLE_WINDOW_SIZE); } else if (successful_to_lost_ratio > 0.1f) { - window_size = std::max((window_size - 50), + window_size = std::max( + (window_size - 50), MIN_RELIABLE_WINDOW_SIZE); } } @@ -726,17 +730,16 @@ void Channel::UpdateTimers(float dtime) if (bpm_counter > 10.0f) { { MutexAutoLock internal(m_internal_mutex); - cur_kbps = (((float)current_bytes_transfered) / bpm_counter) / - 1024.0f; + cur_kbps = + (((float) current_bytes_transfered)/bpm_counter)/1024.0f; current_bytes_transfered = 0; - cur_kbps_lost = (((float)current_bytes_lost) / bpm_counter) / - 1024.0f; - current_bytes_lost = 0; - cur_incoming_kbps = - (((float)current_bytes_received) / bpm_counter) / - 1024.0f; - current_bytes_received = 0; - bpm_counter = 0.0f; + cur_kbps_lost = + (((float) current_bytes_lost)/bpm_counter)/1024.0f; + current_bytes_lost = 0; + cur_incoming_kbps = + (((float) current_bytes_received)/bpm_counter)/1024.0f; + current_bytes_received = 0; + bpm_counter = 0.0f; } if (cur_kbps > max_kbps) { @@ -751,21 +754,24 @@ void Channel::UpdateTimers(float dtime) max_incoming_kbps = cur_incoming_kbps; } - rate_samples = MYMIN(rate_samples + 1, 10); - float old_fraction = ((float)(rate_samples - 1)) / ((float)rate_samples); - avg_kbps = avg_kbps * old_fraction + cur_kbps * (1.0 - old_fraction); - avg_kbps_lost = avg_kbps_lost * old_fraction + + rate_samples = MYMIN(rate_samples+1,10); + float old_fraction = ((float) (rate_samples-1) )/( (float) rate_samples); + avg_kbps = avg_kbps * old_fraction + + cur_kbps * (1.0 - old_fraction); + avg_kbps_lost = avg_kbps_lost * old_fraction + cur_kbps_lost * (1.0 - old_fraction); - avg_incoming_kbps = avg_incoming_kbps * old_fraction + - cur_incoming_kbps * (1.0 - old_fraction); + avg_incoming_kbps = avg_incoming_kbps * old_fraction + + cur_incoming_kbps * (1.0 - old_fraction); } } + /* Peer */ -PeerHelper::PeerHelper(Peer *peer) : m_peer(peer) +PeerHelper::PeerHelper(Peer* peer) : + m_peer(peer) { if (peer && !peer->IncUseCount()) m_peer = nullptr; @@ -779,7 +785,7 @@ PeerHelper::~PeerHelper() m_peer = nullptr; } -PeerHelper &PeerHelper::operator=(Peer *peer) +PeerHelper& PeerHelper::operator=(Peer* peer) { m_peer = peer; if (peer && !peer->IncUseCount()) @@ -787,24 +793,24 @@ PeerHelper &PeerHelper::operator=(Peer *peer) return *this; } -Peer *PeerHelper::operator->() const +Peer* PeerHelper::operator->() const { return m_peer; } -Peer *PeerHelper::operator&() const +Peer* PeerHelper::operator&() const { return m_peer; } bool PeerHelper::operator!() { - return !m_peer; + return ! m_peer; } -bool PeerHelper::operator!=(void *ptr) +bool PeerHelper::operator!=(void* ptr) { - return ((void *)m_peer != ptr); + return ((void*) m_peer != ptr); } bool Peer::IncUseCount() @@ -832,9 +838,8 @@ void Peer::DecUseCount() delete this; } -void Peer::RTTStatistics( - float rtt, const std::string &profiler_id, unsigned int num_samples) -{ +void Peer::RTTStatistics(float rtt, const std::string &profiler_id, + unsigned int num_samples) { if (m_last_rtt > 0) { /* set min max values */ @@ -845,18 +850,18 @@ void Peer::RTTStatistics( /* do average calculation */ if (m_rtt.avg_rtt < 0.0) - m_rtt.avg_rtt = rtt; + m_rtt.avg_rtt = rtt; else - m_rtt.avg_rtt = m_rtt.avg_rtt * (num_samples / (num_samples - 1)) + - rtt * (1 / num_samples); + m_rtt.avg_rtt = m_rtt.avg_rtt * (num_samples/(num_samples-1)) + + rtt * (1/num_samples); /* do jitter calculation */ - // just use some neutral value at beginning + //just use some neutral value at beginning float jitter = m_rtt.jitter_min; if (rtt > m_last_rtt) - jitter = rtt - m_last_rtt; + jitter = rtt-m_last_rtt; if (rtt <= m_last_rtt) jitter = m_last_rtt - rtt; @@ -867,17 +872,14 @@ void Peer::RTTStatistics( m_rtt.jitter_max = jitter; if (m_rtt.jitter_avg < 0.0) - m_rtt.jitter_avg = jitter; + m_rtt.jitter_avg = jitter; else - m_rtt.jitter_avg = - m_rtt.jitter_avg * - (num_samples / (num_samples - 1)) + - jitter * (1 / num_samples); + m_rtt.jitter_avg = m_rtt.jitter_avg * (num_samples/(num_samples-1)) + + jitter * (1/num_samples); if (!profiler_id.empty()) { g_profiler->graphAdd(profiler_id + " RTT [ms]", rtt * 1000.f); - g_profiler->graphAdd( - profiler_id + " jitter [ms]", jitter * 1000.f); + g_profiler->graphAdd(profiler_id + " jitter [ms]", jitter * 1000.f); } } /* save values required for next loop */ @@ -889,7 +891,7 @@ bool Peer::isTimedOut(float timeout) MutexAutoLock lock(m_exclusive_access_mutex); u64 current_time = porting::getTimeMs(); - float dtime = CALC_DTIME(m_last_timeout_check, current_time); + float dtime = CALC_DTIME(m_last_timeout_check,current_time); m_last_timeout_check = current_time; m_timeout_counter += dtime; @@ -907,28 +909,28 @@ void Peer::Drop() } PROFILE(std::stringstream peerIdentifier1); - PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc() << ";" << id - << ";RELIABLE]"); + PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc() + << ";" << id << ";RELIABLE]"); PROFILE(g_profiler->remove(peerIdentifier1.str())); PROFILE(std::stringstream peerIdentifier2); - PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc() << ";" << id - << ";RELIABLE]"); + PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc() + << ";" << id << ";RELIABLE]"); PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier2.str(), SPT_AVG)); delete this; } -UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection *connection) : - Peer(a_address, a_id, connection) +UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) : + Peer(a_address,a_id,connection) { for (Channel &channel : channels) channel.setWindowSize(START_RELIABLE_WINDOW_SIZE); } -bool UDPPeer::getAddress(MTProtocols type, Address &toset) +bool UDPPeer::getAddress(MTProtocols type,Address& toset) { - if ((type == MTP_UDP) || (type == MTP_MINETEST_RELIABLE_UDP) || - (type == MTP_PRIMARY)) { + if ((type == MTP_UDP) || (type == MTP_MINETEST_RELIABLE_UDP) || (type == MTP_PRIMARY)) + { toset = address; return true; } @@ -941,7 +943,7 @@ void UDPPeer::reportRTT(float rtt) if (rtt < 0.0) { return; } - RTTStatistics(rtt, "rudp", MAX_RELIABLE_WINDOW_SIZE * 10); + RTTStatistics(rtt,"rudp",MAX_RELIABLE_WINDOW_SIZE*10); float timeout = getStat(AVG_RTT) * RESEND_TIMEOUT_FACTOR; if (timeout < RESEND_TIMEOUT_MIN) @@ -953,10 +955,11 @@ void UDPPeer::reportRTT(float rtt) resend_timeout = timeout; } -bool UDPPeer::Ping(float dtime, SharedBuffer<u8> &data) +bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data) { m_ping_timer += dtime; - if (m_ping_timer >= PING_TIMEOUT) { + if (m_ping_timer >= PING_TIMEOUT) + { // Create and send PING packet writeU8(&data[0], PACKET_TYPE_CONTROL); writeU8(&data[1], CONTROLTYPE_PING); @@ -966,7 +969,8 @@ bool UDPPeer::Ping(float dtime, SharedBuffer<u8> &data) return false; } -void UDPPeer::PutReliableSendCommand(ConnectionCommand &c, unsigned int max_packet_size) +void UDPPeer::PutReliableSendCommand(ConnectionCommand &c, + unsigned int max_packet_size) { if (m_pending_disconnect) return; @@ -976,37 +980,41 @@ void UDPPeer::PutReliableSendCommand(ConnectionCommand &c, unsigned int max_pack if (chan.queued_commands.empty() && /* don't queue more packets then window size */ (chan.queued_reliables.size() < chan.getWindowSize() / 2)) { - LOG(dout_con << m_connection->getDesc() - << " processing reliable command for peer id: " << c.peer_id - << " data size: " << c.data.getSize() << std::endl); - if (!processReliableSendCommand(c, max_packet_size)) { + LOG(dout_con<<m_connection->getDesc() + <<" processing reliable command for peer id: " << c.peer_id + <<" data size: " << c.data.getSize() << std::endl); + if (!processReliableSendCommand(c,max_packet_size)) { chan.queued_commands.push_back(c); } - } else { - LOG(dout_con << m_connection->getDesc() - << " Queueing reliable command for peer id: " << c.peer_id - << " data size: " << c.data.getSize() << std::endl); + } + else { + LOG(dout_con<<m_connection->getDesc() + <<" Queueing reliable command for peer id: " << c.peer_id + <<" data size: " << c.data.getSize() <<std::endl); chan.queued_commands.push_back(c); if (chan.queued_commands.size() >= chan.getWindowSize() / 2) { LOG(derr_con << m_connection->getDesc() - << "Possible packet stall to peer id: " << c.peer_id - << " queued_commands=" << chan.queued_commands.size() - << std::endl); + << "Possible packet stall to peer id: " << c.peer_id + << " queued_commands=" << chan.queued_commands.size() + << std::endl); } } } bool UDPPeer::processReliableSendCommand( - ConnectionCommand &c, unsigned int max_packet_size) + ConnectionCommand &c, + unsigned int max_packet_size) { if (m_pending_disconnect) return true; Channel &chan = channels[c.channelnum]; - u32 chunksize_max = max_packet_size - BASE_HEADER_SIZE - RELIABLE_HEADER_SIZE; + u32 chunksize_max = max_packet_size + - BASE_HEADER_SIZE + - RELIABLE_HEADER_SIZE; - sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE * 512); + sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512); std::list<SharedBuffer<u8>> originals; u16 split_sequence_number = chan.readNextSplitSeqNum(); @@ -1014,8 +1022,7 @@ bool UDPPeer::processReliableSendCommand( if (c.raw) { originals.emplace_back(c.data); } else { - makeAutoSplitPacket( - c.data, chunksize_max, split_sequence_number, &originals); + makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals); chan.setNextSplitSeqNum(split_sequence_number); } @@ -1031,7 +1038,8 @@ bool UDPPeer::processReliableSendCommand( if (!have_sequence_number) break; - if (!have_initial_sequence_number) { + if (!have_initial_sequence_number) + { initial_sequence_number = seqnum; have_initial_sequence_number = true; } @@ -1051,14 +1059,11 @@ bool UDPPeer::processReliableSendCommand( while (!toadd.empty()) { BufferedPacket p = toadd.front(); toadd.pop(); - // LOG(dout_con<<connection->getDesc() - // << " queuing reliable packet for - //peer_id: " << c.peer_id - // << " channel: " << - //(c.channelnum&0xFF) - // << " seqnum: " << - //readU16(&p.data[BASE_HEADER_SIZE+1]) - // << std::endl) +// LOG(dout_con<<connection->getDesc() +// << " queuing reliable packet for peer_id: " << c.peer_id +// << " channel: " << (c.channelnum&0xFF) +// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1]) +// << std::endl) chan.queued_reliables.push(p); pcount++; } @@ -1076,9 +1081,9 @@ bool UDPPeer::processReliableSendCommand( /* remove packet */ toadd.pop(); - bool successfully_put_back_sequence_number = chan.putBackSequenceNumber( - (initial_sequence_number + - toadd.size() % (SEQNUM_MAX + 1))); + bool successfully_put_back_sequence_number + = chan.putBackSequenceNumber( + (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1))); FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error"); } @@ -1087,20 +1092,24 @@ bool UDPPeer::processReliableSendCommand( // 'log_message_mutex' and 'm_list_mutex'. u32 n_queued = chan.outgoing_reliables_sent.size(); - LOG(dout_con << m_connection->getDesc() - << " Windowsize exceeded on reliable sending " << c.data.getSize() - << " bytes" << std::endl - << "\t\tinitial_sequence_number: " << initial_sequence_number - << std::endl - << "\t\tgot at most : " << packets_available << " packets" - << std::endl - << "\t\tpackets queued : " << n_queued << std::endl); + LOG(dout_con<<m_connection->getDesc() + << " Windowsize exceeded on reliable sending " + << c.data.getSize() << " bytes" + << std::endl << "\t\tinitial_sequence_number: " + << initial_sequence_number + << std::endl << "\t\tgot at most : " + << packets_available << " packets" + << std::endl << "\t\tpackets queued : " + << n_queued + << std::endl); return false; } -void UDPPeer::RunCommandQueues(unsigned int max_packet_size, unsigned int maxcommands, - unsigned int maxtransfer) +void UDPPeer::RunCommandQueues( + unsigned int max_packet_size, + unsigned int maxcommands, + unsigned int maxtransfer) { for (Channel &channel : channels) { @@ -1113,22 +1122,19 @@ void UDPPeer::RunCommandQueues(unsigned int max_packet_size, unsigned int maxcom ConnectionCommand c = channel.queued_commands.front(); LOG(dout_con << m_connection->getDesc() - << " processing queued reliable command " - << std::endl); + << " processing queued reliable command " << std::endl); // Packet is processed, remove it from queue - if (processReliableSendCommand(c, max_packet_size)) { + if (processReliableSendCommand(c,max_packet_size)) { channel.queued_commands.pop_front(); } else { LOG(dout_con << m_connection->getDesc() - << " Failed to queue packets for " - "peer_id: " - << c.peer_id - << ", delaying sending of " - << c.data.getSize() << " bytes" - << std::endl); + << " Failed to queue packets for peer_id: " << c.peer_id + << ", delaying sending of " << c.data.getSize() + << " bytes" << std::endl); } - } catch (ItemNotFoundException &e) { + } + catch (ItemNotFoundException &e) { // intentionally empty } } @@ -1147,8 +1153,8 @@ void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum) channels[channel].setNextSplitSeqNum(seqnum); } -SharedBuffer<u8> UDPPeer::addSplitPacket( - u8 channel, const BufferedPacket &toadd, bool reliable) +SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd, + bool reliable) { assert(channel < CHANNEL_COUNT); // Pre-condition return channels[channel].incoming_splits.insert(toadd, reliable); @@ -1158,13 +1164,13 @@ SharedBuffer<u8> UDPPeer::addSplitPacket( Connection */ -Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6, - PeerHandler *peerhandler) : - m_udpSocket(ipv6), - m_protocol_id(protocol_id), - m_sendThread(new ConnectionSendThread(max_packet_size, timeout)), - m_receiveThread(new ConnectionReceiveThread(max_packet_size)), - m_bc_peerhandler(peerhandler) +Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, + bool ipv6, PeerHandler *peerhandler) : + m_udpSocket(ipv6), + m_protocol_id(protocol_id), + m_sendThread(new ConnectionSendThread(max_packet_size, timeout)), + m_receiveThread(new ConnectionReceiveThread(max_packet_size)), + m_bc_peerhandler(peerhandler) { /* Amount of time Receive() will wait for data, this is entirely different @@ -1178,6 +1184,7 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool m_receiveThread->start(); } + Connection::~Connection() { m_shutting_down = true; @@ -1185,7 +1192,7 @@ Connection::~Connection() m_sendThread->stop(); m_receiveThread->stop(); - // TODO for some unkonwn reason send/receive threads do not exit as they're + //TODO for some unkonwn reason send/receive threads do not exit as they're // supposed to be but wait on peer timeout. To speed up shutdown we reduce // timeout to half a second. m_sendThread->setPeerTimeout(0.5); @@ -1228,20 +1235,20 @@ PeerHelper Connection::getPeerNoEx(session_t peer_id) } /* find peer_id for address */ -u16 Connection::lookupPeer(Address &sender) +u16 Connection::lookupPeer(Address& sender) { MutexAutoLock peerlock(m_peers_mutex); - std::map<u16, Peer *>::iterator j; + std::map<u16, Peer*>::iterator j; j = m_peers.begin(); - for (; j != m_peers.end(); ++j) { + for(; j != m_peers.end(); ++j) + { Peer *peer = j->second; if (peer->isPendingDeletion()) continue; Address tocheck; - if ((peer->getAddress(MTP_MINETEST_RELIABLE_UDP, tocheck)) && - (tocheck == sender)) + if ((peer->getAddress(MTP_MINETEST_RELIABLE_UDP, tocheck)) && (tocheck == sender)) return peer->id; if ((peer->getAddress(MTP_UDP, tocheck)) && (tocheck == sender)) @@ -1266,13 +1273,14 @@ bool Connection::deletePeer(session_t peer_id, bool timeout) } Address peer_address; - // any peer has a primary address this never fails! + //any peer has a primary address this never fails! peer->getAddress(MTP_PRIMARY, peer_address); // Create event ConnectionEvent e; e.peerRemoved(peer_id, timeout, peer_address); putEvent(e); + peer->Drop(); return true; } @@ -1283,7 +1291,7 @@ ConnectionEvent Connection::waitEvent(u32 timeout_ms) { try { return m_event_queue.pop_front(timeout_ms); - } catch (ItemNotFoundException &ex) { + } catch(ItemNotFoundException &ex) { ConnectionEvent e; e.type = CONNEVENT_NONE; return e; @@ -1343,12 +1351,12 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout) events keep happening before the timeout expires. This is not considered to be a problem (is it?) */ - for (;;) { + for(;;) { ConnectionEvent e = waitEvent(timeout); if (e.type != CONNEVENT_NONE) LOG(dout_con << getDesc() << ": Receive: got event: " - << e.describe() << std::endl); - switch (e.type) { + << e.describe() << std::endl); + switch(e.type) { case CONNEVENT_NONE: return false; case CONNEVENT_DATA_RECEIVED: @@ -1373,7 +1381,7 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout) } case CONNEVENT_BIND_FAILED: throw ConnectionBindFailed("Failed to bind socket " - "(port already in use?)"); + "(port already in use?)"); } } return false; @@ -1391,7 +1399,8 @@ bool Connection::TryReceive(NetworkPacket *pkt) return Receive(pkt, 0); } -void Connection::Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable) +void Connection::Send(session_t peer_id, u8 channelnum, + NetworkPacket *pkt, bool reliable) { assert(channelnum < CHANNEL_COUNT); // Pre-condition @@ -1415,8 +1424,7 @@ Address Connection::GetPeerAddress(session_t peer_id) float Connection::getPeerStat(session_t peer_id, rtt_stat_type type) { PeerHelper peer = getPeerNoEx(peer_id); - if (!peer) - return -1; + if (!peer) return -1; return peer->getStat(type); } @@ -1424,31 +1432,30 @@ float Connection::getLocalStat(rate_stat_type type) { PeerHelper peer = getPeerNoEx(PEER_ID_SERVER); - FATAL_ERROR_IF(!peer, "Connection::getLocalStat we couldn't get our own peer? " - "are you serious???"); + FATAL_ERROR_IF(!peer, "Connection::getLocalStat we couldn't get our own peer? are you serious???"); float retval = 0.0; for (Channel &channel : dynamic_cast<UDPPeer *>(&peer)->channels) { - switch (type) { - case CUR_DL_RATE: - retval += channel.getCurrentDownloadRateKB(); - break; - case AVG_DL_RATE: - retval += channel.getAvgDownloadRateKB(); - break; - case CUR_INC_RATE: - retval += channel.getCurrentIncomingRateKB(); - break; - case AVG_INC_RATE: - retval += channel.getAvgIncomingRateKB(); - break; - case AVG_LOSS_RATE: - retval += channel.getAvgLossRateKB(); - break; - case CUR_LOSS_RATE: - retval += channel.getCurrentLossRateKB(); - break; + switch(type) { + case CUR_DL_RATE: + retval += channel.getCurrentDownloadRateKB(); + break; + case AVG_DL_RATE: + retval += channel.getAvgDownloadRateKB(); + break; + case CUR_INC_RATE: + retval += channel.getCurrentIncomingRateKB(); + break; + case AVG_INC_RATE: + retval += channel.getAvgIncomingRateKB(); + break; + case AVG_LOSS_RATE: + retval += channel.getAvgLossRateKB(); + break; + case CUR_LOSS_RATE: + retval += channel.getCurrentLossRateKB(); + break; default: FATAL_ERROR("Connection::getLocalStat Invalid stat type"); } @@ -1456,20 +1463,20 @@ float Connection::getLocalStat(rate_stat_type type) return retval; } -u16 Connection::createPeer(Address &sender, MTProtocols protocol, int fd) +u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) { // Somebody wants to make a new connection // Get a unique peer id (2 or higher) session_t peer_id_new = m_next_remote_peer_id; - u16 overflow = MAX_UDP_PEERS; + u16 overflow = MAX_UDP_PEERS; /* Find an unused peer id */ MutexAutoLock lock(m_peers_mutex); bool out_of_ids = false; - for (;;) { + for(;;) { // Check if exists if (m_peers.find(peer_id_new) == m_peers.end()) @@ -1494,17 +1501,17 @@ u16 Connection::createPeer(Address &sender, MTProtocols protocol, int fd) m_peers[peer->id] = peer; m_peer_ids.push_back(peer->id); - m_next_remote_peer_id = (peer_id_new + 1) % MAX_UDP_PEERS; + m_next_remote_peer_id = (peer_id_new +1 ) % MAX_UDP_PEERS; - LOG(dout_con << getDesc() << "createPeer(): giving peer_id=" << peer_id_new - << std::endl); + LOG(dout_con << getDesc() + << "createPeer(): giving peer_id=" << peer_id_new << std::endl); ConnectionCommand cmd; SharedBuffer<u8> reply(4); writeU8(&reply[0], PACKET_TYPE_CONTROL); writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID); writeU16(&reply[2], peer_id_new); - cmd.createPeer(peer_id_new, reply); + cmd.createPeer(peer_id_new,reply); putCommand(cmd); // Create peer addition event @@ -1519,14 +1526,14 @@ u16 Connection::createPeer(Address &sender, MTProtocols protocol, int fd) void Connection::PrintInfo(std::ostream &out) { m_info_mutex.lock(); - out << getDesc() << ": "; + out<<getDesc()<<": "; m_info_mutex.unlock(); } const std::string Connection::getDesc() { - return std::string("con(") + itos(m_udpSocket.GetHandle()) + "/" + - itos(m_peer_id) + ")"; + return std::string("con(")+ + itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")"; } void Connection::DisconnectPeer(session_t peer_id) @@ -1540,9 +1547,10 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum) { assert(channelnum < CHANNEL_COUNT); // Pre-condition - LOG(dout_con << getDesc() << " Queuing ACK command to peer_id: " << peer_id - << " channel: " << (channelnum & 0xFF) << " seqnum: " << seqnum - << std::endl); + LOG(dout_con<<getDesc() + <<" Queuing ACK command to peer_id: " << peer_id << + " channel: " << (channelnum & 0xFF) << + " seqnum: " << seqnum << std::endl); ConnectionCommand c; SharedBuffer<u8> ack(4); @@ -1555,9 +1563,10 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum) m_sendThread->Trigger(); } -UDPPeer *Connection::createServerPeer(Address &address) +UDPPeer* Connection::createServerPeer(Address& address) { - if (getPeerNoEx(PEER_ID_SERVER) != 0) { + if (getPeerNoEx(PEER_ID_SERVER) != 0) + { throw ConnectionException("Already connected to a server"); } |