Fix: improved load balancing algorithm
[l2tpns.git] / grpsess.c
1 /*
2 * Fernando ALVES 2013
3 * Grouped session for load balancing and fail-over
4 * GPL licenced
5 */
6
7 #include <errno.h>
8 #include <ctype.h>
9 #include <string.h>
10 #include <sys/socket.h>
11 #include <linux/rtnetlink.h>
12
13 #include "l2tpns.h"
14 #include "util.h"
15 #include "cluster.h"
16
17 #ifdef BGP
18 #include "bgp.h"
19 #endif
20
21 union grp_iphash {
22 groupidt grp;
23 union grp_iphash *idx;
24 } grp_ip_hash[256]; // Mapping from IP address to group structures.
25
26 groupidt gnextgrpid = 0;
27
28 // Find gruop by IP, < 1 for not found
29 //
30 // Confusingly enough, this 'ip' must be
31 // in _network_ order. This being the common
32 // case when looking it up from IP packet headers.
33 static groupidt grp_lookup_ipmap(in_addr_t ip)
34 {
35 uint8_t *a = (uint8_t *) &ip;
36 union grp_iphash *h = grp_ip_hash;
37
38 if (!(h = h[*a++].idx)) return 0;
39 if (!(h = h[*a++].idx)) return 0;
40 if (!(h = h[*a++].idx)) return 0;
41
42 return h[*a].grp;
43 }
44
45 typedef struct
46 {
47 sessionidt sid_loaddist[0x10000];
48 }
49 local_group;
50
51 local_group *grp_local = NULL; // Array of local_group structures.
52
53 //
54 // Take an IP address in HOST byte order and
55 // add it to the grouid by IP cache.
56 //
57 // (It's actually cached in network order)
58 //
59 static void grp_cache_ipmap(in_addr_t ip, groupidt g)
60 {
61 in_addr_t nip = htonl(ip); // MUST be in network order. I.e. MSB must in be ((char *) (&ip))[0]
62 uint8_t *a = (uint8_t *) &nip;
63 union grp_iphash *h = grp_ip_hash;
64 int i;
65
66 for (i = 0; i < 3; i++)
67 {
68 if (!(h[a[i]].idx || (h[a[i]].idx = calloc(256, sizeof(union grp_iphash)))))
69 return;
70
71 h = h[a[i]].idx;
72 }
73
74 h[a[3]].grp = g;
75
76 if (g > 0)
77 LOG(4, 0, 0, "Caching Group:%d ip address %s\n", g, fmtaddr(nip, 0));
78 else if (g == 0)
79 LOG(4, 0, 0, "Un-caching Group ip address %s\n", fmtaddr(nip, 0));
80 }
81
82 groupidt grp_groupbyip(in_addr_t ip)
83 {
84 groupidt g = grp_lookup_ipmap(ip);
85
86 if (g > 0 && g < MAXGROUPE)
87 return g;
88
89 return 0;
90 }
91
92 // Add a route
93 //
94 // This adds it to the routing table, advertises it
95 // via BGP if enabled, and stuffs it into the
96 // 'groupbyip' cache.
97 //
98 // 'ip' must be in _host_ order.
99 //
100 static void grp_routeset(groupidt g, in_addr_t ip, int prefixlen, int add)
101 {
102 struct {
103 struct nlmsghdr nh;
104 struct rtmsg rt;
105 char buf[32];
106 } req;
107 int i;
108 in_addr_t n_ip;
109
110 if (!prefixlen) prefixlen = 32;
111
112 ip &= 0xffffffff << (32 - prefixlen);; // Force the ip to be the first one in the route.
113
114 memset(&req, 0, sizeof(req));
115
116 if (add)
117 {
118 req.nh.nlmsg_type = RTM_NEWROUTE;
119 req.nh.nlmsg_flags = NLM_F_REQUEST | NLM_F_CREATE | NLM_F_REPLACE;
120 }
121 else
122 {
123 req.nh.nlmsg_type = RTM_DELROUTE;
124 req.nh.nlmsg_flags = NLM_F_REQUEST;
125 }
126
127 req.nh.nlmsg_len = NLMSG_LENGTH(sizeof(req.rt));
128
129 req.rt.rtm_family = AF_INET;
130 req.rt.rtm_dst_len = prefixlen;
131 req.rt.rtm_table = RT_TABLE_MAIN;
132 req.rt.rtm_protocol = 42;
133 req.rt.rtm_scope = RT_SCOPE_LINK;
134 req.rt.rtm_type = RTN_UNICAST;
135
136 netlink_addattr(&req.nh, RTA_OIF, &tunidx, sizeof(int));
137 n_ip = htonl(ip);
138 netlink_addattr(&req.nh, RTA_DST, &n_ip, sizeof(n_ip));
139
140 LOG(3, 0, 0, "Route (Group) %s %s/%d\n", add ? "add" : "del", fmtaddr(htonl(ip), 0), prefixlen);
141
142 if (netlink_send(&req.nh) < 0)
143 LOG(0, 0, 0, "grp_routeset() error in sending netlink message: %s\n", strerror(errno));
144
145 #ifdef BGP
146 if (add)
147 bgp_add_route(htonl(ip), prefixlen);
148 else
149 bgp_del_route(htonl(ip), prefixlen);
150 #endif /* BGP */
151
152 // Add/Remove the IPs to the 'groupbyip' cache.
153 // Note that we add the zero address in the case of
154 // a network route. Roll on CIDR.
155
156 // Note that 'g == 0' implies this is the address pool.
157 // We still cache it here, because it will pre-fill
158 // the malloc'ed tree.
159 if (g)
160 {
161 if (!add) // Are we deleting a route?
162 g = 0; // Caching the session as '0' is the same as uncaching.
163
164 for (i = ip; i < ip+(1<<(32-prefixlen)) ; ++i)
165 {
166 grp_cache_ipmap(i, g);
167 if (!g) cache_ipmap(i, 0);
168 }
169 }
170 }
171
172 // Set all route of a group
173 void grp_setgrouproute(groupidt g, int add)
174 {
175 int i;
176 for (i = 0; i < grpsession[g].nbroutesgrp; i++)
177 {
178 if (grpsession[g].route[i].ip != 0)
179 {
180 grp_routeset(g, grpsession[g].route[i].ip, grpsession[g].route[i].prefixlen, add);
181 }
182 }
183 }
184
185 // return group id by session
186 groupidt grp_groupbysession(sessionidt s)
187 {
188 groupidt g = 0;
189 int i;
190 for (g = gnextgrpid; g != 0; g = grpsession[g].prev)
191 {
192 for (i = 0; i < grpsession[g].nbsession; i++)
193 {
194 if (grpsession[g].sesslist[i].sid == s)
195 { // session found in group
196 return g;
197 }
198 }
199 }
200
201 return 0;
202 }
203
204 // Remove a session to a group
205 // return 1 if OK
206 void grp_removesession(groupidt g, sessionidt s)
207 {
208 int i;
209
210 if (grpsession[g].nbsession <= 0)
211 return;
212
213 for (i = 0; i < grpsession[g].nbsession; i++)
214 {
215 if (grpsession[g].sesslist[i].sid == s)
216 { // session found on group
217 --grpsession[g].nbsession;
218 if (grpsession[g].nbsession == 0)
219 {
220 // Group is empty, remove it
221
222 // Del all routes
223 grp_setgrouproute(g, 0);
224
225 if (gnextgrpid == g)
226 {
227 gnextgrpid = grpsession[g].prev;
228 }
229 else
230 {
231 groupidt g2;
232 for (g2 = gnextgrpid; g2 != 0; g2 = grpsession[g2].prev)
233 {
234 if (grpsession[g2].prev == g)
235 {
236 grpsession[g2].prev = grpsession[g].prev;
237 break;
238 }
239 }
240 }
241
242 memset(&grpsession[g], 0, sizeof(grpsession[0]));
243 grpsession[g].state = GROUPEFREE;
244 }
245 else
246 {
247 // remove the session
248 memmove(&grpsession[g].sesslist[i],
249 &grpsession[g].sesslist[i+1],
250 (grpsession[g].nbsession - i) * sizeof(grpsession[g].sesslist[i]));
251 }
252
253 cluster_send_groupe(g);
254 return;
255 }
256 }
257 }
258
259 // Add a session to a group
260 // return 1 if OK
261 static int grp_addsession(groupidt g, sessionidt s, uint8_t weight)
262 {
263 int i;
264
265 for (i = 0; i < grpsession[g].nbsession; i++)
266 {
267 if (grpsession[g].sesslist[i].sid == s)
268 { // already in group
269 if ((!grpsession[g].sesslist[i].weight) || (weight > 1))
270 grpsession[g].sesslist[i].weight = weight; // update Weight session (for load-balancing)
271
272 return 1;
273 }
274 }
275
276 if (i >= MAXSESSINGRP)
277 {
278 LOG(1, s, session[s].tunnel, " Too many session for Group %d\n", g);
279 return 0;
280 }
281 else
282 { // add session id to group
283 if (i == 0)
284 {
285 // it's the first session of the group, set to next group
286 grpsession[g].prev = gnextgrpid;
287 gnextgrpid = g;
288 grpsession[g].state = GROUPEOPEN;
289 }
290 grpsession[g].sesslist[i].sid = s;
291 grpsession[g].sesslist[i].weight = weight;
292 grpsession[g].nbsession++;
293
294 return 1;
295 }
296 return 0;
297 }
298
299 // Add a route to a group
300 // return 1 if OK
301 static int grp_addroute(groupidt g, sessionidt s, in_addr_t ip, int prefixlen)
302 {
303 int i;
304
305 for (i = 0; i < MAXROUTEINGRP; i++)
306 {
307 if ((i >= grpsession[g].nbroutesgrp))
308 {
309 LOG(3, s, session[s].tunnel, " Radius reply Group %d contains route for %s/%d\n",
310 g, fmtaddr(htonl(ip), 0), prefixlen);
311
312 grpsession[g].route[i].ip = ip;
313 grpsession[g].route[i].prefixlen = prefixlen;
314 grpsession[g].nbroutesgrp++;
315 return 1;
316 }
317 else if ((grpsession[g].route[i].ip == ip) && (grpsession[g].route[i].prefixlen == prefixlen))
318 {
319 // route already defined in group
320 LOG(3, s, session[s].tunnel,
321 " Radius reply Group %d contains route for %s/%d (this already defined)\n",
322 g, fmtaddr(htonl(ip), 0), prefixlen);
323
324 return 1;
325 }
326 else if (grpsession[g].route[i].ip == 0)
327 {
328 LOG(3, s, session[s].tunnel, " Radius reply Group %d contains route for %s/%d (find empty on list!!!)\n",
329 g, fmtaddr(htonl(ip), 0), prefixlen);
330
331 grpsession[g].route[i].ip = ip;
332 grpsession[g].route[i].prefixlen = prefixlen;
333 return 1;
334 }
335 }
336
337 if (i >= MAXROUTEINGRP)
338 {
339 LOG(1, s, session[s].tunnel, " Too many routes for Group %d\n", g);
340 }
341 return 0;
342 }
343
344 // Process Sames vendor specific attribut radius
345 void grp_processvendorspecific(sessionidt s, uint8_t *pvs)
346 {
347 uint8_t attrib = *pvs;
348 groupidt grpid = 0;
349 uint8_t *n = pvs + 2;
350 uint8_t *e = pvs + pvs[1];
351
352 if ((attrib >= 22) && (attrib <= 23))
353 {
354 while (n < e && isdigit(*n))
355 {
356 grpid = grpid * 10 + *n - '0';
357 n++;
358 }
359 if ((grpid == 0) || (grpid >= MAXGROUPE))
360 {
361 LOG(1, s, session[s].tunnel, " Group Attribute Id %d not allowed\n", grpid);
362 return;
363 }
364 else if (*n != ',')
365 {
366 LOG(1, s, session[s].tunnel, " Group Attribute Id not defined\n");
367 return;
368 }
369
370 if (!grp_addsession(grpid, s, 1))
371 {
372 return;
373 }
374
375 if (grpid > config->cluster_highest_groupeid)
376 config->cluster_highest_groupeid = grpid;
377
378 n++;
379 }
380
381 //process, Sames vendor-specific 64520
382 if (attrib == 22) //SAMES-Group-Framed-Route
383 {
384 in_addr_t ip = 0;
385 uint8_t u = 0;
386 uint8_t bits = 0;
387
388 while (n < e && (isdigit(*n) || *n == '.'))
389 {
390 if (*n == '.')
391 {
392 ip = (ip << 8) + u;
393 u = 0;
394 }
395 else
396 u = u * 10 + *n - '0';
397 n++;
398 }
399 ip = (ip << 8) + u;
400 if (*n == '/')
401 {
402 n++;
403 while (n < e && isdigit(*n))
404 bits = bits * 10 + *n++ - '0';
405 }
406 else if ((ip >> 24) < 128)
407 bits = 8;
408 else if ((ip >> 24) < 192)
409 bits = 16;
410 else
411 bits = 24;
412
413 if (!grp_addroute(grpid, s, ip, bits))
414 return;
415 }
416 else if (attrib == 23) //SAMES-Group-Session-Weight
417 {
418 uint8_t weight = 0;
419
420 while (n < e && isdigit(*n))
421 weight = weight * 10 + *n++ - '0';
422
423 if (!weight)
424 {
425 LOG(1, s, session[s].tunnel, " Group-Session-Weight 0 GroupId %d not allowed\n", grpid);
426 return;
427 }
428 if (!grp_addsession(grpid, s, weight))
429 {
430 return;
431 }
432 }
433 else
434 {
435 LOG(3, s, session[s].tunnel, " Unknown vendor-specific: 64520, Attrib: %d\n", attrib);
436 }
437 }
438
439 // Init data structures
440 void grp_initdata()
441 {
442 int i;
443
444 // Set default value (10s)
445 config->grp_txrate_average_time = 10;
446
447 if (!(grpsession = shared_malloc(sizeof(groupsesst) * MAXGROUPE)))
448 {
449 LOG(0, 0, 0, "Error doing malloc for grouped session: %s\n", strerror(errno));
450 exit(1);
451 }
452
453 memset(grpsession, 0, sizeof(grpsession[0]) * MAXGROUPE);
454 for (i = 1; i < MAXGROUPE; i++)
455 {
456 grpsession[i].state = GROUPEUNDEF;
457 }
458
459 if (!(grp_local = shared_malloc(sizeof(local_group) * MAXGROUPE)))
460 {
461 LOG(0, 0, 0, "Error doing malloc for grp_local: %s\n", strerror(errno));
462 exit(1);
463 }
464 memset(grp_local, 0, sizeof(grp_local[0]) * MAXGROUPE);
465
466 }
467
468 // Update time_changed of the group
469 void grp_time_changed()
470 {
471 groupidt g;
472
473 for (g = gnextgrpid; g != 0; g = grpsession[g].prev)
474 {
475 grpsession[g].time_changed++;
476 }
477 }
478
479 // Uncache all IP of a session
480 static void grp_uncache_ipsession(groupidt g, sessionidt s)
481 {
482 int i;
483 uint8_t *a;
484 in_addr_t ip;
485 in_addr_t n_ip, j;
486 int prefixlen;
487 union iphash *h;
488
489 for (i = 0; i < grpsession[g].nbroutesgrp; i++)
490 {
491 if (grpsession[g].route[i].ip != 0)
492 {
493 prefixlen = grpsession[g].route[i].prefixlen;
494 ip = grpsession[g].route[i].ip & (0xffffffff << (32 - prefixlen)); // Force the ip to be the first one in the route.
495
496 for (j = ip; j < ip+(1<<(32-prefixlen)) ; ++j)
497 {
498 n_ip = htonl(j); // To network order
499 a = (uint8_t *) &n_ip;
500 h = ip_hash;
501
502 if (!(h = h[*a++].idx)) continue;
503 if (!(h = h[*a++].idx)) continue;
504 if (!(h = h[*a++].idx)) continue;
505
506 if (s == h[*a].sess)
507 {
508 h[*a].sess = 0;
509 //LOG(3, s, session[s].tunnel, "UnCaching ip address %s\n", fmtaddr(n_ip, 0));
510 }
511 }
512 }
513 }
514 }
515
516 // return the next session can be used on the group
517 sessionidt grp_getnextsession(groupidt g, in_addr_t ip, in_addr_t ip_src)
518 {
519 sessionidt s = 0, s2 = 0, s3 = 0;
520 int i;
521 uint32_t ltime_changed = 0, mintxrate = 0xFFFFFFFF, maxtxrate = 0;
522 uint32_t txrate;
523
524 if (g >= MAXGROUPE)
525 return 0;
526
527 if (grpsession[g].time_changed >= config->grp_txrate_average_time)
528 {
529 // recalculation txrate
530 ltime_changed = grpsession[g].time_changed;
531 grpsession[g].time_changed = 0;
532 for (i = 0; i < grpsession[g].nbsession; i++)
533 {
534 if ((s2 = grpsession[g].sesslist[i].sid))
535 {
536 uint32_t coutgrp_delta = 0;
537
538 if (session[s2].cout >= grpsession[g].sesslist[i].prev_coutgrp)
539 coutgrp_delta = session[s2].cout - grpsession[g].sesslist[i].prev_coutgrp;
540 grpsession[g].sesslist[i].prev_coutgrp = session[s2].cout;
541
542 grpsession[g].sesslist[i].tx_rate = coutgrp_delta/ltime_changed;
543
544 txrate = grpsession[g].sesslist[i].tx_rate/grpsession[g].sesslist[i].weight;
545 if (txrate < mintxrate)
546 {
547 if ( session[s2].ppp.phase > Establish &&
548 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
549 {
550 grpsession[g].smin = s2;
551 mintxrate = txrate;
552 }
553 }
554
555 if (txrate > maxtxrate)
556 {
557 if ( session[s2].ppp.phase > Establish &&
558 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
559 {
560 grpsession[g].smax = s2;
561 maxtxrate = txrate;
562 }
563 }
564 }
565 }
566 }
567
568 if ((s = sessionbyip(ip)))
569 {
570 uint8_t *as = (uint8_t *) &ip_src;
571 uint8_t *ad = (uint8_t *) &ip;
572 uint16_t ai = ad[3];
573 ai <<= 8;
574 ai |= as[3];
575
576 s = grp_local[g].sid_loaddist[ai];
577 if (!s)
578 {
579 s = grpsession[g].smin;
580 grp_local[g].sid_loaddist[ai] = s;
581 }
582
583 if (g != grp_groupbysession(s))
584 {
585 // This session does not belong to this group
586 LOG(2, s, session[s].tunnel, "Warning, the session does not belong to group %d\n", g);
587 s = 0;
588 grp_local[g].sid_loaddist[ai] = 0;
589 }
590 else if (s == grpsession[g].smax)
591 {
592 s = grpsession[g].smin;
593 grp_local[g].sid_loaddist[ai] = s;
594 grpsession[g].smax = 0;
595 }
596 else if ( (session[s].ppp.phase > Establish) &&
597 (time_now - session[s].last_packet <= (config->echo_timeout + 1)) )
598 {
599 return s;
600 }
601 else
602 {
603 s = 0;
604 grp_local[g].sid_loaddist[ai] = 0;
605 }
606 }
607
608 if (!s)
609 {
610 // random between 0 and nbsession-1
611 uint indexsess = (rand() % grpsession[g].nbsession);
612
613 if (indexsess >= grpsession[g].nbsession)
614 indexsess = 0; //Sanity checks.
615
616 s2 = grpsession[g].sesslist[indexsess].sid;
617 if (s2 &&
618 (session[s2].ppp.phase > Establish) &&
619 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)))
620 {
621 s = s2;
622 }
623 else
624 {
625 for (i = 0; i < grpsession[g].nbsession; i++)
626 {
627 if ((s2 = grpsession[g].sesslist[i].sid))
628 {
629 s3 = s2;
630
631 if ( session[s2].ppp.phase > Establish &&
632 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
633 {
634 s = s2;
635 break;
636 }
637 }
638 }
639 }
640 }
641
642 if (!s)
643 s = s3;
644
645 if (s)
646 cache_ipmap(ntohl(ip), s);
647
648 return s;
649 }
650
651 // load a groupe receive from master
652 int grp_cluster_load_groupe(groupidt g, groupsesst *new)
653 {
654 int i;
655 int updategroup = 0;
656
657 if (g >= MAXGROUPE)
658 {
659 LOG(0, 0, 0, "ERROR: Received a group id > MAXGROUPE!\n");
660 return 0;
661 }
662
663 if ((grpsession[g].nbroutesgrp != new->nbroutesgrp) ||
664 (grpsession[g].nbsession != new->nbsession))
665 {
666 updategroup = 1;
667 }
668
669 if (!updategroup)
670 {
671 // Check session list
672 for (i = 0; i < grpsession[g].nbsession; i++)
673 {
674 if (grpsession[g].sesslist[i].sid != new->sesslist[i].sid)
675 {
676 updategroup = 1;
677 break;
678 }
679 }
680 }
681
682 if (!updategroup)
683 {
684 // Check routes list
685 for (i = 0; i < grpsession[g].nbroutesgrp; i++)
686 {
687 if (grpsession[g].route[i].ip != new->route[i].ip)
688 {
689 updategroup = 1;
690 break;
691 }
692 }
693 }
694
695 // needs update
696 if (updategroup)
697 {
698 // Del all routes
699 grp_setgrouproute(g, 0);
700 }
701
702 memcpy(&grpsession[g], new, sizeof(grpsession[g])); // Copy over..
703
704 // needs update
705 if (updategroup)
706 {
707 // Add all routes
708 grp_setgrouproute(g, 1);
709 }
710
711 return 1;
712 }