Fix cluster group update
[l2tpns.git] / grpsess.c
index a26c489..7f897b3 100644 (file)
--- a/grpsess.c
+++ b/grpsess.c
@@ -12,6 +12,7 @@
 
 #include "l2tpns.h"
 #include "util.h"
 
 #include "l2tpns.h"
 #include "util.h"
+#include "cluster.h"
 
 #ifdef BGP
 #include "bgp.h"
 
 #ifdef BGP
 #include "bgp.h"
@@ -22,7 +23,7 @@ union grp_iphash {
        union grp_iphash *idx;
 } grp_ip_hash[256];                    // Mapping from IP address to group structures.
 
        union grp_iphash *idx;
 } grp_ip_hash[256];                    // Mapping from IP address to group structures.
 
-static groupidt gnextgrpid = 0;
+groupidt gnextgrpid = 0;
 
 // Find gruop by IP, < 1 for not found
 //
 
 // Find gruop by IP, < 1 for not found
 //
@@ -84,7 +85,7 @@ groupidt grp_groupbyip(in_addr_t ip)
 //
 // This adds it to the routing table, advertises it
 // via BGP if enabled, and stuffs it into the
 //
 // This adds it to the routing table, advertises it
 // via BGP if enabled, and stuffs it into the
-// 'sessionbyip' cache.
+// 'groupbyip' cache.
 //
 // 'ip' must be in _host_ order.
 //
 //
 // 'ip' must be in _host_ order.
 //
@@ -128,7 +129,7 @@ static void grp_routeset(groupidt g, in_addr_t ip, int prefixlen, int add)
        n_ip = htonl(ip);
        netlink_addattr(&req.nh, RTA_DST, &n_ip, sizeof(n_ip));
 
        n_ip = htonl(ip);
        netlink_addattr(&req.nh, RTA_DST, &n_ip, sizeof(n_ip));
 
-       LOG(1, 0, 0, "Route (Group) %s %s/%d\n", add ? "add" : "del", fmtaddr(htonl(ip), 0), prefixlen);
+       LOG(3, 0, 0, "Route (Group) %s %s/%d\n", add ? "add" : "del", fmtaddr(htonl(ip), 0), prefixlen);
 
        if (netlink_send(&req.nh) < 0)
                LOG(0, 0, 0, "grp_routeset() error in sending netlink message: %s\n", strerror(errno));
 
        if (netlink_send(&req.nh) < 0)
                LOG(0, 0, 0, "grp_routeset() error in sending netlink message: %s\n", strerror(errno));
@@ -153,7 +154,10 @@ static void grp_routeset(groupidt g, in_addr_t ip, int prefixlen, int add)
                        g = 0;  // Caching the session as '0' is the same as uncaching.
 
                for (i = ip; i < ip+(1<<(32-prefixlen)) ; ++i)
                        g = 0;  // Caching the session as '0' is the same as uncaching.
 
                for (i = ip; i < ip+(1<<(32-prefixlen)) ; ++i)
+               {
                        grp_cache_ipmap(i, g);
                        grp_cache_ipmap(i, g);
+                       if (!g) cache_ipmap(i, 0);
+               }
        }
 }
 
        }
 }
 
@@ -212,7 +216,7 @@ void grp_removesession(groupidt g, sessionidt s)
 
                                if (gnextgrpid == g)
                                {
 
                                if (gnextgrpid == g)
                                {
-                                       gnextgrpid = 0;
+                                       gnextgrpid = grpsession[g].prev;
                                }
                                else
                                {
                                }
                                else
                                {
@@ -228,6 +232,7 @@ void grp_removesession(groupidt g, sessionidt s)
                                }
 
                                memset(&grpsession[g], 0, sizeof(grpsession[0]));
                                }
 
                                memset(&grpsession[g], 0, sizeof(grpsession[0]));
+                               grpsession[g].state = GROUPEFREE;
                        }
                        else
                        {
                        }
                        else
                        {
@@ -236,6 +241,8 @@ void grp_removesession(groupidt g, sessionidt s)
                                                &grpsession[g].sesslist[i+1],
                                                (grpsession[g].nbsession - i) * sizeof(grpsession[g].sesslist[i]));
                        }
                                                &grpsession[g].sesslist[i+1],
                                                (grpsession[g].nbsession - i) * sizeof(grpsession[g].sesslist[i]));
                        }
