diff options
Diffstat (limited to 'src/network/connectionthreads.cpp')
-rw-r--r-- | src/network/connectionthreads.cpp | 223 |
1 files changed, 97 insertions, 126 deletions
diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp index 47678dac5..90936b43d 100644 --- a/src/network/connectionthreads.cpp +++ b/src/network/connectionthreads.cpp @@ -32,32 +32,25 @@ namespace con /* defines used for debugging and profiling */ /******************************************************************************/ #ifdef NDEBUG -#define LOG(a) a #define PROFILE(a) #undef DEBUG_CONNECTION_KBPS #else /* this mutex is used to achieve log message consistency */ -std::mutex log_conthread_mutex; -#define LOG(a) \ - { \ - MutexAutoLock loglock(log_conthread_mutex); \ - a; \ - } #define PROFILE(a) a //#define DEBUG_CONNECTION_KBPS #undef DEBUG_CONNECTION_KBPS #endif -/* maximum number of retries for reliable packets */ -#define MAX_RELIABLE_RETRY 5 +// TODO: Clean this up. +#define LOG(a) a #define WINDOW_SIZE 5 -static session_t readPeerId(u8 *packetdata) +static session_t readPeerId(const u8 *packetdata) { return readU16(&packetdata[4]); } -static u8 readChannel(u8 *packetdata) +static u8 readChannel(const u8 *packetdata) { return readU8(&packetdata[6]); } @@ -117,9 +110,9 @@ void *ConnectionSendThread::run() } /* translate commands to packets */ - ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0); - while (c.type != CONNCMD_NONE) { - if (c.reliable) + auto c = m_connection->m_command_queue.pop_frontNoEx(0); + while (c && c->type != CONNCMD_NONE) { + if (c->reliable) processReliableCommand(c); else processNonReliableCommand(c); @@ -212,7 +205,6 @@ void ConnectionSendThread::runTimeouts(float dtime) } float resend_timeout = udpPeer->getResendTimeout(); - bool retry_count_exceeded = false; for (Channel &channel : udpPeer->channels) { // Remove timed out incomplete unreliable split packets @@ -231,45 +223,29 @@ void ConnectionSendThread::runTimeouts(float dtime) m_iteration_packets_avaialble -= timed_outs.size(); for (const auto &k : timed_outs) { - session_t peer_id = readPeerId(*k.data); - u8 channelnum = readChannel(*k.data); - u16 seqnum = readU16(&(k.data[BASE_HEADER_SIZE + 1])); + u8 channelnum = readChannel(k->data); + u16 seqnum = k->getSeqnum(); - channel.UpdateBytesLost(k.data.getSize()); - - if (k.resend_count > MAX_RELIABLE_RETRY) { - retry_count_exceeded = true; - timeouted_peers.push_back(peer->id); - /* no need to check additional packets if a single one did timeout*/ - break; - } + channel.UpdateBytesLost(k->size()); LOG(derr_con << m_connection->getDesc() << "RE-SENDING timed-out RELIABLE to " - << k.address.serializeString() + << k->address.serializeString() << "(t/o=" << resend_timeout << "): " - << "from_peer_id=" << peer_id + << "count=" << k->resend_count << ", channel=" << ((int) channelnum & 0xff) << ", seqnum=" << seqnum << std::endl); - rawSend(k); + rawSend(k.get()); // do not handle rtt here as we can't decide if this packet was // lost or really takes more time to transmit } - if (retry_count_exceeded) { - break; /* no need to check other channels if we already did timeout */ - } - channel.UpdateTimers(dtime); } - /* skip to next peer if we did timeout */ - if (retry_count_exceeded) - continue; - /* send ping if necessary */ if (udpPeer->Ping(dtime, data)) { LOG(dout_con << m_connection->getDesc() @@ -294,25 +270,24 @@ void ConnectionSendThread::runTimeouts(float dtime) } } -void ConnectionSendThread::rawSend(const BufferedPacket &packet) +void ConnectionSendThread::rawSend(const BufferedPacket *p) { try { - m_connection->m_udpSocket.Send(packet.address, *packet.data, - packet.data.getSize()); + m_connection->m_udpSocket.Send(p->address, p->data, p->size()); LOG(dout_con << m_connection->getDesc() - << " rawSend: " << packet.data.getSize() + << " rawSend: " << p->size() << " bytes sent" << std::endl); } catch (SendFailedException &e) { LOG(derr_con << m_connection->getDesc() << "Connection::rawSend(): SendFailedException: " - << packet.address.serializeString() << std::endl); + << p->address.serializeString() << std::endl); } } -void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel) +void ConnectionSendThread::sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel) { try { - p.absolute_send_time = porting::getTimeMs(); + p->absolute_send_time = porting::getTimeMs(); // Buffer the packet channel->outgoing_reliables_sent.insert(p, (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE) @@ -325,7 +300,7 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan } // Send the packet - rawSend(p); + rawSend(p.get()); } bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, @@ -341,11 +316,10 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]); if (reliable) { - bool have_sequence_number_for_raw_packet = true; - u16 seqnum = - channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet); + bool have_seqnum = false; + const u16 seqnum = channel->getOutgoingSequenceNumber(have_seqnum); - if (!have_sequence_number_for_raw_packet) + if (!have_seqnum) return false; SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum); @@ -353,13 +327,12 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address); // Add base headers and make a packet - BufferedPacket p = con::makePacket(peer_address, reliable, + BufferedPacketPtr p = con::makePacket(peer_address, reliable, m_connection->GetProtocolID(), m_connection->GetPeerID(), channelnum); // first check if our send window is already maxed out - if (channel->outgoing_reliables_sent.size() - < channel->getWindowSize()) { + if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) { LOG(dout_con << m_connection->getDesc() << " INFO: sending a reliable packet to peer_id " << peer_id << " channel: " << (u32)channelnum @@ -372,19 +345,19 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, << " INFO: queueing reliable packet for peer_id: " << peer_id << " channel: " << (u32)channelnum << " seqnum: " << seqnum << std::endl); - channel->queued_reliables.push(std::move(p)); + channel->queued_reliables.push(p); return false; } Address peer_address; if (peer->getAddress(MTP_UDP, peer_address)) { // Add base headers and make a packet - BufferedPacket p = con::makePacket(peer_address, data, + BufferedPacketPtr p = con::makePacket(peer_address, data, m_connection->GetProtocolID(), m_connection->GetPeerID(), channelnum); // Send the packet - rawSend(p); + rawSend(p.get()); return true; } @@ -394,11 +367,11 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, return false; } -void ConnectionSendThread::processReliableCommand(ConnectionCommand &c) +void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c) { - assert(c.reliable); // Pre-condition + assert(c->reliable); // Pre-condition - switch (c.type) { + switch (c->type) { case CONNCMD_NONE: LOG(dout_con << m_connection->getDesc() << "UDP processing reliable CONNCMD_NONE" << std::endl); @@ -419,7 +392,7 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c) case CONCMD_CREATE_PEER: LOG(dout_con << m_connection->getDesc() << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl); - if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) { + if (!rawSendAsPacket(c->peer_id, c->channelnum, c->data, c->reliable)) { /* put to queue if we couldn't send it immediately */ sendReliable(c); } @@ -432,13 +405,14 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c) FATAL_ERROR("Got command that shouldn't be reliable as reliable command"); default: LOG(dout_con << m_connection->getDesc() - << " Invalid reliable command type: " << c.type << std::endl); + << " Invalid reliable command type: " << c->type << std::endl); } } -void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c) +void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr) { + const ConnectionCommand &c = *c_ptr; assert(!c.reliable); // Pre-condition switch (c.type) { @@ -500,9 +474,7 @@ void ConnectionSendThread::serve(Address bind_address) } catch (SocketException &e) { // Create event - ConnectionEvent ce; - ce.bindFailed(); - m_connection->putEvent(ce); + m_connection->putEvent(ConnectionEvent::bindFailed()); } } @@ -515,9 +487,7 @@ void ConnectionSendThread::connect(Address address) UDPPeer *peer = m_connection->createServerPeer(address); // Create event - ConnectionEvent e; - e.peerAdded(peer->id, peer->address); - m_connection->putEvent(e); + m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address)); Address bind_addr; @@ -606,9 +576,9 @@ void ConnectionSendThread::send(session_t peer_id, u8 channelnum, } } -void ConnectionSendThread::sendReliable(ConnectionCommand &c) +void ConnectionSendThread::sendReliable(ConnectionCommandPtr &c) { - PeerHelper peer = m_connection->getPeerNoEx(c.peer_id); + PeerHelper peer = m_connection->getPeerNoEx(c->peer_id); if (!peer) return; @@ -624,7 +594,7 @@ void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data } } -void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c) +void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c) { std::vector<session_t> peerids = m_connection->getPeerIDs(); @@ -683,8 +653,12 @@ void ConnectionSendThread::sendPackets(float dtime) // first send queued reliable packets for all peers (if possible) for (unsigned int i = 0; i < CHANNEL_COUNT; i++) { Channel &channel = udpPeer->channels[i]; - u16 next_to_ack = 0; + // Reduces logging verbosity + if (channel.queued_reliables.empty()) + continue; + + u16 next_to_ack = 0; channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack); u16 next_to_receive = 0; channel.incoming_reliables.getFirstSeqnum(next_to_receive); @@ -714,13 +688,13 @@ void ConnectionSendThread::sendPackets(float dtime) channel.outgoing_reliables_sent.size() < channel.getWindowSize() && peer->m_increment_packets_remaining > 0) { - BufferedPacket p = std::move(channel.queued_reliables.front()); + BufferedPacketPtr p = channel.queued_reliables.front(); channel.queued_reliables.pop(); LOG(dout_con << m_connection->getDesc() << " INFO: sending a queued reliable packet " << " channel: " << i - << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1]) + << ", seqnum: " << p->getSeqnum() << std::endl); sendAsPacketReliable(p, &channel); @@ -901,17 +875,14 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, try { // First, see if there any buffered packets we can process now if (packet_queued) { - bool data_left = true; session_t peer_id; SharedBuffer<u8> resultdata; - while (data_left) { + while (true) { try { - data_left = getFromBuffers(peer_id, resultdata); - if (data_left) { - ConnectionEvent e; - e.dataReceived(peer_id, resultdata); - m_connection->putEvent(std::move(e)); - } + if (!getFromBuffers(peer_id, resultdata)) + break; + + m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata)); } catch (ProcessedSilentlyException &e) { /* try reading again */ @@ -928,7 +899,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, return; if ((received_size < BASE_HEADER_SIZE) || - (readU32(&packetdata[0]) != m_connection->GetProtocolID())) { + (readU32(&packetdata[0]) != m_connection->GetProtocolID())) { LOG(derr_con << m_connection->getDesc() << "Receive(): Invalid incoming packet, " << "size: " << received_size @@ -1019,9 +990,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, << ", channel: " << (u32)channelnum << ", returned " << resultdata.getSize() << " bytes" << std::endl); - ConnectionEvent e; - e.dataReceived(peer_id, resultdata); - m_connection->putEvent(std::move(e)); + m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata)); } catch (ProcessedSilentlyException &e) { } @@ -1046,10 +1015,11 @@ bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8 if (!peer) continue; - if (dynamic_cast<UDPPeer *>(&peer) == 0) + UDPPeer *p = dynamic_cast<UDPPeer *>(&peer); + if (!p) continue; - for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) { + for (Channel &channel : p->channels) { if (checkIncomingBuffers(&channel, peer_id, dst)) { return true; } @@ -1062,32 +1032,34 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel, session_t &peer_id, SharedBuffer<u8> &dst) { u16 firstseqnum = 0; - if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) { - if (firstseqnum == channel->readNextIncomingSeqNum()) { - BufferedPacket p = channel->incoming_reliables.popFirst(); - peer_id = readPeerId(*p.data); - u8 channelnum = readChannel(*p.data); - u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]); + if (!channel->incoming_reliables.getFirstSeqnum(firstseqnum)) + return false; - LOG(dout_con << m_connection->getDesc() - << "UNBUFFERING TYPE_RELIABLE" - << " seqnum=" << seqnum - << " peer_id=" << peer_id - << " channel=" << ((int) channelnum & 0xff) - << std::endl); + if (firstseqnum != channel->readNextIncomingSeqNum()) + return false; - channel->incNextIncomingSeqNum(); + BufferedPacketPtr p = channel->incoming_reliables.popFirst(); - u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE; - // Get out the inside packet and re-process it - SharedBuffer<u8> payload(p.data.getSize() - headers_size); - memcpy(*payload, &p.data[headers_size], payload.getSize()); + peer_id = readPeerId(p->data); // Carried over to caller function + u8 channelnum = readChannel(p->data); + u16 seqnum = p->getSeqnum(); - dst = processPacket(channel, payload, peer_id, channelnum, true); - return true; - } - } - return false; + LOG(dout_con << m_connection->getDesc() + << "UNBUFFERING TYPE_RELIABLE" + << " seqnum=" << seqnum + << " peer_id=" << peer_id + << " channel=" << ((int) channelnum & 0xff) + << std::endl); + + channel->incNextIncomingSeqNum(); + + u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE; + // Get out the inside packet and re-process it + SharedBuffer<u8> payload(p->size() - headers_size); + memcpy(*payload, &p->data[headers_size], payload.getSize()); + + dst = processPacket(channel, payload, peer_id, channelnum, true); + return true; } SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel, @@ -1135,7 +1107,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan if (packetdata.getSize() < 2) throw InvalidIncomingDataException("packetdata.getSize() < 2"); - u8 controltype = readU8(&(packetdata[1])); + ControlType controltype = (ControlType)readU8(&(packetdata[1])); if (controltype == CONTROLTYPE_ACK) { assert(channel != NULL); @@ -1151,31 +1123,32 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan << seqnum << " ]" << std::endl); try { - BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum); + BufferedPacketPtr p = channel->outgoing_reliables_sent.popSeqnum(seqnum); - // only calculate rtt from straight sent packets - if (p.resend_count == 0) { + // the rtt calculation will be a bit off for re-sent packets but that's okay + { // Get round trip time u64 current_time = porting::getTimeMs(); // a overflow is quite unlikely but as it'd result in major // rtt miscalculation we handle it here - if (current_time > p.absolute_send_time) { - float rtt = (current_time - p.absolute_send_time) / 1000.0; + if (current_time > p->absolute_send_time) { + float rtt = (current_time - p->absolute_send_time) / 1000.0; // Let peer calculate stuff according to it // (avg_rtt and resend_timeout) dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt); - } else if (p.totaltime > 0) { - float rtt = p.totaltime; + } else if (p->totaltime > 0) { + float rtt = p->totaltime; // Let peer calculate stuff according to it // (avg_rtt and resend_timeout) dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt); } } + // put bytes for max bandwidth calculation - channel->UpdateBytesSent(p.data.getSize(), 1); + channel->UpdateBytesSent(p->size(), 1); if (channel->outgoing_reliables_sent.size() == 0) m_connection->TriggerSend(); } catch (NotFoundException &e) { @@ -1223,7 +1196,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan throw ProcessedSilentlyException("Got a DISCO"); } else { LOG(derr_con << m_connection->getDesc() - << "INVALID TYPE_CONTROL: invalid controltype=" + << "INVALID controltype=" << ((int) controltype & 0xff) << std::endl); throw InvalidIncomingDataException("Invalid control type"); } @@ -1251,7 +1224,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channe if (peer->getAddress(MTP_UDP, peer_address)) { // We have to create a packet again for buffering // This isn't actually too bad an idea. - BufferedPacket packet = makePacket(peer_address, + BufferedPacketPtr packet = con::makePacket(peer_address, packetdata, m_connection->GetProtocolID(), peer->id, @@ -1286,7 +1259,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha if (packetdata.getSize() < RELIABLE_HEADER_SIZE) throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE"); - u16 seqnum = readU16(&packetdata[1]); + const u16 seqnum = readU16(&packetdata[1]); bool is_future_packet = false; bool is_old_packet = false; @@ -1330,7 +1303,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha // This one comes later, buffer it. // Actually we have to make a packet to buffer one. // Well, we have all the ingredients, so just do it. - BufferedPacket packet = con::makePacket( + BufferedPacketPtr packet = con::makePacket( peer_address, packetdata, m_connection->GetProtocolID(), @@ -1347,9 +1320,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha throw ProcessedQueued("Buffered future reliable packet"); } catch (AlreadyExistsException &e) { } catch (IncomingDataCorruption &e) { - ConnectionCommand discon; - discon.disconnect_peer(peer->id); - m_connection->putCommand(discon); + m_connection->putCommand(ConnectionCommand::disconnect_peer(peer->id)); LOG(derr_con << m_connection->getDesc() << "INVALID, TYPE_RELIABLE peer_id: " << peer->id @@ -1370,7 +1341,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha u16 queued_seqnum = 0; if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) { if (queued_seqnum == seqnum) { - BufferedPacket queued_packet = channel->incoming_reliables.popFirst(); + BufferedPacketPtr queued_packet = channel->incoming_reliables.popFirst(); /** TODO find a way to verify the new against the old packet */ } } |