0cc43ee586dca0471617705dc47e78245a5e16c3
[l2tpns.git] / cluster_master.c
1 // L2TPNS Cluster Master
2 // $Id: cluster_master.c,v 1.1 2003-12-16 07:07:39 fred_nerk Exp $
3
4 #include <stdio.h>
5 #include <netinet/in.h>
6 #include <stdlib.h>
7 #include <unistd.h>
8 #include <time.h>
9 #include <string.h>
10 #include <errno.h>
11 #include <stdarg.h>
12 #include <malloc.h>
13 #include <sys/socket.h>
14 #include <sys/types.h>
15 #include <signal.h>
16 #include <sys/ioctl.h>
17 #include <arpa/inet.h>
18 #include <sys/wait.h>
19 #include "cluster.h"
20 #include "ll.h"
21 #include "util.h"
22 #include "config.h"
23
24 #define L2TPNS BINDIR "/l2tpns"
25
26 typedef struct
27 {
28 char *hostname;
29 unsigned long last_message;
30 uint32_t ip_address;
31 uint32_t slave_address;
32 int remove_from_cluster;
33 int down;
34 int tunnel_len;
35 int session_len;
36 pid_t pid;
37
38 int num_tunnels;
39 char *tunnels[1000];
40 int num_sessions;
41 char *sessions[13000];
42 } slave;
43
44 uint32_t master_address;
45 linked_list *slaves;
46 extern int cluster_sockfd;
47 int debug = 4;
48
49 int processmsg(char *buf, int l, struct sockaddr_in *src_addr);
50 int handle_hello(char *buf, int l, struct sockaddr_in *src_addr, uint32_t addr);
51 int handle_tunnel(char *buf, int l, uint32_t addr);
52 int handle_session(char *buf, int l, uint32_t addr);
53 int handle_ping(char *buf, int l, uint32_t addr);
54 int backup_up(slave *s);
55 int backup_down(slave *s);
56 int return_state(slave *s);
57 slave *find_slave(uint32_t address);
58 #define log _log
59 void _log(int level, const char *format, ...);
60 void log_hex(int level, const char *title, const char *data, int maxsize);
61
62 /* Catch our forked processes exiting */
63 void sigchild_handler(int signal)
64 {
65 int status;
66 int pid;
67
68 pid = wait(&status);
69 /* TODO: catch errors and respawn? */
70 }
71
72 int main(int argc, char *argv[])
73 {
74 slave *s;
75 char buf[4096];
76 struct timeval to;
77
78 if (argc != 2) {
79 log(0, "Usage: %s <address>\n", argv[0]);
80 exit(-1);
81 }
82
83 master_address = inet_addr(argv[1]);
84 if (master_address == INADDR_NONE) {
85 log(0, "Invalid ip %s\n", argv[1]);
86 exit(-1);
87 }
88
89 cluster_init(master_address, 1);
90 slaves = ll_init();
91
92 signal(SIGCHLD, sigchild_handler);
93
94 log(0, "Cluster Manager $Id: cluster_master.c,v 1.1 2003-12-16 07:07:39 fred_nerk Exp $ starting\n");
95
96 to.tv_sec = 1;
97 to.tv_usec = 0;
98 while (1)
99 {
100 fd_set r;
101 int n;
102
103 FD_ZERO(&r);
104 FD_SET(cluster_sockfd, &r);
105 n = select(cluster_sockfd + 1, &r, 0, 0, &to);
106 if (n < 0)
107 {
108 if (errno != EINTR)
109 {
110 perror("select");
111 exit(-1);
112 }
113 continue;
114 }
115 else if (n)
116 {
117 struct sockaddr_in addr;
118 int alen = sizeof(addr);
119
120 memset(buf, 0, sizeof(buf));
121 if (FD_ISSET(cluster_sockfd, &r))
122 processmsg(buf, recvfrom(cluster_sockfd, buf, sizeof(buf), MSG_WAITALL, (void *) &addr, &alen), &addr);
123 continue;
124 }
125
126 // Handle slaves timing out
127 {
128 time_t now = time(NULL);
129 ll_reset(slaves);
130 while ((s = ll_next(slaves)))
131 {
132 if (s->down) continue;
133 if (s->last_message < (now - TIMEOUT))
134 {
135 log(4, "Slave \"%s\" s->last_message is %lu (timeout is %lu)\n", s->hostname, s->last_message, (now - TIMEOUT));
136 if (s->remove_from_cluster)
137 {
138 // Remove them from the cluster
139 ll_delete(slaves, s);
140 if (s->hostname) free(s->hostname);
141 free(s);
142 ll_reset(slaves);
143 continue;
144 }
145 backup_up(s);
146 }
147 }
148 }
149
150 to.tv_sec = 1;
151 to.tv_usec = 0;
152 }
153
154 return 0;
155 }
156
157 int processmsg(char *buf, int l, struct sockaddr_in *src_addr)
158 {
159 char mtype;
160 uint32_t addr;
161
162 log_hex(4, "Received", buf, l);
163 if (!buf || l <= sizeof(uint32_t)) return 0;
164
165 addr = ntohl(*(uint32_t*)buf);
166 buf += sizeof(uint32_t);
167 l -= sizeof(uint32_t);
168
169 mtype = *buf; buf++; l--;
170
171 switch (mtype)
172 {
173 case C_HELLO:
174 handle_hello(buf, l, src_addr, addr);
175 break;
176 case C_PING:
177 if (!find_slave(addr))
178 handle_hello(buf, l, src_addr, addr);
179 else
180 handle_ping(buf, l, addr);
181 break;
182 case C_TUNNEL:
183 if (!find_slave(addr)) handle_hello((char *)(buf + 1), *(char *)buf, src_addr, addr);
184 handle_tunnel(buf, l, addr);
185 break;
186 case C_SESSION:
187 if (!find_slave(addr)) handle_hello((char *)(buf + 1), *(char *)buf, src_addr, addr);
188 handle_session(buf, l, addr);
189 break;
190 }
191 return mtype;
192 }
193
194 int handle_hello(char *buf, int l, struct sockaddr_in *src_addr, uint32_t addr)
195 {
196 slave *s;
197 char *hostname;
198
199 hostname = calloc(l + 1, 1);
200 memcpy(hostname, buf, l);
201
202 // Is this a slave we have state information for?
203 if ((s = find_slave(addr)))
204 {
205 if (src_addr->sin_addr.s_addr == master_address)
206 {
207 log(1, "Got hello from \"%s\", local backup for %s.\n", hostname, inet_toa(s->ip_address));
208 }
209 else if (s->down)
210 {
211 log(1, "Slave \"%s\" (for %s) has come back.\n", hostname, inet_toa(s->ip_address));
212 backup_down(s);
213 }
214 else
215 {
216 log(1, "Slave \"%s\" said hello and we didn't know it was down.\n", s->hostname);
217 }
218
219 /* Reset the hostname if needed */
220 free(s->hostname);
221 s->hostname = hostname;
222 } else {
223 // No state information, it's a new slave
224 s = calloc(sizeof(slave), 1);
225 s->ip_address = addr;
226 ll_push(slaves, s);
227 s->hostname = hostname;
228 log(1, "New slave added to cluster \"%s\"\n", s->hostname);
229 }
230
231 s->slave_address = src_addr->sin_addr.s_addr;
232
233 // Send state information back
234 return_state(s);
235
236 s->last_message = time(NULL);
237
238 return 0;
239 }
240
241 int handle_tunnel(char *buf, int l, uint32_t addr)
242 {
243 int tid;
244 slave *s;
245 if (!(s = find_slave(addr)))
246 {
247 log(0, "handle_tunnel() called with no valid slave\n");
248 return 0;
249 }
250 s->last_message = time(NULL);
251
252 // Skip hostname
253 tid = *(char *)buf;
254 buf += (tid + 1);
255 l -= (tid + 1);
256
257 // Grab tunnel ID
258 tid = *(int *)buf;
259 buf += sizeof(int);
260 l -= sizeof(int);
261
262 log(3, "Received tunnel %d from \"%s\" (%d bytes long)\n", tid, s->hostname, l);
263
264 // Allocate memory for it if it's not already
265 if (!s->tunnels[tid])
266 {
267 s->tunnels[tid] = malloc(l);
268 s->num_tunnels++;
269 s->tunnel_len = l;
270 }
271
272 memcpy(s->tunnels[tid], buf, l);
273 return l;
274 }
275
276 int handle_session(char *buf, int l, uint32_t addr)
277 {
278 slave *s;
279 int sid;
280 char hostname[4096] = {0};
281 if (!(s = find_slave(addr)))
282 {
283 log(0, "handle_session() called with no valid slave\n");
284 return 0;
285 }
286 s->last_message = time(NULL);
287
288 // Skip hostname
289 sid = *(char *)buf;
290 memcpy(hostname, (char *)(buf + 1), sid);
291 buf += (sid + 1);
292 l -= (sid + 1);
293 log(0, "Hostname is %s\n", hostname);
294
295 // Grab session ID
296 sid = *(int *)buf;
297 buf += sizeof(int);
298 l -= sizeof(int);
299
300 log(3, "Received session %d from \"%s\" (%d bytes long)\n", sid, s->hostname, l);
301
302 // Allocate memory for it if it's not already
303 if (!s->sessions[sid])
304 {
305 s->sessions[sid] = malloc(l);
306 s->num_sessions++;
307 s->session_len = l;
308 }
309
310 memcpy(s->sessions[sid], buf, l);
311 return l;
312 }
313
314 int handle_ping(char *buf, int l, uint32_t addr)
315 {
316 slave *s;
317 if (!(s = find_slave(addr)))
318 {
319 log(0, "handle_ping() called with no valid slave\n");
320 return 0;
321 }
322 s->last_message = time(NULL);
323
324 return 0;
325 }
326
327 int return_state(slave *s)
328 {
329 char *packet;
330 int i;
331 int num_tunnels = 0, num_sessions = 0;
332 int pktlen;
333
334 log(3, "Sending state information to \"%s\"\n", s->hostname);
335
336 for (i = 0; i < 1000; i++)
337 if (s->tunnels[i]) num_tunnels++;
338
339 for (i = 0; i < 13000; i++)
340 if (s->sessions[i]) num_sessions++;
341
342 if (!num_sessions && !num_tunnels) return 0;
343
344 packet = calloc(IL * 4, 1);
345 *(int *)(packet + IL * 0) = num_tunnels;
346 *(int *)(packet + IL * 1) = num_sessions;
347 *(int *)(packet + IL * 2) = s->tunnel_len;
348 *(int *)(packet + IL * 3) = s->session_len;
349 cluster_send_message(s->slave_address, s->ip_address, C_HELLO_RESPONSE, packet, IL * 4);
350 free(packet);
351
352 // Send tunnels one-by-one, in order
353 log(0, "Sending %d tunnels of %d bytes each\n", num_tunnels, s->tunnel_len);
354 pktlen = s->tunnel_len + sizeof(int);
355 packet = malloc(pktlen);
356 for (i = 0; i < 1000; i++)
357 {
358 if (s->tunnels[i])
359 {
360 *(int *)packet = i;
361 memcpy((char *)(packet + sizeof(int)), s->tunnels[i], s->tunnel_len);
362 log(0, "Sending tunnel %d\n", i);
363 cluster_send_message(s->slave_address, s->ip_address, C_TUNNEL, packet, pktlen);
364 }
365 }
366 free(packet);
367
368 // Send sessions one-by-one, in order
369 log(0, "Sending %d sessions of %d bytes each\n", num_sessions, s->session_len);
370 pktlen = s->session_len + sizeof(int);
371 packet = malloc(pktlen);
372 for (i = 0; i < 13000; i++)
373 {
374 if (s->sessions[i])
375 {
376 *(int *)packet = i;
377 memcpy((char *)(packet + sizeof(int)), s->sessions[i], s->session_len);
378 log(0, "Sending session %d\n", i);
379 cluster_send_message(s->slave_address, s->ip_address, C_SESSION, packet, pktlen);
380 }
381 }
382 free(packet);
383
384 return 0;
385 }
386
387 slave *find_slave(uint32_t address)
388 {
389 slave *s;
390
391 ll_reset(slaves);
392 while ((s = ll_next(slaves)))
393 {
394 if (s->ip_address == address)
395 {
396 return s;
397 }
398 }
399 return NULL;
400 }
401
402 void _log(int level, const char *format, ...)
403 {
404 va_list ap;
405 if (debug < level) return;
406
407 va_start(ap, format);
408 vfprintf(stderr, format, ap);
409 }
410
411 void log_hex(int level, const char *title, const char *data, int maxsize)
412 {
413 unsigned int i, j;
414 unsigned const char *d = (unsigned const char *)data;
415
416 if (debug < level) return;
417 log(level, "%s (%d bytes):\n", title, maxsize);
418 setvbuf(stderr, NULL, _IOFBF, 16384);
419 for (i = 0; i < maxsize; )
420 {
421 fprintf(stderr, "%4X: ", i);
422 for (j = i; j < maxsize && j < (i + 16); j++)
423 {
424 fprintf(stderr, "%02X ", d[j]);
425 if (j == i + 7)
426 fputs(": ", stderr);
427 }
428
429 for (; j < i + 16; j++)
430 {
431 fputs(" ", stderr);
432 if (j == i + 7)
433 fputs(": ", stderr);
434 }
435
436 fputs(" ", stderr);
437 for (j = i; j < maxsize && j < (i + 16); j++)
438 {
439 if (d[j] >= 0x20 && d[j] < 0x7f && d[j] != 0x20)
440 fputc(d[j], stderr);
441 else
442 fputc('.', stderr);
443
444 if (j == i + 7)
445 fputs(" ", stderr);
446 }
447
448 i = j;
449 fputs("\n", stderr);
450 }
451 fflush(stderr);
452 setbuf(stderr, NULL);
453 }
454
455 int backup_up(slave *s)
456 {
457 log(2, "Becoming backup for \"%s\" (%s).\n", s->hostname, inet_toa(s->ip_address));
458 s->pid = fork();
459 if (!s->pid)
460 {
461 if (execl(L2TPNS, L2TPNS, "-a", inet_toa(s->ip_address), NULL) < 0)
462 log(0, "Error execing backup " L2TPNS ": %s\n", strerror(errno));
463 exit(0);
464 }
465 s->down = 1;
466 return 0;
467 }
468
469 int backup_down(slave *s)
470 {
471 log(2, "Not being backup for \"%s\" (%s) anymore.\n", s->hostname, inet_toa(s->ip_address));
472 s->down = 0;
473 if (s->pid) {
474 kill(s->pid, SIGTERM);
475 sleep(2);
476 kill(s->pid, SIGKILL);
477 }
478 return 0;
479 }
480