// L2TPNS Clustering Stuff
-char const *cvs_id_cluster = "$Id: cluster.c,v 1.39 2005/05/26 12:17:30 bodea Exp $";
+char const *cvs_id_cluster = "$Id: cluster.c,v 1.42 2005/06/14 05:37:09 bodea Exp $";
#include <stdio.h>
#include <stdlib.h>
*/
// Module variables.
-int cluster_sockfd = 0; // The filedescriptor for the cluster communications port.
+extern int cluster_sockfd; // The filedescriptor for the cluster communications port.
in_addr_t my_address = 0; // The network address of my ethernet port.
static int walk_session_number = 0; // The next session to send when doing the slow table walk.
LOG(4, 0, 0, "Forwarding packet from %s to master (size %d)\n", fmtaddr(addr, 0), size);
STAT(c_forwarded);
- add_type(&p, C_FORWARD, addr, (char *) &port, sizeof(port));
+ add_type(&p, C_FORWARD, addr, (char *) &port, sizeof(port)); // ick. should be uint16_t
memcpy(p, data, size);
p += size;
if (config->cluster_iam_master) // Only happens on the slaves.
return;
- if (!config->cluster_master_address) // If we don't have a master, skip it for a while.
+ if (!config->cluster_master_address) // If we don't have a master, skip it for a while.
return;
- i = MAX_B_RECS * 5; // Examine max 2000 sessions;
+ // C_BYTES format changed in 2.1.0 (cluster version 5)
+ // during upgrade from previous versions, hang onto our counters
+ // for a bit until the new master comes up
+ if (config->cluster_last_hb_ver < 5)
+ return;
+
+ i = MAX_B_RECS * 5; // Examine max 3000 sessions;
if (config->cluster_highest_sessionid > i)
i = config->cluster_highest_sessionid;
walk_session_number = 1;
if (!sess_local[walk_session_number].cin && !sess_local[walk_session_number].cout)
- continue; // Unused. Skip it.
+ continue; // Unchanged. Skip it.
b[c].sid = walk_session_number;
- b[c].in = sess_local[walk_session_number].cin;
- b[c].out = sess_local[walk_session_number].cout;
-
- if (++c > MAX_B_RECS) // Send a max of 400 elements in a packet.
- break;
+ b[c].pin = sess_local[walk_session_number].pin;
+ b[c].pout = sess_local[walk_session_number].pout;
+ b[c].cin = sess_local[walk_session_number].cin;
+ b[c].cout = sess_local[walk_session_number].cout;
// Reset counters.
+ sess_local[walk_session_number].pin = sess_local[walk_session_number].pout = 0;
sess_local[walk_session_number].cin = sess_local[walk_session_number].cout = 0;
+
+ if (++c > MAX_B_RECS) // Send a max of 600 elements in a packet.
+ break;
}
if (!c) // Didn't find any that changes. Get out of here!
LOG(0, 0, 0, "Master timed out! Holding election...\n");
+ // In the process of shutting down, can't be master
+ if (main_quit)
+ return;
+
for (i = have_peers = 0; i < num_peers; i++)
{
if ((peers[i].timestamp + config->cluster_hb_timeout) < t)
// Reset die relative to our uptime rather than the old master's
if (session[i].die) session[i].die = TIME;
- // Accumulate un-sent byte counters.
- session[i].cin += sess_local[i].cin;
- session[i].cout += sess_local[i].cout;
- session[i].total_cin += sess_local[i].cin;
- session[i].total_cout += sess_local[i].cout;
+ // Accumulate un-sent byte/packet counters.
+ increment_counter(&session[i].cin, &session[i].cin_wrap, sess_local[i].cin);
+ increment_counter(&session[i].cout, &session[i].cout_wrap, sess_local[i].cout);
+ session[i].cin_delta += sess_local[i].cin;
+ session[i].cout_delta += sess_local[i].cout;
+
+ session[i].pin += sess_local[i].pin;
+ session[i].pout += sess_local[i].pout;
sess_local[i].cin = sess_local[i].cout = 0;
+ sess_local[i].pin = sess_local[i].pout = 0;
sess_local[i].radius = 0; // Reset authentication as the radius blocks aren't up to date.
config->cluster_master_address = 0;
config->cluster_last_hb = 0; // Force an election.
cluster_check_master();
- return 0;
}
if (i >= num_peers)
fmtaddr(master, 1));
config->cluster_master_address = master;
+ if (master)
+ {
+ // catchup with new master
+ peer_send_message(master, C_LASTSEEN, config->cluster_seq_number, NULL, 0);
+
+ // delay next election
+ config->cluster_last_hb = TIME;
+ }
+
+ // run election (or reset "probed" if master was set)
cluster_check_master();
return 0;
}
return -1; /* Abort processing */
}
- session[b->sid].total_cin += b->in;
- session[b->sid].total_cout += b->out;
+ session[b->sid].pin += b->pin;
+ session[b->sid].pout += b->pout;
+
+ increment_counter(&session[b->sid].cin, &session[b->sid].cin_wrap, b->cin);
+ increment_counter(&session[b->sid].cout, &session[b->sid].cout_wrap, b->cout);
- session[b->sid].cin += b->in;
- session[b->sid].cout += b->out;
+ session[b->sid].cin_delta += b->cin;
+ session[b->sid].cout_delta += b->cout;
- if (b->in)
+ if (b->cin)
session[b->sid].last_packet = time_now; // Reset idle timer!
size -= sizeof(*b);
new.nr = old->nr;
new.ns = old->ns;
new.magic = old->magic;
- new.cin = old->cin;
- new.cout = old->cout;
new.pin = old->pin;
new.pout = old->pout;
- new.total_cin = old->total_cin;
- new.total_cout = old->total_cout;
+ new.cin = old->total_cin;
+ new.cout = old->total_cout;
+ new.cin_delta = old->cin;
+ new.cout_delta = old->cout;
new.throttle_in = old->throttle_in;
new.throttle_out = old->throttle_out;
new.filter_in = old->filter_in;
config->cluster_seq_number = h->seq;
config->cluster_last_hb = TIME; // Reset to ensure that we don't become master!!
+ config->cluster_last_hb_ver = hb_ver; // remember what cluster version the master is using
if (config->cluster_seq_number != h->seq) { // Out of sequence heartbeat!
static int lastseen_seq = 0;