X-Git-Url: http://git.sameswireless.fr/l2tpns.git/blobdiff_plain/7825a266917816e443bcc28d63c6c365997bd4ed..924f6f57c1d2844e150d0ee19fbab924f172cfec:/cluster.c?ds=sidebyside diff --git a/cluster.c b/cluster.c index 1dc5c53..d83ae89 100644 --- a/cluster.c +++ b/cluster.c @@ -1,6 +1,6 @@ // L2TPNS Clustering Stuff -char const *cvs_id_cluster = "$Id: cluster.c,v 1.31 2005/02/14 06:58:38 bodea Exp $"; +char const *cvs_id_cluster = "$Id: cluster.c,v 1.37 2005/05/08 08:00:49 bodea Exp $"; #include #include @@ -580,9 +580,12 @@ void cluster_check_master(void) continue; } - // Reset all the idle timeouts.. + // Reset idle timeouts.. session[i].last_packet = time_now; + // Reset die relative to our uptime rather than the old master's + if (session[i].die) session[i].die = TIME; + // Accumulate un-sent byte counters. session[i].cin += sess_local[i].cin; session[i].cout += sess_local[i].cout; @@ -591,7 +594,7 @@ void cluster_check_master(void) sess_local[i].cin = sess_local[i].cout = 0; - session[i].radius = 0; // Reset authentication as the radius blocks aren't up to date. + sess_local[i].radius = 0; // Reset authentication as the radius blocks aren't up to date. if (session[i].unique_id >= high_unique_id) // This is different to the index into the session table!!! high_unique_id = session[i].unique_id+1; @@ -654,8 +657,6 @@ static void cluster_check_sessions(int highsession, int freesession_ptr, int hig if (session[i].tunnel == T_UNDEF) session[i].tunnel = T_FREE; // Defined. continue; } - if (session[i].tunnel != T_UNDEF) - continue; if (session[i].tunnel == T_UNDEF) ++config->cluster_undefined_sessions; @@ -1036,7 +1037,9 @@ static int cluster_handle_bytes(char *data, int size) session[b->sid].cin += b->in; session[b->sid].cout += b->out; - session[b->sid].last_packet = time_now; // Reset idle timer! + + if (b->in) + session[b->sid].last_packet = time_now; // Reset idle timer! size -= sizeof(*b); ++b; @@ -1109,24 +1112,127 @@ static int cluster_recv_tunnel(int more, uint8_t *p) } +// pre v5 heartbeat session structure +struct oldsession { + sessionidt next; + sessionidt far; + tunnelidt tunnel; + in_addr_t ip; + int ip_pool_index; + unsigned long unique_id; + uint16_t nr; + uint16_t ns; + uint32_t magic; + uint32_t cin, cout; + uint32_t pin, pout; + uint32_t total_cin; + uint32_t total_cout; + uint32_t id; + uint16_t throttle_in; + uint16_t throttle_out; + clockt opened; + clockt die; + time_t last_packet; + in_addr_t dns1, dns2; + routet route[MAXROUTE]; + uint16_t radius; + uint16_t mru; + uint16_t tbf_in; + uint16_t tbf_out; + uint8_t l2tp_flags; + uint8_t reserved_old_snoop; + uint8_t walled_garden; + uint8_t flags1; + char random_vector[MAXTEL]; + int random_vector_length; + char user[129]; + char called[MAXTEL]; + char calling[MAXTEL]; + uint32_t tx_connect_speed; + uint32_t rx_connect_speed; + uint32_t flags; + in_addr_t snoop_ip; + uint16_t snoop_port; + uint16_t sid; + uint8_t filter_in; + uint8_t filter_out; + char reserved[18]; +}; + +static uint8_t *convert_session(struct oldsession *old) +{ + static sessiont new; + int i; + + memset(&new, 0, sizeof(new)); + + new.next = old->next; + new.far = old->far; + new.tunnel = old->tunnel; + new.l2tp_flags = old->l2tp_flags; + new.flags = old->flags; + new.ip = old->ip; + new.ip_pool_index = old->ip_pool_index; + new.unique_id = old->unique_id; + new.nr = old->nr; + new.ns = old->ns; + new.magic = old->magic; + new.cin = old->cin; + new.cout = old->cout; + new.pin = old->pin; + new.pout = old->pout; + new.total_cin = old->total_cin; + new.total_cout = old->total_cout; + new.throttle_in = old->throttle_in; + new.throttle_out = old->throttle_out; + new.filter_in = old->filter_in; + new.filter_out = old->filter_out; + new.mru = old->mru; + new.opened = old->opened; + new.die = old->die; + new.last_packet = old->last_packet; + new.dns1 = old->dns1; + new.dns2 = old->dns2; + new.tbf_in = old->tbf_in; + new.tbf_out = old->tbf_out; + new.random_vector_length = old->random_vector_length; + new.tx_connect_speed = old->tx_connect_speed; + new.rx_connect_speed = old->rx_connect_speed; + new.snoop_ip = old->snoop_ip; + new.snoop_port = old->snoop_port; + new.walled_garden = old->walled_garden; + + memcpy(new.random_vector, old->random_vector, sizeof(new.random_vector)); + memcpy(new.user, old->user, sizeof(new.user)); + memcpy(new.called, old->called, sizeof(new.called)); + memcpy(new.calling, old->calling, sizeof(new.calling)); + + for (i = 0; i < MAXROUTE; i++) + memcpy(&new.route[i], &old->route[i], sizeof(new.route[i])); + + return (uint8_t *) &new; +} + // // Process a heartbeat.. // // v3: added interval, timeout // v4: added table_version +// v5: added ipv6, re-ordered session structure static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t *p, in_addr_t addr) { heartt *h; int s = size - (p-data); int i, type; + int hb_ver = more; -#if HB_VERSION != 4 +#if HB_VERSION != 5 # error "need to update cluster_process_heartbeat()" #endif - // we handle versions 3 through 4 - if (more < 3 || more > HB_VERSION) { - LOG(0, 0, 0, "Received a heartbeat version that I don't support (%d)!\n", more); + // we handle versions 3 through 5 + if (hb_ver < 3 || hb_ver > HB_VERSION) { + LOG(0, 0, 0, "Received a heartbeat version that I don't support (%d)!\n", hb_ver); return -1; // Ignore it?? } @@ -1150,7 +1256,7 @@ static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t return -1; // Skip it. } - if (more >= 4) { + if (hb_ver >= 4) { if (h->table_version > config->cluster_table_version) { LOG(0, 0, 0, "They've seen more state changes (%" PRIu64 " vs my %" PRIu64 ") so I'm gone!\n", h->table_version, config->cluster_table_version); @@ -1258,6 +1364,18 @@ static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t size = rle_decompress((uint8_t **) &p, s, c, sizeof(c) ); s -= (p - orig_p); + // session struct changed with v5 + if (hb_ver < 5) + { + if (size != sizeof(struct oldsession)) { + LOG(0, 0, 0, "DANGER: Received a v%d CSESSION that didn't decompress correctly!\n", hb_ver); + // Now what? Should exit! No-longer up to date! + break; + } + cluster_recv_session(more, convert_session((struct oldsession *) c)); + break; + } + if (size != sizeof(sessiont) ) { // Ouch! Very very bad! LOG(0, 0, 0, "DANGER: Received a CSESSION that didn't decompress correctly!\n"); // Now what? Should exit! No-longer up to date! @@ -1268,6 +1386,18 @@ static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t break; } case C_SESSION: + if (hb_ver < 5) + { + if (s < sizeof(struct oldsession)) + goto shortpacket; + + cluster_recv_session(more, convert_session((struct oldsession *) p)); + + p += sizeof(struct oldsession); + s -= sizeof(struct oldsession); + break; + } + if ( s < sizeof(session[more])) goto shortpacket; @@ -1361,6 +1491,11 @@ int processcluster(char *data, int size, in_addr_t addr) return cluster_add_peer(addr, more, (pingt *) p, s); case C_LASTSEEN: // Catch up a slave (slave missed a packet). + if (!config->cluster_iam_master) { // huh? + LOG(0, 0, 0, "I'm not the master, but I got a C_LASTSEEN from %s?\n", fmtaddr(addr, 0)); + return -1; + } + return cluster_catchup_slave(more, addr); case C_FORWARD: { // Forwarded control packet. pass off to processudp.