1 // L2TPNS Cluster Master
2 // $Id: cluster_master.c,v 1.3 2004-05-24 04:12:34 fred_nerk Exp $
5 #include <netinet/in.h>
13 #include <sys/socket.h>
14 #include <sys/types.h>
16 #include <sys/ioctl.h>
17 #include <arpa/inet.h>
24 #define L2TPNS BINDIR "/l2tpns"
29 unsigned long last_message
;
31 uint32_t slave_address
;
32 int remove_from_cluster
;
41 char *sessions
[13000];
44 uint32_t master_address
;
46 extern int cluster_sockfd
;
49 int processmsg(char *buf
, int l
, struct sockaddr_in
*src_addr
);
50 int handle_hello(char *buf
, int l
, struct sockaddr_in
*src_addr
, uint32_t addr
);
51 int handle_tunnel(char *buf
, int l
, uint32_t addr
);
52 int handle_session(char *buf
, int l
, uint32_t addr
);
53 int handle_ping(char *buf
, int l
, uint32_t addr
);
54 int handle_goodbye(char *buf
, int l
, uint32_t addr
);
55 int backup_up(slave
*s
);
56 int backup_down(slave
*s
);
57 int return_state(slave
*s
);
58 slave
*find_slave(uint32_t address
);
60 void _log(int level
, const char *format
, ...) __attribute__((format (printf
, 2, 3)));
61 void log_hex(int level
, const char *title
, const char *data
, int maxsize
);
63 /* Catch our forked processes exiting */
64 void sigchild_handler(int signal
)
70 /* TODO: catch errors and respawn? */
73 int main(int argc
, char *argv
[])
80 log(0, "Usage: %s <address>\n", argv
[0]);
84 master_address
= inet_addr(argv
[1]);
85 if (master_address
== INADDR_NONE
) {
86 log(0, "Invalid ip %s\n", argv
[1]);
90 cluster_init(master_address
, 1);
93 signal(SIGCHLD
, sigchild_handler
);
95 log(0, "Cluster Manager $Id: cluster_master.c,v 1.3 2004-05-24 04:12:34 fred_nerk Exp $ starting\n");
105 FD_SET(cluster_sockfd
, &r
);
106 n
= select(cluster_sockfd
+ 1, &r
, 0, 0, &to
);
118 struct sockaddr_in addr
;
119 int alen
= sizeof(addr
);
121 memset(buf
, 0, sizeof(buf
));
122 if (FD_ISSET(cluster_sockfd
, &r
))
123 processmsg(buf
, recvfrom(cluster_sockfd
, buf
, sizeof(buf
), MSG_WAITALL
, (void *) &addr
, &alen
), &addr
);
127 // Handle slaves timing out
129 time_t now
= time(NULL
);
131 while ((s
= ll_next(slaves
)))
133 if (s
->down
) continue;
134 if (s
->last_message
< (now
- TIMEOUT
))
136 log(4, "Slave \"%s\" s->last_message is %lu (timeout is %lu)\n", s
->hostname
, s
->last_message
, (now
- TIMEOUT
));
137 if (s
->remove_from_cluster
)
139 // Remove them from the cluster
140 ll_delete(slaves
, s
);
141 if (s
->hostname
) free(s
->hostname
);
158 int processmsg(char *buf
, int l
, struct sockaddr_in
*src_addr
)
164 log_hex(4, "Received", buf
, l
);
165 if (!buf
|| l
<= sizeof(uint32_t)) return 0;
167 addr
= ntohl(*(uint32_t*)buf
);
168 buf
+= sizeof(uint32_t);
169 l
-= sizeof(uint32_t);
171 mtype
= *buf
; buf
++; l
--;
173 if (mtype
!= C_GOODBYE
&& (s
= find_slave(addr
)) && s
->down
)
176 hostname
= calloc(l
+ 1, 1);
177 memcpy(hostname
, buf
, l
);
178 log(1, "Slave \"%s\" (for %s) has come back.\n", hostname
, inet_toa(s
->ip_address
));
186 handle_hello(buf
, l
, src_addr
, addr
);
189 if (!find_slave(addr
))
190 handle_hello(buf
, l
, src_addr
, addr
);
192 handle_ping(buf
, l
, addr
);
195 if (!find_slave(addr
)) handle_hello((char *)(buf
+ 1), *(char *)buf
, src_addr
, addr
);
196 handle_tunnel(buf
, l
, addr
);
199 if (!find_slave(addr
)) handle_hello((char *)(buf
+ 1), *(char *)buf
, src_addr
, addr
);
200 handle_session(buf
, l
, addr
);
203 if (!find_slave(addr
)) break;
204 handle_goodbye(buf
, l
, addr
);
210 int handle_hello(char *buf
, int l
, struct sockaddr_in
*src_addr
, uint32_t addr
)
215 hostname
= calloc(l
+ 1, 1);
216 memcpy(hostname
, buf
, l
);
218 // Is this a slave we have state information for?
219 if ((s
= find_slave(addr
)))
221 if (src_addr
->sin_addr
.s_addr
== master_address
)
223 log(1, "Got hello from \"%s\", local backup for %s.\n", hostname
, inet_toa(s
->ip_address
));
227 log(1, "Slave \"%s\" (for %s) has come back.\n", hostname
, inet_toa(s
->ip_address
));
232 log(1, "Slave \"%s\" said hello and we didn't know it was down.\n", s
->hostname
);
235 /* Reset the hostname if needed */
237 s
->hostname
= hostname
;
239 // No state information, it's a new slave
240 s
= calloc(sizeof(slave
), 1);
241 s
->ip_address
= addr
;
243 s
->hostname
= hostname
;
244 log(1, "New slave added to cluster \"%s\"\n", s
->hostname
);
247 s
->slave_address
= src_addr
->sin_addr
.s_addr
;
249 // Send state information back
252 s
->last_message
= time(NULL
);
257 int handle_tunnel(char *buf
, int l
, uint32_t addr
)
261 if (!(s
= find_slave(addr
)))
263 log(0, "handle_tunnel() called with no valid slave\n");
266 s
->last_message
= time(NULL
);
278 log(3, "Received tunnel %d from \"%s\" (%d bytes long)\n", tid
, s
->hostname
, l
);
280 // Allocate memory for it if it's not already
281 if (!s
->tunnels
[tid
])
283 s
->tunnels
[tid
] = malloc(l
);
288 memcpy(s
->tunnels
[tid
], buf
, l
);
292 int handle_session(char *buf
, int l
, uint32_t addr
)
296 char hostname
[4096] = {0};
297 if (!(s
= find_slave(addr
)))
299 log(0, "handle_session() called with no valid slave\n");
302 s
->last_message
= time(NULL
);
306 memcpy(hostname
, (char *)(buf
+ 1), sid
);
309 log(0, "Hostname is %s\n", hostname
);
316 log(3, "Received session %d from \"%s\" (%d bytes long)\n", sid
, s
->hostname
, l
);
318 // Allocate memory for it if it's not already
319 if (!s
->sessions
[sid
])
321 s
->sessions
[sid
] = malloc(l
);
326 memcpy(s
->sessions
[sid
], buf
, l
);
330 int handle_ping(char *buf
, int l
, uint32_t addr
)
333 if (!(s
= find_slave(addr
)))
335 log(0, "handle_ping() called with no valid slave\n");
338 s
->last_message
= time(NULL
);
343 int return_state(slave
*s
)
347 int num_tunnels
= 0, num_sessions
= 0;
350 log(3, "Sending state information to \"%s\"\n", s
->hostname
);
352 for (i
= 0; i
< 1000; i
++)
353 if (s
->tunnels
[i
]) num_tunnels
++;
355 for (i
= 0; i
< 13000; i
++)
356 if (s
->sessions
[i
]) num_sessions
++;
358 if (!num_sessions
&& !num_tunnels
) return 0;
360 packet
= calloc(IL
* 4, 1);
361 *(int *)(packet
+ IL
* 0) = num_tunnels
;
362 *(int *)(packet
+ IL
* 1) = num_sessions
;
363 *(int *)(packet
+ IL
* 2) = s
->tunnel_len
;
364 *(int *)(packet
+ IL
* 3) = s
->session_len
;
365 cluster_send_message(s
->slave_address
, s
->ip_address
, C_HELLO_RESPONSE
, packet
, IL
* 4);
368 // Send tunnels one-by-one, in order
369 log(0, "Sending %d tunnels of %d bytes each\n", num_tunnels
, s
->tunnel_len
);
370 pktlen
= s
->tunnel_len
+ sizeof(int);
371 packet
= malloc(pktlen
);
372 for (i
= 0; i
< 1000; i
++)
377 memcpy((char *)(packet
+ sizeof(int)), s
->tunnels
[i
], s
->tunnel_len
);
378 log(0, "Sending tunnel %d\n", i
);
379 cluster_send_message(s
->slave_address
, s
->ip_address
, C_TUNNEL
, packet
, pktlen
);
384 // Send sessions one-by-one, in order
385 log(0, "Sending %d sessions of %d bytes each\n", num_sessions
, s
->session_len
);
386 pktlen
= s
->session_len
+ sizeof(int);
387 packet
= malloc(pktlen
);
388 for (i
= 0; i
< 13000; i
++)
393 memcpy((char *)(packet
+ sizeof(int)), s
->sessions
[i
], s
->session_len
);
394 log(0, "Sending session %d\n", i
);
395 cluster_send_message(s
->slave_address
, s
->ip_address
, C_SESSION
, packet
, pktlen
);
403 slave
*find_slave(uint32_t address
)
408 while ((s
= ll_next(slaves
)))
410 if (s
->ip_address
== address
)
418 void _log(int level
, const char *format
, ...)
421 if (debug
< level
) return;
423 va_start(ap
, format
);
424 vfprintf(stderr
, format
, ap
);
427 void log_hex(int level
, const char *title
, const char *data
, int maxsize
)
430 unsigned const char *d
= (unsigned const char *)data
;
432 if (debug
< level
) return;
433 log(level
, "%s (%d bytes):\n", title
, maxsize
);
434 setvbuf(stderr
, NULL
, _IOFBF
, 16384);
435 for (i
= 0; i
< maxsize
; )
437 fprintf(stderr
, "%4X: ", i
);
438 for (j
= i
; j
< maxsize
&& j
< (i
+ 16); j
++)
440 fprintf(stderr
, "%02X ", d
[j
]);
445 for (; j
< i
+ 16; j
++)
453 for (j
= i
; j
< maxsize
&& j
< (i
+ 16); j
++)
455 if (d
[j
] >= 0x20 && d
[j
] < 0x7f && d
[j
] != 0x20)
468 setbuf(stderr
, NULL
);
471 int backup_up(slave
*s
)
473 log(2, "Becoming backup for \"%s\" (%s).\n", s
->hostname
, inet_toa(s
->ip_address
));
477 if (execl(L2TPNS
, L2TPNS
, "-a", inet_toa(s
->ip_address
), NULL
) < 0)
478 log(0, "Error execing backup " L2TPNS
": %s\n", strerror(errno
));
485 int backup_down(slave
*s
)
487 log(2, "Not being backup for \"%s\" (%s) anymore.\n", s
->hostname
, inet_toa(s
->ip_address
));
490 kill(s
->pid
, SIGTERM
);
492 kill(s
->pid
, SIGKILL
);
497 int handle_goodbye(char *buf
, int l
, uint32_t addr
)
502 // Is this a slave we have state information for?
503 if ((s
= find_slave(addr
)))
505 log(0, "Received goodbye for slave %s\n", s
->hostname
);
506 ll_delete(slaves
, s
);
507 for (i
= 0; i
< s
->num_tunnels
; i
++)
508 if (s
->tunnels
[i
]) free(s
->tunnels
[i
]);
509 for (i
= 0; i
< s
->num_sessions
; i
++)
510 if (s
->sessions
[i
]) free(s
->sessions
[i
]);
511 if (s
->hostname
) free(s
->hostname
);