// L2TPNS Clustering Stuff
-char const *cvs_id_cluster = "$Id: cluster.c,v 1.26 2004-12-16 23:40:31 bodea Exp $";
+char const *cvs_id_cluster = "$Id: cluster.c,v 1.37 2005-05-08 08:00:49 bodea Exp $";
#include <stdio.h>
#include <stdlib.h>
*/
// Module variables.
-int cluster_sockfd = 0; // The filedescriptor for the cluster communications port.
+int cluster_sockfd = 0; // The filedescriptor for the cluster communications port.
in_addr_t my_address = 0; // The network address of my ethernet port.
static int walk_session_number = 0; // The next session to send when doing the slow table walk.
static int walk_tunnel_number = 0; // The next tunnel to send when doing the slow table walk.
+int forked = 0; // Sanity check: CLI must not diddle with heartbeat table
#define MAX_HEART_SIZE (8192) // Maximum size of heartbeat packet. Must be less than max IP packet size :)
#define MAX_CHANGES (MAX_HEART_SIZE/(sizeof(sessiont) + sizeof(int) ) - 2) // Assumes a session is the biggest type!
if ( walk_session_number > config->cluster_highest_sessionid)
walk_session_number = 1;
- if (!sess_count[walk_session_number].cin && !sess_count[walk_session_number].cout)
+ if (!sess_local[walk_session_number].cin && !sess_local[walk_session_number].cout)
continue; // Unused. Skip it.
b[c].sid = walk_session_number;
- b[c].in = sess_count[walk_session_number].cin;
- b[c].out = sess_count[walk_session_number].cout;
+ b[c].in = sess_local[walk_session_number].cin;
+ b[c].out = sess_local[walk_session_number].cout;
if (++c > MAX_B_RECS) // Send a max of 400 elements in a packet.
break;
// Reset counters.
- sess_count[walk_session_number].cin = sess_count[walk_session_number].cout = 0;
+ sess_local[walk_session_number].cin = sess_local[walk_session_number].cout = 0;
}
if (!c) // Didn't find any that changes. Get out of here!
++count;
}
- if (session[i].tunnel == T_FREE) { // Unused session. Add to free list.
+ if (!session[i].opened) { // Unused session. Add to free list.
+ memset(&session[i], 0, sizeof(session[i]));
+ session[i].tunnel = T_FREE;
session[last_free].next = i;
session[i].next = 0;
last_free = i;
+ continue;
}
- // Reset all the idle timeouts..
+ // Reset idle timeouts..
session[i].last_packet = time_now;
+ // Reset die relative to our uptime rather than the old master's
+ if (session[i].die) session[i].die = TIME;
+
// Accumulate un-sent byte counters.
- session[i].cin += sess_count[i].cin;
- session[i].cout += sess_count[i].cout;
- session[i].total_cin += sess_count[i].cin;
- session[i].total_cout += sess_count[i].cout;
+ session[i].cin += sess_local[i].cin;
+ session[i].cout += sess_local[i].cout;
+ session[i].total_cin += sess_local[i].cin;
+ session[i].total_cout += sess_local[i].cout;
- sess_count[i].cin = sess_count[i].cout = 0;
+ sess_local[i].cin = sess_local[i].cout = 0;
- session[i].radius = 0; // Reset authentication as the radius blocks aren't up to date.
+ sess_local[i].radius = 0; // Reset authentication as the radius blocks aren't up to date.
if (session[i].unique_id >= high_unique_id) // This is different to the index into the session table!!!
high_unique_id = session[i].unique_id+1;
-
session[i].tbf_in = session[i].tbf_out = 0; // Remove stale pointers from old master.
throttle_session(i, session[i].throttle_in, session[i].throttle_out);
- if (session[i].tunnel != T_FREE && i > config->cluster_highest_sessionid)
- config->cluster_highest_sessionid = i;
+ config->cluster_highest_sessionid = i;
}
session[last_free].next = 0; // End of chain.
- last_id = high_unique_id; // Keep track of the highest used session ID.
+ last_id = high_unique_id; // Keep track of the highest used session ID.
become_master();
config->cluster_undefined_sessions = 0;
for (i = 1 ; i < MAXSESSION; ++i) {
if (i > highsession) {
- session[i].tunnel = 0; // Defined.
+ if (session[i].tunnel == T_UNDEF) session[i].tunnel = T_FREE; // Defined.
continue;
}
- if (session[i].tunnel != T_UNDEF)
- continue;
- ++config->cluster_undefined_sessions;
+
+ if (session[i].tunnel == T_UNDEF)
+ ++config->cluster_undefined_sessions;
}
// Clear out defined tunnels, counting the number of
config->cluster_undefined_tunnels = 0;
for (i = 1 ; i < MAXTUNNEL; ++i) {
if (i > hightunnel) {
- tunnel[i].state = TUNNELFREE; // Defined.
+ if (tunnel[i].state == TUNNELUNDEF) tunnel[i].state = TUNNELFREE; // Defined.
continue;
}
- if (tunnel[i].state != TUNNELUNDEF)
- continue;
- ++config->cluster_undefined_tunnels;
+
+ if (tunnel[i].state == TUNNELUNDEF)
+ ++config->cluster_undefined_tunnels;
}
return -1;
}
+ if (forked) {
+ LOG(0, sid, 0, "cluster_send_session called from child process!\n");
+ return -1;
+ }
+
return type_changed(C_CSESSION, sid);
}
session[b->sid].cin += b->in;
session[b->sid].cout += b->out;
- session[b->sid].last_packet = time_now; // Reset idle timer!
+
+ if (b->in)
+ session[b->sid].last_packet = time_now; // Reset idle timer!
size -= sizeof(*b);
++b;
}
+// pre v5 heartbeat session structure
+struct oldsession {
+ sessionidt next;
+ sessionidt far;
+ tunnelidt tunnel;
+ in_addr_t ip;
+ int ip_pool_index;
+ unsigned long unique_id;
+ uint16_t nr;
+ uint16_t ns;
+ uint32_t magic;
+ uint32_t cin, cout;
+ uint32_t pin, pout;
+ uint32_t total_cin;
+ uint32_t total_cout;
+ uint32_t id;
+ uint16_t throttle_in;
+ uint16_t throttle_out;
+ clockt opened;
+ clockt die;
+ time_t last_packet;
+ in_addr_t dns1, dns2;
+ routet route[MAXROUTE];
+ uint16_t radius;
+ uint16_t mru;
+ uint16_t tbf_in;
+ uint16_t tbf_out;
+ uint8_t l2tp_flags;
+ uint8_t reserved_old_snoop;
+ uint8_t walled_garden;
+ uint8_t flags1;
+ char random_vector[MAXTEL];
+ int random_vector_length;
+ char user[129];
+ char called[MAXTEL];
+ char calling[MAXTEL];
+ uint32_t tx_connect_speed;
+ uint32_t rx_connect_speed;
+ uint32_t flags;
+ in_addr_t snoop_ip;
+ uint16_t snoop_port;
+ uint16_t sid;
+ uint8_t filter_in;
+ uint8_t filter_out;
+ char reserved[18];
+};
+
+static uint8_t *convert_session(struct oldsession *old)
+{
+ static sessiont new;
+ int i;
+
+ memset(&new, 0, sizeof(new));
+
+ new.next = old->next;
+ new.far = old->far;
+ new.tunnel = old->tunnel;
+ new.l2tp_flags = old->l2tp_flags;
+ new.flags = old->flags;
+ new.ip = old->ip;
+ new.ip_pool_index = old->ip_pool_index;
+ new.unique_id = old->unique_id;
+ new.nr = old->nr;
+ new.ns = old->ns;
+ new.magic = old->magic;
+ new.cin = old->cin;
+ new.cout = old->cout;
+ new.pin = old->pin;
+ new.pout = old->pout;
+ new.total_cin = old->total_cin;
+ new.total_cout = old->total_cout;
+ new.throttle_in = old->throttle_in;
+ new.throttle_out = old->throttle_out;
+ new.filter_in = old->filter_in;
+ new.filter_out = old->filter_out;
+ new.mru = old->mru;
+ new.opened = old->opened;
+ new.die = old->die;
+ new.last_packet = old->last_packet;
+ new.dns1 = old->dns1;
+ new.dns2 = old->dns2;
+ new.tbf_in = old->tbf_in;
+ new.tbf_out = old->tbf_out;
+ new.random_vector_length = old->random_vector_length;
+ new.tx_connect_speed = old->tx_connect_speed;
+ new.rx_connect_speed = old->rx_connect_speed;
+ new.snoop_ip = old->snoop_ip;
+ new.snoop_port = old->snoop_port;
+ new.walled_garden = old->walled_garden;
+
+ memcpy(new.random_vector, old->random_vector, sizeof(new.random_vector));
+ memcpy(new.user, old->user, sizeof(new.user));
+ memcpy(new.called, old->called, sizeof(new.called));
+ memcpy(new.calling, old->calling, sizeof(new.calling));
+
+ for (i = 0; i < MAXROUTE; i++)
+ memcpy(&new.route[i], &old->route[i], sizeof(new.route[i]));
+
+ return (uint8_t *) &new;
+}
+
//
// Process a heartbeat..
//
// v3: added interval, timeout
// v4: added table_version
+// v5: added ipv6, re-ordered session structure
static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t *p, in_addr_t addr)
{
heartt *h;
int s = size - (p-data);
int i, type;
+ int hb_ver = more;
-#if HB_VERSION != 4
+#if HB_VERSION != 5
# error "need to update cluster_process_heartbeat()"
#endif
- // we handle versions 3 through 4
- if (more < 3 || more > HB_VERSION) {
- LOG(0, 0, 0, "Received a heartbeat version that I don't support (%d)!\n", more);
+ // we handle versions 3 through 5
+ if (hb_ver < 3 || hb_ver > HB_VERSION) {
+ LOG(0, 0, 0, "Received a heartbeat version that I don't support (%d)!\n", hb_ver);
return -1; // Ignore it??
}
return -1; // Skip it.
}
- if (more >= 4) {
+ if (hb_ver >= 4) {
if (h->table_version > config->cluster_table_version) {
LOG(0, 0, 0, "They've seen more state changes (%" PRIu64 " vs my %" PRIu64 ") so I'm gone!\n",
h->table_version, config->cluster_table_version);
config->cluster_last_hb = TIME; // Reset to ensure that we don't become master!!
if (config->cluster_seq_number != h->seq) { // Out of sequence heartbeat!
- LOG(1, 0, 0, "HB: Got seq# %d but was expecting %d. asking for resend.\n", h->seq, config->cluster_seq_number);
+ static int lastseen_seq = 0;
+ static time_t lastseen_time = 0;
+
+ // limit to once per second for a particular seq#
+ int ask = (config->cluster_seq_number != lastseen_seq || time_now != lastseen_time);
+
+ LOG(1, 0, 0, "HB: Got seq# %d but was expecting %d. %s.\n",
+ h->seq, config->cluster_seq_number,
+ ask ? "Asking for resend" : "Ignoring");
- peer_send_message(addr, C_LASTSEEN, config->cluster_seq_number, NULL, 0);
+ if (ask)
+ {
+ lastseen_seq = config->cluster_seq_number;
+ lastseen_time = time_now;
+ peer_send_message(addr, C_LASTSEEN, config->cluster_seq_number, NULL, 0);
+ }
config->cluster_last_hb = TIME; // Reset to ensure that we don't become master!!
size = rle_decompress((uint8_t **) &p, s, c, sizeof(c) );
s -= (p - orig_p);
+ // session struct changed with v5
+ if (hb_ver < 5)
+ {
+ if (size != sizeof(struct oldsession)) {
+ LOG(0, 0, 0, "DANGER: Received a v%d CSESSION that didn't decompress correctly!\n", hb_ver);
+ // Now what? Should exit! No-longer up to date!
+ break;
+ }
+ cluster_recv_session(more, convert_session((struct oldsession *) c));
+ break;
+ }
+
if (size != sizeof(sessiont) ) { // Ouch! Very very bad!
LOG(0, 0, 0, "DANGER: Received a CSESSION that didn't decompress correctly!\n");
// Now what? Should exit! No-longer up to date!
break;
}
case C_SESSION:
+ if (hb_ver < 5)
+ {
+ if (s < sizeof(struct oldsession))
+ goto shortpacket;
+
+ cluster_recv_session(more, convert_session((struct oldsession *) p));
+
+ p += sizeof(struct oldsession);
+ s -= sizeof(struct oldsession);
+ break;
+ }
+
if ( s < sizeof(session[more]))
goto shortpacket;
return cluster_add_peer(addr, more, (pingt *) p, s);
case C_LASTSEEN: // Catch up a slave (slave missed a packet).
+ if (!config->cluster_iam_master) { // huh?
+ LOG(0, 0, 0, "I'm not the master, but I got a C_LASTSEEN from %s?\n", fmtaddr(addr, 0));
+ return -1;
+ }
+
return cluster_catchup_slave(more, addr);
case C_FORWARD: { // Forwarded control packet. pass off to processudp.