diff options
Diffstat (limited to 'src/network/connectionthreads.cpp')
-rw-r--r-- | src/network/connectionthreads.cpp | 779 |
1 files changed, 400 insertions, 379 deletions
diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp index 9a6617a1c..4d47b776c 100644 --- a/src/network/connectionthreads.cpp +++ b/src/network/connectionthreads.cpp @@ -38,10 +38,10 @@ namespace con #else /* this mutex is used to achieve log message consistency */ std::mutex log_conthread_mutex; -#define LOG(a) \ - { \ - MutexAutoLock loglock(log_conthread_mutex); \ - a; \ +#define LOG(a) \ + { \ + MutexAutoLock loglock(log_conthread_mutex); \ + a; \ } #define PROFILE(a) a //#define DEBUG_CONNECTION_KBPS @@ -66,12 +66,10 @@ static u8 readChannel(u8 *packetdata) /* Connection Threads */ /******************************************************************************/ -ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size, - float timeout) : - Thread("ConnectionSend"), - m_max_packet_size(max_packet_size), - m_timeout(timeout), - m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration")) +ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size, float timeout) : + Thread("ConnectionSend"), m_max_packet_size(max_packet_size), + m_timeout(timeout), m_max_data_packets_per_iteration(g_settings->getU16( + "max_packets_per_iteration")) { SANITY_CHECK(m_max_data_packets_per_iteration > 1); } @@ -80,14 +78,15 @@ void *ConnectionSendThread::run() { assert(m_connection); - LOG(dout_con << m_connection->getDesc() - << "ConnectionSend thread started" << std::endl); + LOG(dout_con << m_connection->getDesc() << "ConnectionSend thread started" + << std::endl); u64 curtime = porting::getTimeMs(); u64 lasttime = curtime; PROFILE(std::stringstream ThreadIdentifier); - PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]"); + PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() + << "]"); /* if stop is requested don't stop immediately but try to send all */ /* packets first */ @@ -112,8 +111,10 @@ void *ConnectionSendThread::run() runTimeouts(dtime); if (m_iteration_packets_avaialble == 0) { LOG(warningstream << m_connection->getDesc() - << " Packet quota used up after re-sending packets, " - << "max=" << m_max_data_packets_per_iteration << std::endl); + << " Packet quota used up after re-sending " + "packets, " + << "max=" << m_max_data_packets_per_iteration + << std::endl); } /* translate commands to packets */ @@ -165,7 +166,6 @@ bool ConnectionSendThread::packetsQueued() } } - return false; } @@ -185,21 +185,20 @@ void ConnectionSendThread::runTimeouts(float dtime) continue; PROFILE(std::stringstream peerIdentifier); - PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc() - << ";" << peerId << ";RELIABLE]"); - PROFILE(ScopeProfiler - peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG)); + PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc() << ";" + << peerId << ";RELIABLE]"); + PROFILE(ScopeProfiler peerprofiler( + g_profiler, peerIdentifier.str(), SPT_AVG)); - SharedBuffer<u8> data(2); // data for sending ping, required here because of goto + SharedBuffer<u8> data(2); // data for sending ping, required here because + // of goto /* Check peer timeout */ if (peer->isTimedOut(m_timeout)) { - infostream << m_connection->getDesc() - << "RunTimeouts(): Peer " << peer->id - << " has timed out." - << std::endl; + infostream << m_connection->getDesc() << "RunTimeouts(): Peer " + << peer->id << " has timed out." << std::endl; // Add peer to the list timeouted_peers.push_back(peer->id); // Don't bother going through the buffers of this one @@ -212,7 +211,8 @@ void ConnectionSendThread::runTimeouts(float dtime) std::list<BufferedPacket> timed_outs; // Remove timed out incomplete unreliable split packets - channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout); + channel.incoming_splits.removeUnreliableTimedOuts( + dtime, m_timeout); // Increment reliable packet times channel.outgoing_reliables_sent.incrementTimeouts(dtime); @@ -223,8 +223,9 @@ void ConnectionSendThread::runTimeouts(float dtime) return; // Re-send timed out outgoing reliables - timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout, - (m_max_data_packets_per_iteration / numpeers)); + timed_outs = channel.outgoing_reliables_sent.getTimedOuts( + resend_timeout, + (m_max_data_packets_per_iteration / numpeers)); channel.UpdatePacketLossCounter(timed_outs.size()); g_profiler->graphAdd("packets_lost", timed_outs.size()); @@ -232,7 +233,7 @@ void ConnectionSendThread::runTimeouts(float dtime) m_iteration_packets_avaialble -= timed_outs.size(); for (std::list<BufferedPacket>::iterator k = timed_outs.begin(); - k != timed_outs.end(); ++k) { + k != timed_outs.end(); ++k) { session_t peer_id = readPeerId(*(k->data)); u8 channelnum = readChannel(*(k->data)); u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1])); @@ -243,27 +244,28 @@ void ConnectionSendThread::runTimeouts(float dtime) if (k->resend_count > MAX_RELIABLE_RETRY) { retry_count_exceeded = true; timeouted_peers.push_back(peer->id); - /* no need to check additional packets if a single one did timeout*/ + /* no need to check additional packets if a single + * one did timeout*/ break; } LOG(derr_con << m_connection->getDesc() - << "RE-SENDING timed-out RELIABLE to " - << k->address.serializeString() - << "(t/o=" << resend_timeout << "): " - << "from_peer_id=" << peer_id - << ", channel=" << ((int) channelnum & 0xff) - << ", seqnum=" << seqnum - << std::endl); + << "RE-SENDING timed-out RELIABLE to " + << k->address.serializeString() + << "(t/o=" << resend_timeout << "): " + << "from_peer_id=" << peer_id + << ", channel=" << ((int)channelnum & 0xff) + << ", seqnum=" << seqnum << std::endl); rawSend(*k); - // do not handle rtt here as we can't decide if this packet was - // lost or really takes more time to transmit + // do not handle rtt here as we can't decide if this + // packet was lost or really takes more time to transmit } if (retry_count_exceeded) { - break; /* no need to check other channels if we already did timeout */ + break; /* no need to check other channels if we already + did timeout */ } channel.UpdateTimers(dtime); @@ -276,23 +278,23 @@ void ConnectionSendThread::runTimeouts(float dtime) /* send ping if necessary */ if (udpPeer->Ping(dtime, data)) { LOG(dout_con << m_connection->getDesc() - << "Sending ping for peer_id: " << udpPeer->id << std::endl); + << "Sending ping for peer_id: " << udpPeer->id + << std::endl); /* this may fail if there ain't a sequence number left */ if (!rawSendAsPacket(udpPeer->id, 0, data, true)) { - //retrigger with reduced ping interval + // retrigger with reduced ping interval udpPeer->Ping(4.0, data); } } - udpPeer->RunCommandQueues(m_max_packet_size, - m_max_commands_per_iteration, - m_max_packets_requeued); + udpPeer->RunCommandQueues(m_max_packet_size, m_max_commands_per_iteration, + m_max_packets_requeued); } // Remove timed out peers for (u16 timeouted_peer : timeouted_peers) { - LOG(dout_con << m_connection->getDesc() - << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl); + LOG(dout_con << m_connection->getDesc() << "RunTimeouts(): Removing peer " + << timeouted_peer << std::endl); m_connection->deletePeer(timeouted_peer, true); } } @@ -300,15 +302,14 @@ void ConnectionSendThread::runTimeouts(float dtime) void ConnectionSendThread::rawSend(const BufferedPacket &packet) { try { - m_connection->m_udpSocket.Send(packet.address, *packet.data, - packet.data.getSize()); - LOG(dout_con << m_connection->getDesc() - << " rawSend: " << packet.data.getSize() - << " bytes sent" << std::endl); + m_connection->m_udpSocket.Send( + packet.address, *packet.data, packet.data.getSize()); + LOG(dout_con << m_connection->getDesc() << " rawSend: " + << packet.data.getSize() << " bytes sent" << std::endl); } catch (SendFailedException &e) { LOG(derr_con << m_connection->getDesc() - << "Connection::rawSend(): SendFailedException: " - << packet.address.serializeString() << std::endl); + << "Connection::rawSend(): SendFailedException: " + << packet.address.serializeString() << std::endl); } } @@ -317,14 +318,14 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan try { p.absolute_send_time = porting::getTimeMs(); // Buffer the packet - channel->outgoing_reliables_sent.insert(p, - (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE) - % (MAX_RELIABLE_WINDOW_SIZE + 1)); - } - catch (AlreadyExistsException &e) { + channel->outgoing_reliables_sent.insert( + p, (channel->readOutgoingSequenceNumber() - + MAX_RELIABLE_WINDOW_SIZE) % + (MAX_RELIABLE_WINDOW_SIZE + 1)); + } catch (AlreadyExistsException &e) { LOG(derr_con << m_connection->getDesc() - << "WARNING: Going to send a reliable packet" - << " in outgoing buffer" << std::endl); + << "WARNING: Going to send a reliable packet" + << " in outgoing buffer" << std::endl); } // Send the packet @@ -332,21 +333,22 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan } bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, - const SharedBuffer<u8> &data, bool reliable) + const SharedBuffer<u8> &data, bool reliable) { PeerHelper peer = m_connection->getPeerNoEx(peer_id); if (!peer) { - LOG(errorstream << m_connection->getDesc() - << " dropped " << (reliable ? "reliable " : "") - << "packet for non existent peer_id: " << peer_id << std::endl); + LOG(errorstream << m_connection->getDesc() << " dropped " + << (reliable ? "reliable " : "") + << "packet for non existent peer_id: " << peer_id + << std::endl); return false; } Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]); if (reliable) { bool have_sequence_number_for_raw_packet = true; - u16 seqnum = - channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet); + u16 seqnum = channel->getOutgoingSequenceNumber( + have_sequence_number_for_raw_packet); if (!have_sequence_number_for_raw_packet) return false; @@ -357,24 +359,23 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, // Add base headers and make a packet BufferedPacket p = con::makePacket(peer_address, reliable, - m_connection->GetProtocolID(), m_connection->GetPeerID(), - channelnum); + m_connection->GetProtocolID(), m_connection->GetPeerID(), + channelnum); // first check if our send window is already maxed out - if (channel->outgoing_reliables_sent.size() - < channel->getWindowSize()) { + if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) { LOG(dout_con << m_connection->getDesc() - << " INFO: sending a reliable packet to peer_id " << peer_id - << " channel: " << (u32)channelnum - << " seqnum: " << seqnum << std::endl); + << " INFO: sending a reliable packet to peer_id " + << peer_id << " channel: " << (u32)channelnum + << " seqnum: " << seqnum << std::endl); sendAsPacketReliable(p, channel); return true; } LOG(dout_con << m_connection->getDesc() - << " INFO: queueing reliable packet for peer_id: " << peer_id - << " channel: " << (u32)channelnum - << " seqnum: " << seqnum << std::endl); + << " INFO: queueing reliable packet for peer_id: " << peer_id + << " channel: " << (u32)channelnum << " seqnum: " << seqnum + << std::endl); channel->queued_reliables.push(p); return false; } @@ -383,8 +384,8 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, if (peer->getAddress(MTP_UDP, peer_address)) { // Add base headers and make a packet BufferedPacket p = con::makePacket(peer_address, data, - m_connection->GetProtocolID(), m_connection->GetPeerID(), - channelnum); + m_connection->GetProtocolID(), m_connection->GetPeerID(), + channelnum); // Send the packet rawSend(p); @@ -392,116 +393,116 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, } LOG(dout_con << m_connection->getDesc() - << " INFO: dropped unreliable packet for peer_id: " << peer_id - << " because of (yet) missing udp address" << std::endl); + << " INFO: dropped unreliable packet for peer_id: " << peer_id + << " because of (yet) missing udp address" << std::endl); return false; } void ConnectionSendThread::processReliableCommand(ConnectionCommand &c) { - assert(c.reliable); // Pre-condition + assert(c.reliable); // Pre-condition switch (c.type) { - case CONNCMD_NONE: - LOG(dout_con << m_connection->getDesc() - << "UDP processing reliable CONNCMD_NONE" << std::endl); - return; + case CONNCMD_NONE: + LOG(dout_con << m_connection->getDesc() + << "UDP processing reliable CONNCMD_NONE" << std::endl); + return; - case CONNCMD_SEND: - LOG(dout_con << m_connection->getDesc() - << "UDP processing reliable CONNCMD_SEND" << std::endl); - sendReliable(c); - return; + case CONNCMD_SEND: + LOG(dout_con << m_connection->getDesc() + << "UDP processing reliable CONNCMD_SEND" << std::endl); + sendReliable(c); + return; - case CONNCMD_SEND_TO_ALL: - LOG(dout_con << m_connection->getDesc() - << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl); - sendToAllReliable(c); - return; + case CONNCMD_SEND_TO_ALL: + LOG(dout_con << m_connection->getDesc() + << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl); + sendToAllReliable(c); + return; - case CONCMD_CREATE_PEER: - LOG(dout_con << m_connection->getDesc() - << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl); - if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) { - /* put to queue if we couldn't send it immediately */ - sendReliable(c); - } - return; + case CONCMD_CREATE_PEER: + LOG(dout_con << m_connection->getDesc() + << "UDP processing reliable CONCMD_CREATE_PEER" + << std::endl); + if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) { + /* put to queue if we couldn't send it immediately */ + sendReliable(c); + } + return; - case CONNCMD_SERVE: - case CONNCMD_CONNECT: - case CONNCMD_DISCONNECT: - case CONCMD_ACK: - FATAL_ERROR("Got command that shouldn't be reliable as reliable command"); - default: - LOG(dout_con << m_connection->getDesc() - << " Invalid reliable command type: " << c.type << std::endl); + case CONNCMD_SERVE: + case CONNCMD_CONNECT: + case CONNCMD_DISCONNECT: + case CONCMD_ACK: + FATAL_ERROR("Got command that shouldn't be reliable as reliable command"); + default: + LOG(dout_con << m_connection->getDesc() + << " Invalid reliable command type: " << c.type + << std::endl); } } - void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c) { assert(!c.reliable); // Pre-condition switch (c.type) { - case CONNCMD_NONE: - LOG(dout_con << m_connection->getDesc() - << " UDP processing CONNCMD_NONE" << std::endl); - return; - case CONNCMD_SERVE: - LOG(dout_con << m_connection->getDesc() - << " UDP processing CONNCMD_SERVE port=" - << c.address.serializeString() << std::endl); - serve(c.address); - return; - case CONNCMD_CONNECT: - LOG(dout_con << m_connection->getDesc() - << " UDP processing CONNCMD_CONNECT" << std::endl); - connect(c.address); - return; - case CONNCMD_DISCONNECT: - LOG(dout_con << m_connection->getDesc() - << " UDP processing CONNCMD_DISCONNECT" << std::endl); - disconnect(); - return; - case CONNCMD_DISCONNECT_PEER: - LOG(dout_con << m_connection->getDesc() - << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl); - disconnect_peer(c.peer_id); - return; - case CONNCMD_SEND: - LOG(dout_con << m_connection->getDesc() - << " UDP processing CONNCMD_SEND" << std::endl); - send(c.peer_id, c.channelnum, c.data); - return; - case CONNCMD_SEND_TO_ALL: - LOG(dout_con << m_connection->getDesc() - << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl); - sendToAll(c.channelnum, c.data); - return; - case CONCMD_ACK: - LOG(dout_con << m_connection->getDesc() - << " UDP processing CONCMD_ACK" << std::endl); - sendAsPacket(c.peer_id, c.channelnum, c.data, true); - return; - case CONCMD_CREATE_PEER: - FATAL_ERROR("Got command that should be reliable as unreliable command"); - default: - LOG(dout_con << m_connection->getDesc() - << " Invalid command type: " << c.type << std::endl); + case CONNCMD_NONE: + LOG(dout_con << m_connection->getDesc() << " UDP processing CONNCMD_NONE" + << std::endl); + return; + case CONNCMD_SERVE: + LOG(dout_con << m_connection->getDesc() + << " UDP processing CONNCMD_SERVE port=" + << c.address.serializeString() << std::endl); + serve(c.address); + return; + case CONNCMD_CONNECT: + LOG(dout_con << m_connection->getDesc() + << " UDP processing CONNCMD_CONNECT" << std::endl); + connect(c.address); + return; + case CONNCMD_DISCONNECT: + LOG(dout_con << m_connection->getDesc() + << " UDP processing CONNCMD_DISCONNECT" << std::endl); + disconnect(); + return; + case CONNCMD_DISCONNECT_PEER: + LOG(dout_con << m_connection->getDesc() + << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl); + disconnect_peer(c.peer_id); + return; + case CONNCMD_SEND: + LOG(dout_con << m_connection->getDesc() << " UDP processing CONNCMD_SEND" + << std::endl); + send(c.peer_id, c.channelnum, c.data); + return; + case CONNCMD_SEND_TO_ALL: + LOG(dout_con << m_connection->getDesc() + << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl); + sendToAll(c.channelnum, c.data); + return; + case CONCMD_ACK: + LOG(dout_con << m_connection->getDesc() << " UDP processing CONCMD_ACK" + << std::endl); + sendAsPacket(c.peer_id, c.channelnum, c.data, true); + return; + case CONCMD_CREATE_PEER: + FATAL_ERROR("Got command that should be reliable as unreliable command"); + default: + LOG(dout_con << m_connection->getDesc() + << " Invalid command type: " << c.type << std::endl); } } void ConnectionSendThread::serve(Address bind_address) { - LOG(dout_con << m_connection->getDesc() - << "UDP serving at port " << bind_address.serializeString() << std::endl); + LOG(dout_con << m_connection->getDesc() << "UDP serving at port " + << bind_address.serializeString() << std::endl); try { m_connection->m_udpSocket.Bind(bind_address); m_connection->SetPeerID(PEER_ID_SERVER); - } - catch (SocketException &e) { + } catch (SocketException &e) { // Create event ConnectionEvent ce; ce.bindFailed(); @@ -512,8 +513,8 @@ void ConnectionSendThread::serve(Address bind_address) void ConnectionSendThread::connect(Address address) { LOG(dout_con << m_connection->getDesc() << " connecting to " - << address.serializeString() - << ":" << address.getPort() << std::endl); + << address.serializeString() << ":" << address.getPort() + << std::endl); UDPPeer *peer = m_connection->createServerPeer(address); @@ -525,7 +526,7 @@ void ConnectionSendThread::connect(Address address) Address bind_addr; if (address.isIPv6()) - bind_addr.setAddress((IPv6AddressBytes *) NULL); + bind_addr.setAddress((IPv6AddressBytes *)NULL); else bind_addr.setAddress(0, 0, 0, 0); @@ -546,7 +547,6 @@ void ConnectionSendThread::disconnect() writeU8(&data[0], PACKET_TYPE_CONTROL); writeU8(&data[1], CONTROLTYPE_DISCO); - // Send to all std::list<session_t> peerids = m_connection->getPeerIDs(); @@ -577,23 +577,23 @@ void ConnectionSendThread::disconnect_peer(session_t peer_id) dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true; } -void ConnectionSendThread::send(session_t peer_id, u8 channelnum, - const SharedBuffer<u8> &data) +void ConnectionSendThread::send( + session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data) { assert(channelnum < CHANNEL_COUNT); // Pre-condition PeerHelper peer = m_connection->getPeerNoEx(peer_id); if (!peer) { LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id - << ">>>NOT<<< found on sending packet" - << ", channel " << (channelnum % 0xFF) - << ", size: " << data.getSize() << std::endl); + << ">>>NOT<<< found on sending packet" + << ", channel " << (channelnum % 0xFF) + << ", size: " << data.getSize() << std::endl); return; } LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id - << ", channel " << (channelnum % 0xFF) - << ", size: " << data.getSize() << std::endl); + << ", channel " << (channelnum % 0xFF) + << ", size: " << data.getSize() << std::endl); u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum); @@ -647,16 +647,16 @@ void ConnectionSendThread::sendPackets(float dtime) std::list<session_t> pendingDisconnect; std::map<session_t, bool> pending_unreliable; - const unsigned int peer_packet_quota = m_iteration_packets_avaialble - / MYMAX(peerIds.size(), 1); + const unsigned int peer_packet_quota = + m_iteration_packets_avaialble / MYMAX(peerIds.size(), 1); for (session_t peerId : peerIds) { PeerHelper peer = m_connection->getPeerNoEx(peerId); - //peer may have been removed + // peer may have been removed if (!peer) { - LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id=" - << peerId - << std::endl); + LOG(dout_con << m_connection->getDesc() + << " Peer not found: peer_id=" << peerId + << std::endl); continue; } peer->m_increment_packets_remaining = peer_packet_quota; @@ -671,17 +671,16 @@ void ConnectionSendThread::sendPackets(float dtime) pendingDisconnect.push_back(peerId); } - PROFILE(std::stringstream - peerIdentifier); - PROFILE( - peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId - << ";RELIABLE]"); - PROFILE(ScopeProfiler - peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG)); + PROFILE(std::stringstream peerIdentifier); + PROFILE(peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" + << peerId << ";RELIABLE]"); + PROFILE(ScopeProfiler peerprofiler( + g_profiler, peerIdentifier.str(), SPT_AVG)); LOG(dout_con << m_connection->getDesc() - << " Handle per peer queues: peer_id=" << peerId - << " packet quota: " << peer->m_increment_packets_remaining << std::endl); + << " Handle per peer queues: peer_id=" << peerId + << " packet quota: " << peer->m_increment_packets_remaining + << std::endl); // first send queued reliable packets for all peers (if possible) for (unsigned int i = 0; i < CHANNEL_COUNT; i++) { @@ -692,38 +691,34 @@ void ConnectionSendThread::sendPackets(float dtime) u16 next_to_receive = 0; channel.incoming_reliables.getFirstSeqnum(next_to_receive); - LOG(dout_con << m_connection->getDesc() << "\t channel: " - << i << ", peer quota:" - << peer->m_increment_packets_remaining - << std::endl - << "\t\t\treliables on wire: " - << channel.outgoing_reliables_sent.size() - << ", waiting for ack for " << next_to_ack - << std::endl - << "\t\t\tincoming_reliables: " - << channel.incoming_reliables.size() - << ", next reliable packet: " - << channel.readNextIncomingSeqNum() - << ", next queued: " << next_to_receive - << std::endl - << "\t\t\treliables queued : " - << channel.queued_reliables.size() - << std::endl - << "\t\t\tqueued commands : " - << channel.queued_commands.size() - << std::endl); + LOG(dout_con << m_connection->getDesc() << "\t channel: " << i + << ", peer quota:" + << peer->m_increment_packets_remaining << std::endl + << "\t\t\treliables on wire: " + << channel.outgoing_reliables_sent.size() + << ", waiting for ack for " << next_to_ack + << std::endl + << "\t\t\tincoming_reliables: " + << channel.incoming_reliables.size() + << ", next reliable packet: " + << channel.readNextIncomingSeqNum() + << ", next queued: " << next_to_receive << std::endl + << "\t\t\treliables queued : " + << channel.queued_reliables.size() << std::endl + << "\t\t\tqueued commands : " + << channel.queued_commands.size() << std::endl); while (!channel.queued_reliables.empty() && - channel.outgoing_reliables_sent.size() - < channel.getWindowSize() && + channel.outgoing_reliables_sent.size() < + channel.getWindowSize() && peer->m_increment_packets_remaining > 0) { BufferedPacket p = channel.queued_reliables.front(); channel.queued_reliables.pop(); LOG(dout_con << m_connection->getDesc() - << " INFO: sending a queued reliable packet " - << " channel: " << i - << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1]) - << std::endl); + << " INFO: sending a queued reliable packet " + << " channel: " << i << ", seqnum: " + << readU16(&p.data[BASE_HEADER_SIZE + 1]) + << std::endl); sendAsPacketReliable(p, &channel); peer->m_increment_packets_remaining--; } @@ -731,9 +726,8 @@ void ConnectionSendThread::sendPackets(float dtime) } if (!m_outgoing_queue.empty()) { - LOG(dout_con << m_connection->getDesc() - << " Handle non reliable queue (" - << m_outgoing_queue.size() << " pkts)" << std::endl); + LOG(dout_con << m_connection->getDesc() << " Handle non reliable queue (" + << m_outgoing_queue.size() << " pkts)" << std::endl); } unsigned int initial_queuesize = m_outgoing_queue.size(); @@ -748,17 +742,18 @@ void ConnectionSendThread::sendPackets(float dtime) PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id); if (!peer) { LOG(dout_con << m_connection->getDesc() - << " Outgoing queue: peer_id=" << packet.peer_id - << ">>>NOT<<< found on sending packet" - << ", channel " << (packet.channelnum % 0xFF) - << ", size: " << packet.data.getSize() << std::endl); + << " Outgoing queue: peer_id=" << packet.peer_id + << ">>>NOT<<< found on sending packet" + << ", channel " << (packet.channelnum % 0xFF) + << ", size: " << packet.data.getSize() << std::endl); continue; } /* send acks immediately */ - if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) { - rawSendAsPacket(packet.peer_id, packet.channelnum, - packet.data, packet.reliable); + if (packet.ack || peer->m_increment_packets_remaining > 0 || + stopRequested()) { + rawSendAsPacket(packet.peer_id, packet.channelnum, packet.data, + packet.reliable); if (peer->m_increment_packets_remaining > 0) peer->m_increment_packets_remaining--; } else { @@ -774,8 +769,10 @@ void ConnectionSendThread::sendPackets(float dtime) continue; if (peer->m_increment_packets_remaining == 0) { LOG(warningstream << m_connection->getDesc() - << " Packet quota used up for peer_id=" << peerId - << ", was " << peer_packet_quota << " pkts" << std::endl); + << " Packet quota used up for peer_id=" + << peerId << ", was " + << peer_packet_quota << " pkts" + << std::endl); } } } @@ -787,15 +784,15 @@ void ConnectionSendThread::sendPackets(float dtime) } } -void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum, - const SharedBuffer<u8> &data, bool ack) +void ConnectionSendThread::sendAsPacket( + session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data, bool ack) { OutgoingPacket packet(peer_id, channelnum, data, false, ack); m_outgoing_queue.push(packet); } ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) : - Thread("ConnectionReceive") + Thread("ConnectionReceive") { } @@ -803,12 +800,12 @@ void *ConnectionReceiveThread::run() { assert(m_connection); - LOG(dout_con << m_connection->getDesc() - << "ConnectionReceive thread started" << std::endl); + LOG(dout_con << m_connection->getDesc() << "ConnectionReceive thread started" + << std::endl); - PROFILE(std::stringstream - ThreadIdentifier); - PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]"); + PROFILE(std::stringstream ThreadIdentifier); + PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() + << "]"); // use IPv6 minimum allowed MTU as receive buffer size as this is // theoretical reliable upper boundary of a udp packet for all IPv6 enabled @@ -826,13 +823,12 @@ void *ConnectionReceiveThread::run() while (!stopRequested()) { BEGIN_DEBUG_EXCEPTION_HANDLER - PROFILE(ScopeProfiler - sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); + PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); #ifdef DEBUG_CONNECTION_KBPS lasttime = curtime; curtime = porting::getTimeMs(); - float dtime = CALC_DTIME(lasttime,curtime); + float dtime = CALC_DTIME(lasttime, curtime); #endif /* receive packets */ @@ -846,9 +842,7 @@ void *ConnectionReceiveThread::run() std::list<session_t> peerids = m_connection->getPeerIDs(); for (std::list<session_t>::iterator i = peerids.begin(); - i != peerids.end(); - i++) - { + i != peerids.end(); i++) { PeerHelper peer = m_connection->getPeerNoEx(*i); if (!peer) continue; @@ -858,35 +852,52 @@ void *ConnectionReceiveThread::run() float avg_rate = 0.0; float avg_loss = 0.0; - for(u16 j=0; j<CHANNEL_COUNT; j++) - { - peer_current +=peer->channels[j].getCurrentDownloadRateKB(); - peer_loss += peer->channels[j].getCurrentLossRateKB(); - avg_rate += peer->channels[j].getAvgDownloadRateKB(); + for (u16 j = 0; j < CHANNEL_COUNT; j++) { + peer_current += peer->channels[j] + .getCurrentDownloadRateKB(); + peer_loss += peer->channels[j] + .getCurrentLossRateKB(); + avg_rate += peer->channels[j] + .getAvgDownloadRateKB(); avg_loss += peer->channels[j].getAvgLossRateKB(); } std::stringstream output; output << std::fixed << std::setprecision(1); - output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl; - output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl; - output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl; + output << "OUT to Peer " << *i << " RATES (good / loss) " + << std::endl; + output << "\tcurrent (sum): " << peer_current << "kb/s " + << peer_loss << "kb/s" << std::endl; + output << "\taverage (sum): " << avg_rate << "kb/s " + << avg_loss << "kb/s" << std::endl; output << std::setfill(' '); - for(u16 j=0; j<CHANNEL_COUNT; j++) - { + for (u16 j = 0; j < CHANNEL_COUNT; j++) { output << "\tcha " << j << ":" - << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s" - << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s" - << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s" - << " /" - << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s" - << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s" - << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s" - << " / WS: " << peer->channels[j].getWindowSize() - << std::endl; + << " CUR: " << std::setw(6) + << peer->channels[j].getCurrentDownloadRateKB() + << "kb/s" + << " AVG: " << std::setw(6) + << peer->channels[j].getAvgDownloadRateKB() + << "kb/s" + << " MAX: " << std::setw(6) + << peer->channels[j].getMaxDownloadRateKB() + << "kb/s" + << " /" + << " CUR: " << std::setw(6) + << peer->channels[j].getCurrentLossRateKB() + << "kb/s" + << " AVG: " << std::setw(6) + << peer->channels[j].getAvgLossRateKB() + << "kb/s" + << " MAX: " << std::setw(6) + << peer->channels[j].getMaxLossRateKB() + << "kb/s" + << " / WS: " + << peer->channels[j].getWindowSize() + << std::endl; } - fprintf(stderr,"%s\n",output.str().c_str()); + fprintf(stderr, "%s\n", output.str().c_str()); } } #endif @@ -898,8 +909,7 @@ void *ConnectionReceiveThread::run() } // Receive packets from the network and buffers and create ConnectionEvents -void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, - bool &packet_queued) +void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, bool &packet_queued) { try { // First, see if there any buffered packets we can process now @@ -915,8 +925,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, e.dataReceived(peer_id, resultdata); m_connection->putEvent(e); } - } - catch (ProcessedSilentlyException &e) { + } catch (ProcessedSilentlyException &e) { /* try reading again */ } } @@ -925,19 +934,20 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, // Call Receive() to wait for incoming data Address sender; - s32 received_size = m_connection->m_udpSocket.Receive(sender, - *packetdata, packetdata.getSize()); + s32 received_size = m_connection->m_udpSocket.Receive( + sender, *packetdata, packetdata.getSize()); if (received_size < 0) return; if ((received_size < BASE_HEADER_SIZE) || - (readU32(&packetdata[0]) != m_connection->GetProtocolID())) { + (readU32(&packetdata[0]) != + m_connection->GetProtocolID())) { LOG(derr_con << m_connection->getDesc() - << "Receive(): Invalid incoming packet, " - << "size: " << received_size - << ", protocol: " - << ((received_size >= 4) ? readU32(&packetdata[0]) : -1) - << std::endl); + << "Receive(): Invalid incoming packet, " + << "size: " << received_size << ", protocol: " + << ((received_size >= 4) ? readU32(&packetdata[0]) + : -1) + << std::endl); return; } @@ -946,7 +956,8 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, if (channelnum > CHANNEL_COUNT - 1) { LOG(derr_con << m_connection->getDesc() - << "Receive(): Invalid channel " << (u32)channelnum << std::endl); + << "Receive(): Invalid channel " << (u32)channelnum + << std::endl); return; } @@ -960,14 +971,15 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, /* The peer was not found in our lists. Add it. */ if (peer_id == PEER_ID_INEXISTENT) { - peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0); + peer_id = m_connection->createPeer( + sender, MTP_MINETEST_RELIABLE_UDP, 0); } PeerHelper peer = m_connection->getPeerNoEx(peer_id); if (!peer) { LOG(dout_con << m_connection->getDesc() - << " got packet from unknown peer_id: " - << peer_id << " Ignoring." << std::endl); + << " got packet from unknown peer_id: " << peer_id + << " Ignoring." << std::endl); return; } @@ -976,15 +988,18 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, Address peer_address; if (peer->getAddress(MTP_UDP, peer_address)) { if (peer_address != sender) { - LOG(derr_con << m_connection->getDesc() - << " Peer " << peer_id << " sending from different address." - " Ignoring." << std::endl); + LOG(derr_con << m_connection->getDesc() << " Peer " + << peer_id + << " sending from different address." + " Ignoring." + << std::endl); return; } } else { - LOG(derr_con << m_connection->getDesc() - << " Peer " << peer_id << " doesn't have an address?!" - " Ignoring." << std::endl); + LOG(derr_con << m_connection->getDesc() << " Peer " << peer_id + << " doesn't have an address?!" + " Ignoring." + << std::endl); return; } @@ -995,8 +1010,10 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]; } else { LOG(derr_con << m_connection->getDesc() - << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!" - " Ignoring." << std::endl); + << "Receive(): peer_id=" << peer_id + << " isn't an UDPPeer?!" + " Ignoring." + << std::endl); return; } @@ -1007,33 +1024,30 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, // Make a new SharedBuffer from the data without the base headers SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE); memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE], - strippeddata.getSize()); + strippeddata.getSize()); try { // Process it (the result is some data with no headers made by us) - SharedBuffer<u8> resultdata = processPacket - (channel, strippeddata, peer_id, channelnum, false); + SharedBuffer<u8> resultdata = processPacket(channel, strippeddata, + peer_id, channelnum, false); LOG(dout_con << m_connection->getDesc() - << " ProcessPacket from peer_id: " << peer_id - << ", channel: " << (u32)channelnum << ", returned " - << resultdata.getSize() << " bytes" << std::endl); + << " ProcessPacket from peer_id: " << peer_id + << ", channel: " << (u32)channelnum << ", returned " + << resultdata.getSize() << " bytes" << std::endl); ConnectionEvent e; e.dataReceived(peer_id, resultdata); m_connection->putEvent(e); - } - catch (ProcessedSilentlyException &e) { - } - catch (ProcessedQueued &e) { + } catch (ProcessedSilentlyException &e) { + } catch (ProcessedQueued &e) { // we set it to true anyway (see below) } /* Every time we receive a packet it can happen that a previously * buffered packet is now ready to process. */ packet_queued = true; - } - catch (InvalidIncomingDataException &e) { + } catch (InvalidIncomingDataException &e) { } } @@ -1058,8 +1072,8 @@ bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8 return false; } -bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel, - session_t &peer_id, SharedBuffer<u8> &dst) +bool ConnectionReceiveThread::checkIncomingBuffers( + Channel *channel, session_t &peer_id, SharedBuffer<u8> &dst) { u16 firstseqnum = 0; if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) { @@ -1070,11 +1084,10 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel, u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]); LOG(dout_con << m_connection->getDesc() - << "UNBUFFERING TYPE_RELIABLE" - << " seqnum=" << seqnum - << " peer_id=" << peer_id - << " channel=" << ((int) channelnum & 0xff) - << std::endl); + << "UNBUFFERING TYPE_RELIABLE" + << " seqnum=" << seqnum << " peer_id=" << peer_id + << " channel=" << ((int)channelnum & 0xff) + << std::endl); channel->incNextIncomingSeqNum(); @@ -1091,7 +1104,8 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel, } SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel, - const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable) + const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, + bool reliable) { PeerHelper peer = m_connection->getPeerNoEx(peer_id); @@ -1112,8 +1126,8 @@ SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel, } if (type >= PACKET_TYPE_MAX) { - derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff) - << std::endl; + derr_con << m_connection->getDesc() + << "Got invalid type=" << ((int)type & 0xff) << std::endl; throw InvalidIncomingDataException("Invalid packet type"); } @@ -1122,15 +1136,16 @@ SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel, } const ConnectionReceiveThread::PacketTypeHandler - ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = { - {&ConnectionReceiveThread::handlePacketType_Control}, - {&ConnectionReceiveThread::handlePacketType_Original}, - {&ConnectionReceiveThread::handlePacketType_Split}, - {&ConnectionReceiveThread::handlePacketType_Reliable}, + ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = { + {&ConnectionReceiveThread::handlePacketType_Control}, + {&ConnectionReceiveThread::handlePacketType_Original}, + {&ConnectionReceiveThread::handlePacketType_Split}, + {&ConnectionReceiveThread::handlePacketType_Reliable}, }; SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel, - const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable) + const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, + bool reliable) { if (packetdata.getSize() < 2) throw InvalidIncomingDataException("packetdata.getSize() < 2"); @@ -1142,26 +1157,29 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan if (packetdata.getSize() < 4) { throw InvalidIncomingDataException( - "packetdata.getSize() < 4 (ACK header size)"); + "packetdata.getSize() < 4 (ACK header size)"); } u16 seqnum = readU16(&packetdata[2]); - LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum=" - << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum=" - << seqnum << " ]" << std::endl); + LOG(dout_con << m_connection->getDesc() + << " [ CONTROLTYPE_ACK: channelnum=" + << ((int)channelnum & 0xff) << ", peer_id=" << peer->id + << ", seqnum=" << seqnum << " ]" << std::endl); try { - BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum); + BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum( + seqnum); // only calculate rtt from straight sent packets if (p.resend_count == 0) { // Get round trip time u64 current_time = porting::getTimeMs(); - // a overflow is quite unlikely but as it'd result in major - // rtt miscalculation we handle it here + // a overflow is quite unlikely but as it'd result in + // major rtt miscalculation we handle it here if (current_time > p.absolute_send_time) { - float rtt = (current_time - p.absolute_send_time) / 1000.0; + float rtt = (current_time - p.absolute_send_time) / + 1000.0; // Let peer calculate stuff according to it // (avg_rtt and resend_timeout) @@ -1180,8 +1198,8 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan m_connection->TriggerSend(); } catch (NotFoundException &e) { LOG(derr_con << m_connection->getDesc() - << "WARNING: ACKed packet not in outgoing queue" - << " seqnum=" << seqnum << std::endl); + << "WARNING: ACKed packet not in outgoing queue" + << " seqnum=" << seqnum << std::endl); channel->UpdatePacketTooLateCounter(); } @@ -1189,18 +1207,19 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan } else if (controltype == CONTROLTYPE_SET_PEER_ID) { // Got a packet to set our peer id if (packetdata.getSize() < 4) - throw InvalidIncomingDataException - ("packetdata.getSize() < 4 (SET_PEER_ID header size)"); + throw InvalidIncomingDataException("packetdata.getSize() < 4 " + "(SET_PEER_ID header size)"); session_t peer_id_new = readU16(&packetdata[2]); - LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new - << "... " << std::endl); + LOG(dout_con << m_connection->getDesc() << "Got new peer id: " + << peer_id_new << "... " << std::endl); if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) { LOG(derr_con << m_connection->getDesc() - << "WARNING: Not changing existing peer id." << std::endl); + << "WARNING: Not changing existing peer id." + << std::endl); } else { LOG(dout_con << m_connection->getDesc() << "changing own peer id" - << std::endl); + << std::endl); m_connection->SetPeerID(peer_id_new); } @@ -1214,29 +1233,31 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan // Just ignore it, the incoming data already reset // the timeout counter LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer " - << peer->id << std::endl); + << peer->id << std::endl); if (!m_connection->deletePeer(peer->id, false)) { - derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl; + derr_con << m_connection->getDesc() << "DISCO: Peer not found" + << std::endl; } throw ProcessedSilentlyException("Got a DISCO"); } else { LOG(derr_con << m_connection->getDesc() - << "INVALID TYPE_CONTROL: invalid controltype=" - << ((int) controltype & 0xff) << std::endl); + << "INVALID TYPE_CONTROL: invalid controltype=" + << ((int)controltype & 0xff) << std::endl); throw InvalidIncomingDataException("Invalid control type"); } } SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel, - const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable) + const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, + bool reliable) { if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE) - throw InvalidIncomingDataException - ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE"); + throw InvalidIncomingDataException( + "packetdata.getSize() <= ORIGINAL_HEADER_SIZE"); LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user" - << std::endl); + << std::endl); // Get the inside packet out and return it SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE); memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize()); @@ -1244,29 +1265,29 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *cha } SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel, - const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable) + const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, + bool reliable) { Address peer_address; if (peer->getAddress(MTP_UDP, peer_address)) { // We have to create a packet again for buffering // This isn't actually too bad an idea. - BufferedPacket packet = makePacket(peer_address, - packetdata, - m_connection->GetProtocolID(), - peer->id, - channelnum); + BufferedPacket packet = makePacket(peer_address, packetdata, + m_connection->GetProtocolID(), peer->id, channelnum); // Buffer the packet - SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable); + SharedBuffer<u8> data = + peer->addSplitPacket(channelnum, packet, reliable); if (data.getSize() != 0) { LOG(dout_con << m_connection->getDesc() - << "RETURNING TYPE_SPLIT: Constructed full data, " - << "size=" << data.getSize() << std::endl); + << "RETURNING TYPE_SPLIT: Constructed full data, " + << "size=" << data.getSize() << std::endl); return data; } - LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl); + LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" + << std::endl); throw ProcessedSilentlyException("Buffered a split packet chunk"); } @@ -1275,7 +1296,8 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channe } SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel, - const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable) + const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, + bool reliable) { assert(channel != NULL); @@ -1284,41 +1306,44 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha throw InvalidIncomingDataException("Found nested reliable packets"); if (packetdata.getSize() < RELIABLE_HEADER_SIZE) - throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE"); + throw InvalidIncomingDataException( + "packetdata.getSize() < RELIABLE_HEADER_SIZE"); u16 seqnum = readU16(&packetdata[1]); bool is_future_packet = false; bool is_old_packet = false; /* packet is within our receive window send ack */ - if (seqnum_in_window(seqnum, - channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) { + if (seqnum_in_window(seqnum, channel->readNextIncomingSeqNum(), + MAX_RELIABLE_WINDOW_SIZE)) { m_connection->sendAck(peer->id, channelnum, seqnum); } else { - is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum()); + is_future_packet = + seqnum_higher(seqnum, channel->readNextIncomingSeqNum()); is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum); /* packet is not within receive window, don't send ack. * * if this was a valid packet it's gonna be retransmitted */ if (is_future_packet) - throw ProcessedSilentlyException( - "Received packet newer then expected, not sending ack"); + throw ProcessedSilentlyException("Received packet newer then " + "expected, not sending ack"); /* seems like our ack was lost, send another one for a old packet */ if (is_old_packet) { LOG(dout_con << m_connection->getDesc() - << "RE-SENDING ACK: peer_id: " << peer->id - << ", channel: " << (channelnum & 0xFF) - << ", seqnum: " << seqnum << std::endl;) + << "RE-SENDING ACK: peer_id: " << peer->id + << ", channel: " << (channelnum & 0xFF) + << ", seqnum: " << seqnum << std::endl;) m_connection->sendAck(peer->id, channelnum, seqnum); // we already have this packet so this one was on wire at least // the current timeout - // we don't know how long this packet was on wire don't do silly guessing - // dynamic_cast<UDPPeer*>(&peer)-> + // we don't know how long this packet was on wire don't do silly + // guessing dynamic_cast<UDPPeer*>(&peer)-> // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout()); - throw ProcessedSilentlyException("Retransmitting ack for old packet"); + throw ProcessedSilentlyException( + "Retransmitting ack for old packet"); } } @@ -1330,19 +1355,16 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha // This one comes later, buffer it. // Actually we have to make a packet to buffer one. // Well, we have all the ingredients, so just do it. - BufferedPacket packet = con::makePacket( - peer_address, - packetdata, - m_connection->GetProtocolID(), - peer->id, - channelnum); + BufferedPacket packet = con::makePacket(peer_address, packetdata, + m_connection->GetProtocolID(), peer->id, channelnum); try { - channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum()); + channel->incoming_reliables.insert( + packet, channel->readNextIncomingSeqNum()); LOG(dout_con << m_connection->getDesc() - << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id - << ", channel: " << (channelnum & 0xFF) - << ", seqnum: " << seqnum << std::endl;) + << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id + << ", channel: " << (channelnum & 0xFF) + << ", seqnum: " << seqnum << std::endl;) throw ProcessedQueued("Buffered future reliable packet"); } catch (AlreadyExistsException &e) { @@ -1352,25 +1374,24 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha m_connection->putCommand(discon); LOG(derr_con << m_connection->getDesc() - << "INVALID, TYPE_RELIABLE peer_id: " << peer->id - << ", channel: " << (channelnum & 0xFF) - << ", seqnum: " << seqnum - << "DROPPING CLIENT!" << std::endl;) + << "INVALID, TYPE_RELIABLE peer_id: " << peer->id + << ", channel: " << (channelnum & 0xFF) + << ", seqnum: " << seqnum << "DROPPING CLIENT!" + << std::endl;) } } /* we got a packet to process right now */ - LOG(dout_con << m_connection->getDesc() - << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id - << ", channel: " << (channelnum & 0xFF) - << ", seqnum: " << seqnum << std::endl;) - + LOG(dout_con << m_connection->getDesc() << "RECURSIVE, TYPE_RELIABLE peer_id: " + << peer->id << ", channel: " << (channelnum & 0xFF) + << ", seqnum: " << seqnum << std::endl;) /* check for resend case */ u16 queued_seqnum = 0; if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) { if (queued_seqnum == seqnum) { - BufferedPacket queued_packet = channel->incoming_reliables.popFirst(); + BufferedPacket queued_packet = + channel->incoming_reliables.popFirst(); /** TODO find a way to verify the new against the old packet */ } } |