aboutsummaryrefslogtreecommitdiff
path: root/src/network/connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network/connection.cpp')
-rw-r--r--src/network/connection.cpp525
1 files changed, 258 insertions, 267 deletions
diff --git a/src/network/connection.cpp b/src/network/connection.cpp
index 3692e45a9..dbcece99d 100644
--- a/src/network/connection.cpp
+++ b/src/network/connection.cpp
@@ -41,29 +41,29 @@ namespace con
/* defines used for debugging and profiling */
/******************************************************************************/
#ifdef NDEBUG
- #define LOG(a) a
- #define PROFILE(a)
+#define LOG(a) a
+#define PROFILE(a)
#else
- #if 0
+#if 0
/* this mutex is used to achieve log message consistency */
std::mutex log_message_mutex;
- #define LOG(a) \
- { \
- MutexAutoLock loglock(log_message_mutex); \
- a; \
- }
- #else
- // Prevent deadlocks until a solution is found after 5.2.0 (TODO)
- #define LOG(a) a
- #endif
+#define LOG(a) \
+ { \
+ MutexAutoLock loglock(log_message_mutex); \
+ a; \
+ }
+#else
+// Prevent deadlocks until a solution is found after 5.2.0 (TODO)
+#define LOG(a) a
+#endif
- #define PROFILE(a) a
+#define PROFILE(a) a
#endif
#define PING_TIMEOUT 5.0
-BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
- u32 protocol_id, session_t sender_peer_id, u8 channel)
+BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data, u32 protocol_id,
+ session_t sender_peer_id, u8 channel)
{
u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
BufferedPacket p(packet_size);
@@ -123,8 +123,7 @@ void makeSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max, u16 seqnum
start = end + 1;
chunk_num++;
- }
- while (end != data.getSize() - 1);
+ } while (end != data.getSize() - 1);
for (SharedBuffer<u8> &chunk : *chunks) {
// Write chunk_count
@@ -167,11 +166,11 @@ SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum)
void ReliablePacketBuffer::print()
{
MutexAutoLock listlock(m_list_mutex);
- LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
+ LOG(dout_con << "Dump of ReliablePacketBuffer:" << std::endl);
unsigned int index = 0;
for (BufferedPacket &bufferedPacket : m_list) {
- u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1]));
- LOG(dout_con<<index<< ":" << s << std::endl);
+ u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE + 1]));
+ LOG(dout_con << index << ":" << s << std::endl);
index++;
}
}
@@ -191,9 +190,8 @@ u32 ReliablePacketBuffer::size()
RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
{
std::list<BufferedPacket>::iterator i = m_list.begin();
- for(; i != m_list.end(); ++i)
- {
- u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ for (; i != m_list.end(); ++i) {
+ u16 s = readU16(&(i->data[BASE_HEADER_SIZE + 1]));
if (s == seqnum)
break;
}
@@ -205,7 +203,7 @@ RPBSearchResult ReliablePacketBuffer::notFound()
return m_list.end();
}
-bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
+bool ReliablePacketBuffer::getFirstSeqnum(u16 &result)
{
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
@@ -237,17 +235,16 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
MutexAutoLock listlock(m_list_mutex);
RPBSearchResult r = findPacket(seqnum);
if (r == notFound()) {
- LOG(dout_con<<"Sequence number: " << seqnum
- << " not found in reliable buffer"<<std::endl);
+ LOG(dout_con << "Sequence number: " << seqnum
+ << " not found in reliable buffer" << std::endl);
throw NotFoundException("seqnum not found in buffer");
}
BufferedPacket p = *r;
-
RPBSearchResult next = r;
++next;
if (next != notFound()) {
- u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1]));
+ u16 s = readU16(&(next->data[BASE_HEADER_SIZE + 1]));
m_oldest_non_answered_ack = s;
}
@@ -267,25 +264,27 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected)
MutexAutoLock listlock(m_list_mutex);
if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
- "reliable packet" << std::endl;
+ "reliable packet"
+ << std::endl;
return;
}
u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]);
if (type != PACKET_TYPE_RELIABLE) {
errorstream << "ReliablePacketBuffer::insert(): type is not reliable"
- << std::endl;
+ << std::endl;
return;
}
u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
- "expected window " << std::endl;
+ "expected window "
+ << std::endl;
return;
}
if (seqnum == next_expected) {
errorstream << "ReliablePacketBuffer::insert(): seqnum is next expected"
- << std::endl;
+ << std::endl;
return;
}
@@ -293,8 +292,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected)
// Find the right place for the packet and insert it there
// If list is empty, just add it
- if (m_list.empty())
- {
+ if (m_list.empty()) {
m_list.push_back(p);
m_oldest_non_answered_ack = seqnum;
// Done.
@@ -304,47 +302,47 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected)
// Otherwise find the right place
std::list<BufferedPacket>::iterator i = m_list.begin();
// Find the first packet in the list which has a higher seqnum
- u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ u16 s = readU16(&(i->data[BASE_HEADER_SIZE + 1]));
/* case seqnum is smaller then next_expected seqnum */
/* this is true e.g. on wrap around */
if (seqnum < next_expected) {
- while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
+ while (((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
++i;
if (i != m_list.end())
- s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ s = readU16(&(i->data[BASE_HEADER_SIZE + 1]));
}
}
/* non wrap around case (at least for incoming and next_expected */
- else
- {
- while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
+ else {
+ while (((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
++i;
if (i != m_list.end())
- s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ s = readU16(&(i->data[BASE_HEADER_SIZE + 1]));
}
}
if (s == seqnum) {
/* nothing to do this seems to be a resent packet */
/* for paranoia reason data should be compared */
- if (
- (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
- (i->data.getSize() != p.data.getSize()) ||
- (i->address != p.address)
- )
- {
+ if ((readU16(&(i->data[BASE_HEADER_SIZE + 1])) != seqnum) ||
+ (i->data.getSize() != p.data.getSize()) ||
+ (i->address != p.address)) {
/* if this happens your maximum transfer window may be to big */
fprintf(stderr,
- "Duplicated seqnum %d non matching packet detected:\n",
+ "Duplicated seqnum %d non matching packet "
+ "detected:\n",
seqnum);
fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
- readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
+ readU16(&(i->data[BASE_HEADER_SIZE + 1])),
+ i->data.getSize(),
i->address.serializeString().c_str());
fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
- readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
+ readU16(&(p.data[BASE_HEADER_SIZE + 1])),
+ p.data.getSize(),
p.address.serializeString().c_str());
- throw IncomingDataCorruption("duplicated packet isn't same as original one");
+ throw IncomingDataCorruption(
+ "duplicated packet isn't same as original one");
}
}
/* insert or push back */
@@ -355,7 +353,8 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected)
}
/* update last packet number */
- m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
+ m_oldest_non_answered_ack =
+ readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE + 1]);
}
void ReliablePacketBuffer::incrementTimeouts(float dtime)
@@ -367,8 +366,8 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime)
}
}
-std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
- unsigned int max_packets)
+std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(
+ float timeout, unsigned int max_packets)
{
MutexAutoLock listlock(m_list_mutex);
std::list<BufferedPacket> timed_outs;
@@ -376,7 +375,7 @@ std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
if (bufferedPacket.time >= timeout) {
timed_outs.push_back(bufferedPacket);
- //this packet will be sent right afterwards reset timeout here
+ // this packet will be sent right afterwards reset timeout here
bufferedPacket.time = 0.0f;
if (timed_outs.size() >= max_packets)
break;
@@ -447,19 +446,19 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
errorstream << "Invalid data size for split packet" << std::endl;
return SharedBuffer<u8>();
}
- u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
- u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
- u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
- u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
+ u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]);
+ u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+ u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE + 3]);
+ u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE + 5]);
if (type != PACKET_TYPE_SPLIT) {
errorstream << "IncomingSplitBuffer::insert(): type is not split"
- << std::endl;
+ << std::endl;
return SharedBuffer<u8>();
}
if (chunk_num >= chunk_count) {
errorstream << "IncomingSplitBuffer::insert(): chunk_num=" << chunk_num
- << " >= chunk_count=" << chunk_count << std::endl;
+ << " >= chunk_count=" << chunk_count << std::endl;
return SharedBuffer<u8>();
}
@@ -474,14 +473,13 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
if (chunk_count != sp->chunk_count) {
errorstream << "IncomingSplitBuffer::insert(): chunk_count="
- << chunk_count << " != sp->chunk_count=" << sp->chunk_count
- << std::endl;
+ << chunk_count << " != sp->chunk_count=" << sp->chunk_count
+ << std::endl;
return SharedBuffer<u8>();
}
if (reliable != sp->reliable)
- LOG(derr_con<<"Connection: WARNING: reliable="<<reliable
- <<" != sp->reliable="<<sp->reliable
- <<std::endl);
+ LOG(derr_con << "Connection: WARNING: reliable=" << reliable
+ << " != sp->reliable=" << sp->reliable << std::endl);
// Cut chunk data out of packet
u32 chunkdatasize = p.data.getSize() - headersize;
@@ -521,7 +519,8 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
}
for (u16 j : remove_queue) {
MutexAutoLock listlock(m_map_mutex);
- LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl);
+ LOG(dout_con << "NOTE: Removing timed out unreliable split packet"
+ << std::endl);
delete m_buf[j];
m_buf.erase(j);
}
@@ -531,8 +530,8 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
ConnectionCommand
*/
-void ConnectionCommand::send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt,
- bool reliable_)
+void ConnectionCommand::send(
+ session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_)
{
type = CONNCMD_SEND;
peer_id = peer_id_;
@@ -570,36 +569,36 @@ void Channel::setNextSplitSeqNum(u16 seqnum)
next_outgoing_split_seqnum = seqnum;
}
-u16 Channel::getOutgoingSequenceNumber(bool& successful)
+u16 Channel::getOutgoingSequenceNumber(bool &successful)
{
MutexAutoLock internal(m_internal_mutex);
u16 retval = next_outgoing_seqnum;
u16 lowest_unacked_seqnumber;
/* shortcut if there ain't any packet in outgoing list */
- if (outgoing_reliables_sent.empty())
- {
+ if (outgoing_reliables_sent.empty()) {
next_outgoing_seqnum++;
return retval;
}
- if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber))
- {
+ if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber)) {
if (lowest_unacked_seqnumber < next_outgoing_seqnum) {
// ugly cast but this one is required in order to tell compiler we
- // know about difference of two unsigned may be negative in general
- // but we already made sure it won't happen in this case
- if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) {
+ // know about difference of two unsigned may be negative in
+ // general but we already made sure it won't happen in this case
+ if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) >
+ window_size) {
successful = false;
return 0;
}
- }
- else {
+ } else {
// ugly cast but this one is required in order to tell compiler we
- // know about difference of two unsigned may be negative in general
- // but we already made sure it won't happen in this case
- if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
- window_size) {
+ // know about difference of two unsigned may be negative in
+ // general but we already made sure it won't happen in this case
+ if ((next_outgoing_seqnum +
+ (u16)(SEQNUM_MAX -
+ lowest_unacked_seqnumber)) >
+ window_size) {
successful = false;
return 0;
}
@@ -618,7 +617,7 @@ u16 Channel::readOutgoingSequenceNumber()
bool Channel::putBackSequenceNumber(u16 seqnum)
{
- if (((seqnum + 1) % (SEQNUM_MAX+1)) == next_outgoing_seqnum) {
+ if (((seqnum + 1) % (SEQNUM_MAX + 1)) == next_outgoing_seqnum) {
next_outgoing_seqnum = seqnum;
return true;
@@ -633,7 +632,8 @@ void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets)
current_packet_successful += packets;
}
-void Channel::UpdateBytesReceived(unsigned int bytes) {
+void Channel::UpdateBytesReceived(unsigned int bytes)
+{
MutexAutoLock internal(m_internal_mutex);
current_bytes_received += bytes;
}
@@ -644,7 +644,6 @@ void Channel::UpdateBytesLost(unsigned int bytes)
current_bytes_lost += bytes;
}
-
void Channel::UpdatePacketLossCounter(unsigned int count)
{
MutexAutoLock internal(m_internal_mutex);
@@ -665,19 +664,21 @@ void Channel::UpdateTimers(float dtime)
if (packet_loss_counter > 1.0f) {
packet_loss_counter -= 1.0f;
- unsigned int packet_loss = 11; /* use a neutral value for initialization */
+ unsigned int packet_loss =
+ 11; /* use a neutral value for initialization */
unsigned int packets_successful = 0;
- //unsigned int packet_too_late = 0;
+ // unsigned int packet_too_late = 0;
bool reasonable_amount_of_data_transmitted = false;
{
MutexAutoLock internal(m_internal_mutex);
packet_loss = current_packet_loss;
- //packet_too_late = current_packet_too_late;
+ // packet_too_late = current_packet_too_late;
packets_successful = current_packet_successful;
- if (current_bytes_transfered > (unsigned int) (window_size*512/2)) {
+ if (current_bytes_transfered >
+ (unsigned int)(window_size * 512 / 2)) {
reasonable_amount_of_data_transmitted = true;
}
current_packet_loss = 0;
@@ -690,38 +691,33 @@ void Channel::UpdateTimers(float dtime)
bool done = false;
if (packets_successful > 0) {
- successful_to_lost_ratio = packet_loss/packets_successful;
+ successful_to_lost_ratio = packet_loss / packets_successful;
} else if (packet_loss > 0) {
window_size = std::max(
- (window_size - 10),
- MIN_RELIABLE_WINDOW_SIZE);
+ (window_size - 10), MIN_RELIABLE_WINDOW_SIZE);
done = true;
}
if (!done) {
if ((successful_to_lost_ratio < 0.01f) &&
- (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
+ (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
/* don't even think about increasing if we didn't even
* use major parts of our window */
if (reasonable_amount_of_data_transmitted)
- window_size = std::min(
- (window_size + 100),
+ window_size = std::min((window_size + 100),
MAX_RELIABLE_WINDOW_SIZE);
} else if ((successful_to_lost_ratio < 0.05f) &&
(window_size < MAX_RELIABLE_WINDOW_SIZE)) {
/* don't even think about increasing if we didn't even
* use major parts of our window */
if (reasonable_amount_of_data_transmitted)
- window_size = std::min(
- (window_size + 50),
+ window_size = std::min((window_size + 50),
MAX_RELIABLE_WINDOW_SIZE);
} else if (successful_to_lost_ratio > 0.15f) {
- window_size = std::max(
- (window_size - 100),
+ window_size = std::max((window_size - 100),
MIN_RELIABLE_WINDOW_SIZE);
} else if (successful_to_lost_ratio > 0.1f) {
- window_size = std::max(
- (window_size - 50),
+ window_size = std::max((window_size - 50),
MIN_RELIABLE_WINDOW_SIZE);
}
}
@@ -730,16 +726,17 @@ void Channel::UpdateTimers(float dtime)
if (bpm_counter > 10.0f) {
{
MutexAutoLock internal(m_internal_mutex);
- cur_kbps =
- (((float) current_bytes_transfered)/bpm_counter)/1024.0f;
+ cur_kbps = (((float)current_bytes_transfered) / bpm_counter) /
+ 1024.0f;
current_bytes_transfered = 0;
- cur_kbps_lost =
- (((float) current_bytes_lost)/bpm_counter)/1024.0f;
- current_bytes_lost = 0;
- cur_incoming_kbps =
- (((float) current_bytes_received)/bpm_counter)/1024.0f;
- current_bytes_received = 0;
- bpm_counter = 0.0f;
+ cur_kbps_lost = (((float)current_bytes_lost) / bpm_counter) /
+ 1024.0f;
+ current_bytes_lost = 0;
+ cur_incoming_kbps =
+ (((float)current_bytes_received) / bpm_counter) /
+ 1024.0f;
+ current_bytes_received = 0;
+ bpm_counter = 0.0f;
}
if (cur_kbps > max_kbps) {
@@ -754,24 +751,21 @@ void Channel::UpdateTimers(float dtime)
max_incoming_kbps = cur_incoming_kbps;
}
- rate_samples = MYMIN(rate_samples+1,10);
- float old_fraction = ((float) (rate_samples-1) )/( (float) rate_samples);
- avg_kbps = avg_kbps * old_fraction +
- cur_kbps * (1.0 - old_fraction);
- avg_kbps_lost = avg_kbps_lost * old_fraction +
+ rate_samples = MYMIN(rate_samples + 1, 10);
+ float old_fraction = ((float)(rate_samples - 1)) / ((float)rate_samples);
+ avg_kbps = avg_kbps * old_fraction + cur_kbps * (1.0 - old_fraction);
+ avg_kbps_lost = avg_kbps_lost * old_fraction +
cur_kbps_lost * (1.0 - old_fraction);
- avg_incoming_kbps = avg_incoming_kbps * old_fraction +
- cur_incoming_kbps * (1.0 - old_fraction);
+ avg_incoming_kbps = avg_incoming_kbps * old_fraction +
+ cur_incoming_kbps * (1.0 - old_fraction);
}
}
-
/*
Peer
*/
-PeerHelper::PeerHelper(Peer* peer) :
- m_peer(peer)
+PeerHelper::PeerHelper(Peer *peer) : m_peer(peer)
{
if (peer && !peer->IncUseCount())
m_peer = nullptr;
@@ -785,7 +779,7 @@ PeerHelper::~PeerHelper()
m_peer = nullptr;
}
-PeerHelper& PeerHelper::operator=(Peer* peer)
+PeerHelper &PeerHelper::operator=(Peer *peer)
{
m_peer = peer;
if (peer && !peer->IncUseCount())
@@ -793,24 +787,24 @@ PeerHelper& PeerHelper::operator=(Peer* peer)
return *this;
}
-Peer* PeerHelper::operator->() const
+Peer *PeerHelper::operator->() const
{
return m_peer;
}
-Peer* PeerHelper::operator&() const
+Peer *PeerHelper::operator&() const
{
return m_peer;
}
bool PeerHelper::operator!()
{
- return ! m_peer;
+ return !m_peer;
}
-bool PeerHelper::operator!=(void* ptr)
+bool PeerHelper::operator!=(void *ptr)
{
- return ((void*) m_peer != ptr);
+ return ((void *)m_peer != ptr);
}
bool Peer::IncUseCount()
@@ -838,8 +832,9 @@ void Peer::DecUseCount()
delete this;
}
-void Peer::RTTStatistics(float rtt, const std::string &profiler_id,
- unsigned int num_samples) {
+void Peer::RTTStatistics(
+ float rtt, const std::string &profiler_id, unsigned int num_samples)
+{
if (m_last_rtt > 0) {
/* set min max values */
@@ -850,18 +845,18 @@ void Peer::RTTStatistics(float rtt, const std::string &profiler_id,
/* do average calculation */
if (m_rtt.avg_rtt < 0.0)
- m_rtt.avg_rtt = rtt;
+ m_rtt.avg_rtt = rtt;
else
- m_rtt.avg_rtt = m_rtt.avg_rtt * (num_samples/(num_samples-1)) +
- rtt * (1/num_samples);
+ m_rtt.avg_rtt = m_rtt.avg_rtt * (num_samples / (num_samples - 1)) +
+ rtt * (1 / num_samples);
/* do jitter calculation */
- //just use some neutral value at beginning
+ // just use some neutral value at beginning
float jitter = m_rtt.jitter_min;
if (rtt > m_last_rtt)
- jitter = rtt-m_last_rtt;
+ jitter = rtt - m_last_rtt;
if (rtt <= m_last_rtt)
jitter = m_last_rtt - rtt;
@@ -872,14 +867,17 @@ void Peer::RTTStatistics(float rtt, const std::string &profiler_id,
m_rtt.jitter_max = jitter;
if (m_rtt.jitter_avg < 0.0)
- m_rtt.jitter_avg = jitter;
+ m_rtt.jitter_avg = jitter;
else
- m_rtt.jitter_avg = m_rtt.jitter_avg * (num_samples/(num_samples-1)) +
- jitter * (1/num_samples);
+ m_rtt.jitter_avg =
+ m_rtt.jitter_avg *
+ (num_samples / (num_samples - 1)) +
+ jitter * (1 / num_samples);
if (!profiler_id.empty()) {
g_profiler->graphAdd(profiler_id + " RTT [ms]", rtt * 1000.f);
- g_profiler->graphAdd(profiler_id + " jitter [ms]", jitter * 1000.f);
+ g_profiler->graphAdd(
+ profiler_id + " jitter [ms]", jitter * 1000.f);
}
}
/* save values required for next loop */
@@ -891,7 +889,7 @@ bool Peer::isTimedOut(float timeout)
MutexAutoLock lock(m_exclusive_access_mutex);
u64 current_time = porting::getTimeMs();
- float dtime = CALC_DTIME(m_last_timeout_check,current_time);
+ float dtime = CALC_DTIME(m_last_timeout_check, current_time);
m_last_timeout_check = current_time;
m_timeout_counter += dtime;
@@ -909,28 +907,28 @@ void Peer::Drop()
}
PROFILE(std::stringstream peerIdentifier1);
- PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc()
- << ";" << id << ";RELIABLE]");
+ PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc() << ";" << id
+ << ";RELIABLE]");
PROFILE(g_profiler->remove(peerIdentifier1.str()));
PROFILE(std::stringstream peerIdentifier2);
- PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc()
- << ";" << id << ";RELIABLE]");
+ PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc() << ";" << id
+ << ";RELIABLE]");
PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier2.str(), SPT_AVG));
delete this;
}
-UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) :
- Peer(a_address,a_id,connection)
+UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection *connection) :
+ Peer(a_address, a_id, connection)
{
for (Channel &channel : channels)
channel.setWindowSize(START_RELIABLE_WINDOW_SIZE);
}
-bool UDPPeer::getAddress(MTProtocols type,Address& toset)
+bool UDPPeer::getAddress(MTProtocols type, Address &toset)
{
- if ((type == MTP_UDP) || (type == MTP_MINETEST_RELIABLE_UDP) || (type == MTP_PRIMARY))
- {
+ if ((type == MTP_UDP) || (type == MTP_MINETEST_RELIABLE_UDP) ||
+ (type == MTP_PRIMARY)) {
toset = address;
return true;
}
@@ -943,7 +941,7 @@ void UDPPeer::reportRTT(float rtt)
if (rtt < 0.0) {
return;
}
- RTTStatistics(rtt,"rudp",MAX_RELIABLE_WINDOW_SIZE*10);
+ RTTStatistics(rtt, "rudp", MAX_RELIABLE_WINDOW_SIZE * 10);
float timeout = getStat(AVG_RTT) * RESEND_TIMEOUT_FACTOR;
if (timeout < RESEND_TIMEOUT_MIN)
@@ -955,11 +953,10 @@ void UDPPeer::reportRTT(float rtt)
resend_timeout = timeout;
}
-bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
+bool UDPPeer::Ping(float dtime, SharedBuffer<u8> &data)
{
m_ping_timer += dtime;
- if (m_ping_timer >= PING_TIMEOUT)
- {
+ if (m_ping_timer >= PING_TIMEOUT) {
// Create and send PING packet
writeU8(&data[0], PACKET_TYPE_CONTROL);
writeU8(&data[1], CONTROLTYPE_PING);
@@ -969,8 +966,7 @@ bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
return false;
}
-void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
- unsigned int max_packet_size)
+void UDPPeer::PutReliableSendCommand(ConnectionCommand &c, unsigned int max_packet_size)
{
if (m_pending_disconnect)
return;
@@ -980,41 +976,37 @@ void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
if (chan.queued_commands.empty() &&
/* don't queue more packets then window size */
(chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
- LOG(dout_con<<m_connection->getDesc()
- <<" processing reliable command for peer id: " << c.peer_id
- <<" data size: " << c.data.getSize() << std::endl);
- if (!processReliableSendCommand(c,max_packet_size)) {
+ LOG(dout_con << m_connection->getDesc()
+ << " processing reliable command for peer id: " << c.peer_id
+ << " data size: " << c.data.getSize() << std::endl);
+ if (!processReliableSendCommand(c, max_packet_size)) {
chan.queued_commands.push_back(c);
}
- }
- else {
- LOG(dout_con<<m_connection->getDesc()
- <<" Queueing reliable command for peer id: " << c.peer_id
- <<" data size: " << c.data.getSize() <<std::endl);
+ } else {
+ LOG(dout_con << m_connection->getDesc()
+ << " Queueing reliable command for peer id: " << c.peer_id
+ << " data size: " << c.data.getSize() << std::endl);
chan.queued_commands.push_back(c);
if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
LOG(derr_con << m_connection->getDesc()
- << "Possible packet stall to peer id: " << c.peer_id
- << " queued_commands=" << chan.queued_commands.size()
- << std::endl);
+ << "Possible packet stall to peer id: " << c.peer_id
+ << " queued_commands=" << chan.queued_commands.size()
+ << std::endl);
}
}
}
bool UDPPeer::processReliableSendCommand(
- ConnectionCommand &c,
- unsigned int max_packet_size)
+ ConnectionCommand &c, unsigned int max_packet_size)
{
if (m_pending_disconnect)
return true;
Channel &chan = channels[c.channelnum];
- u32 chunksize_max = max_packet_size
- - BASE_HEADER_SIZE
- - RELIABLE_HEADER_SIZE;
+ u32 chunksize_max = max_packet_size - BASE_HEADER_SIZE - RELIABLE_HEADER_SIZE;
- sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
+ sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE * 512);
std::list<SharedBuffer<u8>> originals;
u16 split_sequence_number = chan.readNextSplitSeqNum();
@@ -1022,7 +1014,8 @@ bool UDPPeer::processReliableSendCommand(
if (c.raw) {
originals.emplace_back(c.data);
} else {
- makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals);
+ makeAutoSplitPacket(
+ c.data, chunksize_max, split_sequence_number, &originals);
chan.setNextSplitSeqNum(split_sequence_number);
}
@@ -1038,8 +1031,7 @@ bool UDPPeer::processReliableSendCommand(
if (!have_sequence_number)
break;
- if (!have_initial_sequence_number)
- {
+ if (!have_initial_sequence_number) {
initial_sequence_number = seqnum;
have_initial_sequence_number = true;
}
@@ -1059,11 +1051,14 @@ bool UDPPeer::processReliableSendCommand(
while (!toadd.empty()) {
BufferedPacket p = toadd.front();
toadd.pop();
-// LOG(dout_con<<connection->getDesc()
-// << " queuing reliable packet for peer_id: " << c.peer_id
-// << " channel: " << (c.channelnum&0xFF)
-// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
-// << std::endl)
+ // LOG(dout_con<<connection->getDesc()
+ // << " queuing reliable packet for
+ //peer_id: " << c.peer_id
+ // << " channel: " <<
+ //(c.channelnum&0xFF)
+ // << " seqnum: " <<
+ //readU16(&p.data[BASE_HEADER_SIZE+1])
+ // << std::endl)
chan.queued_reliables.push(p);
pcount++;
}
@@ -1081,9 +1076,9 @@ bool UDPPeer::processReliableSendCommand(
/* remove packet */
toadd.pop();
- bool successfully_put_back_sequence_number
- = chan.putBackSequenceNumber(
- (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
+ bool successfully_put_back_sequence_number = chan.putBackSequenceNumber(
+ (initial_sequence_number +
+ toadd.size() % (SEQNUM_MAX + 1)));
FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
}
@@ -1092,24 +1087,20 @@ bool UDPPeer::processReliableSendCommand(
// 'log_message_mutex' and 'm_list_mutex'.
u32 n_queued = chan.outgoing_reliables_sent.size();
- LOG(dout_con<<m_connection->getDesc()
- << " Windowsize exceeded on reliable sending "
- << c.data.getSize() << " bytes"
- << std::endl << "\t\tinitial_sequence_number: "
- << initial_sequence_number
- << std::endl << "\t\tgot at most : "
- << packets_available << " packets"
- << std::endl << "\t\tpackets queued : "
- << n_queued
- << std::endl);
+ LOG(dout_con << m_connection->getDesc()
+ << " Windowsize exceeded on reliable sending " << c.data.getSize()
+ << " bytes" << std::endl
+ << "\t\tinitial_sequence_number: " << initial_sequence_number
+ << std::endl
+ << "\t\tgot at most : " << packets_available << " packets"
+ << std::endl
+ << "\t\tpackets queued : " << n_queued << std::endl);
return false;
}
-void UDPPeer::RunCommandQueues(
- unsigned int max_packet_size,
- unsigned int maxcommands,
- unsigned int maxtransfer)
+void UDPPeer::RunCommandQueues(unsigned int max_packet_size, unsigned int maxcommands,
+ unsigned int maxtransfer)
{
for (Channel &channel : channels) {
@@ -1122,19 +1113,22 @@ void UDPPeer::RunCommandQueues(
ConnectionCommand c = channel.queued_commands.front();
LOG(dout_con << m_connection->getDesc()
- << " processing queued reliable command " << std::endl);
+ << " processing queued reliable command "
+ << std::endl);
// Packet is processed, remove it from queue
- if (processReliableSendCommand(c,max_packet_size)) {
+ if (processReliableSendCommand(c, max_packet_size)) {
channel.queued_commands.pop_front();
} else {
LOG(dout_con << m_connection->getDesc()
- << " Failed to queue packets for peer_id: " << c.peer_id
- << ", delaying sending of " << c.data.getSize()
- << " bytes" << std::endl);
+ << " Failed to queue packets for "
+ "peer_id: "
+ << c.peer_id
+ << ", delaying sending of "
+ << c.data.getSize() << " bytes"
+ << std::endl);
}
- }
- catch (ItemNotFoundException &e) {
+ } catch (ItemNotFoundException &e) {
// intentionally empty
}
}
@@ -1153,8 +1147,8 @@ void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
channels[channel].setNextSplitSeqNum(seqnum);
}
-SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd,
- bool reliable)
+SharedBuffer<u8> UDPPeer::addSplitPacket(
+ u8 channel, const BufferedPacket &toadd, bool reliable)
{
assert(channel < CHANNEL_COUNT); // Pre-condition
return channels[channel].incoming_splits.insert(toadd, reliable);
@@ -1164,13 +1158,13 @@ SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd
Connection
*/
-Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
- bool ipv6, PeerHandler *peerhandler) :
- m_udpSocket(ipv6),
- m_protocol_id(protocol_id),
- m_sendThread(new ConnectionSendThread(max_packet_size, timeout)),
- m_receiveThread(new ConnectionReceiveThread(max_packet_size)),
- m_bc_peerhandler(peerhandler)
+Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
+ PeerHandler *peerhandler) :
+ m_udpSocket(ipv6),
+ m_protocol_id(protocol_id),
+ m_sendThread(new ConnectionSendThread(max_packet_size, timeout)),
+ m_receiveThread(new ConnectionReceiveThread(max_packet_size)),
+ m_bc_peerhandler(peerhandler)
{
/* Amount of time Receive() will wait for data, this is entirely different
@@ -1184,7 +1178,6 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
m_receiveThread->start();
}
-
Connection::~Connection()
{
m_shutting_down = true;
@@ -1192,7 +1185,7 @@ Connection::~Connection()
m_sendThread->stop();
m_receiveThread->stop();
- //TODO for some unkonwn reason send/receive threads do not exit as they're
+ // TODO for some unkonwn reason send/receive threads do not exit as they're
// supposed to be but wait on peer timeout. To speed up shutdown we reduce
// timeout to half a second.
m_sendThread->setPeerTimeout(0.5);
@@ -1235,20 +1228,20 @@ PeerHelper Connection::getPeerNoEx(session_t peer_id)
}
/* find peer_id for address */
-u16 Connection::lookupPeer(Address& sender)
+u16 Connection::lookupPeer(Address &sender)
{
MutexAutoLock peerlock(m_peers_mutex);
- std::map<u16, Peer*>::iterator j;
+ std::map<u16, Peer *>::iterator j;
j = m_peers.begin();
- for(; j != m_peers.end(); ++j)
- {
+ for (; j != m_peers.end(); ++j) {
Peer *peer = j->second;
if (peer->isPendingDeletion())
continue;
Address tocheck;
- if ((peer->getAddress(MTP_MINETEST_RELIABLE_UDP, tocheck)) && (tocheck == sender))
+ if ((peer->getAddress(MTP_MINETEST_RELIABLE_UDP, tocheck)) &&
+ (tocheck == sender))
return peer->id;
if ((peer->getAddress(MTP_UDP, tocheck)) && (tocheck == sender))
@@ -1273,14 +1266,13 @@ bool Connection::deletePeer(session_t peer_id, bool timeout)
}
Address peer_address;
- //any peer has a primary address this never fails!
+ // any peer has a primary address this never fails!
peer->getAddress(MTP_PRIMARY, peer_address);
// Create event
ConnectionEvent e;
e.peerRemoved(peer_id, timeout, peer_address);
putEvent(e);
-
peer->Drop();
return true;
}
@@ -1291,7 +1283,7 @@ ConnectionEvent Connection::waitEvent(u32 timeout_ms)
{
try {
return m_event_queue.pop_front(timeout_ms);
- } catch(ItemNotFoundException &ex) {
+ } catch (ItemNotFoundException &ex) {
ConnectionEvent e;
e.type = CONNEVENT_NONE;
return e;
@@ -1351,12 +1343,12 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
events keep happening before the timeout expires.
This is not considered to be a problem (is it?)
*/
- for(;;) {
+ for (;;) {
ConnectionEvent e = waitEvent(timeout);
if (e.type != CONNEVENT_NONE)
LOG(dout_con << getDesc() << ": Receive: got event: "
- << e.describe() << std::endl);
- switch(e.type) {
+ << e.describe() << std::endl);
+ switch (e.type) {
case CONNEVENT_NONE:
return false;
case CONNEVENT_DATA_RECEIVED:
@@ -1381,7 +1373,7 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
}
case CONNEVENT_BIND_FAILED:
throw ConnectionBindFailed("Failed to bind socket "
- "(port already in use?)");
+ "(port already in use?)");
}
}
return false;
@@ -1399,8 +1391,7 @@ bool Connection::TryReceive(NetworkPacket *pkt)
return Receive(pkt, 0);
}
-void Connection::Send(session_t peer_id, u8 channelnum,
- NetworkPacket *pkt, bool reliable)
+void Connection::Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable)
{
assert(channelnum < CHANNEL_COUNT); // Pre-condition
@@ -1424,7 +1415,8 @@ Address Connection::GetPeerAddress(session_t peer_id)
float Connection::getPeerStat(session_t peer_id, rtt_stat_type type)
{
PeerHelper peer = getPeerNoEx(peer_id);
- if (!peer) return -1;
+ if (!peer)
+ return -1;
return peer->getStat(type);
}
@@ -1432,30 +1424,31 @@ float Connection::getLocalStat(rate_stat_type type)
{
PeerHelper peer = getPeerNoEx(PEER_ID_SERVER);
- FATAL_ERROR_IF(!peer, "Connection::getLocalStat we couldn't get our own peer? are you serious???");
+ FATAL_ERROR_IF(!peer, "Connection::getLocalStat we couldn't get our own peer? "
+ "are you serious???");
float retval = 0.0;
for (Channel &channel : dynamic_cast<UDPPeer *>(&peer)->channels) {
- switch(type) {
- case CUR_DL_RATE:
- retval += channel.getCurrentDownloadRateKB();
- break;
- case AVG_DL_RATE:
- retval += channel.getAvgDownloadRateKB();
- break;
- case CUR_INC_RATE:
- retval += channel.getCurrentIncomingRateKB();
- break;
- case AVG_INC_RATE:
- retval += channel.getAvgIncomingRateKB();
- break;
- case AVG_LOSS_RATE:
- retval += channel.getAvgLossRateKB();
- break;
- case CUR_LOSS_RATE:
- retval += channel.getCurrentLossRateKB();
- break;
+ switch (type) {
+ case CUR_DL_RATE:
+ retval += channel.getCurrentDownloadRateKB();
+ break;
+ case AVG_DL_RATE:
+ retval += channel.getAvgDownloadRateKB();
+ break;
+ case CUR_INC_RATE:
+ retval += channel.getCurrentIncomingRateKB();
+ break;
+ case AVG_INC_RATE:
+ retval += channel.getAvgIncomingRateKB();
+ break;
+ case AVG_LOSS_RATE:
+ retval += channel.getAvgLossRateKB();
+ break;
+ case CUR_LOSS_RATE:
+ retval += channel.getCurrentLossRateKB();
+ break;
default:
FATAL_ERROR("Connection::getLocalStat Invalid stat type");
}
@@ -1463,20 +1456,20 @@ float Connection::getLocalStat(rate_stat_type type)
return retval;
}
-u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
+u16 Connection::createPeer(Address &sender, MTProtocols protocol, int fd)
{
// Somebody wants to make a new connection
// Get a unique peer id (2 or higher)
session_t peer_id_new = m_next_remote_peer_id;
- u16 overflow = MAX_UDP_PEERS;
+ u16 overflow = MAX_UDP_PEERS;
/*
Find an unused peer id
*/
MutexAutoLock lock(m_peers_mutex);
bool out_of_ids = false;
- for(;;) {
+ for (;;) {
// Check if exists
if (m_peers.find(peer_id_new) == m_peers.end())
@@ -1501,17 +1494,17 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
m_peers[peer->id] = peer;
m_peer_ids.push_back(peer->id);
- m_next_remote_peer_id = (peer_id_new +1 ) % MAX_UDP_PEERS;
+ m_next_remote_peer_id = (peer_id_new + 1) % MAX_UDP_PEERS;
- LOG(dout_con << getDesc()
- << "createPeer(): giving peer_id=" << peer_id_new << std::endl);
+ LOG(dout_con << getDesc() << "createPeer(): giving peer_id=" << peer_id_new
+ << std::endl);
ConnectionCommand cmd;
SharedBuffer<u8> reply(4);
writeU8(&reply[0], PACKET_TYPE_CONTROL);
writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
writeU16(&reply[2], peer_id_new);
- cmd.createPeer(peer_id_new,reply);
+ cmd.createPeer(peer_id_new, reply);
putCommand(cmd);
// Create peer addition event
@@ -1526,14 +1519,14 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
void Connection::PrintInfo(std::ostream &out)
{
m_info_mutex.lock();
- out<<getDesc()<<": ";
+ out << getDesc() << ": ";
m_info_mutex.unlock();
}
const std::string Connection::getDesc()
{
- return std::string("con(")+
- itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
+ return std::string("con(") + itos(m_udpSocket.GetHandle()) + "/" +
+ itos(m_peer_id) + ")";
}
void Connection::DisconnectPeer(session_t peer_id)
@@ -1547,10 +1540,9 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
{
assert(channelnum < CHANNEL_COUNT); // Pre-condition
- LOG(dout_con<<getDesc()
- <<" Queuing ACK command to peer_id: " << peer_id <<
- " channel: " << (channelnum & 0xFF) <<
- " seqnum: " << seqnum << std::endl);
+ LOG(dout_con << getDesc() << " Queuing ACK command to peer_id: " << peer_id
+ << " channel: " << (channelnum & 0xFF) << " seqnum: " << seqnum
+ << std::endl);
ConnectionCommand c;
SharedBuffer<u8> ack(4);
@@ -1563,10 +1555,9 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
m_sendThread->Trigger();
}
-UDPPeer* Connection::createServerPeer(Address& address)
+UDPPeer *Connection::createServerPeer(Address &address)
{
- if (getPeerNoEx(PEER_ID_SERVER) != 0)
- {
+ if (getPeerNoEx(PEER_ID_SERVER) != 0) {
throw ConnectionException("Already connected to a server");
}