Fix cluster group update
[l2tpns.git] / grpsess.c
1 /*
2 * Fernando ALVES 2013
3 * Grouped session for load balancing and fail-over
4 * GPL licenced
5 */
6
7 #include <errno.h>
8 #include <ctype.h>
9 #include <string.h>
10 #include <sys/socket.h>
11 #include <linux/rtnetlink.h>
12
13 #include "l2tpns.h"
14 #include "util.h"
15 #include "cluster.h"
16
17 #ifdef BGP
18 #include "bgp.h"
19 #endif
20
21 union grp_iphash {
22 groupidt grp;
23 union grp_iphash *idx;
24 } grp_ip_hash[256]; // Mapping from IP address to group structures.
25
26 groupidt gnextgrpid = 0;
27
28 // Find gruop by IP, < 1 for not found
29 //
30 // Confusingly enough, this 'ip' must be
31 // in _network_ order. This being the common
32 // case when looking it up from IP packet headers.
33 static groupidt grp_lookup_ipmap(in_addr_t ip)
34 {
35 uint8_t *a = (uint8_t *) &ip;
36 union grp_iphash *h = grp_ip_hash;
37
38 if (!(h = h[*a++].idx)) return 0;
39 if (!(h = h[*a++].idx)) return 0;
40 if (!(h = h[*a++].idx)) return 0;
41
42 return h[*a].grp;
43 }
44
45 //
46 // Take an IP address in HOST byte order and
47 // add it to the grouid by IP cache.
48 //
49 // (It's actually cached in network order)
50 //
51 static void grp_cache_ipmap(in_addr_t ip, groupidt g)
52 {
53 in_addr_t nip = htonl(ip); // MUST be in network order. I.e. MSB must in be ((char *) (&ip))[0]
54 uint8_t *a = (uint8_t *) &nip;
55 union grp_iphash *h = grp_ip_hash;
56 int i;
57
58 for (i = 0; i < 3; i++)
59 {
60 if (!(h[a[i]].idx || (h[a[i]].idx = calloc(256, sizeof(union grp_iphash)))))
61 return;
62
63 h = h[a[i]].idx;
64 }
65
66 h[a[3]].grp = g;
67
68 if (g > 0)
69 LOG(4, 0, 0, "Caching Group:%d ip address %s\n", g, fmtaddr(nip, 0));
70 else if (g == 0)
71 LOG(4, 0, 0, "Un-caching Group ip address %s\n", fmtaddr(nip, 0));
72 }
73
74 groupidt grp_groupbyip(in_addr_t ip)
75 {
76 groupidt g = grp_lookup_ipmap(ip);
77
78 if (g > 0 && g < MAXGROUPE)
79 return g;
80
81 return 0;
82 }
83
84 // Add a route
85 //
86 // This adds it to the routing table, advertises it
87 // via BGP if enabled, and stuffs it into the
88 // 'groupbyip' cache.
89 //
90 // 'ip' must be in _host_ order.
91 //
92 static void grp_routeset(groupidt g, in_addr_t ip, int prefixlen, int add)
93 {
94 struct {
95 struct nlmsghdr nh;
96 struct rtmsg rt;
97 char buf[32];
98 } req;
99 int i;
100 in_addr_t n_ip;
101
102 if (!prefixlen) prefixlen = 32;
103
104 ip &= 0xffffffff << (32 - prefixlen);; // Force the ip to be the first one in the route.
105
106 memset(&req, 0, sizeof(req));
107
108 if (add)
109 {
110 req.nh.nlmsg_type = RTM_NEWROUTE;
111 req.nh.nlmsg_flags = NLM_F_REQUEST | NLM_F_CREATE | NLM_F_REPLACE;
112 }
113 else
114 {
115 req.nh.nlmsg_type = RTM_DELROUTE;
116 req.nh.nlmsg_flags = NLM_F_REQUEST;
117 }
118
119 req.nh.nlmsg_len = NLMSG_LENGTH(sizeof(req.rt));
120
121 req.rt.rtm_family = AF_INET;
122 req.rt.rtm_dst_len = prefixlen;
123 req.rt.rtm_table = RT_TABLE_MAIN;
124 req.rt.rtm_protocol = 42;
125 req.rt.rtm_scope = RT_SCOPE_LINK;
126 req.rt.rtm_type = RTN_UNICAST;
127
128 netlink_addattr(&req.nh, RTA_OIF, &tunidx, sizeof(int));
129 n_ip = htonl(ip);
130 netlink_addattr(&req.nh, RTA_DST, &n_ip, sizeof(n_ip));
131
132 LOG(3, 0, 0, "Route (Group) %s %s/%d\n", add ? "add" : "del", fmtaddr(htonl(ip), 0), prefixlen);
133
134 if (netlink_send(&req.nh) < 0)
135 LOG(0, 0, 0, "grp_routeset() error in sending netlink message: %s\n", strerror(errno));
136
137 #ifdef BGP
138 if (add)
139 bgp_add_route(htonl(ip), prefixlen);
140 else
141 bgp_del_route(htonl(ip), prefixlen);
142 #endif /* BGP */
143
144 // Add/Remove the IPs to the 'groupbyip' cache.
145 // Note that we add the zero address in the case of
146 // a network route. Roll on CIDR.
147
148 // Note that 'g == 0' implies this is the address pool.
149 // We still cache it here, because it will pre-fill
150 // the malloc'ed tree.
151 if (g)
152 {
153 if (!add) // Are we deleting a route?
154 g = 0; // Caching the session as '0' is the same as uncaching.
155
156 for (i = ip; i < ip+(1<<(32-prefixlen)) ; ++i)
157 {
158 grp_cache_ipmap(i, g);
159 if (!g) cache_ipmap(i, 0);
160 }
161 }
162 }
163
164 // Set all route of a group
165 void grp_setgrouproute(groupidt g, int add)
166 {
167 int i;
168 for (i = 0; i < grpsession[g].nbroutesgrp; i++)
169 {
170 if (grpsession[g].route[i].ip != 0)
171 {
172 grp_routeset(g, grpsession[g].route[i].ip, grpsession[g].route[i].prefixlen, add);
173 }
174 }
175 }
176
177 // return group id by session
178 groupidt grp_groupbysession(sessionidt s)
179 {
180 groupidt g = 0;
181 int i;
182 for (g = gnextgrpid; g != 0; g = grpsession[g].prev)
183 {
184 for (i = 0; i < grpsession[g].nbsession; i++)
185 {
186 if (grpsession[g].sesslist[i].sid == s)
187 { // session found in group
188 return g;
189 }
190 }
191 }
192
193 return 0;
194 }
195
196 // Remove a session to a group
197 // return 1 if OK
198 void grp_removesession(groupidt g, sessionidt s)
199 {
200 int i;
201
202 if (grpsession[g].nbsession <= 0)
203 return;
204
205 for (i = 0; i < grpsession[g].nbsession; i++)
206 {
207 if (grpsession[g].sesslist[i].sid == s)
208 { // session found on group
209 --grpsession[g].nbsession;
210 if (grpsession[g].nbsession == 0)
211 {
212 // Group is empty, remove it
213
214 // Del all routes
215 grp_setgrouproute(g, 0);
216
217 if (gnextgrpid == g)
218 {
219 gnextgrpid = grpsession[g].prev;
220 }
221 else
222 {
223 groupidt g2;
224 for (g2 = gnextgrpid; g2 != 0; g2 = grpsession[g2].prev)
225 {
226 if (grpsession[g2].prev == g)
227 {
228 grpsession[g2].prev = grpsession[g].prev;
229 break;
230 }
231 }
232 }
233
234 memset(&grpsession[g], 0, sizeof(grpsession[0]));
235 grpsession[g].state = GROUPEFREE;
236 }
237 else
238 {
239 // remove the session
240 memmove(&grpsession[g].sesslist[i],
241 &grpsession[g].sesslist[i+1],
242 (grpsession[g].nbsession - i) * sizeof(grpsession[g].sesslist[i]));
243 }
244
245 cluster_send_groupe(g);
246 return;
247 }
248 }
249 }
250
251 // Add a session to a group
252 // return 1 if OK
253 static int grp_addsession(groupidt g, sessionidt s, uint8_t weight)
254 {
255 int i;
256
257 for (i = 0; i < grpsession[g].nbsession; i++)
258 {
259 if (grpsession[g].sesslist[i].sid == s)
260 { // already in group
261 if ((!grpsession[g].sesslist[i].weight) || (weight > 1))
262 grpsession[g].sesslist[i].weight = weight; // update Weight session (for load-balancing)
263
264 return 1;
265 }
266 }
267
268 if (i >= MAXSESSINGRP)
269 {
270 LOG(1, s, session[s].tunnel, " Too many session for Group %d\n", g);
271 return 0;
272 }
273 else
274 { // add session id to group
275 if (i == 0)
276 {
277 // it's the first session of the group, set to next group
278 grpsession[g].prev = gnextgrpid;
279 gnextgrpid = g;
280 grpsession[g].state = GROUPEOPEN;
281 }
282 grpsession[g].sesslist[i].sid = s;
283 grpsession[g].sesslist[i].weight = weight;
284 grpsession[g].nbsession++;
285
286 return 1;
287 }
288 return 0;
289 }
290
291 // Add a route to a group
292 // return 1 if OK
293 static int grp_addroute(groupidt g, sessionidt s, in_addr_t ip, int prefixlen)
294 {
295 int i;
296
297 for (i = 0; i < MAXROUTEINGRP; i++)
298 {
299 if ((i >= grpsession[g].nbroutesgrp))
300 {
301 LOG(3, s, session[s].tunnel, " Radius reply Group %d contains route for %s/%d\n",
302 g, fmtaddr(htonl(ip), 0), prefixlen);
303
304 grpsession[g].route[i].ip = ip;
305 grpsession[g].route[i].prefixlen = prefixlen;
306 grpsession[g].nbroutesgrp++;
307 return 1;
308 }
309 else if ((grpsession[g].route[i].ip == ip) && (grpsession[g].route[i].prefixlen == prefixlen))
310 {
311 // route already defined in group
312 LOG(3, s, session[s].tunnel,
313 " Radius reply Group %d contains route for %s/%d (this already defined)\n",
314 g, fmtaddr(htonl(ip), 0), prefixlen);
315
316 return 1;
317 }
318 else if (grpsession[g].route[i].ip == 0)
319 {
320 LOG(3, s, session[s].tunnel, " Radius reply Group %d contains route for %s/%d (find empty on list!!!)\n",
321 g, fmtaddr(htonl(ip), 0), prefixlen);
322
323 grpsession[g].route[i].ip = ip;
324 grpsession[g].route[i].prefixlen = prefixlen;
325 return 1;
326 }
327 }
328
329 if (i >= MAXROUTEINGRP)
330 {
331 LOG(1, s, session[s].tunnel, " Too many routes for Group %d\n", g);
332 }
333 return 0;
334 }
335
336 // Process Sames vendor specific attribut radius
337 void grp_processvendorspecific(sessionidt s, uint8_t *pvs)
338 {
339 uint8_t attrib = *pvs;
340 groupidt grpid = 0;
341 uint8_t *n = pvs + 2;
342 uint8_t *e = pvs + pvs[1];
343
344 if ((attrib >= 22) && (attrib <= 23))
345 {
346 while (n < e && isdigit(*n))
347 {
348 grpid = grpid * 10 + *n - '0';
349 n++;
350 }
351 if ((grpid == 0) || (grpid >= MAXGROUPE))
352 {
353 LOG(1, s, session[s].tunnel, " Group Attribute Id %d not allowed\n", grpid);
354 return;
355 }
356 else if (*n != ',')
357 {
358 LOG(1, s, session[s].tunnel, " Group Attribute Id not defined\n");
359 return;
360 }
361
362 if (!grp_addsession(grpid, s, 1))
363 {
364 return;
365 }
366
367 if (grpid > config->cluster_highest_groupeid)
368 config->cluster_highest_groupeid = grpid;
369
370 n++;
371 }
372
373 //process, Sames vendor-specific 64520
374 if (attrib == 22) //SAMES-Group-Framed-Route
375 {
376 in_addr_t ip = 0;
377 uint8_t u = 0;
378 uint8_t bits = 0;
379
380 while (n < e && (isdigit(*n) || *n == '.'))
381 {
382 if (*n == '.')
383 {
384 ip = (ip << 8) + u;
385 u = 0;
386 }
387 else
388 u = u * 10 + *n - '0';
389 n++;
390 }
391 ip = (ip << 8) + u;
392 if (*n == '/')
393 {
394 n++;
395 while (n < e && isdigit(*n))
396 bits = bits * 10 + *n++ - '0';
397 }
398 else if ((ip >> 24) < 128)
399 bits = 8;
400 else if ((ip >> 24) < 192)
401 bits = 16;
402 else
403 bits = 24;
404
405 if (!grp_addroute(grpid, s, ip, bits))
406 return;
407 }
408 else if (attrib == 23) //SAMES-Group-Session-Weight
409 {
410 uint8_t weight = 0;
411
412 while (n < e && isdigit(*n))
413 weight = weight * 10 + *n++ - '0';
414
415 if (!weight)
416 {
417 LOG(1, s, session[s].tunnel, " Group-Session-Weight 0 GroupId %d not allowed\n", grpid);
418 return;
419 }
420 if (!grp_addsession(grpid, s, weight))
421 {
422 return;
423 }
424 }
425 else
426 {
427 LOG(3, s, session[s].tunnel, " Unknown vendor-specific: 64520, Attrib: %d\n", attrib);
428 }
429 }
430
431 // Init data structures
432 void grp_initdata()
433 {
434 int i;
435
436 // Set default value (10s)
437 config->grp_txrate_average_time = 10;
438
439 if (!(grpsession = shared_malloc(sizeof(groupsesst) * MAXGROUPE)))
440 {
441 LOG(0, 0, 0, "Error doing malloc for grouped session: %s\n", strerror(errno));
442 exit(1);
443 }
444
445 memset(grpsession, 0, sizeof(grpsession[0]) * MAXGROUPE);
446 for (i = 1; i < MAXGROUPE; i++)
447 {
448 grpsession[i].state = GROUPEUNDEF;
449 }
450 }
451
452 // Update time_changed of the group
453 void grp_time_changed()
454 {
455 groupidt g;
456
457 for (g = gnextgrpid; g != 0; g = grpsession[g].prev)
458 {
459 grpsession[g].time_changed++;
460 }
461 }
462
463 // Uncache all IP of a session
464 static void grp_uncache_ipsession(groupidt g, sessionidt s)
465 {
466 int i;
467 uint8_t *a;
468 in_addr_t ip;
469 in_addr_t n_ip, j;
470 int prefixlen;
471 union iphash *h;
472
473 for (i = 0; i < grpsession[g].nbroutesgrp; i++)
474 {
475 if (grpsession[g].route[i].ip != 0)
476 {
477 prefixlen = grpsession[g].route[i].prefixlen;
478 ip = grpsession[g].route[i].ip & (0xffffffff << (32 - prefixlen)); // Force the ip to be the first one in the route.
479
480 for (j = ip; j < ip+(1<<(32-prefixlen)) ; ++j)
481 {
482 n_ip = htonl(j); // To network order
483 a = (uint8_t *) &n_ip;
484 h = ip_hash;
485
486 if (!(h = h[*a++].idx)) continue;
487 if (!(h = h[*a++].idx)) continue;
488 if (!(h = h[*a++].idx)) continue;
489
490 if (s == h[*a].sess)
491 {
492 h[*a].sess = 0;
493 //LOG(3, s, session[s].tunnel, "UnCaching ip address %s\n", fmtaddr(n_ip, 0));
494 }
495 }
496 }
497 }
498 }
499
500 // return the next session can be used on the group
501 sessionidt grp_getnextsession(groupidt g, in_addr_t ip)
502 {
503 sessionidt s = 0, s2 = 0, s3 = 0;
504 int i;
505 uint32_t ltime_changed = 0, mintxrate = 0xFFFFFFFF, maxtxrate = 0;
506 uint32_t txrate;
507
508 if (g >= MAXGROUPE)
509 return 0;
510
511 if (grpsession[g].time_changed >= config->grp_txrate_average_time)
512 {
513 // recalculation txrate
514 ltime_changed = grpsession[g].time_changed;
515 grpsession[g].time_changed = 0;
516 for (i = 0; i < grpsession[g].nbsession; i++)
517 {
518 if ((s2 = grpsession[g].sesslist[i].sid))
519 {
520 uint32_t coutgrp_delta = 0;
521
522 if (session[s2].cout >= grpsession[g].sesslist[i].prev_coutgrp)
523 coutgrp_delta = session[s2].cout - grpsession[g].sesslist[i].prev_coutgrp;
524 grpsession[g].sesslist[i].prev_coutgrp = session[s2].cout;
525
526 grpsession[g].sesslist[i].tx_rate = coutgrp_delta/ltime_changed;
527
528 txrate = grpsession[g].sesslist[i].tx_rate/grpsession[g].sesslist[i].weight;
529 if (txrate < mintxrate)
530 {
531 if ( session[s2].ppp.phase > Establish &&
532 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
533 {
534 grpsession[g].smin = s2;
535 mintxrate = txrate;
536 }
537 }
538
539 if (txrate > maxtxrate)
540 {
541 if ( session[s2].ppp.phase > Establish &&
542 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
543 {
544 grpsession[g].smax = s2;
545 maxtxrate = txrate;
546 }
547 }
548 }
549 }
550 }
551
552 if ((s = sessionbyip(ip)))
553 {
554 if (s == grpsession[g].smax)
555 {
556 s = grpsession[g].smin;
557 grpsession[g].smax = 0;
558 }
559 else if ( (session[s].ppp.phase > Establish) &&
560 (time_now - session[s].last_packet <= (config->echo_timeout + 1)) )
561 {
562 return s;
563 }
564 else
565 {
566 s = 0;
567 }
568 }
569
570 if (!s)
571 {
572 // random between 0 and nbsession-1
573 uint indexsess = (rand() % grpsession[g].nbsession);
574
575 if (indexsess >= grpsession[g].nbsession)
576 indexsess = 0; //Sanity checks.
577
578 s2 = grpsession[g].sesslist[indexsess].sid;
579 if (s2 &&
580 (session[s2].ppp.phase > Establish) &&
581 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)))
582 {
583 s = s2;
584 }
585 else
586 {
587 for (i = 0; i < grpsession[g].nbsession; i++)
588 {
589 if ((s2 = grpsession[g].sesslist[i].sid))
590 {
591 s3 = s2;
592
593 if ( session[s2].ppp.phase > Establish &&
594 (time_now - session[s2].last_packet <= (config->echo_timeout + 1)) )
595 {
596 s = s2;
597 break;
598 }
599 }
600 }
601 }
602 }
603
604 if (!s)
605 s = s3;
606
607 if (s)
608 cache_ipmap(ntohl(ip), s);
609
610 return s;
611 }
612
613 // load a groupe receive from master
614 int grp_cluster_load_groupe(groupidt g, groupsesst *new)
615 {
616 int i;
617 int updategroup = 0;
618
619 if (g >= MAXGROUPE)
620 {
621 LOG(0, 0, 0, "ERROR: Received a group id > MAXGROUPE!\n");
622 return 0;
623 }
624
625 if ((grpsession[g].nbroutesgrp != new->nbroutesgrp) ||
626 (grpsession[g].nbsession != new->nbsession))
627 {
628 updategroup = 1;
629 }
630
631 if (!updategroup)
632 {
633 // Check session list
634 for (i = 0; i < grpsession[g].nbsession; i++)
635 {
636 if (grpsession[g].sesslist[i].sid != new->sesslist[i].sid)
637 {
638 updategroup = 1;
639 break;
640 }
641 }
642 }
643
644 if (!updategroup)
645 {
646 // Check routes list
647 for (i = 0; i < grpsession[g].nbroutesgrp; i++)
648 {
649 if (grpsession[g].route[i].ip != new->route[i].ip)
650 {
651 updategroup = 1;
652 break;
653 }
654 }
655 }
656
657 // needs update
658 if (updategroup)
659 {
660 // Del all routes
661 grp_setgrouproute(g, 0);
662 }
663
664 memcpy(&grpsession[g], new, sizeof(grpsession[g])); // Copy over..
665
666 // needs update
667 if (updategroup)
668 {
669 // Add all routes
670 grp_setgrouproute(g, 1);
671 }
672
673 return 1;
674 }