manage groupes in cluster mode
authorfendo <fendo@bi12info.com>
Mon, 11 Mar 2013 15:30:58 +0000 (16:30 +0100)
committerfendo <fendo@bi12info.com>
Mon, 11 Mar 2013 15:30:58 +0000 (16:30 +0100)
Makefile
cluster.c
cluster.h
garden.c
grpsess.c
l2tplac.c
l2tpns.c
l2tpns.h
pppoe.c

index c0c7c4e..b19dfcd 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -127,7 +127,7 @@ tbf.o: tbf.c l2tpns.h util.h tbf.h
 util.o: util.c l2tpns.h bgp.h
 pppoe.o: pppoe.c l2tpns.h cluster.h constants.h md5.h util.h
 l2tplac.o: l2tplac.c md5.h l2tpns.h util.h cluster.h l2tplac.h pppoe.h
-grpsess.o: grpsess.c l2tpns.h util.h bgp.h
+grpsess.o: grpsess.c l2tpns.h util.h cluster.h bgp.h
 bgp.o: bgp.c l2tpns.h bgp.h util.h
 autosnoop.so: autosnoop.c l2tpns.h plugin.h
 autothrottle.so: autothrottle.c l2tpns.h plugin.h
index 5b6f208..fc4d58a 100644 (file)
--- a/cluster.c
+++ b/cluster.c
@@ -44,6 +44,7 @@ in_addr_t my_address = 0;             // The network address of my ethernet port.
 static int walk_session_number = 0;    // The next session to send when doing the slow table walk.
 static int walk_bundle_number = 0;     // The next bundle to send when doing the slow table walk.
 static int walk_tunnel_number = 0;     // The next tunnel to send when doing the slow table walk.
+static int walk_groupe_number = 0;     // The next groupe to send when doing the slow table walk.
 int forked = 0;                                // Sanity check: CLI must not diddle with heartbeat table
 
 #define MAX_HEART_SIZE (8192)  // Maximum size of heartbeat packet. Must be less than max IP packet size :)
@@ -88,6 +89,7 @@ int cluster_init()
        config->cluster_undefined_sessions = MAXSESSION-1;
        config->cluster_undefined_bundles = MAXBUNDLE-1;
        config->cluster_undefined_tunnels = MAXTUNNEL-1;
+       config->cluster_undefined_groupes = MAXGROUPE-1;
 
        if (!config->cluster_address)
                return 0;
@@ -229,7 +231,8 @@ static void cluster_uptodate(void)
        if (config->cluster_iam_uptodate)
                return;
 
-       if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels || config->cluster_undefined_bundles)
+       if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels ||
+               config->cluster_undefined_bundles || config->cluster_undefined_groupes)
                return;
 
        config->cluster_iam_uptodate = 1;
@@ -454,7 +457,8 @@ void cluster_send_ping(time_t basetime)
 
        x.ver = 1;
        x.addr = config->bind_address;
-       x.undef = config->cluster_undefined_sessions + config->cluster_undefined_tunnels + config->cluster_undefined_bundles;
+       x.undef = config->cluster_undefined_sessions + config->cluster_undefined_tunnels +
+                       config->cluster_undefined_groupes + config->cluster_undefined_bundles;
        x.basetime = basetime;
 
        add_type(&p, C_PING, basetime, (uint8_t *) &x, sizeof(x));
@@ -676,6 +680,20 @@ void cluster_check_master(void)
                         config->cluster_highest_bundleid = i;
         }
 
+               //
+               // Go through and mark all the groupes as defined.
+               // Count the highest used groupe number as well.
+               //
+               config->cluster_highest_groupeid = 0;
+               for (i = 0; i < MAXGROUPE; ++i)
+               {
+                       if (grpsession[i].state == GROUPEUNDEF)
+                               grpsession[i].state = GROUPEFREE;
+
+                       if (grpsession[i].state != GROUPEFREE && i > config->cluster_highest_groupeid)
+                               config->cluster_highest_groupeid = i;
+               }
+
                //
                // Go through and mark all the sessions as being defined.
                // reset the idle timeouts.
@@ -711,7 +729,6 @@ void cluster_check_master(void)
                increment_counter(&session[i].cout, &session[i].cout_wrap, sess_local[i].cout);
                session[i].cin_delta += sess_local[i].cin;
                session[i].cout_delta += sess_local[i].cout;
-               session[i].coutgrp_delta += sess_local[i].cout;
 
                session[i].pin += sess_local[i].pin;
                session[i].pout += sess_local[i].pout;
