Optimisations
[l2tpns.git] / cluster_master.c
1 // L2TPNS Cluster Master
2 // $Id: cluster_master.c,v 1.3 2004-05-24 04:12:34 fred_nerk Exp $
3
4 #include <stdio.h>
5 #include <netinet/in.h>
6 #include <stdlib.h>
7 #include <unistd.h>
8 #include <time.h>
9 #include <string.h>
10 #include <errno.h>
11 #include <stdarg.h>
12 #include <malloc.h>
13 #include <sys/socket.h>
14 #include <sys/types.h>
15 #include <signal.h>
16 #include <sys/ioctl.h>
17 #include <arpa/inet.h>
18 #include <sys/wait.h>
19 #include "cluster.h"
20 #include "ll.h"
21 #include "util.h"
22 #include "config.h"
23
24 #define L2TPNS BINDIR "/l2tpns"
25
26 typedef struct
27 {
28 char *hostname;
29 unsigned long last_message;
30 uint32_t ip_address;
31 uint32_t slave_address;
32 int remove_from_cluster;
33 int down;
34 int tunnel_len;
35 int session_len;
36 pid_t pid;
37
38 int num_tunnels;
39 char *tunnels[1000];
40 int num_sessions;
41 char *sessions[13000];
42 } slave;
43
44 uint32_t master_address;
45 linked_list *slaves;
46 extern int cluster_sockfd;
47 int debug = 4;
48
49 int processmsg(char *buf, int l, struct sockaddr_in *src_addr);
50 int handle_hello(char *buf, int l, struct sockaddr_in *src_addr, uint32_t addr);
51 int handle_tunnel(char *buf, int l, uint32_t addr);
52 int handle_session(char *buf, int l, uint32_t addr);
53 int handle_ping(char *buf, int l, uint32_t addr);
54 int handle_goodbye(char *buf, int l, uint32_t addr);
55 int backup_up(slave *s);
56 int backup_down(slave *s);
57 int return_state(slave *s);
58 slave *find_slave(uint32_t address);
59 #define log _log
60 void _log(int level, const char *format, ...) __attribute__((format (printf, 2, 3)));
61 void log_hex(int level, const char *title, const char *data, int maxsize);
62
63 /* Catch our forked processes exiting */
64 void sigchild_handler(int signal)
65 {
66 int status;
67 int pid;
68
69 pid = wait(&status);
70 /* TODO: catch errors and respawn? */
71 }
72
73 int main(int argc, char *argv[])
74 {
75 slave *s;
76 char buf[4096];
77 struct timeval to;
78
79 if (argc != 2) {
80 log(0, "Usage: %s <address>\n", argv[0]);
81 exit(-1);
82 }
83
84 master_address = inet_addr(argv[1]);
85 if (master_address == INADDR_NONE) {
86 log(0, "Invalid ip %s\n", argv[1]);
87 exit(-1);
88 }
89
90 cluster_init(master_address, 1);
91 slaves = ll_init();
92
93 signal(SIGCHLD, sigchild_handler);
94
95 log(0, "Cluster Manager $Id: cluster_master.c,v 1.3 2004-05-24 04:12:34 fred_nerk Exp $ starting\n");
96
97 to.tv_sec = 1;
98 to.tv_usec = 0;
99 while (1)
100 {
101 fd_set r;
102 int n;
103
104 FD_ZERO(&r);
105 FD_SET(cluster_sockfd, &r);
106 n = select(cluster_sockfd + 1, &r, 0, 0, &to);
107 if (n < 0)
108 {
109 if (errno != EINTR)
110 {
111 perror("select");
112 exit(-1);
113 }
114 continue;
115 }
116 else if (n)
117 {
118 struct sockaddr_in addr;
119 int alen = sizeof(addr);
120
121 memset(buf, 0, sizeof(buf));
122 if (FD_ISSET(cluster_sockfd, &r))
123 processmsg(buf, recvfrom(cluster_sockfd, buf, sizeof(buf), MSG_WAITALL, (void *) &addr, &alen), &addr);
124 continue;
125 }
126
127 // Handle slaves timing out
128 {
129 time_t now = time(NULL);
130 ll_reset(slaves);
131 while ((s = ll_next(slaves)))
132 {
133 if (s->down) continue;
134 if (s->last_message < (now - TIMEOUT))
135 {
136 log(4, "Slave \"%s\" s->last_message is %lu (timeout is %lu)\n", s->hostname, s->last_message, (now - TIMEOUT));
137 if (s->remove_from_cluster)
138 {
139 // Remove them from the cluster
140 ll_delete(slaves, s);
141 if (s->hostname) free(s->hostname);
142 free(s);
143 ll_reset(slaves);
144 continue;
145 }
146 backup_up(s);
147 }
148 }
149 }
150
151 to.tv_sec = 1;
152 to.tv_usec = 0;
153 }
154
155 return 0;
156 }
157
158 int processmsg(char *buf, int l, struct sockaddr_in *src_addr)
159 {
160 slave *s;
161 char mtype;
162 uint32_t addr;
163
164 log_hex(4, "Received", buf, l);
165 if (!buf || l <= sizeof(uint32_t)) return 0;
166
167 addr = ntohl(*(uint32_t*)buf);
168 buf += sizeof(uint32_t);
169 l -= sizeof(uint32_t);
170
171 mtype = *buf; buf++; l--;
172
173 if (mtype != C_GOODBYE && (s = find_slave(addr)) && s->down)
174 {
175 char *hostname;
176 hostname = calloc(l + 1, 1);
177 memcpy(hostname, buf, l);
178 log(1, "Slave \"%s\" (for %s) has come back.\n", hostname, inet_toa(s->ip_address));
179 backup_down(s);
180 free(hostname);
181 }
182
183 switch (mtype)
184 {
185 case C_HELLO:
186 handle_hello(buf, l, src_addr, addr);
187 break;
188 case C_PING:
189 if (!find_slave(addr))
190 handle_hello(buf, l, src_addr, addr);
191 else
192 handle_ping(buf, l, addr);
193 break;
194 case C_TUNNEL:
195 if (!find_slave(addr)) handle_hello((char *)(buf + 1), *(char *)buf, src_addr, addr);
196 handle_tunnel(buf, l, addr);
197 break;
198 case C_SESSION:
199 if (!find_slave(addr)) handle_hello((char *)(buf + 1), *(char *)buf, src_addr, addr);
200 handle_session(buf, l, addr);
201 break;
202 case C_GOODBYE:
203 if (!find_slave(addr)) break;
204 handle_goodbye(buf, l, addr);
205 break;
206 }
207 return mtype;
208 }
209
210 int handle_hello(char *buf, int l, struct sockaddr_in *src_addr, uint32_t addr)
211 {
212 slave *s;
213 char *hostname;
214
215 hostname = calloc(l + 1, 1);
216 memcpy(hostname, buf, l);
217
218 // Is this a slave we have state information for?
219 if ((s = find_slave(addr)))
220 {
221 if (src_addr->sin_addr.s_addr == master_address)
222 {
223 log(1, "Got hello from \"%s\", local backup for %s.\n", hostname, inet_toa(s->ip_address));
224 }
225 else if (s->down)
226 {
227 log(1, "Slave \"%s\" (for %s) has come back.\n", hostname, inet_toa(s->ip_address));
228 backup_down(s);
229 }
230 else
231 {
232 log(1, "Slave \"%s\" said hello and we didn't know it was down.\n", s->hostname);
233 }
234
235 /* Reset the hostname if needed */
236 free(s->hostname);
237 s->hostname = hostname;
238 } else {
239 // No state information, it's a new slave
240 s = calloc(sizeof(slave), 1);
241 s->ip_address = addr;
242 ll_push(slaves, s);
243 s->hostname = hostname;
244 log(1, "New slave added to cluster \"%s\"\n", s->hostname);
245 }
246
247 s->slave_address = src_addr->sin_addr.s_addr;
248
249 // Send state information back
250 return_state(s);
251
252 s->last_message = time(NULL);
253
254 return 0;
255 }
256
257 int handle_tunnel(char *buf, int l, uint32_t addr)
258 {
259 int tid;
260 slave *s;
261 if (!(s = find_slave(addr)))
262 {
263 log(0, "handle_tunnel() called with no valid slave\n");
264 return 0;
265 }
266 s->last_message = time(NULL);
267
268 // Skip hostname
269 tid = *(char *)buf;
270 buf += (tid + 1);
271 l -= (tid + 1);
272
273 // Grab tunnel ID
274 tid = *(int *)buf;
275 buf += sizeof(int);
276 l -= sizeof(int);
277
278 log(3, "Received tunnel %d from \"%s\" (%d bytes long)\n", tid, s->hostname, l);
279
280 // Allocate memory for it if it's not already
281 if (!s->tunnels[tid])
282 {
283 s->tunnels[tid] = malloc(l);
284 s->num_tunnels++;
285 s->tunnel_len = l;
286 }
287
288 memcpy(s->tunnels[tid], buf, l);
289 return l;
290 }
291
292 int handle_session(char *buf, int l, uint32_t addr)
293 {
294 slave *s;
295 int sid;
296 char hostname[4096] = {0};
297 if (!(s = find_slave(addr)))
298 {
299 log(0, "handle_session() called with no valid slave\n");
300 return 0;
301 }
302 s->last_message = time(NULL);
303
304 // Skip hostname
305 sid = *(char *)buf;
306 memcpy(hostname, (char *)(buf + 1), sid);
307 buf += (sid + 1);
308 l -= (sid + 1);
309 log(0, "Hostname is %s\n", hostname);
310
311 // Grab session ID
312 sid = *(int *)buf;
313 buf += sizeof(int);
314 l -= sizeof(int);
315
316 log(3, "Received session %d from \"%s\" (%d bytes long)\n", sid, s->hostname, l);
317
318 // Allocate memory for it if it's not already
319 if (!s->sessions[sid])
320 {
321 s->sessions[sid] = malloc(l);
322 s->num_sessions++;
323 s->session_len = l;
324 }
325
326 memcpy(s->sessions[sid], buf, l);
327 return l;
328 }
329
330 int handle_ping(char *buf, int l, uint32_t addr)
331 {
332 slave *s;
333 if (!(s = find_slave(addr)))
334 {
335 log(0, "handle_ping() called with no valid slave\n");
336 return 0;
337 }
338 s->last_message = time(NULL);
339
340 return 0;
341 }
342
343 int return_state(slave *s)
344 {
345 char *packet;
346 int i;
347 int num_tunnels = 0, num_sessions = 0;
348 int pktlen;
349
350 log(3, "Sending state information to \"%s\"\n", s->hostname);
351
352 for (i = 0; i < 1000; i++)
353 if (s->tunnels[i]) num_tunnels++;
354
355 for (i = 0; i < 13000; i++)
356 if (s->sessions[i]) num_sessions++;
357
358 if (!num_sessions && !num_tunnels) return 0;
359
360 packet = calloc(IL * 4, 1);
361 *(int *)(packet + IL * 0) = num_tunnels;
362 *(int *)(packet + IL * 1) = num_sessions;
363 *(int *)(packet + IL * 2) = s->tunnel_len;
364 *(int *)(packet + IL * 3) = s->session_len;
365 cluster_send_message(s->slave_address, s->ip_address, C_HELLO_RESPONSE, packet, IL * 4);
366 free(packet);
367
368 // Send tunnels one-by-one, in order
369 log(0, "Sending %d tunnels of %d bytes each\n", num_tunnels, s->tunnel_len);
370 pktlen = s->tunnel_len + sizeof(int);
371 packet = malloc(pktlen);
372 for (i = 0; i < 1000; i++)
373 {
374 if (s->tunnels[i])
375 {
376 *(int *)packet = i;
377 memcpy((char *)(packet + sizeof(int)), s->tunnels[i], s->tunnel_len);
378 log(0, "Sending tunnel %d\n", i);
379 cluster_send_message(s->slave_address, s->ip_address, C_TUNNEL, packet, pktlen);
380 }
381 }
382 free(packet);
383
384 // Send sessions one-by-one, in order
385 log(0, "Sending %d sessions of %d bytes each\n", num_sessions, s->session_len);
386 pktlen = s->session_len + sizeof(int);
387 packet = malloc(pktlen);
388 for (i = 0; i < 13000; i++)
389 {
390 if (s->sessions[i])
391 {
392 *(int *)packet = i;
393 memcpy((char *)(packet + sizeof(int)), s->sessions[i], s->session_len);
394 log(0, "Sending session %d\n", i);
395 cluster_send_message(s->slave_address, s->ip_address, C_SESSION, packet, pktlen);
396 }
397 }
398 free(packet);
399
400 return 0;
401 }
402
403 slave *find_slave(uint32_t address)
404 {
405 slave *s;
406
407 ll_reset(slaves);
408 while ((s = ll_next(slaves)))
409 {
410 if (s->ip_address == address)
411 {
412 return s;
413 }
414 }
415 return NULL;
416 }
417
418 void _log(int level, const char *format, ...)
419 {
420 va_list ap;
421 if (debug < level) return;
422
423 va_start(ap, format);
424 vfprintf(stderr, format, ap);
425 }
426
427 void log_hex(int level, const char *title, const char *data, int maxsize)
428 {
429 unsigned int i, j;
430 unsigned const char *d = (unsigned const char *)data;
431
432 if (debug < level) return;
433 log(level, "%s (%d bytes):\n", title, maxsize);
434 setvbuf(stderr, NULL, _IOFBF, 16384);
435 for (i = 0; i < maxsize; )
436 {
437 fprintf(stderr, "%4X: ", i);
438 for (j = i; j < maxsize && j < (i + 16); j++)
439 {
440 fprintf(stderr, "%02X ", d[j]);
441 if (j == i + 7)
442 fputs(": ", stderr);
443 }
444
445 for (; j < i + 16; j++)
446 {
447 fputs(" ", stderr);
448 if (j == i + 7)
449 fputs(": ", stderr);
450 }
451
452 fputs(" ", stderr);
453 for (j = i; j < maxsize && j < (i + 16); j++)
454 {
455 if (d[j] >= 0x20 && d[j] < 0x7f && d[j] != 0x20)
456 fputc(d[j], stderr);
457 else
458 fputc('.', stderr);
459
460 if (j == i + 7)
461 fputs(" ", stderr);
462 }
463
464 i = j;
465 fputs("\n", stderr);
466 }
467 fflush(stderr);
468 setbuf(stderr, NULL);
469 }
470
471 int backup_up(slave *s)
472 {
473 log(2, "Becoming backup for \"%s\" (%s).\n", s->hostname, inet_toa(s->ip_address));
474 s->pid = fork();
475 if (!s->pid)
476 {
477 if (execl(L2TPNS, L2TPNS, "-a", inet_toa(s->ip_address), NULL) < 0)
478 log(0, "Error execing backup " L2TPNS ": %s\n", strerror(errno));
479 exit(0);
480 }
481 s->down = 1;
482 return 0;
483 }
484
485 int backup_down(slave *s)
486 {
487 log(2, "Not being backup for \"%s\" (%s) anymore.\n", s->hostname, inet_toa(s->ip_address));
488 s->down = 0;
489 if (s->pid) {
490 kill(s->pid, SIGTERM);
491 sleep(2);
492 kill(s->pid, SIGKILL);
493 }
494 return 0;
495 }
496
497 int handle_goodbye(char *buf, int l, uint32_t addr)
498 {
499 int i;
500 slave *s;
501
502 // Is this a slave we have state information for?
503 if ((s = find_slave(addr)))
504 {
505 log(0, "Received goodbye for slave %s\n", s->hostname);
506 ll_delete(slaves, s);
507 for (i = 0; i < s->num_tunnels; i++)
508 if (s->tunnels[i]) free(s->tunnels[i]);
509 for (i = 0; i < s->num_sessions; i++)
510 if (s->sessions[i]) free(s->sessions[i]);
511 if (s->hostname) free(s->hostname);
512 free(s);
513 }
514
515 return 0;
516 }
517