From 916c13c6039bd189873ee094e9c0ebd010899792 Mon Sep 17 00:00:00 2001 From: fendo Date: Mon, 11 Mar 2013 16:30:58 +0100 Subject: [PATCH] manage groupes in cluster mode --- Makefile | 2 +- cluster.c | 229 ++++++++++++++++++++++++++++++++++++++++++++---------- cluster.h | 9 ++- garden.c | 2 +- grpsess.c | 93 +++++++++++++++++++++- l2tplac.c | 1 - l2tpns.c | 7 +- l2tpns.h | 21 +++-- pppoe.c | 2 - 9 files changed, 305 insertions(+), 61 deletions(-) diff --git a/Makefile b/Makefile index c0c7c4e..b19dfcd 100644 --- a/Makefile +++ b/Makefile @@ -127,7 +127,7 @@ tbf.o: tbf.c l2tpns.h util.h tbf.h util.o: util.c l2tpns.h bgp.h pppoe.o: pppoe.c l2tpns.h cluster.h constants.h md5.h util.h l2tplac.o: l2tplac.c md5.h l2tpns.h util.h cluster.h l2tplac.h pppoe.h -grpsess.o: grpsess.c l2tpns.h util.h bgp.h +grpsess.o: grpsess.c l2tpns.h util.h cluster.h bgp.h bgp.o: bgp.c l2tpns.h bgp.h util.h autosnoop.so: autosnoop.c l2tpns.h plugin.h autothrottle.so: autothrottle.c l2tpns.h plugin.h diff --git a/cluster.c b/cluster.c index 5b6f208..fc4d58a 100644 --- a/cluster.c +++ b/cluster.c @@ -44,6 +44,7 @@ in_addr_t my_address = 0; // The network address of my ethernet port. static int walk_session_number = 0; // The next session to send when doing the slow table walk. static int walk_bundle_number = 0; // The next bundle to send when doing the slow table walk. static int walk_tunnel_number = 0; // The next tunnel to send when doing the slow table walk. +static int walk_groupe_number = 0; // The next groupe to send when doing the slow table walk. int forked = 0; // Sanity check: CLI must not diddle with heartbeat table #define MAX_HEART_SIZE (8192) // Maximum size of heartbeat packet. Must be less than max IP packet size :) @@ -88,6 +89,7 @@ int cluster_init() config->cluster_undefined_sessions = MAXSESSION-1; config->cluster_undefined_bundles = MAXBUNDLE-1; config->cluster_undefined_tunnels = MAXTUNNEL-1; + config->cluster_undefined_groupes = MAXGROUPE-1; if (!config->cluster_address) return 0; @@ -229,7 +231,8 @@ static void cluster_uptodate(void) if (config->cluster_iam_uptodate) return; - if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels || config->cluster_undefined_bundles) + if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels || + config->cluster_undefined_bundles || config->cluster_undefined_groupes) return; config->cluster_iam_uptodate = 1; @@ -454,7 +457,8 @@ void cluster_send_ping(time_t basetime) x.ver = 1; x.addr = config->bind_address; - x.undef = config->cluster_undefined_sessions + config->cluster_undefined_tunnels + config->cluster_undefined_bundles; + x.undef = config->cluster_undefined_sessions + config->cluster_undefined_tunnels + + config->cluster_undefined_groupes + config->cluster_undefined_bundles; x.basetime = basetime; add_type(&p, C_PING, basetime, (uint8_t *) &x, sizeof(x)); @@ -676,6 +680,20 @@ void cluster_check_master(void) config->cluster_highest_bundleid = i; } + // + // Go through and mark all the groupes as defined. + // Count the highest used groupe number as well. + // + config->cluster_highest_groupeid = 0; + for (i = 0; i < MAXGROUPE; ++i) + { + if (grpsession[i].state == GROUPEUNDEF) + grpsession[i].state = GROUPEFREE; + + if (grpsession[i].state != GROUPEFREE && i > config->cluster_highest_groupeid) + config->cluster_highest_groupeid = i; + } + // // Go through and mark all the sessions as being defined. // reset the idle timeouts. @@ -711,7 +729,6 @@ void cluster_check_master(void) increment_counter(&session[i].cout, &session[i].cout_wrap, sess_local[i].cout); session[i].cin_delta += sess_local[i].cin; session[i].cout_delta += sess_local[i].cout; - session[i].coutgrp_delta += sess_local[i].cout; session[i].pin += sess_local[i].pin; session[i].pout += sess_local[i].pout; @@ -744,6 +761,7 @@ void cluster_check_master(void) config->cluster_undefined_sessions = 0; config->cluster_undefined_bundles = 0; config->cluster_undefined_tunnels = 0; + config->cluster_undefined_groupes = 0; config->cluster_iam_uptodate = 1; // assume all peers are up-to-date // FIXME. We need to fix up the tunnel control message @@ -760,7 +778,7 @@ void cluster_check_master(void) // we fix it up here, and we ensure that the 'first free session' // pointer is valid. // -static void cluster_check_sessions(int highsession, int freesession_ptr, int highbundle, int hightunnel) +static void cluster_check_sessions(int highsession, int freesession_ptr, int highbundle, int hightunnel, int highgroupe) { int i; @@ -769,7 +787,8 @@ static void cluster_check_sessions(int highsession, int freesession_ptr, int hig if (config->cluster_iam_uptodate) return; - if (highsession > config->cluster_undefined_sessions && highbundle > config->cluster_undefined_bundles && hightunnel > config->cluster_undefined_tunnels) + if (highsession > config->cluster_undefined_sessions && highbundle > config->cluster_undefined_bundles && + highgroupe > config->cluster_undefined_groupes && hightunnel > config->cluster_undefined_tunnels) return; // Clear out defined sessions, counting the number of @@ -811,10 +830,23 @@ static void cluster_check_sessions(int highsession, int freesession_ptr, int hig ++config->cluster_undefined_tunnels; } + // Clear out defined groupe, counting the number of + // undefs remaining. + config->cluster_undefined_groupes = 0; + for (i = 1 ; i < MAXGROUPE; ++i) { + if (i > highgroupe) { + if (grpsession[i].state == GROUPEUNDEF) grpsession[i].state = GROUPEFREE; // Defined. + continue; + } + + if (grpsession[i].state == GROUPEUNDEF) + ++config->cluster_undefined_groupes; + } - if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels || config->cluster_undefined_bundles) { - LOG(2, 0, 0, "Cleared undefined sessions/bundles/tunnels. %d sess (high %d), %d bund (high %d), %d tunn (high %d)\n", - config->cluster_undefined_sessions, highsession, config->cluster_undefined_bundles, highbundle, config->cluster_undefined_tunnels, hightunnel); + if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels || config->cluster_undefined_bundles || config->cluster_undefined_groupes) { + LOG(2, 0, 0, "Cleared undefined sessions/bundles/tunnels. %d sess (high %d), %d bund (high %d), %d grp (high %d), %d tunn (high %d)\n", + config->cluster_undefined_sessions, highsession, config->cluster_undefined_bundles, highbundle, + config->cluster_undefined_groupes, highgroupe, config->cluster_undefined_tunnels, hightunnel); return; } @@ -868,6 +900,27 @@ static int hb_add_type(uint8_t **p, int type, int id) add_type(p, C_BUNDLE, id, (uint8_t *) &bundle[id], sizeof(bundlet)); break; + case C_CGROUPE: { // Compressed C_GROUPE + uint8_t c[sizeof(groupsesst) * 2]; // Bigger than worst case. + uint8_t *d = (uint8_t *) &grpsession[id]; + uint8_t *orig = d; + int size; + + size = rle_compress( &d, sizeof(groupsesst), c, sizeof(c) ); + + // Did we compress the full structure, and is the size actually + // reduced?? + if ( (d - orig) == sizeof(groupsesst) && size < sizeof(groupsesst) ) + { + add_type(p, C_CGROUPE, id, c, size); + break; + } + // Failed to compress : Fall through. + } + case C_GROUPE: + add_type(p, C_GROUPE, id, (uint8_t *) &grpsession[id], sizeof(groupsesst)); + break; + case C_CTUNNEL: { // Compressed C_TUNNEL uint8_t c[sizeof(tunnelt) * 2]; // Bigger than worst case. uint8_t *d = (uint8_t *) &tunnel[id]; @@ -900,7 +953,7 @@ static int hb_add_type(uint8_t **p, int type, int id) // void cluster_heartbeat() { - int i, count = 0, tcount = 0, bcount = 0; + int i, count = 0, tcount = 0, bcount = 0, gcount = 0; uint8_t buff[MAX_HEART_SIZE + sizeof(heartt) + sizeof(int) ]; heartt h; uint8_t *p = buff; @@ -922,9 +975,11 @@ void cluster_heartbeat() h.freesession = sessionfree; h.hightunnel = config->cluster_highest_tunnelid; h.highbundle = config->cluster_highest_bundleid; + h.highgroupe = config->cluster_highest_groupeid; h.size_sess = sizeof(sessiont); // Just in case. h.size_bund = sizeof(bundlet); h.size_tunn = sizeof(tunnelt); + h.nextgrpid = gnextgrpid; h.interval = config->cluster_hb_interval; h.timeout = config->cluster_hb_timeout; h.table_version = config->cluster_table_version; @@ -943,7 +998,7 @@ void cluster_heartbeat() // // Fill out the packet with sessions from the session table... - // (not forgetting to leave space so we can get some tunnels in too ) + // (not forgetting to leave space so we can get some tunnels,bundle,groupe in too ) while ( (p + sizeof(uint32_t) * 2 + sizeof(sessiont) * 2 ) < (buff + MAX_HEART_SIZE) ) { if (!walk_session_number) // session #0 isn't valid. @@ -958,40 +1013,59 @@ void cluster_heartbeat() ++count; // Count the number of extra sessions we're sending. } - // - // Fill out the packet with tunnels from the tunnel table... - // This effectively means we walk the tunnel table more quickly - // than the session table. This is good because stuffing up a - // tunnel is a much bigger deal than stuffing up a session. - // - while ( (p + sizeof(uint32_t) * 2 + sizeof(tunnelt) ) < (buff + MAX_HEART_SIZE) ) { + // + // Fill out the packet with tunnels from the tunnel table... + // This effectively means we walk the tunnel table more quickly + // than the session table. This is good because stuffing up a + // tunnel is a much bigger deal than stuffing up a session. + // + int maxsize = (sizeof(tunnelt) < sizeof(bundlet)) ? sizeof(bundlet):sizeof(tunnelt); + maxsize = (sizeof(groupsesst) < maxsize) ? maxsize:sizeof(groupsesst); + maxsize += (sizeof(uint32_t) * 2); + + // Fill out the packet with tunnels,bundlets, groupes from the tables... + while ( (p + maxsize) < (buff + MAX_HEART_SIZE) ) + { + if ((tcount >= config->cluster_highest_tunnelid) && + (bcount >= config->cluster_highest_bundleid) && + (gcount >= config->cluster_highest_groupeid)) + break; - if (!walk_tunnel_number) // tunnel #0 isn't valid. - ++walk_tunnel_number; + if ( ((p + sizeof(uint32_t) * 2 + sizeof(tunnelt) ) < (buff + MAX_HEART_SIZE)) && + (tcount < config->cluster_highest_tunnelid)) + { + if (!walk_tunnel_number) // tunnel #0 isn't valid. + ++walk_tunnel_number; - if (tcount >= config->cluster_highest_tunnelid) - break; + hb_add_type(&p, C_CTUNNEL, walk_tunnel_number); + walk_tunnel_number = (1+walk_tunnel_number)%(config->cluster_highest_tunnelid+1); // +1 avoids divide by zero. - hb_add_type(&p, C_CTUNNEL, walk_tunnel_number); - walk_tunnel_number = (1+walk_tunnel_number)%(config->cluster_highest_tunnelid+1); // +1 avoids divide by zero. + ++tcount; + } - ++tcount; - } + if ( ((p + sizeof(uint32_t) * 2 + sizeof(bundlet) ) < (buff + MAX_HEART_SIZE)) && + (bcount < config->cluster_highest_bundleid)) + { + if (!walk_bundle_number) // bundle #0 isn't valid. + ++walk_bundle_number; - // - // Fill out the packet with bundles from the bundle table... - while ( (p + sizeof(uint32_t) * 2 + sizeof(bundlet) ) < (buff + MAX_HEART_SIZE) ) { + hb_add_type(&p, C_CBUNDLE, walk_bundle_number); + walk_bundle_number = (1+walk_bundle_number)%(config->cluster_highest_bundleid+1); // +1 avoids divide by zero. - if (!walk_bundle_number) // bundle #0 isn't valid. - ++walk_bundle_number; + ++bcount; + } - if (bcount >= config->cluster_highest_bundleid) - break; + if ( ((p + sizeof(uint32_t) * 2 + sizeof(groupsesst) ) < (buff + MAX_HEART_SIZE)) && + (gcount < config->cluster_highest_groupeid)) + { + if (!walk_groupe_number) // groupe #0 isn't valid. + ++walk_groupe_number; - hb_add_type(&p, C_CBUNDLE, walk_bundle_number); - walk_bundle_number = (1+walk_bundle_number)%(config->cluster_highest_bundleid+1); // +1 avoids divide by zero. - ++bcount; - } + hb_add_type(&p, C_CGROUPE, walk_groupe_number); + walk_groupe_number = (1+walk_groupe_number)%(config->cluster_highest_groupeid+1); // +1 avoids divide by zero. + ++gcount; + } + } // // Did we do something wrong? @@ -1002,10 +1076,10 @@ void cluster_heartbeat() } LOG(4, 0, 0, "Sending v%d heartbeat #%d, change #%" PRIu64 " with %d changes " - "(%d x-sess, %d x-bundles, %d x-tunnels, %d highsess, %d highbund, %d hightun, size %d)\n", + "(%d x-sess, %d x-bundles, %d x-tunnels, %d x-groupes, %d highsess, %d highbund, %d hightun, %d highgrp, size %d)\n", HB_VERSION, h.seq, h.table_version, config->cluster_num_changes, - count, bcount, tcount, config->cluster_highest_sessionid, config->cluster_highest_bundleid, - config->cluster_highest_tunnelid, (int) (p - buff)); + count, bcount, tcount, gcount, config->cluster_highest_sessionid, config->cluster_highest_bundleid, + config->cluster_highest_tunnelid, config->cluster_highest_groupeid, (int) (p - buff)); config->cluster_num_changes = 0; @@ -1071,6 +1145,18 @@ int cluster_send_bundle(int bid) return type_changed(C_CBUNDLE, bid); } +// A particular groupe has been changed! +int cluster_send_groupe(int gid) +{ + if (!config->cluster_iam_master) + { + LOG(0, 0, gid, "I'm not a master, but I just tried to change a groupe!\n"); + return -1; + } + + return type_changed(C_CGROUPE, gid); +} + // A particular tunnel has been changed! int cluster_send_tunnel(int tid) { @@ -1266,7 +1352,6 @@ static int cluster_handle_bytes(uint8_t *data, int size) session[b->sid].cin_delta += b->cin; session[b->sid].cout_delta += b->cout; - session[b->sid].coutgrp_delta += b->cout; if (b->cin) session[b->sid].last_packet = session[b->sid].last_data = time_now; @@ -1336,6 +1421,31 @@ static int cluster_recv_bundle(int more, uint8_t *p) return 0; } +static int cluster_recv_groupe(int more, uint8_t *p) +{ + if (more >= MAXGROUPE) { + LOG(0, 0, 0, "DANGER: Received a group id > MAXGROUPE!\n"); + return -1; + } + + if (grpsession[more].state == GROUPEUNDEF) { + if (config->cluster_iam_uptodate) { // Sanity. + LOG(0, 0, 0, "I thought I was uptodate but I just found an undefined group!\n"); + } else { + --config->cluster_undefined_groupes; + } + } + + grp_cluster_load_groupe(more, (groupsesst *) p); // Copy groupe into groupe table.. + + LOG(5, 0, more, "Received group update (%d undef)\n", config->cluster_undefined_groupes); + + if (!config->cluster_iam_uptodate) + cluster_uptodate(); // Check to see if we're up to date. + + return 0; +} + static int cluster_recv_tunnel(int more, uint8_t *p) { if (more >= MAXTUNNEL) { @@ -1632,9 +1742,10 @@ static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t memcpy(&past_hearts[i].data, data, size); // Save it. - // Check that we don't have too many undefined sessions, and - // that the free session pointer is correct. - cluster_check_sessions(h->highsession, h->freesession, h->highbundle, h->hightunnel); + // Check that we don't have too many undefined sessions, and + // that the free session pointer is correct. + gnextgrpid = h->nextgrpid; + cluster_check_sessions(h->highsession, h->freesession, h->highbundle, h->hightunnel, h->highgroupe); if (h->interval != config->cluster_hb_interval) { @@ -1772,6 +1883,36 @@ static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t p += sizeof(bundle[more]); s -= sizeof(bundle[more]); break; + + case C_CGROUPE: + { // Compressed Groupe structure. + uint8_t c[ sizeof(groupsesst) + 2]; + int size; + uint8_t *orig_p = p; + + size = rle_decompress((uint8_t **) &p, s, c, sizeof(c)); + s -= (p - orig_p); + + if (size != sizeof(groupsesst) ) + { // Ouch! Very very bad! + LOG(0, 0, 0, "DANGER: Received a C_CGROUPE that didn't decompress correctly!\n"); + // Now what? Should exit! No-longer up to date! + break; + } + + cluster_recv_groupe(more, c); + break; + } + case C_GROUPE: + if ( s < sizeof(grpsession[more])) + goto shortpacket; + + cluster_recv_groupe(more, p); + + p += sizeof(grpsession[more]); + s -= sizeof(grpsession[more]); + break; + default: LOG(0, 0, 0, "DANGER: I received a heartbeat element where I didn't understand the type! (%d)\n", type); return -1; // can't process any more of the packet!! @@ -1981,12 +2122,14 @@ int cmd_show_cluster(struct cli_def *cli, char *command, char **argv, int argc) cli_print(cli, "Next sequence number expected: %d", config->cluster_seq_number); cli_print(cli, "%d sessions undefined of %d", config->cluster_undefined_sessions, config->cluster_highest_sessionid); cli_print(cli, "%d bundles undefined of %d", config->cluster_undefined_bundles, config->cluster_highest_bundleid); + cli_print(cli, "%d groupes undefined of %d", config->cluster_undefined_groupes, config->cluster_highest_groupeid); cli_print(cli, "%d tunnels undefined of %d", config->cluster_undefined_tunnels, config->cluster_highest_tunnelid); } else { cli_print(cli, "Table version # : %" PRIu64, config->cluster_table_version); cli_print(cli, "Next heartbeat # : %d", config->cluster_seq_number); cli_print(cli, "Highest session : %d", config->cluster_highest_sessionid); cli_print(cli, "Highest bundle : %d", config->cluster_highest_bundleid); + cli_print(cli, "Highest groupe : %d", config->cluster_highest_groupeid); cli_print(cli, "Highest tunnel : %d", config->cluster_highest_tunnelid); cli_print(cli, "%d changes queued for sending", config->cluster_num_changes); } diff --git a/cluster.h b/cluster.h index 660b0c8..794cc9d 100644 --- a/cluster.h +++ b/cluster.h @@ -25,6 +25,9 @@ #define C_CBUNDLE 18 // Compressed bundle structure. #define C_MPPP_FORWARD 19 // MPPP Forwarded packet.. #define C_PPPOE_FORWARD 20 // PPPOE Forwarded packet.. +#define C_GROUPE 21 // Groupe structure. +#define C_CGROUPE 22 // Compressed groupe structure. + #define HB_VERSION 7 // Protocol version number.. #define HB_MAX_SEQ (1<<30) // Maximum sequence number. (MUST BE A POWER OF 2!) @@ -58,7 +61,10 @@ typedef struct { uint64_t table_version; // # state changes processed by cluster - char reserved[128 - 13*sizeof(uint32_t) - sizeof(uint64_t)]; // Pad out to 128 bytes. + uint32_t highgroupe; // Id of the highest used groupe. + uint32_t nextgrpid; // nextgrpid to set gnextgrpid on slave + + char reserved[128 - 15*sizeof(uint32_t) - sizeof(uint64_t)]; // Pad out to 128 bytes. } heartt; typedef struct { /* Used to update byte counters on the */ @@ -81,6 +87,7 @@ int cluster_init(void); int processcluster(uint8_t *buf, int size, in_addr_t addr); int cluster_send_session(int sid); int cluster_send_bundle(int bid); +int cluster_send_groupe(int gid); int cluster_send_tunnel(int tid); int master_forward_packet(uint8_t *data, int size, in_addr_t addr, uint16_t port, uint16_t indexudp); int master_forward_dae_packet(uint8_t *data, int size, in_addr_t addr, int port); diff --git a/garden.c b/garden.c index 0b7d763..63135ea 100644 --- a/garden.c +++ b/garden.c @@ -222,7 +222,7 @@ int garden_session(sessiont *s, int flag, char *newuser) /* Clean up counters */ s->pin = s->pout = 0; s->cin = s->cout = 0; - s->cin_delta = s->cout_delta = s->coutgrp_delta = 0; + s->cin_delta = s->cout_delta = 0; s->cin_wrap = s->cout_wrap = 0; snprintf(cmd, sizeof(cmd), diff --git a/grpsess.c b/grpsess.c index bfc7d8d..73c729a 100644 --- a/grpsess.c +++ b/grpsess.c @@ -12,6 +12,7 @@ #include "l2tpns.h" #include "util.h" +#include "cluster.h" #ifdef BGP #include "bgp.h" @@ -84,7 +85,7 @@ groupidt grp_groupbyip(in_addr_t ip) // // This adds it to the routing table, advertises it // via BGP if enabled, and stuffs it into the -// 'sessionbyip' cache. +// 'groupbyip' cache. // // 'ip' must be in _host_ order. // @@ -153,7 +154,10 @@ static void grp_routeset(groupidt g, in_addr_t ip, int prefixlen, int add) g = 0; // Caching the session as '0' is the same as uncaching. for (i = ip; i < ip+(1<<(32-prefixlen)) ; ++i) + { grp_cache_ipmap(i, g); + if (!g) cache_ipmap(i, 0); + } } } @@ -212,7 +216,7 @@ void grp_removesession(groupidt g, sessionidt s) if (gnextgrpid == g) { - gnextgrpid = 0; + gnextgrpid = grpsession[g].prev; } else { @@ -228,6 +232,7 @@ void grp_removesession(groupidt g, sessionidt s) } memset(&grpsession[g], 0, sizeof(grpsession[0])); + grpsession[g].state = GROUPEFREE; } else { @@ -236,6 +241,8 @@ void grp_removesession(groupidt g, sessionidt s) &grpsession[g].sesslist[i+1], (grpsession[g].nbsession - i) * sizeof(grpsession[g].sesslist[i])); } + + cluster_send_groupe(g); return; } } @@ -270,6 +277,7 @@ static int grp_addsession(groupidt g, sessionidt s, uint8_t weight) // it's the first session of the group, set to next group grpsession[g].prev = gnextgrpid; gnextgrpid = g; + grpsession[g].state = GROUPEOPEN; } grpsession[g].sesslist[i].sid = s; grpsession[g].sesslist[i].weight = weight; @@ -356,6 +364,9 @@ void grp_processvendorspecific(sessionidt s, uint8_t *pvs) return; } + if (grpid > config->cluster_highest_groupeid) + config->cluster_highest_groupeid = grpid; + n++; } @@ -420,6 +431,8 @@ void grp_processvendorspecific(sessionidt s, uint8_t *pvs) // Init data structures void grp_initdata() { + int i; + // Set default value (10s) config->grp_txrate_average_time = 10; @@ -430,6 +443,10 @@ void grp_initdata() } memset(grpsession, 0, sizeof(grpsession[0]) * MAXGROUPE); + for (i = 1; i < MAXGROUPE; i++) + { + grpsession[i].state = GROUPEUNDEF; + } } // Update time_changed of the group @@ -500,8 +517,13 @@ sessionidt grp_getnextsession(groupidt g, in_addr_t ip) { if ((s2 = grpsession[g].sesslist[i].sid)) { - grpsession[g].sesslist[i].tx_rate = session[s2].coutgrp_delta/ltime_changed; - session[s2].coutgrp_delta = 0; + uint32_t coutgrp_delta = 0; + + if (session[s2].cout >= grpsession[g].sesslist[i].prev_coutgrp) + coutgrp_delta = session[s2].cout - grpsession[g].sesslist[i].prev_coutgrp; + grpsession[g].sesslist[i].prev_coutgrp = session[s2].cout; + + grpsession[g].sesslist[i].tx_rate = coutgrp_delta/ltime_changed; txrate = grpsession[g].sesslist[i].tx_rate/grpsession[g].sesslist[i].weight; if (txrate < mintxrate) @@ -587,3 +609,66 @@ sessionidt grp_getnextsession(groupidt g, in_addr_t ip) return s; } + +// load a groupe receive from master +int grp_cluster_load_groupe(groupidt g, groupsesst *new) +{ + int i; + int updategroup = 0; + + if (g >= MAXGROUPE) + { + LOG(0, 0, 0, "ERROR: Received a group id > MAXGROUPE!\n"); + return 0; + } + + if ((grpsession[g].nbroutesgrp != new->nbroutesgrp) || + (grpsession[g].nbsession != new->nbsession)) + { + updategroup = 1; + } + + if (!updategroup) + { + // Check session list + for (i = 0; i < grpsession[g].nbsession; i++) + { + if (grpsession[g].sesslist[i].sid == new->sesslist[i].sid) + { + updategroup = 1; + break; + } + } + } + + if (!updategroup) + { + // Check routes list + for (i = 0; i < grpsession[g].nbroutesgrp; i++) + { + if (grpsession[g].route[i].ip != new->route[i].ip) + { + updategroup = 1; + break; + } + } + } + + // needs update + if (updategroup) + { + // Del all routes + grp_setgrouproute(g, 0); + } + + memcpy(&grpsession[g], new, sizeof(grpsession[g])); // Copy over.. + + // needs update + if (updategroup) + { + // Add all routes + grp_setgrouproute(g, 1); + } + + return 1; +} diff --git a/l2tplac.c b/l2tplac.c index d41a26d..fd3a7e0 100644 --- a/l2tplac.c +++ b/l2tplac.c @@ -528,7 +528,6 @@ int lac_session_forward(uint8_t *buf, int len, sessionidt sess, uint16_t proto, // Update STAT OUT increment_counter(&session[s].cout, &session[s].cout_wrap, len); // byte count session[s].cout_delta += len; - session[s].coutgrp_delta += len; session[s].pout++; sess_local[s].cout += len; sess_local[s].pout++; diff --git a/l2tpns.c b/l2tpns.c index 08c8bed..17c440b 100644 --- a/l2tpns.c +++ b/l2tpns.c @@ -1394,7 +1394,6 @@ static void update_session_out_stat(sessionidt s, sessiont *sp, int len) { increment_counter(&sp->cout, &sp->cout_wrap, len); // byte count sp->cout_delta += len; - sp->coutgrp_delta += len; sp->pout++; sp->last_data = time_now; @@ -1804,7 +1803,6 @@ static void processipv6out(uint8_t * buf, int len) increment_counter(&sp->cout, &sp->cout_wrap, len); // byte count sp->cout_delta += len; - sp->coutgrp_delta += len; sp->pout++; udp_tx += len; @@ -1854,7 +1852,6 @@ static void send_ipout(sessionidt s, uint8_t *buf, int len) increment_counter(&sp->cout, &sp->cout_wrap, len); // byte count sp->cout_delta += len; - sp->coutgrp_delta += len; sp->pout++; udp_tx += len; @@ -5179,6 +5176,9 @@ int main(int argc, char *argv[]) LOG(0, 0, 0, "Can't lock pages: %s\n", strerror(errno)); } + //LOG(3, 0, 0, "Debug sizeof struct: sessiont %lu, tunnelt %lu, bundlet %lu, groupsesst %lu\n", + // sizeof(sessiont), sizeof(tunnelt), sizeof(bundlet), sizeof(groupsesst)); + mainloop(); /* remove plugins (so cleanup code gets run) */ @@ -5679,6 +5679,7 @@ int sessionsetup(sessionidt s, tunnelidt t) if ((g = grp_groupbysession(s))) { grp_setgrouproute(g, 1); + cluster_send_groupe(g); } } diff --git a/l2tpns.h b/l2tpns.h index 4fe9a5c..bce434f 100644 --- a/l2tpns.h +++ b/l2tpns.h @@ -25,7 +25,6 @@ #define MAXSESSION 60000 // could be up to 65535 #define MAXTBFS 6000 // Maximum token bucket filters. Might need up to 2 * session. #define MAXSESSINGRP 12 // Maximum number of member links in grouped session -#define MAXGRPINSESS 12 // Maximum number of member links in session group #define MAXGROUPE 300 // could be up to 65535, Maximum number of grouped session #define MAXROUTEINGRP 15 // max static routes per group @@ -336,25 +335,27 @@ typedef struct struct in6_addr ipv6route; // Static IPv6 route sessionidt forwardtosession; // LNS id_session to forward uint8_t src_hwaddr[ETH_ALEN]; // MAC addr source (for pppoe sessions 6 bytes) - uint32_t coutgrp_delta; + char reserved[4]; // Space to expand structure without changing HB_VERSION } sessiont; typedef struct { uint32_t tx_rate; + uint32_t prev_coutgrp; sessionidt sid; uint8_t weight; } -groupsessionidt; +groupsesslistt; typedef struct { + int state; // current state (groupestate enum) uint32_t time_changed; groupidt prev; sessionidt smax; sessionidt smin; - groupsessionidt sesslist[MAXSESSINGRP]; + groupsesslistt sesslist[MAXSESSINGRP]; routet route[MAXROUTEINGRP]; // static routes uint8_t nbroutesgrp; uint8_t nbsession; @@ -549,6 +550,13 @@ enum BUNDLEUNDEF, // Undefined }; +enum +{ + GROUPEFREE, // Not in use + GROUPEOPEN, // Active bundle + GROUPEUNDEF // Undefined +}; + enum { NULLCLASS = 0, //End Point Discriminator classes @@ -756,8 +764,10 @@ typedef struct int cluster_undefined_sessions; // How many sessions we're yet to receive from the master. int cluster_undefined_bundles; // How many bundles we're yet to receive from the master. int cluster_undefined_tunnels; // How many tunnels we're yet to receive from the master. + int cluster_undefined_groupes; // How many groupes we're yet to receive from the master. int cluster_highest_sessionid; int cluster_highest_bundleid; + int cluster_highest_groupeid; int cluster_highest_tunnelid; clockt cluster_last_hb; // Last time we saw a heartbeat from the master. int cluster_last_hb_ver; // Heartbeat version last seen from master @@ -996,6 +1006,7 @@ groupidt grp_groupbyip(in_addr_t ip); void grp_setgrouproute(groupidt g, int add); void grp_time_changed(void); void grp_removesession(groupidt g, sessionidt s); +int grp_cluster_load_groupe(groupidt g, groupsesst *new); #undef LOG #undef LOG_HEX @@ -1032,7 +1043,7 @@ extern sessiont *session; extern sessionlocalt *sess_local; extern ippoolt *ip_address_pool; extern groupsesst *grpsession; -groupidt gnextgrpid; +extern groupidt gnextgrpid; #define sessionfree (session[0].next) diff --git a/pppoe.c b/pppoe.c index f804b0e..61b881b 100644 --- a/pppoe.c +++ b/pppoe.c @@ -956,7 +956,6 @@ static void pppoe_forwardto_session_rmlns(uint8_t *pack, int size, sessionidt se // Update STAT OUT increment_counter(&session[s].cout, &session[s].cout_wrap, ll2tp); // byte count session[s].cout_delta += ll2tp; - session[s].coutgrp_delta += ll2tp; session[s].pout++; sess_local[s].cout += ll2tp; sess_local[s].pout++; @@ -1028,7 +1027,6 @@ void pppoe_forwardto_session_pppoe(uint8_t *pack, int size, sessionidt sess, uin // Update STAT OUT increment_counter(&session[s].cout, &session[s].cout_wrap, lpppoe); // byte count session[s].cout_delta += lpppoe; - session[s].coutgrp_delta += lpppoe; session[s].pout++; sess_local[s].cout += lpppoe; sess_local[s].pout++; -- 2.20.1