@@ -744,6 +761,7 @@ void cluster_check_master(void)
        config->cluster_undefined_sessions = 0;
        config->cluster_undefined_bundles = 0;
        config->cluster_undefined_tunnels = 0;
+       config->cluster_undefined_groupes = 0;
        config->cluster_iam_uptodate = 1; // assume all peers are up-to-date
 
        // FIXME. We need to fix up the tunnel control message
@@ -760,7 +778,7 @@ void cluster_check_master(void)
 // we fix it up here, and we ensure that the 'first free session'
 // pointer is valid.
 //
-static void cluster_check_sessions(int highsession, int freesession_ptr, int highbundle, int hightunnel)
+static void cluster_check_sessions(int highsession, int freesession_ptr, int highbundle, int hightunnel, int highgroupe)
 {
        int i;
 
@@ -769,7 +787,8 @@ static void cluster_check_sessions(int highsession, int freesession_ptr, int hig
        if (config->cluster_iam_uptodate)
                return;
 
-       if (highsession > config->cluster_undefined_sessions && highbundle > config->cluster_undefined_bundles && hightunnel > config->cluster_undefined_tunnels)
+       if (highsession > config->cluster_undefined_sessions && highbundle > config->cluster_undefined_bundles &&
+               highgroupe > config->cluster_undefined_groupes && hightunnel > config->cluster_undefined_tunnels)
                return;
 
                // Clear out defined sessions, counting the number of
@@ -811,10 +830,23 @@ static void cluster_check_sessions(int highsession, int freesession_ptr, int hig
                        ++config->cluster_undefined_tunnels;
        }
 
+       // Clear out defined groupe, counting the number of
+       // undefs remaining.
+       config->cluster_undefined_groupes = 0;
+       for (i = 1 ; i < MAXGROUPE; ++i) {
+               if (i > highgroupe) {
+                       if (grpsession[i].state == GROUPEUNDEF) grpsession[i].state = GROUPEFREE; // Defined.
+                       continue;
+               }
+
+               if (grpsession[i].state == GROUPEUNDEF)
+                       ++config->cluster_undefined_groupes;
+       }
 
-       if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels || config->cluster_undefined_bundles) {
-               LOG(2, 0, 0, "Cleared undefined sessions/bundles/tunnels. %d sess (high %d), %d bund (high %d), %d tunn (high %d)\n",
-                       config->cluster_undefined_sessions, highsession, config->cluster_undefined_bundles, highbundle, config->cluster_undefined_tunnels, hightunnel);
+       if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels || config->cluster_undefined_bundles || config->cluster_undefined_groupes) {
+               LOG(2, 0, 0, "Cleared undefined sessions/bundles/tunnels. %d sess (high %d), %d bund (high %d), %d grp (high %d), %d tunn (high %d)\n",
+                       config->cluster_undefined_sessions, highsession, config->cluster_undefined_bundles, highbundle,
+                       config->cluster_undefined_groupes, highgroupe, config->cluster_undefined_tunnels, hightunnel);
                return;
        }
 
@@ -868,6 +900,27 @@ static int hb_add_type(uint8_t **p, int type, int id)
                        add_type(p, C_BUNDLE, id, (uint8_t *) &bundle[id], sizeof(bundlet));
                        break;
 
+               case C_CGROUPE: { // Compressed C_GROUPE
+                       uint8_t c[sizeof(groupsesst) * 2]; // Bigger than worst case.
+                       uint8_t *d = (uint8_t *) &grpsession[id];
+                       uint8_t *orig = d;
+                       int size;
+
+                       size = rle_compress( &d,  sizeof(groupsesst), c, sizeof(c) );
+
+                       // Did we compress the full structure, and is the size actually
+                       // reduced??
+                       if ( (d - orig) == sizeof(groupsesst) && size < sizeof(groupsesst) )
+                       {
+                               add_type(p, C_CGROUPE, id, c, size);
+                               break;
+                       }
+                       // Failed to compress : Fall through.
+               }
+               case C_GROUPE:
+                       add_type(p, C_GROUPE, id, (uint8_t *) &grpsession[id], sizeof(groupsesst));
+                       break;
+
                case C_CTUNNEL: { // Compressed C_TUNNEL
                        uint8_t c[sizeof(tunnelt) * 2]; // Bigger than worst case.
                        uint8_t *d = (uint8_t *) &tunnel[id];
@@ -900,7 +953,7 @@ static int hb_add_type(uint8_t **p, int type, int id)
 //
 void cluster_heartbeat()
 {
-       int i, count = 0, tcount = 0, bcount = 0;
+       int i, count = 0, tcount = 0, bcount = 0, gcount = 0;
        uint8_t buff[MAX_HEART_SIZE + sizeof(heartt) + sizeof(int) ];
        heartt h;
        uint8_t *p = buff;
@@ -922,9 +975,11 @@ void cluster_heartbeat()
        h.freesession = sessionfree;
        h.hightunnel = config->cluster_highest_tunnelid;
        h.highbundle = config->cluster_highest_bundleid;
+       h.highgroupe = config->cluster_highest_groupeid;
        h.size_sess = sizeof(sessiont);         // Just in case.
        h.size_bund = sizeof(bundlet);
        h.size_tunn = sizeof(tunnelt);
+       h.nextgrpid = gnextgrpid;
        h.interval = config->cluster_hb_interval;
        h.timeout  = config->cluster_hb_timeout;
        h.table_version = config->cluster_table_version;
@@ -943,7 +998,7 @@ void cluster_heartbeat()
 
                //
                // Fill out the packet with sessions from the session table...
-               // (not forgetting to leave space so we can get some tunnels in too )
+               // (not forgetting to leave space so we can get some tunnels,bundle,groupe in too )
        while ( (p + sizeof(uint32_t) * 2 + sizeof(sessiont) * 2 ) < (buff + MAX_HEART_SIZE) ) {
 
                if (!walk_session_number)       // session #0 isn't valid.
@@ -958,40 +1013,59 @@ void cluster_heartbeat()
                ++count;                        // Count the number of extra sessions we're sending.
        }
 
-               //
-               // Fill out the packet with tunnels from the tunnel table...
-               // This effectively means we walk the tunnel table more quickly
-               // than the session table. This is good because stuffing up a 
-               // tunnel is a much bigger deal than stuffing up a session.
-               //
-       while ( (p + sizeof(uint32_t) * 2 + sizeof(tunnelt) ) < (buff + MAX_HEART_SIZE) ) {
+       //
+       // Fill out the packet with tunnels from the tunnel table...
+       // This effectively means we walk the tunnel table more quickly
+       // than the session table. This is good because stuffing up a
+       // tunnel is a much bigger deal than stuffing up a session.
+       //
+       int maxsize = (sizeof(tunnelt) < sizeof(bundlet)) ? sizeof(bundlet):sizeof(tunnelt);
+       maxsize = (sizeof(groupsesst) < maxsize) ? maxsize:sizeof(groupsesst);
+       maxsize += (sizeof(uint32_t) * 2);
+
+       // Fill out the packet with tunnels,bundlets, groupes from the tables...
+       while ( (p + maxsize) < (buff + MAX_HEART_SIZE) )
+       {
+               if ((tcount >= config->cluster_highest_tunnelid) &&
+                       (bcount >= config->cluster_highest_bundleid) &&
+                       (gcount >= config->cluster_highest_groupeid))
+                               break;
 
-               if (!walk_tunnel_number)        // tunnel #0 isn't valid.
-                       ++walk_tunnel_number;
+               if ( ((p + sizeof(uint32_t) * 2 + sizeof(tunnelt) ) < (buff + MAX_HEART_SIZE)) &&
+                        (tcount < config->cluster_highest_tunnelid))
+               {
+                       if (!walk_tunnel_number)        // tunnel #0 isn't valid.
+                               ++walk_tunnel_number;
 
-               if (tcount >= config->cluster_highest_tunnelid)
-                       break;
+                       hb_add_type(&p, C_CTUNNEL, walk_tunnel_number);
+                       walk_tunnel_number = (1+walk_tunnel_number)%(config->cluster_highest_tunnelid+1);       // +1 avoids divide by zero.
 
-               hb_add_type(&p, C_CTUNNEL, walk_tunnel_number);
-               walk_tunnel_number = (1+walk_tunnel_number)%(config->cluster_highest_tunnelid+1);       // +1 avoids divide by zero.
+                       ++tcount;
+               }
 
-               ++tcount;
-       }
+               if ( ((p + sizeof(uint32_t) * 2 + sizeof(bundlet) ) < (buff + MAX_HEART_SIZE)) &&
+                        (bcount < config->cluster_highest_bundleid))
+               {
+                       if (!walk_bundle_number)        // bundle #0 isn't valid.
+                               ++walk_bundle_number;
 
-               //
-               // Fill out the packet with bundles from the bundle table...
-       while ( (p + sizeof(uint32_t) * 2 + sizeof(bundlet) ) < (buff + MAX_HEART_SIZE) ) {
+                       hb_add_type(&p, C_CBUNDLE, walk_bundle_number);
+                       walk_bundle_number = (1+walk_bundle_number)%(config->cluster_highest_bundleid+1);       // +1 avoids divide by zero.
 
-               if (!walk_bundle_number)        // bundle #0 isn't valid.
-                       ++walk_bundle_number;
+                       ++bcount;
+               }
 
-               if (bcount >= config->cluster_highest_bundleid)
-                       break;
+               if ( ((p + sizeof(uint32_t) * 2 + sizeof(groupsesst) ) < (buff + MAX_HEART_SIZE)) &&
+                        (gcount < config->cluster_highest_groupeid))
+               {
+                       if (!walk_groupe_number)        // groupe #0 isn't valid.
+                               ++walk_groupe_number;
 
-               hb_add_type(&p, C_CBUNDLE, walk_bundle_number);
-               walk_bundle_number = (1+walk_bundle_number)%(config->cluster_highest_bundleid+1);       // +1 avoids divide by zero.
-               ++bcount;
-        }
+                       hb_add_type(&p, C_CGROUPE, walk_groupe_number);
+                       walk_groupe_number = (1+walk_groupe_number)%(config->cluster_highest_groupeid+1);       // +1 avoids divide by zero.
+                       ++gcount;
+               }
+       }
 
                //
                // Did we do something wrong?
@@ -1002,10 +1076,10 @@ void cluster_heartbeat()
        }
 
        LOG(4, 0, 0, "Sending v%d heartbeat #%d, change #%" PRIu64 " with %d changes "
-                    "(%d x-sess, %d x-bundles, %d x-tunnels, %d highsess, %d highbund, %d hightun, size %d)\n",
+                    "(%d x-sess, %d x-bundles, %d x-tunnels, %d x-groupes, %d highsess, %d highbund, %d hightun, %d highgrp, size %d)\n",
            HB_VERSION, h.seq, h.table_version, config->cluster_num_changes,
-           count, bcount, tcount, config->cluster_highest_sessionid, config->cluster_highest_bundleid,
-           config->cluster_highest_tunnelid, (int) (p - buff));
+           count, bcount, tcount, gcount, config->cluster_highest_sessionid, config->cluster_highest_bundleid,
+           config->cluster_highest_tunnelid, config->cluster_highest_groupeid, (int) (p - buff));
 
        config->cluster_num_changes = 0;
 
@@ -1071,6 +1145,18 @@ int cluster_send_bundle(int bid)
        return type_changed(C_CBUNDLE, bid);
 }
 
