#include "l2tpns.h"
#include "util.h"
+#include "cluster.h"
#ifdef BGP
#include "bgp.h"
union grp_iphash *idx;
} grp_ip_hash[256]; // Mapping from IP address to group structures.
-static groupidt gnextgrpid = 0;
+groupidt gnextgrpid = 0;
// Find gruop by IP, < 1 for not found
//
//
// This adds it to the routing table, advertises it
// via BGP if enabled, and stuffs it into the
-// 'sessionbyip' cache.
+// 'groupbyip' cache.
//
// 'ip' must be in _host_ order.
//
n_ip = htonl(ip);
netlink_addattr(&req.nh, RTA_DST, &n_ip, sizeof(n_ip));
- LOG(1, 0, 0, "Route (Group) %s %s/%d\n", add ? "add" : "del", fmtaddr(htonl(ip), 0), prefixlen);
+ LOG(3, 0, 0, "Route (Group) %s %s/%d\n", add ? "add" : "del", fmtaddr(htonl(ip), 0), prefixlen);
if (netlink_send(&req.nh) < 0)
LOG(0, 0, 0, "grp_routeset() error in sending netlink message: %s\n", strerror(errno));
g = 0; // Caching the session as '0' is the same as uncaching.
for (i = ip; i < ip+(1<<(32-prefixlen)) ; ++i)
+ {
grp_cache_ipmap(i, g);
+ if (!g) cache_ipmap(i, 0);
+ }
}
}
if (gnextgrpid == g)
{
- gnextgrpid = 0;
+ gnextgrpid = grpsession[g].prev;
}
else
{
}
memset(&grpsession[g], 0, sizeof(grpsession[0]));
+ grpsession[g].state = GROUPEFREE;
}
else
{
&grpsession[g].sesslist[i+1],
(grpsession[g].nbsession - i) * sizeof(grpsession[g].sesslist[i]));
}
+
+ cluster_send_groupe(g);
return;
}
}
// it's the first session of the group, set to next group
grpsession[g].prev = gnextgrpid;
gnextgrpid = g;
+ grpsession[g].state = GROUPEOPEN;
}
grpsession[g].sesslist[i].sid = s;
grpsession[g].sesslist[i].weight = weight;
return;
}
+ if (grpid > config->cluster_highest_groupeid)
+ config->cluster_highest_groupeid = grpid;
+
n++;
}
// Init data structures
void grp_initdata()
{
+ int i;
+
// Set default value (10s)
config->grp_txrate_average_time = 10;
}
memset(grpsession, 0, sizeof(grpsession[0]) * MAXGROUPE);
+ for (i = 1; i < MAXGROUPE; i++)
+ {
+ grpsession[i].state = GROUPEUNDEF;
+ }
}
// Update time_changed of the group
}
}
+// Uncache all IP of a session
+static void grp_uncache_ipsession(groupidt g, sessionidt s)
+{
+ int i;
+ uint8_t *a;
+ in_addr_t ip;
+ in_addr_t n_ip, j;
+ int prefixlen;
+ union iphash *h;
+
+ for (i = 0; i < grpsession[g].nbroutesgrp; i++)
+ {
+ if (grpsession[g].route[i].ip != 0)
+ {
+ prefixlen = grpsession[g].route[i].prefixlen;
+ ip = grpsession[g].route[i].ip & (0xffffffff << (32 - prefixlen)); // Force the ip to be the first one in the route.
+
+ for (j = ip; j < ip+(1<<(32-prefixlen)) ; ++j)
+ {
+ n_ip = htonl(j); // To network order
+ a = (uint8_t *) &n_ip;
+ h = ip_hash;
+
+ if (!(h = h[*a++].idx)) continue;
+ if (!(h = h[*a++].idx)) continue;
+ if (!(h = h[*a++].idx)) continue;
+
+ if (s == h[*a].sess)
+ {
+ h[*a].sess = 0;
+ //LOG(3, s, session[s].tunnel, "UnCaching ip address %s\n", fmtaddr(n_ip, 0));
+ }
+ }
+ }
+ }
+}
+
// return the next session can be used on the group
sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
{
sessionidt s = 0, s2 = 0, s3 = 0;
int i;
- uint32_t ltime_changed = 0;
- uint32_t mintxrate = 0xFFFFFFFF;
+ uint32_t ltime_changed = 0, mintxrate = 0xFFFFFFFF, maxtxrate = 0;
+ uint32_t txrate;
if (g >= MAXGROUPE)
return 0;
- if ((s = sessionbyip(ip)))
+ if (grpsession[g].time_changed >= config->grp_txrate_average_time)
{
- if ( (session[s].ppp.phase > Establish) &&
- (time_now - session[s].last_packet <= (config->echo_timeout + 2)) )
+ // recalculation txrate
+ ltime_changed = grpsession[g].time_changed;
+ grpsession[g].time_changed = 0;
+ for (i = 0; i < grpsession[g].nbsession; i++)
{
- int recaltxrate = 0;
-
- for (i = 0; i < grpsession[g].nbsession; i++)
+ if ((s2 = grpsession[g].sesslist[i].sid))
{
- if (s == grpsession[g].sesslist[i].sid)
+ uint32_t coutgrp_delta = 0;
+
+ if (session[s2].cout >= grpsession[g].sesslist[i].prev_coutgrp)
+ coutgrp_delta = session[s2].cout - grpsession[g].sesslist[i].prev_coutgrp;
+ grpsession[g].sesslist[i].prev_coutgrp = session[s2].cout;
+
+ grpsession[g].sesslist[i].tx_rate = coutgrp_delta/ltime_changed;
+
+ txrate = grpsession[g].sesslist[i].tx_rate/grpsession[g].sesslist[i].weight;
+ if (txrate < mintxrate)
{
- if ((time_now - grpsession[g].sesslist[i].mark_time) > config->grp_txrate_average_time)
+ if ( session[s2].ppp.phase > Establish &&
+ (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
{
- grpsession[g].sesslist[i].mark_time = time_now;
- recaltxrate = 1;
- break;
+ grpsession[g].smin = s2;
+ mintxrate = txrate;
}
}
- }
- if (!recaltxrate)
- return s;
+ if (txrate > maxtxrate)
+ {
+ if ( session[s2].ppp.phase > Establish &&
+ (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
+ {
+ grpsession[g].smax = s2;
+ maxtxrate = txrate;
+ }
+ }
+ }
}
}
- if (grpsession[g].time_changed > config->grp_txrate_average_time)
+ if ((s = sessionbyip(ip)))
{
- ltime_changed = grpsession[g].time_changed;
- grpsession[g].time_changed = 1;
+ if (g != grp_groupbysession(s))
+ {
+ // This session does not belong to this group
+ LOG(2, s, session[s].tunnel, "Warning, the session does not belong to group %d\n", g);
+ s = 0;
+ }
+ else if (s == grpsession[g].smax)
+ {
+ s = grpsession[g].smin;
+ grpsession[g].smax = 0;
+ }
+ else if ( (session[s].ppp.phase > Establish) &&
+ (time_now - session[s].last_packet <= (config->echo_timeout + 1)) )
+ {
+ return s;
+ }
+ else
+ {
+ s = 0;
+ }
}
- for (i = 0; i < grpsession[g].nbsession; i++)
+ if (!s)
{
- if ((s2 = grpsession[g].sesslist[i].sid))
- {
- s3 = s2;
- if (ltime_changed)
- {
- grpsession[g].sesslist[i].tx_rate = session[s2].coutgrp_delta/ltime_changed;
- session[s2].coutgrp_delta = grpsession[g].sesslist[i].tx_rate;
- LOG(3, s2, session[s2].tunnel, "TX Rate: %d session weight: %d\n", grpsession[g].sesslist[i].tx_rate, grpsession[g].sesslist[i].weight);
- }
+ // random between 0 and nbsession-1
+ uint indexsess = (rand() % grpsession[g].nbsession);
+
+ if (indexsess >= grpsession[g].nbsession)
+ indexsess = 0; //Sanity checks.
- if ( session[s2].ppp.phase > Establish &&
- (time_now - session[s2].last_packet <= (config->echo_timeout + 2)) )
+ s2 = grpsession[g].sesslist[indexsess].sid;
+ if (s2 &&
+ (session[s2].ppp.phase > Establish) &&
+ (time_now - session[s2].last_packet <= (config->echo_timeout + 1)))
+ {
+ s = s2;
+ }
+ else
+ {
+ for (i = 0; i < grpsession[g].nbsession; i++)
{
- uint32_t txrate = grpsession[g].sesslist[i].tx_rate/grpsession[g].sesslist[i].weight;
- if (txrate < mintxrate)
+ if ((s2 = grpsession[g].sesslist[i].sid))
{
- s = s2;
- mintxrate = txrate;
+ s3 = s2;
+
+ if ( session[s2].ppp.phase > Establish &&
+ (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
+ {
+ s = s2;
+ break;
+ }
}
}
}
return s;
}
+
+// load a groupe receive from master
+int grp_cluster_load_groupe(groupidt g, groupsesst *new)
+{
+ int i;
+ int updategroup = 0;
+
+ if (g >= MAXGROUPE)
+ {
+ LOG(0, 0, 0, "ERROR: Received a group id > MAXGROUPE!\n");
+ return 0;
+ }
+
+ if ((grpsession[g].nbroutesgrp != new->nbroutesgrp) ||
+ (grpsession[g].nbsession != new->nbsession))
+ {
+ updategroup = 1;
+ }
+
+ if (!updategroup)
+ {
+ // Check session list
+ for (i = 0; i < grpsession[g].nbsession; i++)
+ {
+ if (grpsession[g].sesslist[i].sid != new->sesslist[i].sid)
+ {
+ updategroup = 1;
+ break;
+ }
+ }
+ }
+
+ if (!updategroup)
+ {
+ // Check routes list
+ for (i = 0; i < grpsession[g].nbroutesgrp; i++)
+ {
+ if (grpsession[g].route[i].ip != new->route[i].ip)
+ {
+ updategroup = 1;
+ break;
+ }
+ }
+ }
+
+ // needs update
+ if (updategroup)
+ {
+ // Del all routes
+ grp_setgrouproute(g, 0);
+ }
+
+ memcpy(&grpsession[g], new, sizeof(grpsession[g])); // Copy over..
+
+ // needs update
+ if (updategroup)
+ {
+ // Add all routes
+ grp_setgrouproute(g, 1);
+ }
+
+ return 1;
+}