+
+                       cluster_send_groupe(g);
                        return;
                }
        }
                        return;
                }
        }
@@ -270,6 +277,7 @@ static int grp_addsession(groupidt g, sessionidt s, uint8_t weight)
                        // it's the first session of the group, set to next group
                        grpsession[g].prev = gnextgrpid;
                        gnextgrpid = g;
                        // it's the first session of the group, set to next group
                        grpsession[g].prev = gnextgrpid;
                        gnextgrpid = g;
+                       grpsession[g].state = GROUPEOPEN;
                }
                grpsession[g].sesslist[i].sid = s;
                grpsession[g].sesslist[i].weight = weight;
                }
                grpsession[g].sesslist[i].sid = s;
                grpsession[g].sesslist[i].weight = weight;
@@ -356,6 +364,9 @@ void grp_processvendorspecific(sessionidt s, uint8_t *pvs)
                        return;
                }
 
                        return;
                }
 
+               if (grpid > config->cluster_highest_groupeid)
+                       config->cluster_highest_groupeid = grpid;
+
                n++;
        }
 
                n++;
        }
 
@@ -420,6 +431,8 @@ void grp_processvendorspecific(sessionidt s, uint8_t *pvs)
 // Init data structures
 void grp_initdata()
 {
 // Init data structures
 void grp_initdata()
 {
+       int i;
+
        // Set default value (10s)
        config->grp_txrate_average_time = 10;
 
        // Set default value (10s)
        config->grp_txrate_average_time = 10;
 
@@ -430,6 +443,10 @@ void grp_initdata()
        }
 
        memset(grpsession, 0, sizeof(grpsession[0]) * MAXGROUPE);
        }
 
        memset(grpsession, 0, sizeof(grpsession[0]) * MAXGROUPE);
+       for (i = 1; i < MAXGROUPE; i++)
+       {
+               grpsession[i].state = GROUPEUNDEF;
+       }
 }
 
 // Update time_changed of the group
 }
 
 // Update time_changed of the group