+// A particular groupe has been changed!
+int cluster_send_groupe(int gid)
+{
+       if (!config->cluster_iam_master)
+       {
+               LOG(0, 0, gid, "I'm not a master, but I just tried to change a groupe!\n");
+               return -1;
+       }
+
+       return type_changed(C_CGROUPE, gid);
+}
+
 // A particular tunnel has been changed!
 int cluster_send_tunnel(int tid)
 {
@@ -1266,7 +1352,6 @@ static int cluster_handle_bytes(uint8_t *data, int size)
 
                session[b->sid].cin_delta += b->cin;
                session[b->sid].cout_delta += b->cout;
-               session[b->sid].coutgrp_delta += b->cout;
 
                if (b->cin)
                        session[b->sid].last_packet = session[b->sid].last_data = time_now;
@@ -1336,6 +1421,31 @@ static int cluster_recv_bundle(int more, uint8_t *p)
         return 0;
 }
 
+static int cluster_recv_groupe(int more, uint8_t *p)
+{
+       if (more >= MAXGROUPE) {
+               LOG(0, 0, 0, "DANGER: Received a group id > MAXGROUPE!\n");
+               return -1;
+       }
+
+       if (grpsession[more].state == GROUPEUNDEF) {
+               if (config->cluster_iam_uptodate) { // Sanity.
+                       LOG(0, 0, 0, "I thought I was uptodate but I just found an undefined group!\n");
+               } else {
+                       --config->cluster_undefined_groupes;
+               }
+       }
+
+       grp_cluster_load_groupe(more, (groupsesst *) p);        // Copy groupe into groupe table..
+
+       LOG(5, 0, more, "Received group update (%d undef)\n", config->cluster_undefined_groupes);
+
+       if (!config->cluster_iam_uptodate)
+               cluster_uptodate();     // Check to see if we're up to date.
+
+        return 0;
+}
+
 static int cluster_recv_tunnel(int more, uint8_t *p)
 {
        if (more >= MAXTUNNEL) {
@@ -1632,9 +1742,10 @@ static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t
        memcpy(&past_hearts[i].data, data, size);       // Save it.
 
 
-                       // Check that we don't have too many undefined sessions, and
-                       // that the free session pointer is correct.
-       cluster_check_sessions(h->highsession, h->freesession, h->highbundle, h->hightunnel);
+       // Check that we don't have too many undefined sessions, and
+       // that the free session pointer is correct.
+       gnextgrpid = h->nextgrpid;
+       cluster_check_sessions(h->highsession, h->freesession, h->highbundle, h->hightunnel, h->highgroupe);
 
        if (h->interval != config->cluster_hb_interval)
        {
@@ -1772,6 +1883,36 @@ static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t
                                 p += sizeof(bundle[more]);
                                 s -= sizeof(bundle[more]);
                                 break;
+
+                       case C_CGROUPE:
+                       { // Compressed Groupe structure.
+                               uint8_t c[ sizeof(groupsesst) + 2];
+                               int size;
+                               uint8_t *orig_p = p;
+
+                               size = rle_decompress((uint8_t **) &p, s, c, sizeof(c));
+                               s -= (p - orig_p);
+
+                               if (size != sizeof(groupsesst) )
+                               { // Ouch! Very very bad!
+                                       LOG(0, 0, 0, "DANGER: Received a C_CGROUPE that didn't decompress correctly!\n");
+                                       // Now what? Should exit! No-longer up to date!
+                                       break;
+                               }
+
+                               cluster_recv_groupe(more, c);
+                               break;
+                       }
+                       case C_GROUPE:
+                               if ( s < sizeof(grpsession[more]))
+                                       goto shortpacket;
+
+                               cluster_recv_groupe(more, p);
+
+                               p += sizeof(grpsession[more]);
+                               s -= sizeof(grpsession[more]);
+                       break;
+
                        default:
                                LOG(0, 0, 0, "DANGER: I received a heartbeat element where I didn't understand the type! (%d)\n", type);
                                return -1; // can't process any more of the packet!!
@@ -1981,12 +2122,14 @@ int cmd_show_cluster(struct cli_def *cli, char *command, char **argv, int argc)
                cli_print(cli, "Next sequence number expected: %d", config->cluster_seq_number);
                cli_print(cli, "%d sessions undefined of %d", config->cluster_undefined_sessions, config->cluster_highest_sessionid);
                cli_print(cli, "%d bundles undefined of %d", config->cluster_undefined_bundles, config->cluster_highest_bundleid);
+               cli_print(cli, "%d groupes undefined of %d", config->cluster_undefined_groupes, config->cluster_highest_groupeid);
                cli_print(cli, "%d tunnels undefined of %d", config->cluster_undefined_tunnels, config->cluster_highest_tunnelid);
        } else {
                cli_print(cli, "Table version #  : %" PRIu64, config->cluster_table_version);
                cli_print(cli, "Next heartbeat # : %d", config->cluster_seq_number);
                cli_print(cli, "Highest session  : %d", config->cluster_highest_sessionid);
                cli_print(cli, "Highest bundle   : %d", config->cluster_highest_bundleid);
+               cli_print(cli, "Highest groupe   : %d", config->cluster_highest_groupeid);
                cli_print(cli, "Highest tunnel   : %d", config->cluster_highest_tunnelid);
                cli_print(cli, "%d changes queued for sending", config->cluster_num_changes);
        }
index 660b0c8..794cc9d 100644 (file)
--- a/cluster.h
+++ b/cluster.h
@@ -25,6 +25,9 @@
 #define C_CBUNDLE              18      // Compressed bundle structure.
 #define C_MPPP_FORWARD 19      // MPPP Forwarded packet..
 #define C_PPPOE_FORWARD        20      // PPPOE Forwarded packet..
+#define C_GROUPE               21      // Groupe structure.
+#define C_CGROUPE              22      // Compressed groupe structure.
+
 
 #define HB_VERSION             7       // Protocol version number..
 #define HB_MAX_SEQ             (1<<30) // Maximum sequence number. (MUST BE A POWER OF 2!)
@@ -58,7 +61,10 @@ typedef struct {
 
        uint64_t table_version; // # state changes processed by cluster
 
-       char reserved[128 - 13*sizeof(uint32_t) - sizeof(uint64_t)];    // Pad out to 128 bytes.
+       uint32_t highgroupe;    // Id of the highest used groupe.
+       uint32_t nextgrpid;     // nextgrpid to set gnextgrpid on slave
+
+       char reserved[128 - 15*sizeof(uint32_t) - sizeof(uint64_t)];    // Pad out to 128 bytes.
 } heartt;
 
 typedef struct {               /* Used to update byte counters on the */
@@ -81,6 +87,7 @@ int cluster_init(void);
 int processcluster(uint8_t *buf, int size, in_addr_t addr);
 int cluster_send_session(int sid);
 int cluster_send_bundle(int bid);
+int cluster_send_groupe(int gid);
 int cluster_send_tunnel(int tid);
 int master_forward_packet(uint8_t *data, int size, in_addr_t addr, uint16_t port, uint16_t indexudp);
 int master_forward_dae_packet(uint8_t *data, int size, in_addr_t addr, int port);
index 0b7d763..63135ea 100644 (file)
--- a/garden.c
+++ b/garden.c
@@ -222,7 +222,7 @@ int garden_session(sessiont *s, int flag, char *newuser)
        /* Clean up counters */
        s->pin = s->pout = 0;
        s->cin = s->cout = 0;
-       s->cin_delta = s->cout_delta = s->coutgrp_delta = 0;
+       s->cin_delta = s->cout_delta = 0;
        s->cin_wrap = s->cout_wrap = 0;
 
        snprintf(cmd, sizeof(cmd),
index bfc7d8d..73c729a 100644 (file)
--- a/grpsess.c
+++ b/grpsess.c
@@ -12,6 +12,7 @@
 
 #include "l2tpns.h"
 #include "util.h"
+#include "cluster.h"
 
 #ifdef BGP
 #include "bgp.h"
@@ -84,7 +85,7 @@ groupidt grp_groupbyip(in_addr_t ip)
 //
 // This adds it to the routing table, advertises it
 // via BGP if enabled, and stuffs it into the
-// 'sessionbyip' cache.
+// 'groupbyip' cache.
 //
 // 'ip' must be in _host_ order.
 //
@@ -153,7 +154,10 @@ static void grp_routeset(groupidt g, in_addr_t ip, int prefixlen, int add)
                        g = 0;  // Caching the session as '0' is the same as uncaching.
 
                for (i = ip; i < ip+(1<<(32-prefixlen)) ; ++i)
+               {
                        grp_cache_ipmap(i, g);
+                       if (!g) cache_ipmap(i, 0);
+               }
        }
 }
 
@@ -212,7 +216,7 @@ void grp_removesession(groupidt g, sessionidt s)
 
                                if (gnextgrpid == g)
                                {
-                                       gnextgrpid = 0;
+                                       gnextgrpid = grpsession[g].prev;
                                }
                                else
                                {
@@ -228,6 +232,7 @@ void grp_removesession(groupidt g, sessionidt s)
                                }
 
                                memset(&grpsession[g], 0, sizeof(grpsession[0]));
+                               grpsession[g].state = GROUPEFREE;
                        }
                        else
                        {
@@ -236,6 +241,8 @@ void grp_removesession(groupidt g, sessionidt s)
                                                &grpsession[g].sesslist[i+1],
                                                (grpsession[g].nbsession - i) * sizeof(grpsession[g].sesslist[i]));
                        }
