improved load balancing algorithm.
[l2tpns.git] / grpsess.c
1 /*
2 * Fernando ALVES 2013
3 * Grouped session for load balancing and fail-over
4 * GPL licenced
5 */
6
7 #include <errno.h>
8 #include <ctype.h>
9 #include <string.h>
10 #include <sys/socket.h>
11 #include <linux/rtnetlink.h>
12
13 #include "l2tpns.h"
14 #include "util.h"
15 #include "cluster.h"
16
17 #ifdef BGP
18 #include "bgp.h"
19 #endif
20
21 union grp_iphash {
22 groupidt grp;
23 union grp_iphash *idx;
24 } grp_ip_hash[256]; // Mapping from IP address to group structures.
25
26 groupidt gnextgrpid = 0;
27
28 typedef struct
29 {
30 sessionidt sid_loaddist[0x10000];
31 }
32 local_group;
33
34 local_group *grp_local = NULL; // Array of local_group structures.
35
36 // Find gruop by IP, < 1 for not found
37 //
38 // Confusingly enough, this 'ip' must be
39 // in _network_ order. This being the common
40 // case when looking it up from IP packet headers.
41 static groupidt grp_lookup_ipmap(in_addr_t ip)
42 {
43 uint8_t *a = (uint8_t *) &ip;
44 union grp_iphash *h = grp_ip_hash;
45
46 if (!(h = h[*a++].idx)) return 0;
47 if (!(h = h[*a++].idx)) return 0;
48 if (!(h = h[*a++].idx)) return 0;
49
50 return h[*a].grp;
51 }
52
53 //
54 // Take an IP address in HOST byte order and
55 // add it to the grouid by IP cache.
56 //
57 // (It's actually cached in network order)
58 //
59 static void grp_cache_ipmap(in_addr_t ip, groupidt g)
60 {
61 in_addr_t nip = htonl(ip); // MUST be in network order. I.e. MSB must in be ((char *) (&ip))[0]
62 uint8_t *a = (uint8_t *) &nip;
63 union grp_iphash *h = grp_ip_hash;
64 int i;
65
66 for (i = 0; i < 3; i++)
67 {
68 if (!(h[a[i]].idx || (h[a[i]].idx = calloc(256, sizeof(union grp_iphash)))))
69 return;
70
71 h = h[a[i]].idx;
72 }
73
74 h[a[3]].grp = g;
75
76 if (g > 0)
77 LOG(4, 0, 0, "Caching Group:%d ip address %s\n", g, fmtaddr(nip, 0));
78 else if (g == 0)
79 LOG(4, 0, 0, "Un-caching Group ip address %s\n", fmtaddr(nip, 0));
80 }
81
82 groupidt grp_groupbyip(in_addr_t ip)
83 {
84 groupidt g = grp_lookup_ipmap(ip);
85
86 if (g > 0 && g < MAXGROUPE)
87 return g;
88
89 return 0;
90 }
91
92 // Add a route
93 //
94 // This adds it to the routing table, advertises it
95 // via BGP if enabled, and stuffs it into the
96 // 'groupbyip' cache.
97 //
98 // 'ip' must be in _host_ order.
99 //
100 static void grp_routeset(groupidt g, in_addr_t ip, int prefixlen, int add)
101 {
102 struct {
103 struct nlmsghdr nh;
104 struct rtmsg rt;
105 char buf[32];
106 } req;
107 int i;
108 in_addr_t n_ip;
109
110 if (!prefixlen) prefixlen = 32;
111
112 ip &= 0xffffffff << (32 - prefixlen);; // Force the ip to be the first one in the route.
113
114 memset(&req, 0, sizeof(req));
115
116 if (add)
117 {
118 req.nh.nlmsg_type = RTM_NEWROUTE;
119 req.nh.nlmsg_flags = NLM_F_REQUEST | NLM_F_CREATE | NLM_F_REPLACE;
120 }
121 else
122 {
123 req.nh.nlmsg_type = RTM_DELROUTE;
124 req.nh.nlmsg_flags = NLM_F_REQUEST;
125 }
126
127 req.nh.nlmsg_len = NLMSG_LENGTH(sizeof(req.rt));
128
129 req.rt.rtm_family = AF_INET;
130 req.rt.rtm_dst_len = prefixlen;
131 req.rt.rtm_table = RT_TABLE_MAIN;
132 req.rt.rtm_protocol = 42;
133 req.rt.rtm_scope = RT_SCOPE_LINK;
134 req.rt.rtm_type = RTN_UNICAST;
135
136 netlink_addattr(&req.nh, RTA_OIF, &tunidx, sizeof(int));
137 n_ip = htonl(ip);
138 netlink_addattr(&req.nh, RTA_DST, &n_ip, sizeof(n_ip));
139
140 LOG(3, 0, 0, "Route (Group) %s %s/%d\n", add ? "add" : "del", fmtaddr(htonl(ip), 0), prefixlen);
141
142 if (netlink_send(&req.nh) < 0)
143 LOG(0, 0, 0, "grp_routeset() error in sending netlink message: %s\n", strerror(errno));
144
145 #ifdef BGP
146 if (add)
147 bgp_add_route(htonl(ip), prefixlen);
148 else
149 bgp_del_route(htonl(ip), prefixlen);
150 #endif /* BGP */
151
152 // Add/Remove the IPs to the 'groupbyip' cache.
153 // Note that we add the zero address in the case of
154 // a network route. Roll on CIDR.
155
156 // Note that 'g == 0' implies this is the address pool.
157 // We still cache it here, because it will pre-fill
158 // the malloc'ed tree.
159 if (g)
160 {
161 if (!add) // Are we deleting a route?
162 g = 0; // Caching the session as '0' is the same as uncaching.
163
164 for (i = ip; i < ip+(1<<(32-prefixlen)) ; ++i)
165 {
166 grp_cache_ipmap(i, g);
167 if (!g) cache_ipmap(i, 0);
168 }
169 }
170 }
171
172 // Set all route of a group
173 void grp_setgrouproute(groupidt g, int add)
174 {
175 int i;
176 for (i = 0; i < grpsession[g].nbroutesgrp; i++)
177 {
178 if (grpsession[g].route[i].ip != 0)
179 {
180 grp_routeset(g, grpsession[g].route[i].ip, grpsession[g].route[i].prefixlen, add);
181 }
182 }
183 }
184
185 // return group id by session
186 groupidt grp_groupbysession(sessionidt s)
187 {
188 groupidt g = 0;
189 int i;
190 for (g = gnextgrpid; g != 0; g = grpsession[g].prev)
191 {
192 for (i = 0; i < grpsession[g].nbsession; i++)
193 {
194 if (grpsession[g].sesslist[i].sid == s)
195 { // session found in group
196 return g;
197 }
198 }
199 }
200
201 return 0;
202 }
203
204 // Remove a session to a group
205 // return 1 if OK
206 void grp_removesession(groupidt g, sessionidt s)
207 {
208 int i;
209
210 if (grpsession[g].nbsession <= 0)
211 return;
212
213 for (i = 0; i < grpsession[g].nbsession; i++)
214 {
215 if (grpsession[g].sesslist[i].sid == s)
216 { // session found on group
217 --grpsession[g].nbsession;
218 if (grpsession[g].nbsession == 0)
219 {
220 // Group is empty, remove it
221
222 // Del all routes
223 grp_setgrouproute(g, 0);
224
225 if (gnextgrpid == g)
226 {
227 gnextgrpid = grpsession[g].prev;
228 }
229 else
230 {
231 groupidt g2;
232 for (g2 = gnextgrpid; g2 != 0; g2 = grpsession[g2].prev)
233 {
234 if (grpsession[g2].prev == g)
235 {
236 grpsession[g2].prev = grpsession[g].prev;
237 break;
238 }
239 }
240 }
241
242 memset(&grpsession[g], 0, sizeof(grpsession[0]));
243 grpsession[g].state = GROUPEFREE;
244 }
245 else
246 {
247 // remove the session
248 memmove(&grpsession[g].sesslist[i],
249 &grpsession[g].sesslist[i+1],
250 (grpsession[g].nbsession - i) * sizeof(grpsession[g].sesslist[i]));
251 }
252
253 cluster_send_groupe(g);
254 return;
255 }
256 }
257 }
258
259 // Add a session to a group
260 // return 1 if OK
261 static int grp_addsession(groupidt g, sessionidt s, uint8_t weight)
262 {
263 int i;
264
265 for (i = 0; i < grpsession[g].nbsession; i++)
266 {
267 if (grpsession[g].sesslist[i].sid == s)
268 { // already in group
269 if ((!grpsession[g].sesslist[i].weight) || (weight > 1))
270 grpsession[g].sesslist[i].weight = weight; // update Weight session (for load-balancing)
271
272 return 1;
273 }
274 }
275
276 if (i >= MAXSESSINGRP)
277 {
278 LOG(1, s, session[s].tunnel, " Too many session for Group %d\n", g);
279 return 0;
280 }
281 else
282 { // add session id to group
283 if (i == 0)
284 {
285 // it's the first session of the group, set to next group
286 grpsession[g].prev = gnextgrpid;
287 gnextgrpid = g;
288 grpsession[g].state = GROUPEOPEN;
289 }
290 grpsession[g].sesslist[i].sid = s;
291 grpsession[g].sesslist[i].weight = weight;
292 grpsession[g].nbsession++;
293
294 return 1;
295 }
296 return 0;
297 }
298
299 // Add a route to a group
300 // return 1 if OK
301 static int grp_addroute(groupidt g, sessionidt s, in_addr_t ip, int prefixlen)
302 {
303 int i;
304
305 for (i = 0; i < MAXROUTEINGRP; i++)
306 {
307 if ((i >= grpsession[g].nbroutesgrp))
308 {
309 LOG(3, s, session[s].tunnel, " Radius reply Group %d contains route for %s/%d\n",
310 g, fmtaddr(htonl(ip), 0), prefixlen);
311
312 grpsession[g].route[i].ip = ip;
313 grpsession[g].route[i].prefixlen = prefixlen;
314 grpsession[g].nbroutesgrp++;
315 return 1;
316 }
317 else if ((grpsession[g].route[i].ip == ip) && (grpsession[g].route[i].prefixlen == prefixlen))
318 {
319 // route already defined in group
320 LOG(3, s, session[s].tunnel,
321 " Radius reply Group %d contains route for %s/%d (this already defined)\n",
322 g, fmtaddr(htonl(ip), 0), prefixlen);
323
324 return 1;
325 }
326 else if (grpsession[g].route[i].ip == 0)
327 {
328 LOG(3, s, session[s].tunnel, " Radius reply Group %d contains route for %s/%d (find empty on list!!!)\n",
329 g, fmtaddr(htonl(ip), 0), prefixlen);
330
331 grpsession[g].route[i].ip = ip;
332 grpsession[g].route[i].prefixlen = prefixlen;
333 return 1;
334 }
335 }
336
337 if (i >= MAXROUTEINGRP)
338 {
339 LOG(1, s, session[s].tunnel, " Too many routes for Group %d\n", g);
340 }
341 return 0;
342 }
343
344 // Process Sames vendor specific attribut radius
345 void grp_processvendorspecific(sessionidt s, uint8_t *pvs)
346 {
347 uint8_t attrib = *pvs;
348 groupidt grpid = 0;
349 uint8_t *n = pvs + 2;
350 uint8_t *e = pvs + pvs[1];
351
352 if ((attrib >= 22) && (attrib <= 23))
353 {
354 while (n < e && isdigit(*n))
355 {
356 grpid = grpid * 10 + *n - '0';
357 n++;
358 }
359 if ((grpid == 0) || (grpid >= MAXGROUPE))
360 {
361 LOG(1, s, session[s].tunnel, " Group Attribute Id %d not allowed\n", grpid);
362 return;
363 }
364 else if (*n != ',')
365 {
366 LOG(1, s, session[s].tunnel, " Group Attribute Id not defined\n");
367 return;
368 }
369
370 if (!grp_addsession(grpid, s, 1))
371 {
372 return;
373 }
374
375 if (grpid > config->cluster_highest_groupeid)
376 config->cluster_highest_groupeid = grpid;
377
378 n++;
379 }
380
381 //process, Sames vendor-specific 64520
382 if (attrib == 22) //SAMES-Group-Framed-Route
383 {
384 in_addr_t ip = 0;
385 uint8_t u = 0;
386 uint8_t bits = 0;
387
388 while (n < e && (isdigit(*n) || *n == '.'))
389 {
390 if (*n == '.')
391 {
392 ip = (ip << 8) + u;
393 u = 0;
394 }
395 else
396 u = u * 10 + *n - '0';
397 n++;
398 }
399 ip = (ip << 8) + u;
400 if (*n == '/')
401 {
402 n++;
403 while (n < e && isdigit(*n))
404 bits = bits * 10 + *n++ - '0';
405 }
406 else if ((ip >> 24) < 128)
407 bits = 8;
408 else if ((ip >> 24) < 192)
409 bits = 16;
410 else
411 bits = 24;
412
413 if (!grp_addroute(grpid, s, ip, bits))
414 return;
415 }
416 else if (attrib == 23) //SAMES-Group-Session-Weight
417 {
418 uint8_t weight = 0;
419
420 while (n < e && isdigit(*n))
421 weight = weight * 10 + *n++ - '0';
422
423 if (!weight)
424 {
425 LOG(1, s, session[s].tunnel, " Group-Session-Weight 0 GroupId %d not allowed\n", grpid);
426 return;
427 }
428 if (!grp_addsession(grpid, s, weight))
429 {
430 return;
431 }
432 }
433 else
434 {
435 LOG(3, s, session[s].tunnel, " Unknown vendor-specific: 64520, Attrib: %d\n", attrib);
436 }
437 }
438
439 // Init data structures
440 void grp_initdata()
441 {
442 int i;
443
444 // Set default value (10s)
445 config->grp_txrate_average_time = 10;
446
447 if (!(grpsession = shared_malloc(sizeof(groupsesst) * MAXGROUPE)))
448 {
449 LOG(0, 0, 0, "Error doing malloc for grouped session: %s\n", strerror(errno));
450 exit(1);
451 }
452
453 memset(grpsession, 0, sizeof(grpsession[0]) * MAXGROUPE);
454 for (i = 1; i < MAXGROUPE; i++)
455 {
456 grpsession[i].state = GROUPEUNDEF;
457 }
458
459 if (!(grp_local = shared_malloc(sizeof(local_group) * MAXGROUPE)))
460 {
461 LOG(0, 0, 0, "Error doing malloc for grp_local: %s\n", strerror(errno));
462 exit(1);
463 }
464 memset(grp_local, 0, sizeof(grp_local[0]) * MAXGROUPE);
465
466 }
467
468 // Update time_changed of the group
469 void grp_time_changed()
470 {
471 groupidt g;
472
473 for (g = gnextgrpid; g != 0; g = grpsession[g].prev)
474 {
475 grpsession[g].time_changed++;
476 }
477 }
478
479 // Uncache all IP of a session
480 static void grp_uncache_ipsession(groupidt g, sessionidt s)
481 {
482 int i;
483 uint8_t *a;
484 in_addr_t ip;
485 in_addr_t n_ip, j;
486 int prefixlen;
487 union iphash *h;
488
489 for (i = 0; i < grpsession[g].nbroutesgrp; i++)
490 {
491 if (grpsession[g].route[i].ip != 0)
492 {
493 prefixlen = grpsession[g].route[i].prefixlen;
494 ip = grpsession[g].route[i].ip & (0xffffffff << (32 - prefixlen)); // Force the ip to be the first one in the route.
495
496 for (j = ip; j < ip+(1<<(32-prefixlen)) ; ++j)
497 {
498 n_ip = htonl(j); // To network order
499 a = (uint8_t *) &n_ip;
500 h = ip_hash;
501
502 if (!(h = h[*a++].idx)) continue;
503 if (!(h = h[*a++].idx)) continue;
504 if (!(h = h[*a++].idx)) continue;
505
506 if (s == h[*a].sess)
507 {
508 h[*a].sess = 0;
509 //LOG(3, s, session[s].tunnel, "UnCaching ip address %s\n", fmtaddr(n_ip, 0));
510 }
511 }
512 }
513 }
514 }
515
516 uint16_t guint16_index_loadlist;
517 // return the next session can be used on the group
518 sessionidt grp_getnextsession(groupidt g, in_addr_t ip, in_addr_t ip_src)
519 {
520 sessionidt s = 0, s2 = 0, s3 = 0;
521 int i;
522 uint32_t ltime_changed = 0, mintxrate = 0xFFFFFFFF, maxtxrate = 0;
523 uint32_t txrate;
524
525 if (g >= MAXGROUPE)
526 return 0;
527
528 if (grpsession[g].time_changed >= config->grp_txrate_average_time)
529 {
530 // recalculation txrate
531 ltime_changed = grpsession[g].time_changed;
532 grpsession[g].time_changed = 0;
533 for (i = 0; i < grpsession[g].nbsession; i++)
534 {
535 if ((s2 = grpsession[g].sesslist[i].sid))
536 {
537 uint32_t coutgrp_delta = 0;
538
539 if (session[s2].cout >= grpsession[g].sesslist[i].prev_coutgrp)
540 coutgrp_delta = session[s2].cout - grpsession[g].sesslist[i].prev_coutgrp;
541 grpsession[g].sesslist[i].prev_coutgrp = session[s2].cout;
542
543 txrate = (txrate + (coutgrp_delta/ltime_changed)) >> 1;
544 grpsession[g].sesslist[i].tx_rate = txrate;
545
546 txrate = grpsession[g].sesslist[i].tx_rate/grpsession[g].sesslist[i].weight;
547 if (txrate < mintxrate)
548 {
549 if ( session[s2].ppp.phase > Establish &&
550 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
551 {
552 grpsession[g].smin = s2;
553 mintxrate = txrate;
554 }
555 }
556
557 if (txrate > maxtxrate)
558 {
559 if ( session[s2].ppp.phase > Establish &&
560 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
561 {
562 grpsession[g].smax = s2;
563 maxtxrate = txrate;
564 }
565 }
566 }
567 }
568 }
569
570 if ((s = sessionbyip(ip)))
571 {
572 uint8_t *as = (uint8_t *) &ip_src;
573 uint8_t *ad = (uint8_t *) &ip;
574 uint16_t ai = ad[3];
575 ai <<= 8;
576 ai |= as[3];
577
578 s = grp_local[g].sid_loaddist[ai];
579 if (!s)
580 {
581 s = grpsession[g].smin;
582 grp_local[g].sid_loaddist[ai] = s;
583 }
584
585 if (g != grp_groupbysession(s))
586 {
587 // This session does not belong to this group
588 LOG(3, s, session[s].tunnel, "Warning, the session does not belong to group %d\n", g);
589 s = 0;
590 grp_local[g].sid_loaddist[ai] = 0;
591 }
592 else if ( (session[s].ppp.phase > Establish) &&
593 (time_now - session[s].last_packet <= (config->echo_timeout + 1)) )
594 {
595 grp_local[g].sid_loaddist[guint16_index_loadlist++] = 0;
596 return s;
597 }
598 else
599 {
600 s = 0;
601 grp_local[g].sid_loaddist[ai] = 0;
602 }
603 }
604
605 if (!s)
606 {
607 // random between 0 and nbsession-1
608 uint indexsess = (rand() % grpsession[g].nbsession);
609
610 if (indexsess >= grpsession[g].nbsession)
611 indexsess = 0; //Sanity checks.
612
613 s2 = grpsession[g].sesslist[indexsess].sid;
614 if (s2 &&
615 (session[s2].ppp.phase > Establish) &&
616 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)))
617 {
618 s = s2;
619 }
620 else
621 {
622 for (i = 0; i < grpsession[g].nbsession; i++)
623 {
624 if ((s2 = grpsession[g].sesslist[i].sid))
625 {
626 s3 = s2;
627
628 if ( session[s2].ppp.phase > Establish &&
629 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
630 {
631 s = s2;
632 break;
633 }
634 }
635 }
636 }
637 }
638
639 if (!s)
640 s = s3;
641
642 if (s)
643 cache_ipmap(ntohl(ip), s);
644
645 return s;
646 }
647
648 // load a groupe receive from master
649 int grp_cluster_load_groupe(groupidt g, groupsesst *new)
650 {
651 int i;
652 int updategroup = 0;
653
654 if (g >= MAXGROUPE)
655 {
656 LOG(0, 0, 0, "ERROR: Received a group id > MAXGROUPE!\n");
657 return 0;
658 }
659
660 if ((grpsession[g].nbroutesgrp != new->nbroutesgrp) ||
661 (grpsession[g].nbsession != new->nbsession))
662 {
663 updategroup = 1;
664 }
665
666 if (!updategroup)
667 {
668 // Check session list
669 for (i = 0; i < grpsession[g].nbsession; i++)
670 {
671 if (grpsession[g].sesslist[i].sid != new->sesslist[i].sid)
672 {
673 updategroup = 1;
674 break;
675 }
676 }
677 }
678
679 if (!updategroup)
680 {
681 // Check routes list
682 for (i = 0; i < grpsession[g].nbroutesgrp; i++)
683 {
684 if (grpsession[g].route[i].ip != new->route[i].ip)
685 {
686 updategroup = 1;
687 break;
688 }
689 }
690 }
691
692 // needs update
693 if (updategroup)
694 {
695 // Del all routes
696 grp_setgrouproute(g, 0);
697 }
698
699 memcpy(&grpsession[g], new, sizeof(grpsession[g])); // Copy over..
700
701 // needs update
702 if (updategroup)
703 {
704 // Add all routes
705 grp_setgrouproute(g, 1);
706 }
707
708 return 1;
709 }