aboutsummaryrefslogtreecommitdiff
path: root/src/network/connectionthreads.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network/connectionthreads.cpp')
-rw-r--r--src/network/connectionthreads.cpp779
1 files changed, 400 insertions, 379 deletions
diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp
index 9a6617a1c..4d47b776c 100644
--- a/src/network/connectionthreads.cpp
+++ b/src/network/connectionthreads.cpp
@@ -38,10 +38,10 @@ namespace con
#else
/* this mutex is used to achieve log message consistency */
std::mutex log_conthread_mutex;
-#define LOG(a) \
- { \
- MutexAutoLock loglock(log_conthread_mutex); \
- a; \
+#define LOG(a) \
+ { \
+ MutexAutoLock loglock(log_conthread_mutex); \
+ a; \
}
#define PROFILE(a) a
//#define DEBUG_CONNECTION_KBPS
@@ -66,12 +66,10 @@ static u8 readChannel(u8 *packetdata)
/* Connection Threads */
/******************************************************************************/
-ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
- float timeout) :
- Thread("ConnectionSend"),
- m_max_packet_size(max_packet_size),
- m_timeout(timeout),
- m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
+ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size, float timeout) :
+ Thread("ConnectionSend"), m_max_packet_size(max_packet_size),
+ m_timeout(timeout), m_max_data_packets_per_iteration(g_settings->getU16(
+ "max_packets_per_iteration"))
{
SANITY_CHECK(m_max_data_packets_per_iteration > 1);
}
@@ -80,14 +78,15 @@ void *ConnectionSendThread::run()
{
assert(m_connection);
- LOG(dout_con << m_connection->getDesc()
- << "ConnectionSend thread started" << std::endl);
+ LOG(dout_con << m_connection->getDesc() << "ConnectionSend thread started"
+ << std::endl);
u64 curtime = porting::getTimeMs();
u64 lasttime = curtime;
PROFILE(std::stringstream ThreadIdentifier);
- PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
+ PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc()
+ << "]");
/* if stop is requested don't stop immediately but try to send all */
/* packets first */
@@ -112,8 +111,10 @@ void *ConnectionSendThread::run()
runTimeouts(dtime);
if (m_iteration_packets_avaialble == 0) {
LOG(warningstream << m_connection->getDesc()
- << " Packet quota used up after re-sending packets, "
- << "max=" << m_max_data_packets_per_iteration << std::endl);
+ << " Packet quota used up after re-sending "
+ "packets, "
+ << "max=" << m_max_data_packets_per_iteration
+ << std::endl);
}
/* translate commands to packets */
@@ -165,7 +166,6 @@ bool ConnectionSendThread::packetsQueued()
}
}
-
return false;
}
@@ -185,21 +185,20 @@ void ConnectionSendThread::runTimeouts(float dtime)
continue;
PROFILE(std::stringstream peerIdentifier);
- PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
- << ";" << peerId << ";RELIABLE]");
- PROFILE(ScopeProfiler
- peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
+ PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc() << ";"
+ << peerId << ";RELIABLE]");
+ PROFILE(ScopeProfiler peerprofiler(
+ g_profiler, peerIdentifier.str(), SPT_AVG));
- SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
+ SharedBuffer<u8> data(2); // data for sending ping, required here because
+ // of goto
/*
Check peer timeout
*/
if (peer->isTimedOut(m_timeout)) {
- infostream << m_connection->getDesc()
- << "RunTimeouts(): Peer " << peer->id
- << " has timed out."
- << std::endl;
+ infostream << m_connection->getDesc() << "RunTimeouts(): Peer "
+ << peer->id << " has timed out." << std::endl;
// Add peer to the list
timeouted_peers.push_back(peer->id);
// Don't bother going through the buffers of this one
@@ -212,7 +211,8 @@ void ConnectionSendThread::runTimeouts(float dtime)
std::list<BufferedPacket> timed_outs;
// Remove timed out incomplete unreliable split packets
- channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
+ channel.incoming_splits.removeUnreliableTimedOuts(
+ dtime, m_timeout);
// Increment reliable packet times
channel.outgoing_reliables_sent.incrementTimeouts(dtime);
@@ -223,8 +223,9 @@ void ConnectionSendThread::runTimeouts(float dtime)
return;
// Re-send timed out outgoing reliables
- timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
- (m_max_data_packets_per_iteration / numpeers));
+ timed_outs = channel.outgoing_reliables_sent.getTimedOuts(
+ resend_timeout,
+ (m_max_data_packets_per_iteration / numpeers));
channel.UpdatePacketLossCounter(timed_outs.size());
g_profiler->graphAdd("packets_lost", timed_outs.size());
@@ -232,7 +233,7 @@ void ConnectionSendThread::runTimeouts(float dtime)
m_iteration_packets_avaialble -= timed_outs.size();
for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
- k != timed_outs.end(); ++k) {
+ k != timed_outs.end(); ++k) {
session_t peer_id = readPeerId(*(k->data));
u8 channelnum = readChannel(*(k->data));
u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
@@ -243,27 +244,28 @@ void ConnectionSendThread::runTimeouts(float dtime)
if (k->resend_count > MAX_RELIABLE_RETRY) {
retry_count_exceeded = true;
timeouted_peers.push_back(peer->id);
- /* no need to check additional packets if a single one did timeout*/
+ /* no need to check additional packets if a single
+ * one did timeout*/
break;
}
LOG(derr_con << m_connection->getDesc()
- << "RE-SENDING timed-out RELIABLE to "
- << k->address.serializeString()
- << "(t/o=" << resend_timeout << "): "
- << "from_peer_id=" << peer_id
- << ", channel=" << ((int) channelnum & 0xff)
- << ", seqnum=" << seqnum
- << std::endl);
+ << "RE-SENDING timed-out RELIABLE to "
+ << k->address.serializeString()
+ << "(t/o=" << resend_timeout << "): "
+ << "from_peer_id=" << peer_id
+ << ", channel=" << ((int)channelnum & 0xff)
+ << ", seqnum=" << seqnum << std::endl);
rawSend(*k);
- // do not handle rtt here as we can't decide if this packet was
- // lost or really takes more time to transmit
+ // do not handle rtt here as we can't decide if this
+ // packet was lost or really takes more time to transmit
}
if (retry_count_exceeded) {
- break; /* no need to check other channels if we already did timeout */
+ break; /* no need to check other channels if we already
+ did timeout */
}
channel.UpdateTimers(dtime);
@@ -276,23 +278,23 @@ void ConnectionSendThread::runTimeouts(float dtime)
/* send ping if necessary */
if (udpPeer->Ping(dtime, data)) {
LOG(dout_con << m_connection->getDesc()
- << "Sending ping for peer_id: " << udpPeer->id << std::endl);
+ << "Sending ping for peer_id: " << udpPeer->id
+ << std::endl);
/* this may fail if there ain't a sequence number left */
if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
- //retrigger with reduced ping interval
+ // retrigger with reduced ping interval
udpPeer->Ping(4.0, data);
}
}
- udpPeer->RunCommandQueues(m_max_packet_size,
- m_max_commands_per_iteration,
- m_max_packets_requeued);
+ udpPeer->RunCommandQueues(m_max_packet_size, m_max_commands_per_iteration,
+ m_max_packets_requeued);
}
// Remove timed out peers
for (u16 timeouted_peer : timeouted_peers) {
- LOG(dout_con << m_connection->getDesc()
- << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
+ LOG(dout_con << m_connection->getDesc() << "RunTimeouts(): Removing peer "
+ << timeouted_peer << std::endl);
m_connection->deletePeer(timeouted_peer, true);
}
}
@@ -300,15 +302,14 @@ void ConnectionSendThread::runTimeouts(float dtime)
void ConnectionSendThread::rawSend(const BufferedPacket &packet)
{
try {
- m_connection->m_udpSocket.Send(packet.address, *packet.data,
- packet.data.getSize());
- LOG(dout_con << m_connection->getDesc()
- << " rawSend: " << packet.data.getSize()
- << " bytes sent" << std::endl);
+ m_connection->m_udpSocket.Send(
+ packet.address, *packet.data, packet.data.getSize());
+ LOG(dout_con << m_connection->getDesc() << " rawSend: "
+ << packet.data.getSize() << " bytes sent" << std::endl);
} catch (SendFailedException &e) {
LOG(derr_con << m_connection->getDesc()
- << "Connection::rawSend(): SendFailedException: "
- << packet.address.serializeString() << std::endl);
+ << "Connection::rawSend(): SendFailedException: "
+ << packet.address.serializeString() << std::endl);
}
}
@@ -317,14 +318,14 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan
try {
p.absolute_send_time = porting::getTimeMs();
// Buffer the packet
- channel->outgoing_reliables_sent.insert(p,
- (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
- % (MAX_RELIABLE_WINDOW_SIZE + 1));
- }
- catch (AlreadyExistsException &e) {
+ channel->outgoing_reliables_sent.insert(
+ p, (channel->readOutgoingSequenceNumber() -
+ MAX_RELIABLE_WINDOW_SIZE) %
+ (MAX_RELIABLE_WINDOW_SIZE + 1));
+ } catch (AlreadyExistsException &e) {
LOG(derr_con << m_connection->getDesc()
- << "WARNING: Going to send a reliable packet"
- << " in outgoing buffer" << std::endl);
+ << "WARNING: Going to send a reliable packet"
+ << " in outgoing buffer" << std::endl);
}
// Send the packet
@@ -332,21 +333,22 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan
}
bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
- const SharedBuffer<u8> &data, bool reliable)
+ const SharedBuffer<u8> &data, bool reliable)
{
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
- LOG(errorstream << m_connection->getDesc()
- << " dropped " << (reliable ? "reliable " : "")
- << "packet for non existent peer_id: " << peer_id << std::endl);
+ LOG(errorstream << m_connection->getDesc() << " dropped "
+ << (reliable ? "reliable " : "")
+ << "packet for non existent peer_id: " << peer_id
+ << std::endl);
return false;
}
Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
if (reliable) {
bool have_sequence_number_for_raw_packet = true;
- u16 seqnum =
- channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
+ u16 seqnum = channel->getOutgoingSequenceNumber(
+ have_sequence_number_for_raw_packet);
if (!have_sequence_number_for_raw_packet)
return false;
@@ -357,24 +359,23 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
// Add base headers and make a packet
BufferedPacket p = con::makePacket(peer_address, reliable,
- m_connection->GetProtocolID(), m_connection->GetPeerID(),
- channelnum);
+ m_connection->GetProtocolID(), m_connection->GetPeerID(),
+ channelnum);
// first check if our send window is already maxed out
- if (channel->outgoing_reliables_sent.size()
- < channel->getWindowSize()) {
+ if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) {
LOG(dout_con << m_connection->getDesc()
- << " INFO: sending a reliable packet to peer_id " << peer_id
- << " channel: " << (u32)channelnum
- << " seqnum: " << seqnum << std::endl);
+ << " INFO: sending a reliable packet to peer_id "
+ << peer_id << " channel: " << (u32)channelnum
+ << " seqnum: " << seqnum << std::endl);
sendAsPacketReliable(p, channel);
return true;
}
LOG(dout_con << m_connection->getDesc()
- << " INFO: queueing reliable packet for peer_id: " << peer_id
- << " channel: " << (u32)channelnum
- << " seqnum: " << seqnum << std::endl);
+ << " INFO: queueing reliable packet for peer_id: " << peer_id
+ << " channel: " << (u32)channelnum << " seqnum: " << seqnum
+ << std::endl);
channel->queued_reliables.push(p);
return false;
}
@@ -383,8 +384,8 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
if (peer->getAddress(MTP_UDP, peer_address)) {
// Add base headers and make a packet
BufferedPacket p = con::makePacket(peer_address, data,
- m_connection->GetProtocolID(), m_connection->GetPeerID(),
- channelnum);
+ m_connection->GetProtocolID(), m_connection->GetPeerID(),
+ channelnum);
// Send the packet
rawSend(p);
@@ -392,116 +393,116 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
}
LOG(dout_con << m_connection->getDesc()
- << " INFO: dropped unreliable packet for peer_id: " << peer_id
- << " because of (yet) missing udp address" << std::endl);
+ << " INFO: dropped unreliable packet for peer_id: " << peer_id
+ << " because of (yet) missing udp address" << std::endl);
return false;
}
void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
{
- assert(c.reliable); // Pre-condition
+ assert(c.reliable); // Pre-condition
switch (c.type) {
- case CONNCMD_NONE:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing reliable CONNCMD_NONE" << std::endl);
- return;
+ case CONNCMD_NONE:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing reliable CONNCMD_NONE" << std::endl);
+ return;
- case CONNCMD_SEND:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing reliable CONNCMD_SEND" << std::endl);
- sendReliable(c);
- return;
+ case CONNCMD_SEND:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing reliable CONNCMD_SEND" << std::endl);
+ sendReliable(c);
+ return;
- case CONNCMD_SEND_TO_ALL:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
- sendToAllReliable(c);
- return;
+ case CONNCMD_SEND_TO_ALL:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
+ sendToAllReliable(c);
+ return;
- case CONCMD_CREATE_PEER:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
- if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
- /* put to queue if we couldn't send it immediately */
- sendReliable(c);
- }
- return;
+ case CONCMD_CREATE_PEER:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing reliable CONCMD_CREATE_PEER"
+ << std::endl);
+ if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
+ /* put to queue if we couldn't send it immediately */
+ sendReliable(c);
+ }
+ return;
- case CONNCMD_SERVE:
- case CONNCMD_CONNECT:
- case CONNCMD_DISCONNECT:
- case CONCMD_ACK:
- FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
- default:
- LOG(dout_con << m_connection->getDesc()
- << " Invalid reliable command type: " << c.type << std::endl);
+ case CONNCMD_SERVE:
+ case CONNCMD_CONNECT:
+ case CONNCMD_DISCONNECT:
+ case CONCMD_ACK:
+ FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
+ default:
+ LOG(dout_con << m_connection->getDesc()
+ << " Invalid reliable command type: " << c.type
+ << std::endl);
}
}
-
void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
{
assert(!c.reliable); // Pre-condition
switch (c.type) {
- case CONNCMD_NONE:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_NONE" << std::endl);
- return;
- case CONNCMD_SERVE:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_SERVE port="
- << c.address.serializeString() << std::endl);
- serve(c.address);
- return;
- case CONNCMD_CONNECT:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_CONNECT" << std::endl);
- connect(c.address);
- return;
- case CONNCMD_DISCONNECT:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_DISCONNECT" << std::endl);
- disconnect();
- return;
- case CONNCMD_DISCONNECT_PEER:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
- disconnect_peer(c.peer_id);
- return;
- case CONNCMD_SEND:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_SEND" << std::endl);
- send(c.peer_id, c.channelnum, c.data);
- return;
- case CONNCMD_SEND_TO_ALL:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
- sendToAll(c.channelnum, c.data);
- return;
- case CONCMD_ACK:
- LOG(dout_con << m_connection->getDesc()
- << " UDP processing CONCMD_ACK" << std::endl);
- sendAsPacket(c.peer_id, c.channelnum, c.data, true);
- return;
- case CONCMD_CREATE_PEER:
- FATAL_ERROR("Got command that should be reliable as unreliable command");
- default:
- LOG(dout_con << m_connection->getDesc()
- << " Invalid command type: " << c.type << std::endl);
+ case CONNCMD_NONE:
+ LOG(dout_con << m_connection->getDesc() << " UDP processing CONNCMD_NONE"
+ << std::endl);
+ return;
+ case CONNCMD_SERVE:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_SERVE port="
+ << c.address.serializeString() << std::endl);
+ serve(c.address);
+ return;
+ case CONNCMD_CONNECT:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_CONNECT" << std::endl);
+ connect(c.address);
+ return;
+ case CONNCMD_DISCONNECT:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_DISCONNECT" << std::endl);
+ disconnect();
+ return;
+ case CONNCMD_DISCONNECT_PEER:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
+ disconnect_peer(c.peer_id);
+ return;
+ case CONNCMD_SEND:
+ LOG(dout_con << m_connection->getDesc() << " UDP processing CONNCMD_SEND"
+ << std::endl);
+ send(c.peer_id, c.channelnum, c.data);
+ return;
+ case CONNCMD_SEND_TO_ALL:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
+ sendToAll(c.channelnum, c.data);
+ return;
+ case CONCMD_ACK:
+ LOG(dout_con << m_connection->getDesc() << " UDP processing CONCMD_ACK"
+ << std::endl);
+ sendAsPacket(c.peer_id, c.channelnum, c.data, true);
+ return;
+ case CONCMD_CREATE_PEER:
+ FATAL_ERROR("Got command that should be reliable as unreliable command");
+ default:
+ LOG(dout_con << m_connection->getDesc()
+ << " Invalid command type: " << c.type << std::endl);
}
}
void ConnectionSendThread::serve(Address bind_address)
{
- LOG(dout_con << m_connection->getDesc()
- << "UDP serving at port " << bind_address.serializeString() << std::endl);
+ LOG(dout_con << m_connection->getDesc() << "UDP serving at port "
+ << bind_address.serializeString() << std::endl);
try {
m_connection->m_udpSocket.Bind(bind_address);
m_connection->SetPeerID(PEER_ID_SERVER);
- }
- catch (SocketException &e) {
+ } catch (SocketException &e) {
// Create event
ConnectionEvent ce;
ce.bindFailed();
@@ -512,8 +513,8 @@ void ConnectionSendThread::serve(Address bind_address)
void ConnectionSendThread::connect(Address address)
{
LOG(dout_con << m_connection->getDesc() << " connecting to "
- << address.serializeString()
- << ":" << address.getPort() << std::endl);
+ << address.serializeString() << ":" << address.getPort()
+ << std::endl);
UDPPeer *peer = m_connection->createServerPeer(address);
@@ -525,7 +526,7 @@ void ConnectionSendThread::connect(Address address)
Address bind_addr;
if (address.isIPv6())
- bind_addr.setAddress((IPv6AddressBytes *) NULL);
+ bind_addr.setAddress((IPv6AddressBytes *)NULL);
else
bind_addr.setAddress(0, 0, 0, 0);
@@ -546,7 +547,6 @@ void ConnectionSendThread::disconnect()
writeU8(&data[0], PACKET_TYPE_CONTROL);
writeU8(&data[1], CONTROLTYPE_DISCO);
-
// Send to all
std::list<session_t> peerids = m_connection->getPeerIDs();
@@ -577,23 +577,23 @@ void ConnectionSendThread::disconnect_peer(session_t peer_id)
dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
}
-void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
- const SharedBuffer<u8> &data)
+void ConnectionSendThread::send(
+ session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data)
{
assert(channelnum < CHANNEL_COUNT); // Pre-condition
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
- << ">>>NOT<<< found on sending packet"
- << ", channel " << (channelnum % 0xFF)
- << ", size: " << data.getSize() << std::endl);
+ << ">>>NOT<<< found on sending packet"
+ << ", channel " << (channelnum % 0xFF)
+ << ", size: " << data.getSize() << std::endl);
return;
}
LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
- << ", channel " << (channelnum % 0xFF)
- << ", size: " << data.getSize() << std::endl);
+ << ", channel " << (channelnum % 0xFF)
+ << ", size: " << data.getSize() << std::endl);
u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
@@ -647,16 +647,16 @@ void ConnectionSendThread::sendPackets(float dtime)
std::list<session_t> pendingDisconnect;
std::map<session_t, bool> pending_unreliable;
- const unsigned int peer_packet_quota = m_iteration_packets_avaialble
- / MYMAX(peerIds.size(), 1);
+ const unsigned int peer_packet_quota =
+ m_iteration_packets_avaialble / MYMAX(peerIds.size(), 1);
for (session_t peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId);
- //peer may have been removed
+ // peer may have been removed
if (!peer) {
- LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
- << peerId
- << std::endl);
+ LOG(dout_con << m_connection->getDesc()
+ << " Peer not found: peer_id=" << peerId
+ << std::endl);
continue;
}
peer->m_increment_packets_remaining = peer_packet_quota;
@@ -671,17 +671,16 @@ void ConnectionSendThread::sendPackets(float dtime)
pendingDisconnect.push_back(peerId);
}
- PROFILE(std::stringstream
- peerIdentifier);
- PROFILE(
- peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
- << ";RELIABLE]");
- PROFILE(ScopeProfiler
- peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
+ PROFILE(std::stringstream peerIdentifier);
+ PROFILE(peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";"
+ << peerId << ";RELIABLE]");
+ PROFILE(ScopeProfiler peerprofiler(
+ g_profiler, peerIdentifier.str(), SPT_AVG));
LOG(dout_con << m_connection->getDesc()
- << " Handle per peer queues: peer_id=" << peerId
- << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
+ << " Handle per peer queues: peer_id=" << peerId
+ << " packet quota: " << peer->m_increment_packets_remaining
+ << std::endl);
// first send queued reliable packets for all peers (if possible)
for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
@@ -692,38 +691,34 @@ void ConnectionSendThread::sendPackets(float dtime)
u16 next_to_receive = 0;
channel.incoming_reliables.getFirstSeqnum(next_to_receive);
- LOG(dout_con << m_connection->getDesc() << "\t channel: "
- << i << ", peer quota:"
- << peer->m_increment_packets_remaining
- << std::endl
- << "\t\t\treliables on wire: "
- << channel.outgoing_reliables_sent.size()
- << ", waiting for ack for " << next_to_ack
- << std::endl
- << "\t\t\tincoming_reliables: "
- << channel.incoming_reliables.size()
- << ", next reliable packet: "
- << channel.readNextIncomingSeqNum()
- << ", next queued: " << next_to_receive
- << std::endl
- << "\t\t\treliables queued : "
- << channel.queued_reliables.size()
- << std::endl
- << "\t\t\tqueued commands : "
- << channel.queued_commands.size()
- << std::endl);
+ LOG(dout_con << m_connection->getDesc() << "\t channel: " << i
+ << ", peer quota:"
+ << peer->m_increment_packets_remaining << std::endl
+ << "\t\t\treliables on wire: "
+ << channel.outgoing_reliables_sent.size()
+ << ", waiting for ack for " << next_to_ack
+ << std::endl
+ << "\t\t\tincoming_reliables: "
+ << channel.incoming_reliables.size()
+ << ", next reliable packet: "
+ << channel.readNextIncomingSeqNum()
+ << ", next queued: " << next_to_receive << std::endl
+ << "\t\t\treliables queued : "
+ << channel.queued_reliables.size() << std::endl
+ << "\t\t\tqueued commands : "
+ << channel.queued_commands.size() << std::endl);
while (!channel.queued_reliables.empty() &&
- channel.outgoing_reliables_sent.size()
- < channel.getWindowSize() &&
+ channel.outgoing_reliables_sent.size() <
+ channel.getWindowSize() &&
peer->m_increment_packets_remaining > 0) {
BufferedPacket p = channel.queued_reliables.front();
channel.queued_reliables.pop();
LOG(dout_con << m_connection->getDesc()
- << " INFO: sending a queued reliable packet "
- << " channel: " << i
- << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
- << std::endl);
+ << " INFO: sending a queued reliable packet "
+ << " channel: " << i << ", seqnum: "
+ << readU16(&p.data[BASE_HEADER_SIZE + 1])
+ << std::endl);
sendAsPacketReliable(p, &channel);
peer->m_increment_packets_remaining--;
}
@@ -731,9 +726,8 @@ void ConnectionSendThread::sendPackets(float dtime)
}
if (!m_outgoing_queue.empty()) {
- LOG(dout_con << m_connection->getDesc()
- << " Handle non reliable queue ("
- << m_outgoing_queue.size() << " pkts)" << std::endl);
+ LOG(dout_con << m_connection->getDesc() << " Handle non reliable queue ("
+ << m_outgoing_queue.size() << " pkts)" << std::endl);
}
unsigned int initial_queuesize = m_outgoing_queue.size();
@@ -748,17 +742,18 @@ void ConnectionSendThread::sendPackets(float dtime)
PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc()
- << " Outgoing queue: peer_id=" << packet.peer_id
- << ">>>NOT<<< found on sending packet"
- << ", channel " << (packet.channelnum % 0xFF)
- << ", size: " << packet.data.getSize() << std::endl);
+ << " Outgoing queue: peer_id=" << packet.peer_id
+ << ">>>NOT<<< found on sending packet"
+ << ", channel " << (packet.channelnum % 0xFF)
+ << ", size: " << packet.data.getSize() << std::endl);
continue;
}
/* send acks immediately */
- if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
- rawSendAsPacket(packet.peer_id, packet.channelnum,
- packet.data, packet.reliable);
+ if (packet.ack || peer->m_increment_packets_remaining > 0 ||
+ stopRequested()) {
+ rawSendAsPacket(packet.peer_id, packet.channelnum, packet.data,
+ packet.reliable);
if (peer->m_increment_packets_remaining > 0)
peer->m_increment_packets_remaining--;
} else {
@@ -774,8 +769,10 @@ void ConnectionSendThread::sendPackets(float dtime)
continue;
if (peer->m_increment_packets_remaining == 0) {
LOG(warningstream << m_connection->getDesc()
- << " Packet quota used up for peer_id=" << peerId
- << ", was " << peer_packet_quota << " pkts" << std::endl);
+ << " Packet quota used up for peer_id="
+ << peerId << ", was "
+ << peer_packet_quota << " pkts"
+ << std::endl);
}
}
}
@@ -787,15 +784,15 @@ void ConnectionSendThread::sendPackets(float dtime)
}
}
-void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
- const SharedBuffer<u8> &data, bool ack)
+void ConnectionSendThread::sendAsPacket(
+ session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data, bool ack)
{
OutgoingPacket packet(peer_id, channelnum, data, false, ack);
m_outgoing_queue.push(packet);
}
ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
- Thread("ConnectionReceive")
+ Thread("ConnectionReceive")
{
}
@@ -803,12 +800,12 @@ void *ConnectionReceiveThread::run()
{
assert(m_connection);
- LOG(dout_con << m_connection->getDesc()
- << "ConnectionReceive thread started" << std::endl);
+ LOG(dout_con << m_connection->getDesc() << "ConnectionReceive thread started"
+ << std::endl);
- PROFILE(std::stringstream
- ThreadIdentifier);
- PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
+ PROFILE(std::stringstream ThreadIdentifier);
+ PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc()
+ << "]");
// use IPv6 minimum allowed MTU as receive buffer size as this is
// theoretical reliable upper boundary of a udp packet for all IPv6 enabled
@@ -826,13 +823,12 @@ void *ConnectionReceiveThread::run()
while (!stopRequested()) {
BEGIN_DEBUG_EXCEPTION_HANDLER
- PROFILE(ScopeProfiler
- sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
+ PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
#ifdef DEBUG_CONNECTION_KBPS
lasttime = curtime;
curtime = porting::getTimeMs();
- float dtime = CALC_DTIME(lasttime,curtime);
+ float dtime = CALC_DTIME(lasttime, curtime);
#endif
/* receive packets */
@@ -846,9 +842,7 @@ void *ConnectionReceiveThread::run()
std::list<session_t> peerids = m_connection->getPeerIDs();
for (std::list<session_t>::iterator i = peerids.begin();
- i != peerids.end();
- i++)
- {
+ i != peerids.end(); i++) {
PeerHelper peer = m_connection->getPeerNoEx(*i);
if (!peer)
continue;
@@ -858,35 +852,52 @@ void *ConnectionReceiveThread::run()
float avg_rate = 0.0;
float avg_loss = 0.0;
- for(u16 j=0; j<CHANNEL_COUNT; j++)
- {
- peer_current +=peer->channels[j].getCurrentDownloadRateKB();
- peer_loss += peer->channels[j].getCurrentLossRateKB();
- avg_rate += peer->channels[j].getAvgDownloadRateKB();
+ for (u16 j = 0; j < CHANNEL_COUNT; j++) {
+ peer_current += peer->channels[j]
+ .getCurrentDownloadRateKB();
+ peer_loss += peer->channels[j]
+ .getCurrentLossRateKB();
+ avg_rate += peer->channels[j]
+ .getAvgDownloadRateKB();
avg_loss += peer->channels[j].getAvgLossRateKB();
}
std::stringstream output;
output << std::fixed << std::setprecision(1);
- output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
- output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
- output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
+ output << "OUT to Peer " << *i << " RATES (good / loss) "
+ << std::endl;
+ output << "\tcurrent (sum): " << peer_current << "kb/s "
+ << peer_loss << "kb/s" << std::endl;
+ output << "\taverage (sum): " << avg_rate << "kb/s "
+ << avg_loss << "kb/s" << std::endl;
output << std::setfill(' ');
- for(u16 j=0; j<CHANNEL_COUNT; j++)
- {
+ for (u16 j = 0; j < CHANNEL_COUNT; j++) {
output << "\tcha " << j << ":"
- << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
- << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
- << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
- << " /"
- << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
- << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
- << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
- << " / WS: " << peer->channels[j].getWindowSize()
- << std::endl;
+ << " CUR: " << std::setw(6)
+ << peer->channels[j].getCurrentDownloadRateKB()
+ << "kb/s"
+ << " AVG: " << std::setw(6)
+ << peer->channels[j].getAvgDownloadRateKB()
+ << "kb/s"
+ << " MAX: " << std::setw(6)
+ << peer->channels[j].getMaxDownloadRateKB()
+ << "kb/s"
+ << " /"
+ << " CUR: " << std::setw(6)
+ << peer->channels[j].getCurrentLossRateKB()
+ << "kb/s"
+ << " AVG: " << std::setw(6)
+ << peer->channels[j].getAvgLossRateKB()
+ << "kb/s"
+ << " MAX: " << std::setw(6)
+ << peer->channels[j].getMaxLossRateKB()
+ << "kb/s"
+ << " / WS: "
+ << peer->channels[j].getWindowSize()
+ << std::endl;
}
- fprintf(stderr,"%s\n",output.str().c_str());
+ fprintf(stderr, "%s\n", output.str().c_str());
}
}
#endif
@@ -898,8 +909,7 @@ void *ConnectionReceiveThread::run()
}
// Receive packets from the network and buffers and create ConnectionEvents
-void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
- bool &packet_queued)
+void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, bool &packet_queued)
{
try {
// First, see if there any buffered packets we can process now
@@ -915,8 +925,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
}
- }
- catch (ProcessedSilentlyException &e) {
+ } catch (ProcessedSilentlyException &e) {
/* try reading again */
}
}
@@ -925,19 +934,20 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
// Call Receive() to wait for incoming data
Address sender;
- s32 received_size = m_connection->m_udpSocket.Receive(sender,
- *packetdata, packetdata.getSize());
+ s32 received_size = m_connection->m_udpSocket.Receive(
+ sender, *packetdata, packetdata.getSize());
if (received_size < 0)
return;
if ((received_size < BASE_HEADER_SIZE) ||
- (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
+ (readU32(&packetdata[0]) !=
+ m_connection->GetProtocolID())) {
LOG(derr_con << m_connection->getDesc()
- << "Receive(): Invalid incoming packet, "
- << "size: " << received_size
- << ", protocol: "
- << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
- << std::endl);
+ << "Receive(): Invalid incoming packet, "
+ << "size: " << received_size << ", protocol: "
+ << ((received_size >= 4) ? readU32(&packetdata[0])
+ : -1)
+ << std::endl);
return;
}
@@ -946,7 +956,8 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
if (channelnum > CHANNEL_COUNT - 1) {
LOG(derr_con << m_connection->getDesc()
- << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
+ << "Receive(): Invalid channel " << (u32)channelnum
+ << std::endl);
return;
}
@@ -960,14 +971,15 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
/* The peer was not found in our lists. Add it. */
if (peer_id == PEER_ID_INEXISTENT) {
- peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
+ peer_id = m_connection->createPeer(
+ sender, MTP_MINETEST_RELIABLE_UDP, 0);
}
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc()
- << " got packet from unknown peer_id: "
- << peer_id << " Ignoring." << std::endl);
+ << " got packet from unknown peer_id: " << peer_id
+ << " Ignoring." << std::endl);
return;
}
@@ -976,15 +988,18 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
if (peer_address != sender) {
- LOG(derr_con << m_connection->getDesc()
- << " Peer " << peer_id << " sending from different address."
- " Ignoring." << std::endl);
+ LOG(derr_con << m_connection->getDesc() << " Peer "
+ << peer_id
+ << " sending from different address."
+ " Ignoring."
+ << std::endl);
return;
}
} else {
- LOG(derr_con << m_connection->getDesc()
- << " Peer " << peer_id << " doesn't have an address?!"
- " Ignoring." << std::endl);
+ LOG(derr_con << m_connection->getDesc() << " Peer " << peer_id
+ << " doesn't have an address?!"
+ " Ignoring."
+ << std::endl);
return;
}
@@ -995,8 +1010,10 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
} else {
LOG(derr_con << m_connection->getDesc()
- << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
- " Ignoring." << std::endl);
+ << "Receive(): peer_id=" << peer_id
+ << " isn't an UDPPeer?!"
+ " Ignoring."
+ << std::endl);
return;
}
@@ -1007,33 +1024,30 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
// Make a new SharedBuffer from the data without the base headers
SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
- strippeddata.getSize());
+ strippeddata.getSize());
try {
// Process it (the result is some data with no headers made by us)
- SharedBuffer<u8> resultdata = processPacket
- (channel, strippeddata, peer_id, channelnum, false);
+ SharedBuffer<u8> resultdata = processPacket(channel, strippeddata,
+ peer_id, channelnum, false);
LOG(dout_con << m_connection->getDesc()
- << " ProcessPacket from peer_id: " << peer_id
- << ", channel: " << (u32)channelnum << ", returned "
- << resultdata.getSize() << " bytes" << std::endl);
+ << " ProcessPacket from peer_id: " << peer_id
+ << ", channel: " << (u32)channelnum << ", returned "
+ << resultdata.getSize() << " bytes" << std::endl);
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
- }
- catch (ProcessedSilentlyException &e) {
- }
- catch (ProcessedQueued &e) {
+ } catch (ProcessedSilentlyException &e) {
+ } catch (ProcessedQueued &e) {
// we set it to true anyway (see below)
}
/* Every time we receive a packet it can happen that a previously
* buffered packet is now ready to process. */
packet_queued = true;
- }
- catch (InvalidIncomingDataException &e) {
+ } catch (InvalidIncomingDataException &e) {
}
}
@@ -1058,8 +1072,8 @@ bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8
return false;
}
-bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
- session_t &peer_id, SharedBuffer<u8> &dst)
+bool ConnectionReceiveThread::checkIncomingBuffers(
+ Channel *channel, session_t &peer_id, SharedBuffer<u8> &dst)
{
u16 firstseqnum = 0;
if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
@@ -1070,11 +1084,10 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
LOG(dout_con << m_connection->getDesc()
- << "UNBUFFERING TYPE_RELIABLE"
- << " seqnum=" << seqnum
- << " peer_id=" << peer_id
- << " channel=" << ((int) channelnum & 0xff)
- << std::endl);
+ << "UNBUFFERING TYPE_RELIABLE"
+ << " seqnum=" << seqnum << " peer_id=" << peer_id
+ << " channel=" << ((int)channelnum & 0xff)
+ << std::endl);
channel->incNextIncomingSeqNum();
@@ -1091,7 +1104,8 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
}
SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
- const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
+ const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum,
+ bool reliable)
{
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
@@ -1112,8 +1126,8 @@ SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
}
if (type >= PACKET_TYPE_MAX) {
- derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
- << std::endl;
+ derr_con << m_connection->getDesc()
+ << "Got invalid type=" << ((int)type & 0xff) << std::endl;
throw InvalidIncomingDataException("Invalid packet type");
}
@@ -1122,15 +1136,16 @@ SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
}
const ConnectionReceiveThread::PacketTypeHandler
- ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
- {&ConnectionReceiveThread::handlePacketType_Control},
- {&ConnectionReceiveThread::handlePacketType_Original},
- {&ConnectionReceiveThread::handlePacketType_Split},
- {&ConnectionReceiveThread::handlePacketType_Reliable},
+ ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
+ {&ConnectionReceiveThread::handlePacketType_Control},
+ {&ConnectionReceiveThread::handlePacketType_Original},
+ {&ConnectionReceiveThread::handlePacketType_Split},
+ {&ConnectionReceiveThread::handlePacketType_Reliable},
};
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
- const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+ const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum,
+ bool reliable)
{
if (packetdata.getSize() < 2)
throw InvalidIncomingDataException("packetdata.getSize() < 2");
@@ -1142,26 +1157,29 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
if (packetdata.getSize() < 4) {
throw InvalidIncomingDataException(
- "packetdata.getSize() < 4 (ACK header size)");
+ "packetdata.getSize() < 4 (ACK header size)");
}
u16 seqnum = readU16(&packetdata[2]);
- LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
- << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
- << seqnum << " ]" << std::endl);
+ LOG(dout_con << m_connection->getDesc()
+ << " [ CONTROLTYPE_ACK: channelnum="
+ << ((int)channelnum & 0xff) << ", peer_id=" << peer->id
+ << ", seqnum=" << seqnum << " ]" << std::endl);
try {
- BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
+ BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(
+ seqnum);
// only calculate rtt from straight sent packets
if (p.resend_count == 0) {
// Get round trip time
u64 current_time = porting::getTimeMs();
- // a overflow is quite unlikely but as it'd result in major
- // rtt miscalculation we handle it here
+ // a overflow is quite unlikely but as it'd result in
+ // major rtt miscalculation we handle it here
if (current_time > p.absolute_send_time) {
- float rtt = (current_time - p.absolute_send_time) / 1000.0;
+ float rtt = (current_time - p.absolute_send_time) /
+ 1000.0;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
@@ -1180,8 +1198,8 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
m_connection->TriggerSend();
} catch (NotFoundException &e) {
LOG(derr_con << m_connection->getDesc()
- << "WARNING: ACKed packet not in outgoing queue"
- << " seqnum=" << seqnum << std::endl);
+ << "WARNING: ACKed packet not in outgoing queue"
+ << " seqnum=" << seqnum << std::endl);
channel->UpdatePacketTooLateCounter();
}
@@ -1189,18 +1207,19 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
} else if (controltype == CONTROLTYPE_SET_PEER_ID) {
// Got a packet to set our peer id
if (packetdata.getSize() < 4)
- throw InvalidIncomingDataException
- ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
+ throw InvalidIncomingDataException("packetdata.getSize() < 4 "
+ "(SET_PEER_ID header size)");
session_t peer_id_new = readU16(&packetdata[2]);
- LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
- << "... " << std::endl);
+ LOG(dout_con << m_connection->getDesc() << "Got new peer id: "
+ << peer_id_new << "... " << std::endl);
if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
LOG(derr_con << m_connection->getDesc()
- << "WARNING: Not changing existing peer id." << std::endl);
+ << "WARNING: Not changing existing peer id."
+ << std::endl);
} else {
LOG(dout_con << m_connection->getDesc() << "changing own peer id"
- << std::endl);
+ << std::endl);
m_connection->SetPeerID(peer_id_new);
}
@@ -1214,29 +1233,31 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
// Just ignore it, the incoming data already reset
// the timeout counter
LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
- << peer->id << std::endl);
+ << peer->id << std::endl);
if (!m_connection->deletePeer(peer->id, false)) {
- derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
+ derr_con << m_connection->getDesc() << "DISCO: Peer not found"
+ << std::endl;
}
throw ProcessedSilentlyException("Got a DISCO");
} else {
LOG(derr_con << m_connection->getDesc()
- << "INVALID TYPE_CONTROL: invalid controltype="
- << ((int) controltype & 0xff) << std::endl);
+ << "INVALID TYPE_CONTROL: invalid controltype="
+ << ((int)controltype & 0xff) << std::endl);
throw InvalidIncomingDataException("Invalid control type");
}
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
- const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+ const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum,
+ bool reliable)
{
if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
- throw InvalidIncomingDataException
- ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
+ throw InvalidIncomingDataException(
+ "packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
- << std::endl);
+ << std::endl);
// Get the inside packet out and return it
SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
@@ -1244,29 +1265,29 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *cha
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
- const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+ const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum,
+ bool reliable)
{
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
// We have to create a packet again for buffering
// This isn't actually too bad an idea.
- BufferedPacket packet = makePacket(peer_address,
- packetdata,
- m_connection->GetProtocolID(),
- peer->id,
- channelnum);
+ BufferedPacket packet = makePacket(peer_address, packetdata,
+ m_connection->GetProtocolID(), peer->id, channelnum);
// Buffer the packet
- SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
+ SharedBuffer<u8> data =
+ peer->addSplitPacket(channelnum, packet, reliable);
if (data.getSize() != 0) {
LOG(dout_con << m_connection->getDesc()
- << "RETURNING TYPE_SPLIT: Constructed full data, "
- << "size=" << data.getSize() << std::endl);
+ << "RETURNING TYPE_SPLIT: Constructed full data, "
+ << "size=" << data.getSize() << std::endl);
return data;
}
- LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
+ LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT"
+ << std::endl);
throw ProcessedSilentlyException("Buffered a split packet chunk");
}
@@ -1275,7 +1296,8 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channe
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
- const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+ const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum,
+ bool reliable)
{
assert(channel != NULL);
@@ -1284,41 +1306,44 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
throw InvalidIncomingDataException("Found nested reliable packets");
if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
- throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
+ throw InvalidIncomingDataException(
+ "packetdata.getSize() < RELIABLE_HEADER_SIZE");
u16 seqnum = readU16(&packetdata[1]);
bool is_future_packet = false;
bool is_old_packet = false;
/* packet is within our receive window send ack */
- if (seqnum_in_window(seqnum,
- channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
+ if (seqnum_in_window(seqnum, channel->readNextIncomingSeqNum(),
+ MAX_RELIABLE_WINDOW_SIZE)) {
m_connection->sendAck(peer->id, channelnum, seqnum);
} else {
- is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
+ is_future_packet =
+ seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
/* packet is not within receive window, don't send ack. *
* if this was a valid packet it's gonna be retransmitted */
if (is_future_packet)
- throw ProcessedSilentlyException(
- "Received packet newer then expected, not sending ack");
+ throw ProcessedSilentlyException("Received packet newer then "
+ "expected, not sending ack");
/* seems like our ack was lost, send another one for a old packet */
if (is_old_packet) {
LOG(dout_con << m_connection->getDesc()
- << "RE-SENDING ACK: peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum << std::endl;)
+ << "RE-SENDING ACK: peer_id: " << peer->id
+ << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum << std::endl;)
m_connection->sendAck(peer->id, channelnum, seqnum);
// we already have this packet so this one was on wire at least
// the current timeout
- // we don't know how long this packet was on wire don't do silly guessing
- // dynamic_cast<UDPPeer*>(&peer)->
+ // we don't know how long this packet was on wire don't do silly
+ // guessing dynamic_cast<UDPPeer*>(&peer)->
// reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
- throw ProcessedSilentlyException("Retransmitting ack for old packet");
+ throw ProcessedSilentlyException(
+ "Retransmitting ack for old packet");
}
}
@@ -1330,19 +1355,16 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
// This one comes later, buffer it.
// Actually we have to make a packet to buffer one.
// Well, we have all the ingredients, so just do it.
- BufferedPacket packet = con::makePacket(
- peer_address,
- packetdata,
- m_connection->GetProtocolID(),
- peer->id,
- channelnum);
+ BufferedPacket packet = con::makePacket(peer_address, packetdata,
+ m_connection->GetProtocolID(), peer->id, channelnum);
try {
- channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
+ channel->incoming_reliables.insert(
+ packet, channel->readNextIncomingSeqNum());
LOG(dout_con << m_connection->getDesc()
- << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum << std::endl;)
+ << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
+ << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum << std::endl;)
throw ProcessedQueued("Buffered future reliable packet");
} catch (AlreadyExistsException &e) {
@@ -1352,25 +1374,24 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
m_connection->putCommand(discon);
LOG(derr_con << m_connection->getDesc()
- << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum
- << "DROPPING CLIENT!" << std::endl;)
+ << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
+ << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum << "DROPPING CLIENT!"
+ << std::endl;)
}
}
/* we got a packet to process right now */
- LOG(dout_con << m_connection->getDesc()
- << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
- << ", channel: " << (channelnum & 0xFF)
- << ", seqnum: " << seqnum << std::endl;)
-
+ LOG(dout_con << m_connection->getDesc() << "RECURSIVE, TYPE_RELIABLE peer_id: "
+ << peer->id << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum << std::endl;)
/* check for resend case */
u16 queued_seqnum = 0;
if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
if (queued_seqnum == seqnum) {
- BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
+ BufferedPacket queued_packet =
+ channel->incoming_reliables.popFirst();
/** TODO find a way to verify the new against the old packet */
}
}