+
+                       cluster_send_groupe(g);
                        return;
                }
        }
@@ -270,6 +277,7 @@ static int grp_addsession(groupidt g, sessionidt s, uint8_t weight)
                        // it's the first session of the group, set to next group
                        grpsession[g].prev = gnextgrpid;
                        gnextgrpid = g;
+                       grpsession[g].state = GROUPEOPEN;
                }
                grpsession[g].sesslist[i].sid = s;
                grpsession[g].sesslist[i].weight = weight;
@@ -356,6 +364,9 @@ void grp_processvendorspecific(sessionidt s, uint8_t *pvs)
                        return;
                }
 
+               if (grpid > config->cluster_highest_groupeid)
+                       config->cluster_highest_groupeid = grpid;
+
                n++;
        }
 
@@ -420,6 +431,8 @@ void grp_processvendorspecific(sessionidt s, uint8_t *pvs)
 // Init data structures
 void grp_initdata()
 {
+       int i;
+
        // Set default value (10s)
        config->grp_txrate_average_time = 10;
 
@@ -430,6 +443,10 @@ void grp_initdata()
        }
 
        memset(grpsession, 0, sizeof(grpsession[0]) * MAXGROUPE);
+       for (i = 1; i < MAXGROUPE; i++)
+       {
+               grpsession[i].state = GROUPEUNDEF;
+       }
 }
 
 // Update time_changed of the group
