Add grouping session functionality for load balancing and failover
[l2tpns.git] / grpsess.c
diff --git a/grpsess.c b/grpsess.c
new file mode 100644 (file)
index 0000000..13c6505
--- /dev/null
+++ b/grpsess.c
@@ -0,0 +1,520 @@
+/*
+ * Fernando ALVES 2013
+ * Grouped session for load balancing and fail-over
+ * GPL licenced
+ */
+
+#include <errno.h>
+#include <ctype.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <linux/rtnetlink.h>
+
+#include "l2tpns.h"
+#include "util.h"
+
+#ifdef BGP
+#include "bgp.h"
+#endif
+
+union grp_iphash {
+       groupidt grp;
+       union grp_iphash *idx;
+} grp_ip_hash[256];                    // Mapping from IP address to group structures.
+
+static groupidt gnextgrpid = 0;
+
+// Find gruop by IP, < 1 for not found
+//
+// Confusingly enough, this 'ip' must be
+// in _network_ order. This being the common
+// case when looking it up from IP packet headers.
+static groupidt grp_lookup_ipmap(in_addr_t ip)
+{
+       uint8_t *a = (uint8_t *) &ip;
+       union grp_iphash *h = grp_ip_hash;
+
+       if (!(h = h[*a++].idx)) return 0;
+       if (!(h = h[*a++].idx)) return 0;
+       if (!(h = h[*a++].idx)) return 0;
+
+       return h[*a].grp;
+}
+
+//
+// Take an IP address in HOST byte order and
+// add it to the grouid by IP cache.
+//
+// (It's actually cached in network order)
+//
+static void grp_cache_ipmap(in_addr_t ip, groupidt g)
+{
+       in_addr_t nip = htonl(ip);      // MUST be in network order. I.e. MSB must in be ((char *) (&ip))[0]
+       uint8_t *a = (uint8_t *) &nip;
+       union grp_iphash *h = grp_ip_hash;
+       int i;
+
+       for (i = 0; i < 3; i++)
+       {
+               if (!(h[a[i]].idx || (h[a[i]].idx = calloc(256, sizeof(union grp_iphash)))))
+                       return;
+
+               h = h[a[i]].idx;
+       }
+
+       h[a[3]].grp = g;
+
+       if (g > 0)
+               LOG(4, 0, 0, "Caching Group:%d ip address %s\n", g, fmtaddr(nip, 0));
+       else if (g == 0)
+               LOG(4, 0, 0, "Un-caching Group ip address %s\n", fmtaddr(nip, 0));
+}
+
+groupidt grp_groupbyip(in_addr_t ip)
+{
+       groupidt g = grp_lookup_ipmap(ip);
+
+       if (g > 0 && g < MAXGROUPE)
+               return g;
+
+       return 0;
+}
+
+// Add a route
+//
+// This adds it to the routing table, advertises it
+// via BGP if enabled, and stuffs it into the
+// 'sessionbyip' cache.
+//
+// 'ip' must be in _host_ order.
+//
+static void grp_routeset(groupidt g, in_addr_t ip, int prefixlen, int add)
+{
+       struct {
+               struct nlmsghdr nh;
+               struct rtmsg rt;
+               char buf[32];
+       } req;
+       int i;
+       in_addr_t n_ip;
+
+       if (!prefixlen) prefixlen = 32;
+
+       ip &= 0xffffffff << (32 - prefixlen);;  // Force the ip to be the first one in the route.
+
+       memset(&req, 0, sizeof(req));
+
+       if (add)
+       {
+               req.nh.nlmsg_type = RTM_NEWROUTE;
+               req.nh.nlmsg_flags = NLM_F_REQUEST | NLM_F_CREATE | NLM_F_REPLACE;
+       }
+       else
+       {
+               req.nh.nlmsg_type = RTM_DELROUTE;
+               req.nh.nlmsg_flags = NLM_F_REQUEST;
+       }
+
+       req.nh.nlmsg_len = NLMSG_LENGTH(sizeof(req.rt));
+
+       req.rt.rtm_family = AF_INET;
+       req.rt.rtm_dst_len = prefixlen;
+       req.rt.rtm_table = RT_TABLE_MAIN;
+       req.rt.rtm_protocol = 42;
+       req.rt.rtm_scope = RT_SCOPE_LINK;
+       req.rt.rtm_type = RTN_UNICAST;
+
+       netlink_addattr(&req.nh, RTA_OIF, &tunidx, sizeof(int));
+       n_ip = htonl(ip);
+       netlink_addattr(&req.nh, RTA_DST, &n_ip, sizeof(n_ip));
+
+       LOG(1, 0, 0, "Route (Group) %s %s/%d\n", add ? "add" : "del", fmtaddr(htonl(ip), 0), prefixlen);
+
+       if (netlink_send(&req.nh) < 0)
+               LOG(0, 0, 0, "grp_routeset() error in sending netlink message: %s\n", strerror(errno));
+
+#ifdef BGP
+       if (add)
+               bgp_add_route(htonl(ip), prefixlen);
+       else
+               bgp_del_route(htonl(ip), prefixlen);
+#endif /* BGP */
+
+       // Add/Remove the IPs to the 'groupbyip' cache.
+       // Note that we add the zero address in the case of
+       // a network route. Roll on CIDR.
+
+       // Note that 'g == 0' implies this is the address pool.
+       // We still cache it here, because it will pre-fill
+       // the malloc'ed tree.
+       if (g)
+       {
+               if (!add)       // Are we deleting a route?
+                       g = 0;  // Caching the session as '0' is the same as uncaching.
+
+               for (i = ip; i < ip+(1<<(32-prefixlen)) ; ++i)
+                       grp_cache_ipmap(i, g);
+       }
+}
+
+// Set all route of a group
+void grp_setgrouproute(groupidt g, int add)
+{
+       int i;
+       for (i = 0; i < grpsession[g].nbroutesgrp; i++)
+       {
+               if (grpsession[g].route[i].ip != 0)
+               {
+                       grp_routeset(g, grpsession[g].route[i].ip, grpsession[g].route[i].prefixlen, add);
+               }
+       }
+}
+
+// return group id by session
+groupidt grp_groupbysession(sessionidt s)
+{
+       groupidt g = 0;
+       int i;
+       for (g = gnextgrpid; g != 0; g = grpsession[g].prev)
+       {
+               for (i = 0; i < grpsession[g].nbsession; i++)
+               {
+                       if (grpsession[g].sesslist[i].sid == s)
+                       {       // session found in group
+                               return g;
+                       }
+               }
+       }
+
+       return 0;
+}
+
+// Remove a session to a group
+// return 1 if OK
+void grp_removesession(groupidt g, sessionidt s)
+{
+       int i;
+
+       if (grpsession[g].nbsession <= 0)
+               return;
+
+       for (i = 0; i < grpsession[g].nbsession; i++)
+       {
+               if (grpsession[g].sesslist[i].sid == s)
+               {       // session found on group
+                       --grpsession[g].nbsession;
+                       if (grpsession[g].nbsession == 0)
+                       {
+                               // Group is empty, remove it
+
+                               // Del all routes
+                               grp_setgrouproute(g, 0);
+
+                               if (gnextgrpid == g)
+                               {
+                                       gnextgrpid = 0;
+                               }
+                               else
+                               {
+                                       groupidt g2;
+                                       for (g2 = gnextgrpid; g2 != 0; g2 = grpsession[g2].prev)
+                                       {
+                                               if (grpsession[g2].prev == g)
+                                               {
+                                                       grpsession[g2].prev = grpsession[g].prev;
+                                                       break;
+                                               }
+                                       }
+                               }
+
+                               memset(&grpsession[g], 0, sizeof(grpsession[0]));
+                       }
+                       else
+                       {
+                               // remove the session
+                               memmove(&grpsession[g].sesslist[i],
+                                               &grpsession[g].sesslist[i+1],
+                                               (grpsession[g].nbsession - i) * sizeof(grpsession[g].sesslist[i]));
+                       }
+                       return;
+               }
+       }
+}
+
+// Add a session to a group
+// return 1 if OK
+static int grp_addsession(groupidt g, sessionidt s, uint8_t weight)
+{
+       int i;
+
+       for (i = 0; i < grpsession[g].nbsession; i++)
+       {
+               if (grpsession[g].sesslist[i].sid == s)
+               {       // already in group
+                       if ((!grpsession[g].sesslist[i].weight) || (weight > 1))
+                               grpsession[g].sesslist[i].weight = weight; // update Weight session (for load-balancing)
+
+                       return 1;
+               }
+       }
+
+       if (i >= MAXSESSINGRP)
+       {
+               LOG(1, s, session[s].tunnel, "   Too many session for Group %d\n", g);
+               return 0;
+       }
+       else
+       {       // add session id to group
+               if (i == 0)
+               {
+                       // it's the first session of the group, set to next group
+                       grpsession[g].prev = gnextgrpid;
+                       gnextgrpid = g;
+               }
+               grpsession[g].sesslist[i].sid = s;
+               grpsession[g].sesslist[i].weight = weight;
+               grpsession[g].nbsession++;
+
+               return 1;
+       }
+       return 0;
+}
+
+// Add a route to a group
+// return 1 if OK
+static int grp_addroute(groupidt g, sessionidt s, in_addr_t ip, int prefixlen)
+{
+       int i;
+
+       for (i = 0; i < MAXROUTEINGRP; i++)
+       {
+               if ((i >= grpsession[g].nbroutesgrp))
+               {
+                       LOG(3, s, session[s].tunnel, "   Radius reply Group %d contains route for %s/%d\n",
+                               g, fmtaddr(htonl(ip), 0), prefixlen);
+
+                       grpsession[g].route[i].ip = ip;
+                       grpsession[g].route[i].prefixlen = prefixlen;
+                       grpsession[g].nbroutesgrp++;
+                       return 1;
+               }
+               else if ((grpsession[g].route[i].ip == ip) && (grpsession[g].route[i].prefixlen == prefixlen))
+               {
+                       // route already defined in group
+                       LOG(3, s, session[s].tunnel,
+                               "   Radius reply Group %d contains route for %s/%d (this already defined)\n",
+                               g, fmtaddr(htonl(ip), 0), prefixlen);
+
+                       return 1;
+               }
+               else if (grpsession[g].route[i].ip == 0)
+               {
+                       LOG(3, s, session[s].tunnel, "   Radius reply Group %d contains route for %s/%d (find empty on list!!!)\n",
+                               g, fmtaddr(htonl(ip), 0), prefixlen);
+
+                       grpsession[g].route[i].ip = ip;
+                       grpsession[g].route[i].prefixlen = prefixlen;
+                       return 1;
+               }
+       }
+
+       if (i >= MAXROUTEINGRP)
+       {
+               LOG(1, s, session[s].tunnel, "   Too many routes for Group %d\n", g);
+       }
+       return 0;
+}
+
+// Process Sames vendor specific attribut radius
+void grp_processvendorspecific(sessionidt s, uint8_t *pvs)
+{
+       uint8_t attrib = *pvs;
+       groupidt grpid = 0;
+       uint8_t *n = pvs + 2;
+       uint8_t *e = pvs + pvs[1];
+
+       if ((attrib >= 22) && (attrib <= 23))
+       {
+               while (n < e && isdigit(*n))
+               {
+                       grpid = grpid * 10 + *n - '0';
+                       n++;
+               }
+               if ((grpid == 0) || (grpid >= MAXGROUPE))
+               {
+                       LOG(1, s, session[s].tunnel, "   Group Attribute Id %d not allowed\n", grpid);
+                       return;
+               }
+               else if (*n != ',')
+               {
+                       LOG(1, s, session[s].tunnel, "   Group Attribute Id not defined\n");
+                       return;
+               }
+
+               if (!grp_addsession(grpid, s, 1))
+               {
+                       return;
+               }
+
+               n++;
+       }
+
+       //process, Sames vendor-specific 64520
+       if (attrib == 22) //SAMES-Group-Framed-Route
+       {
+               in_addr_t ip = 0;
+               uint8_t u = 0;
+               uint8_t bits = 0;
+
+               while (n < e && (isdigit(*n) || *n == '.'))
+               {
+                       if (*n == '.')
+                       {
+                               ip = (ip << 8) + u;
+                               u = 0;
+                       }
+                       else
+                               u = u * 10 + *n - '0';
+                       n++;
+               }
+               ip = (ip << 8) + u;
+               if (*n == '/')
+               {
+                       n++;
+                       while (n < e && isdigit(*n))
+                               bits = bits * 10 + *n++ - '0';
+               }
+               else if ((ip >> 24) < 128)
+                       bits = 8;
+               else if ((ip >> 24) < 192)
+                       bits = 16;
+               else
+                       bits = 24;
+
+               if (!grp_addroute(grpid, s, ip, bits))
+                       return;
+       }
+       else if (attrib == 23) //SAMES-Group-Session-Weight
+       {
+               uint8_t weight = 0;
+
+               while (n < e && isdigit(*n))
+                       weight = weight * 10 + *n++ - '0';
+
+               if (!weight)
+               {
+                       LOG(1, s, session[s].tunnel, "   Group-Session-Weight 0 GroupId %d not allowed\n", grpid);
+                       return;
+               }
+               if (!grp_addsession(grpid, s, weight))
+               {
+                       return;
+               }
+       }
+       else
+       {
+               LOG(3, s, session[s].tunnel, "   Unknown vendor-specific: 64520, Attrib: %d\n", attrib);
+       }
+}
+
+// Init data structures
+void grp_initdata()
+{
+       // Set default value (10s)
+       config->grp_txrate_average_time = 10;
+
+       if (!(grpsession = shared_malloc(sizeof(groupsesst) * MAXGROUPE)))
+       {
+               LOG(0, 0, 0, "Error doing malloc for grouped session: %s\n", strerror(errno));
+               exit(1);
+       }
+
+       memset(grpsession, 0, sizeof(grpsession[0]) * MAXGROUPE);
+}
+
+// Update time_changed of the group
+void grp_time_changed()
+{
+       groupidt g;
+
+       for (g = gnextgrpid; g != 0; g = grpsession[g].prev)
+       {
+               grpsession[g].time_changed++;
+       }
+}
+
+// return the next session can be used on the group
+sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
+{
+       sessionidt s = 0, s2 = 0, s3 = 0;
+       int i;
+       uint32_t ltime_changed = 0;
+       uint32_t mintxrate = 0xFFFFFFFF;
+
+       if (g >= MAXGROUPE)
+               return 0;
+
+       if ((s = sessionbyip(ip)))
+       {
+               if ( (session[s].ppp.phase > Establish) &&
+                        (time_now - session[s].last_packet <= (config->echo_timeout + 2)) )
+               {
+                       int recaltxrate = 0;
+
+                       for (i = 0; i < grpsession[g].nbsession; i++)
+                       {
+                               if (s == grpsession[g].sesslist[i].sid)
+                               {
+                                       if ((time_now - grpsession[g].sesslist[i].mark_time) > config->grp_txrate_average_time)
+                                       {
+                                               grpsession[g].sesslist[i].mark_time = time_now;
+                                               recaltxrate = 1;
+                                               break;
+                                       }
+                               }
+                       }
+
+                       if (!recaltxrate)
+                               return s;
+               }
+       }
+
+       if (grpsession[g].time_changed > config->grp_txrate_average_time)
+       {
+               ltime_changed = grpsession[g].time_changed;
+               grpsession[g].time_changed = 1;
+       }
+
+       for (i = 0; i < grpsession[g].nbsession; i++)
+       {
+               if ((s2 = grpsession[g].sesslist[i].sid))
+               {
+                       s3 = s2;
+                       if (ltime_changed)
+                       {
+                               grpsession[g].sesslist[i].tx_rate = session[s2].coutgrp_delta/ltime_changed;
+                               session[s2].coutgrp_delta = grpsession[g].sesslist[i].tx_rate;
+                               LOG(3, s2, session[s2].tunnel, "TX Rate: %d session weight: %d\n", grpsession[g].sesslist[i].tx_rate, grpsession[g].sesslist[i].weight);
+                       }
+
+                       if ( session[s2].ppp.phase > Establish &&
+                               (time_now - session[s2].last_packet <= (config->echo_timeout + 2)) )
+                       {
+                               uint32_t txrate = grpsession[g].sesslist[i].tx_rate/grpsession[g].sesslist[i].weight;
+                               if (txrate < mintxrate)
+                               {
+                                       s = s2;
+                                       mintxrate = txrate;
+                               }
+                       }
+               }
+       }
+
+       if (!s)
+               s = s3;
+
+       if (s)
+               cache_ipmap(ntohl(ip), s);
+
+       return s;
+}