// L2TPNS Clustering Stuff
-char const *cvs_id_cluster = "$Id: cluster.c,v 1.33 2005/04/01 08:55:29 bodea Exp $";
+char const *cvs_id_cluster = "$Id: cluster.c,v 1.37 2005/05/08 08:00:49 bodea Exp $";
#include <stdio.h>
#include <stdlib.h>
sess_local[i].cin = sess_local[i].cout = 0;
- session[i].radius = 0; // Reset authentication as the radius blocks aren't up to date.
+ sess_local[i].radius = 0; // Reset authentication as the radius blocks aren't up to date.
if (session[i].unique_id >= high_unique_id) // This is different to the index into the session table!!!
high_unique_id = session[i].unique_id+1;
session[b->sid].cin += b->in;
session[b->sid].cout += b->out;
- session[b->sid].last_packet = time_now; // Reset idle timer!
+
+ if (b->in)
+ session[b->sid].last_packet = time_now; // Reset idle timer!
size -= sizeof(*b);
++b;
}
+// pre v5 heartbeat session structure
+struct oldsession {
+ sessionidt next;
+ sessionidt far;
+ tunnelidt tunnel;
+ in_addr_t ip;
+ int ip_pool_index;
+ unsigned long unique_id;
+ uint16_t nr;
+ uint16_t ns;
+ uint32_t magic;
+ uint32_t cin, cout;
+ uint32_t pin, pout;
+ uint32_t total_cin;
+ uint32_t total_cout;
+ uint32_t id;
+ uint16_t throttle_in;
+ uint16_t throttle_out;
+ clockt opened;
+ clockt die;
+ time_t last_packet;
+ in_addr_t dns1, dns2;
+ routet route[MAXROUTE];
+ uint16_t radius;
+ uint16_t mru;
+ uint16_t tbf_in;
+ uint16_t tbf_out;
+ uint8_t l2tp_flags;
+ uint8_t reserved_old_snoop;
+ uint8_t walled_garden;
+ uint8_t flags1;
+ char random_vector[MAXTEL];
+ int random_vector_length;
+ char user[129];
+ char called[MAXTEL];
+ char calling[MAXTEL];
+ uint32_t tx_connect_speed;
+ uint32_t rx_connect_speed;
+ uint32_t flags;
+ in_addr_t snoop_ip;
+ uint16_t snoop_port;
+ uint16_t sid;
+ uint8_t filter_in;
+ uint8_t filter_out;
+ char reserved[18];
+};
+
+static uint8_t *convert_session(struct oldsession *old)
+{
+ static sessiont new;
+ int i;
+
+ memset(&new, 0, sizeof(new));
+
+ new.next = old->next;
+ new.far = old->far;
+ new.tunnel = old->tunnel;
+ new.l2tp_flags = old->l2tp_flags;
+ new.flags = old->flags;
+ new.ip = old->ip;
+ new.ip_pool_index = old->ip_pool_index;
+ new.unique_id = old->unique_id;
+ new.nr = old->nr;
+ new.ns = old->ns;
+ new.magic = old->magic;
+ new.cin = old->cin;
+ new.cout = old->cout;
+ new.pin = old->pin;
+ new.pout = old->pout;
+ new.total_cin = old->total_cin;
+ new.total_cout = old->total_cout;
+ new.throttle_in = old->throttle_in;
+ new.throttle_out = old->throttle_out;
+ new.filter_in = old->filter_in;
+ new.filter_out = old->filter_out;
+ new.mru = old->mru;
+ new.opened = old->opened;
+ new.die = old->die;
+ new.last_packet = old->last_packet;
+ new.dns1 = old->dns1;
+ new.dns2 = old->dns2;
+ new.tbf_in = old->tbf_in;
+ new.tbf_out = old->tbf_out;
+ new.random_vector_length = old->random_vector_length;
+ new.tx_connect_speed = old->tx_connect_speed;
+ new.rx_connect_speed = old->rx_connect_speed;
+ new.snoop_ip = old->snoop_ip;
+ new.snoop_port = old->snoop_port;
+ new.walled_garden = old->walled_garden;
+
+ memcpy(new.random_vector, old->random_vector, sizeof(new.random_vector));
+ memcpy(new.user, old->user, sizeof(new.user));
+ memcpy(new.called, old->called, sizeof(new.called));
+ memcpy(new.calling, old->calling, sizeof(new.calling));
+
+ for (i = 0; i < MAXROUTE; i++)
+ memcpy(&new.route[i], &old->route[i], sizeof(new.route[i]));
+
+ return (uint8_t *) &new;
+}
+
//
// Process a heartbeat..
//
// v3: added interval, timeout
// v4: added table_version
+// v5: added ipv6, re-ordered session structure
static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t *p, in_addr_t addr)
{
heartt *h;
int s = size - (p-data);
int i, type;
+ int hb_ver = more;
-#if HB_VERSION != 4
+#if HB_VERSION != 5
# error "need to update cluster_process_heartbeat()"
#endif
- // we handle versions 3 through 4
- if (more < 3 || more > HB_VERSION) {
- LOG(0, 0, 0, "Received a heartbeat version that I don't support (%d)!\n", more);
+ // we handle versions 3 through 5
+ if (hb_ver < 3 || hb_ver > HB_VERSION) {
+ LOG(0, 0, 0, "Received a heartbeat version that I don't support (%d)!\n", hb_ver);
return -1; // Ignore it??
}
return -1; // Skip it.
}
- if (more >= 4) {
+ if (hb_ver >= 4) {
if (h->table_version > config->cluster_table_version) {
LOG(0, 0, 0, "They've seen more state changes (%" PRIu64 " vs my %" PRIu64 ") so I'm gone!\n",
h->table_version, config->cluster_table_version);
size = rle_decompress((uint8_t **) &p, s, c, sizeof(c) );
s -= (p - orig_p);
+ // session struct changed with v5
+ if (hb_ver < 5)
+ {
+ if (size != sizeof(struct oldsession)) {
+ LOG(0, 0, 0, "DANGER: Received a v%d CSESSION that didn't decompress correctly!\n", hb_ver);
+ // Now what? Should exit! No-longer up to date!
+ break;
+ }
+ cluster_recv_session(more, convert_session((struct oldsession *) c));
+ break;
+ }
+
if (size != sizeof(sessiont) ) { // Ouch! Very very bad!
LOG(0, 0, 0, "DANGER: Received a CSESSION that didn't decompress correctly!\n");
// Now what? Should exit! No-longer up to date!
break;
}
case C_SESSION:
+ if (hb_ver < 5)
+ {
+ if (s < sizeof(struct oldsession))
+ goto shortpacket;
+
+ cluster_recv_session(more, convert_session((struct oldsession *) p));
+
+ p += sizeof(struct oldsession);
+ s -= sizeof(struct oldsession);
+ break;
+ }
+
if ( s < sizeof(session[more]))
goto shortpacket;
return cluster_add_peer(addr, more, (pingt *) p, s);
case C_LASTSEEN: // Catch up a slave (slave missed a packet).
+ if (!config->cluster_iam_master) { // huh?
+ LOG(0, 0, 0, "I'm not the master, but I got a C_LASTSEEN from %s?\n", fmtaddr(addr, 0));
+ return -1;
+ }
+
return cluster_catchup_slave(more, addr);
case C_FORWARD: { // Forwarded control packet. pass off to processudp.