@@ -500,8 +517,13 @@ sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
                {
                        if ((s2 = grpsession[g].sesslist[i].sid))
                        {
-                               grpsession[g].sesslist[i].tx_rate = session[s2].coutgrp_delta/ltime_changed;
-                               session[s2].coutgrp_delta = 0;
+                               uint32_t coutgrp_delta = 0;
+
+                               if (session[s2].cout >= grpsession[g].sesslist[i].prev_coutgrp)
+                                       coutgrp_delta = session[s2].cout - grpsession[g].sesslist[i].prev_coutgrp;
+                               grpsession[g].sesslist[i].prev_coutgrp = session[s2].cout;
+
+                               grpsession[g].sesslist[i].tx_rate = coutgrp_delta/ltime_changed;
 
                                txrate = grpsession[g].sesslist[i].tx_rate/grpsession[g].sesslist[i].weight;
                                if (txrate < mintxrate)
@@ -587,3 +609,66 @@ sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
 
        return s;
 }
+
+// load a groupe receive from master
+int grp_cluster_load_groupe(groupidt g, groupsesst *new)
+{
+       int i;
+       int updategroup = 0;
+
+       if (g >= MAXGROUPE)
+       {
+               LOG(0, 0, 0, "ERROR: Received a group id > MAXGROUPE!\n");
+               return 0;
+       }
+
+       if ((grpsession[g].nbroutesgrp != new->nbroutesgrp) ||
+               (grpsession[g].nbsession != new->nbsession))
+       {
+               updategroup = 1;
+       }
+
+       if (!updategroup)
+       {
+               // Check session list
+               for (i = 0; i < grpsession[g].nbsession; i++)
+               {
+                       if (grpsession[g].sesslist[i].sid == new->sesslist[i].sid)
+                       {
+                               updategroup = 1;
+                               break;
+                       }
+               }
+       }
+
+       if (!updategroup)
+       {
+               // Check routes list
+               for (i = 0; i < grpsession[g].nbroutesgrp; i++)
+               {
+                       if (grpsession[g].route[i].ip != new->route[i].ip)
+                       {
+                               updategroup = 1;
+                               break;
+                       }
+               }
+       }
+
+       // needs update
+       if (updategroup)
+       {
+               // Del all routes
+               grp_setgrouproute(g, 0);
+       }
+
+       memcpy(&grpsession[g], new, sizeof(grpsession[g]));     // Copy over..
+
+       // needs update
+       if (updategroup)
+       {
+               // Add all routes
+               grp_setgrouproute(g, 1);
+       }
+
+       return 1;
+}
index d41a26d..fd3a7e0 100644 (file)
--- a/l2tplac.c
+++ b/l2tplac.c
@@ -528,7 +528,6 @@ int lac_session_forward(uint8_t *buf, int len, sessionidt sess, uint16_t proto,
                // Update STAT OUT
                increment_counter(&session[s].cout, &session[s].cout_wrap, len); // byte count
                session[s].cout_delta += len;
-               session[s].coutgrp_delta += len;
                session[s].pout++;
                sess_local[s].cout += len;
                sess_local[s].pout++;
index 08c8bed..17c440b 100644 (file)
--- a/l2tpns.c
+++ b/l2tpns.c
@@ -1394,7 +1394,6 @@ static void update_session_out_stat(sessionidt s, sessiont *sp, int len)
 {
        increment_counter(&sp->cout, &sp->cout_wrap, len); // byte count
        sp->cout_delta += len;
-       sp->coutgrp_delta += len;
        sp->pout++;
        sp->last_data = time_now;
 
@@ -1804,7 +1803,6 @@ static void processipv6out(uint8_t * buf, int len)
 
        increment_counter(&sp->cout, &sp->cout_wrap, len); // byte count
        sp->cout_delta += len;
-       sp->coutgrp_delta += len;
        sp->pout++;
        udp_tx += len;
 
@@ -1854,7 +1852,6 @@ static void send_ipout(sessionidt s, uint8_t *buf, int len)
 
        increment_counter(&sp->cout, &sp->cout_wrap, len); // byte count
        sp->cout_delta += len;
-       sp->coutgrp_delta += len;
        sp->pout++;
        udp_tx += len;
 
@@ -5179,6 +5176,9 @@ int main(int argc, char *argv[])
                        LOG(0, 0, 0, "Can't lock pages: %s\n", strerror(errno));
        }
 
+       //LOG(3, 0, 0, "Debug sizeof struct: sessiont %lu, tunnelt %lu, bundlet %lu, groupsesst %lu\n",
+       //      sizeof(sessiont), sizeof(tunnelt), sizeof(bundlet), sizeof(groupsesst));
+
        mainloop();
 
        /* remove plugins (so cleanup code gets run) */
@@ -5679,6 +5679,7 @@ int sessionsetup(sessionidt s, tunnelidt t)
                if ((g = grp_groupbysession(s)))
                {
                        grp_setgrouproute(g, 1);
+                       cluster_send_groupe(g);
                }
        }
 
index 4fe9a5c..bce434f 100644 (file)
--- a/l2tpns.h
+++ b/l2tpns.h
@@ -25,7 +25,6 @@
 #define MAXSESSION     60000           // could be up to 65535
 #define MAXTBFS                6000            // Maximum token bucket filters. Might need up to 2 * session.
 #define MAXSESSINGRP   12              // Maximum number of member links in grouped session
-#define MAXGRPINSESS   12              // Maximum number of member links in session group
 #define MAXGROUPE              300     // could be up to 65535, Maximum number of grouped session
 #define MAXROUTEINGRP  15              // max static routes per group
 
@@ -336,25 +335,27 @@ typedef struct
        struct in6_addr ipv6route;      // Static IPv6 route
        sessionidt forwardtosession;    // LNS id_session to forward
        uint8_t src_hwaddr[ETH_ALEN];   // MAC addr source (for pppoe sessions 6 bytes)
-       uint32_t coutgrp_delta;
+       char reserved[4];                               // Space to expand structure without changing HB_VERSION
 }
 sessiont;
 
 typedef struct
 {
        uint32_t tx_rate;
+       uint32_t prev_coutgrp;
        sessionidt sid;
        uint8_t weight;
 }