@@ -483,7 +500,7 @@ static void grp_uncache_ipsession(groupidt g, sessionidt s)
 // return the next session can be used on the group
 sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
 {
 // return the next session can be used on the group
 sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
 {
-       sessionidt s = 0, s2 = 0, s3 = 0, smax = 0;
+       sessionidt s = 0, s2 = 0, s3 = 0;
        int i;
        uint32_t ltime_changed = 0, mintxrate = 0xFFFFFFFF, maxtxrate = 0;
        uint32_t txrate;
        int i;
        uint32_t ltime_changed = 0, mintxrate = 0xFFFFFFFF, maxtxrate = 0;
        uint32_t txrate;
@@ -491,35 +508,22 @@ sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
        if (g >= MAXGROUPE)
                return 0;
 
        if (g >= MAXGROUPE)
                return 0;
 
-       if (grpsession[g].time_changed < config->grp_txrate_average_time)
-       {
-               if ((s = sessionbyip(ip)))
-               {
-                       if ( (session[s].ppp.phase > Establish) &&
-                                (time_now - session[s].last_packet <= (config->echo_timeout + 1)) )
-                       {
-                               return s;
-                       }
-                       s = 0;
-               }
-       }
-       else
+       if (grpsession[g].time_changed >= config->grp_txrate_average_time)
        {
                // recalculation txrate
                ltime_changed = grpsession[g].time_changed;
                grpsession[g].time_changed = 0;
        {
                // recalculation txrate
                ltime_changed = grpsession[g].time_changed;
                grpsession[g].time_changed = 0;
-               s = 0;
                for (i = 0; i < grpsession[g].nbsession; i++)
                {
                        if ((s2 = grpsession[g].sesslist[i].sid))
                        {
                for (i = 0; i < grpsession[g].nbsession; i++)
                {
                        if ((s2 = grpsession[g].sesslist[i].sid))
                        {
-                               s3 = s2;
+                               uint32_t coutgrp_delta = 0;
 
 
-                               grpsession[g].sesslist[i].tx_rate = session[s2].coutgrp_delta/ltime_changed;
-                               session[s2].coutgrp_delta = 0;
+                               if (session[s2].cout >= grpsession[g].sesslist[i].prev_coutgrp)
+                                       coutgrp_delta = session[s2].cout - grpsession[g].sesslist[i].prev_coutgrp;
+                               grpsession[g].sesslist[i].prev_coutgrp = session[s2].cout;
 
 
-                               //LOG(3, s2, session[s2].tunnel, "TX Rate: %d session weight: %d\n",
-                               //      grpsession[g].sesslist[i].tx_rate, grpsession[g].sesslist[i].weight);
+                               grpsession[g].sesslist[i].tx_rate = coutgrp_delta/ltime_changed;
 
                                txrate = grpsession[g].sesslist[i].tx_rate/grpsession[g].sesslist[i].weight;
                                if (txrate < mintxrate)
 
                                txrate = grpsession[g].sesslist[i].tx_rate/grpsession[g].sesslist[i].weight;
                                if (txrate < mintxrate)
@@ -527,7 +531,7 @@ sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
                                        if ( session[s2].ppp.phase > Establish &&
                                                (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
                                        {
                                        if ( session[s2].ppp.phase > Establish &&
                                                (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
                                        {
-                                               s = s2;
+                                               grpsession[g].smin = s2;
                                                mintxrate = txrate;
                                        }
                                }
                                                mintxrate = txrate;
                                        }
                                }
@@ -537,16 +541,29 @@ sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
                                        if ( session[s2].ppp.phase > Establish &&
                                        (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
                                        {
                                        if ( session[s2].ppp.phase > Establish &&
                                        (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
                                        {
-                                               smax = s2;
+                                               grpsession[g].smax = s2;
                                                maxtxrate = txrate;
                                        }
                                }
                        }
                }
                                                maxtxrate = txrate;
                                        }
                                }
                        }
                }
+       }
 
 
-               if (smax && (maxtxrate != mintxrate))
+       if ((s = sessionbyip(ip)))
+       {
+               if (s == grpsession[g].smax)
                {
                {
-                       grp_uncache_ipsession(g, smax);
+                       s = grpsession[g].smin;
+                       grpsession[g].smax = 0;
+               }
+               else if ( (session[s].ppp.phase > Establish) &&
+                        (time_now - session[s].last_packet <= (config->echo_timeout + 1)) )
+               {
+                       return s;
+               }
+               else
+               {
+                       s = 0;
                }
        }
 
                }
        }
 
@@ -564,7 +581,6 @@ sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
                        (time_now - session[s2].last_packet <= (config->echo_timeout + 1)))
                {
                        s = s2;
                        (time_now - session[s2].last_packet <= (config->echo_timeout + 1)))
                {
                        s = s2;
-                       //LOG(3, s, session[s].tunnel, "New random session\n");
                }
                else
                {
                }
                else
                {
@@ -593,3 +609,66 @@ sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
 
        return s;
 }
 
        return s;
 }
+
+// load a groupe receive from master
+int grp_cluster_load_groupe(groupidt g, groupsesst *new)
+{
+       int i;
+       int updategroup = 0;
+
+       if (g >= MAXGROUPE)
+       {
+               LOG(0, 0, 0, "ERROR: Received a group id > MAXGROUPE!\n");
+               return 0;
+       }
+
+       if ((grpsession[g].nbroutesgrp != new->nbroutesgrp) ||
+               (grpsession[g].nbsession != new->nbsession))
+       {
+               updategroup = 1;
+       }
+
+       if (!updategroup)
+       {
+               // Check session list
+               for (i = 0; i < grpsession[g].nbsession; i++)
+               {
+                       if (grpsession[g].sesslist[i].sid != new->sesslist[i].sid)
+                       {
+                               updategroup = 1;
+                               break;
+                       }
+               }
+       }
+
+       if (!updategroup)
+       {
+               // Check routes list
+               for (i = 0; i < grpsession[g].nbroutesgrp; i++)
+               {
+                       if (grpsession[g].route[i].ip != new->route[i].ip)
+                       {
+                               updategroup = 1;
+                               break;
+                       }
+               }
+       }
+
+       // needs update
+       if (updategroup)
+       {
+               // Del all routes
+               grp_setgrouproute(g, 0);
+       }
+
+       memcpy(&grpsession[g], new, sizeof(grpsession[g]));     // Copy over..
+
+       // needs update
+       if (updategroup)
+       {
+               // Add all routes
+               grp_setgrouproute(g, 1);
+       }
+
+       return 1;
+}