-groupsessionidt;
+groupsesslistt;
 
 typedef struct
 {
+       int state;                              // current state (groupestate enum)
        uint32_t time_changed;
        groupidt prev;
        sessionidt smax;
        sessionidt smin;
-       groupsessionidt sesslist[MAXSESSINGRP];
+       groupsesslistt sesslist[MAXSESSINGRP];
        routet route[MAXROUTEINGRP];            // static routes
        uint8_t nbroutesgrp;
        uint8_t nbsession;
@@ -549,6 +550,13 @@ enum
        BUNDLEUNDEF,            // Undefined
 };
 
+enum
+{
+       GROUPEFREE,             // Not in use
+       GROUPEOPEN,             // Active bundle
+       GROUPEUNDEF             // Undefined
+};
+
 enum
 {
        NULLCLASS = 0,          //End Point Discriminator classes
@@ -756,8 +764,10 @@ typedef struct
        int             cluster_undefined_sessions;     // How many sessions we're yet to receive from the master.
        int             cluster_undefined_bundles;      // How many bundles we're yet to receive from the master.
        int             cluster_undefined_tunnels;      // How many tunnels we're yet to receive from the master.
+       int             cluster_undefined_groupes;      // How many groupes we're yet to receive from the master.
        int             cluster_highest_sessionid;
        int             cluster_highest_bundleid;
+       int             cluster_highest_groupeid;
        int             cluster_highest_tunnelid;
        clockt          cluster_last_hb;                // Last time we saw a heartbeat from the master.
        int             cluster_last_hb_ver;            // Heartbeat version last seen from master
@@ -996,6 +1006,7 @@ groupidt grp_groupbyip(in_addr_t ip);
 void grp_setgrouproute(groupidt g, int add);
 void grp_time_changed(void);
 void grp_removesession(groupidt g, sessionidt s);
+int grp_cluster_load_groupe(groupidt g, groupsesst *new);
 
 #undef LOG
 #undef LOG_HEX
@@ -1032,7 +1043,7 @@ extern sessiont *session;
 extern sessionlocalt *sess_local;
 extern ippoolt *ip_address_pool;
 extern groupsesst *grpsession;
-groupidt gnextgrpid;
+extern groupidt gnextgrpid;
 #define sessionfree (session[0].next)
 
 
diff --git a/pppoe.c b/pppoe.c
index f804b0e..61b881b 100644 (file)
--- a/pppoe.c
+++ b/pppoe.c
@@ -956,7 +956,6 @@ static void pppoe_forwardto_session_rmlns(uint8_t *pack, int size, sessionidt se
                // Update STAT OUT
                increment_counter(&session[s].cout, &session[s].cout_wrap, ll2tp); // byte count
                session[s].cout_delta += ll2tp;
-               session[s].coutgrp_delta += ll2tp;
                session[s].pout++;
                sess_local[s].cout += ll2tp;
                sess_local[s].pout++;
@@ -1028,7 +1027,6 @@ void pppoe_forwardto_session_pppoe(uint8_t *pack, int size, sessionidt sess, uin
                // Update STAT OUT
                increment_counter(&session[s].cout, &session[s].cout_wrap, lpppoe); // byte count
                session[s].cout_delta += lpppoe;
-               session[s].coutgrp_delta += lpppoe;
                session[s].pout++;
                sess_local[s].cout += lpppoe;
                sess_local[s].pout++;