0cc43ee586dca0471617705dc47e78245a5e16c3
1 // L2TPNS Cluster Master
2 // $Id: cluster_master.c,v 1.1 2003-12-16 07:07:39 fred_nerk Exp $
5 #include <netinet/in.h>
13 #include <sys/socket.h>
14 #include <sys/types.h>
16 #include <sys/ioctl.h>
17 #include <arpa/inet.h>
24 #define L2TPNS BINDIR "/l2tpns"
29 unsigned long last_message
;
31 uint32_t slave_address
;
32 int remove_from_cluster
;
41 char *sessions
[13000];
44 uint32_t master_address
;
46 extern int cluster_sockfd
;
49 int processmsg(char *buf
, int l
, struct sockaddr_in
*src_addr
);
50 int handle_hello(char *buf
, int l
, struct sockaddr_in
*src_addr
, uint32_t addr
);
51 int handle_tunnel(char *buf
, int l
, uint32_t addr
);
52 int handle_session(char *buf
, int l
, uint32_t addr
);
53 int handle_ping(char *buf
, int l
, uint32_t addr
);
54 int backup_up(slave
*s
);
55 int backup_down(slave
*s
);
56 int return_state(slave
*s
);
57 slave
*find_slave(uint32_t address
);
59 void _log(int level
, const char *format
, ...);
60 void log_hex(int level
, const char *title
, const char *data
, int maxsize
);
62 /* Catch our forked processes exiting */
63 void sigchild_handler(int signal
)
69 /* TODO: catch errors and respawn? */
72 int main(int argc
, char *argv
[])
79 log(0, "Usage: %s <address>\n", argv
[0]);
83 master_address
= inet_addr(argv
[1]);
84 if (master_address
== INADDR_NONE
) {
85 log(0, "Invalid ip %s\n", argv
[1]);
89 cluster_init(master_address
, 1);
92 signal(SIGCHLD
, sigchild_handler
);
94 log(0, "Cluster Manager $Id: cluster_master.c,v 1.1 2003-12-16 07:07:39 fred_nerk Exp $ starting\n");
104 FD_SET(cluster_sockfd
, &r
);
105 n
= select(cluster_sockfd
+ 1, &r
, 0, 0, &to
);
117 struct sockaddr_in addr
;
118 int alen
= sizeof(addr
);
120 memset(buf
, 0, sizeof(buf
));
121 if (FD_ISSET(cluster_sockfd
, &r
))
122 processmsg(buf
, recvfrom(cluster_sockfd
, buf
, sizeof(buf
), MSG_WAITALL
, (void *) &addr
, &alen
), &addr
);
126 // Handle slaves timing out
128 time_t now
= time(NULL
);
130 while ((s
= ll_next(slaves
)))
132 if (s
->down
) continue;
133 if (s
->last_message
< (now
- TIMEOUT
))
135 log(4, "Slave \"%s\" s->last_message is %lu (timeout is %lu)\n", s
->hostname
, s
->last_message
, (now
- TIMEOUT
));
136 if (s
->remove_from_cluster
)
138 // Remove them from the cluster
139 ll_delete(slaves
, s
);
140 if (s
->hostname
) free(s
->hostname
);
157 int processmsg(char *buf
, int l
, struct sockaddr_in
*src_addr
)
162 log_hex(4, "Received", buf
, l
);
163 if (!buf
|| l
<= sizeof(uint32_t)) return 0;
165 addr
= ntohl(*(uint32_t*)buf
);
166 buf
+= sizeof(uint32_t);
167 l
-= sizeof(uint32_t);
169 mtype
= *buf
; buf
++; l
--;
174 handle_hello(buf
, l
, src_addr
, addr
);
177 if (!find_slave(addr
))
178 handle_hello(buf
, l
, src_addr
, addr
);
180 handle_ping(buf
, l
, addr
);
183 if (!find_slave(addr
)) handle_hello((char *)(buf
+ 1), *(char *)buf
, src_addr
, addr
);
184 handle_tunnel(buf
, l
, addr
);
187 if (!find_slave(addr
)) handle_hello((char *)(buf
+ 1), *(char *)buf
, src_addr
, addr
);
188 handle_session(buf
, l
, addr
);
194 int handle_hello(char *buf
, int l
, struct sockaddr_in
*src_addr
, uint32_t addr
)
199 hostname
= calloc(l
+ 1, 1);
200 memcpy(hostname
, buf
, l
);
202 // Is this a slave we have state information for?
203 if ((s
= find_slave(addr
)))
205 if (src_addr
->sin_addr
.s_addr
== master_address
)
207 log(1, "Got hello from \"%s\", local backup for %s.\n", hostname
, inet_toa(s
->ip_address
));
211 log(1, "Slave \"%s\" (for %s) has come back.\n", hostname
, inet_toa(s
->ip_address
));
216 log(1, "Slave \"%s\" said hello and we didn't know it was down.\n", s
->hostname
);
219 /* Reset the hostname if needed */
221 s
->hostname
= hostname
;
223 // No state information, it's a new slave
224 s
= calloc(sizeof(slave
), 1);
225 s
->ip_address
= addr
;
227 s
->hostname
= hostname
;
228 log(1, "New slave added to cluster \"%s\"\n", s
->hostname
);
231 s
->slave_address
= src_addr
->sin_addr
.s_addr
;
233 // Send state information back
236 s
->last_message
= time(NULL
);
241 int handle_tunnel(char *buf
, int l
, uint32_t addr
)
245 if (!(s
= find_slave(addr
)))
247 log(0, "handle_tunnel() called with no valid slave\n");
250 s
->last_message
= time(NULL
);
262 log(3, "Received tunnel %d from \"%s\" (%d bytes long)\n", tid
, s
->hostname
, l
);
264 // Allocate memory for it if it's not already
265 if (!s
->tunnels
[tid
])
267 s
->tunnels
[tid
] = malloc(l
);
272 memcpy(s
->tunnels
[tid
], buf
, l
);
276 int handle_session(char *buf
, int l
, uint32_t addr
)
280 char hostname
[4096] = {0};
281 if (!(s
= find_slave(addr
)))
283 log(0, "handle_session() called with no valid slave\n");
286 s
->last_message
= time(NULL
);
290 memcpy(hostname
, (char *)(buf
+ 1), sid
);
293 log(0, "Hostname is %s\n", hostname
);
300 log(3, "Received session %d from \"%s\" (%d bytes long)\n", sid
, s
->hostname
, l
);
302 // Allocate memory for it if it's not already
303 if (!s
->sessions
[sid
])
305 s
->sessions
[sid
] = malloc(l
);
310 memcpy(s
->sessions
[sid
], buf
, l
);
314 int handle_ping(char *buf
, int l
, uint32_t addr
)
317 if (!(s
= find_slave(addr
)))
319 log(0, "handle_ping() called with no valid slave\n");
322 s
->last_message
= time(NULL
);
327 int return_state(slave
*s
)
331 int num_tunnels
= 0, num_sessions
= 0;
334 log(3, "Sending state information to \"%s\"\n", s
->hostname
);
336 for (i
= 0; i
< 1000; i
++)
337 if (s
->tunnels
[i
]) num_tunnels
++;
339 for (i
= 0; i
< 13000; i
++)
340 if (s
->sessions
[i
]) num_sessions
++;
342 if (!num_sessions
&& !num_tunnels
) return 0;
344 packet
= calloc(IL
* 4, 1);
345 *(int *)(packet
+ IL
* 0) = num_tunnels
;
346 *(int *)(packet
+ IL
* 1) = num_sessions
;
347 *(int *)(packet
+ IL
* 2) = s
->tunnel_len
;
348 *(int *)(packet
+ IL
* 3) = s
->session_len
;
349 cluster_send_message(s
->slave_address
, s
->ip_address
, C_HELLO_RESPONSE
, packet
, IL
* 4);
352 // Send tunnels one-by-one, in order
353 log(0, "Sending %d tunnels of %d bytes each\n", num_tunnels
, s
->tunnel_len
);
354 pktlen
= s
->tunnel_len
+ sizeof(int);
355 packet
= malloc(pktlen
);
356 for (i
= 0; i
< 1000; i
++)
361 memcpy((char *)(packet
+ sizeof(int)), s
->tunnels
[i
], s
->tunnel_len
);
362 log(0, "Sending tunnel %d\n", i
);
363 cluster_send_message(s
->slave_address
, s
->ip_address
, C_TUNNEL
, packet
, pktlen
);
368 // Send sessions one-by-one, in order
369 log(0, "Sending %d sessions of %d bytes each\n", num_sessions
, s
->session_len
);
370 pktlen
= s
->session_len
+ sizeof(int);
371 packet
= malloc(pktlen
);
372 for (i
= 0; i
< 13000; i
++)
377 memcpy((char *)(packet
+ sizeof(int)), s
->sessions
[i
], s
->session_len
);
378 log(0, "Sending session %d\n", i
);
379 cluster_send_message(s
->slave_address
, s
->ip_address
, C_SESSION
, packet
, pktlen
);
387 slave
*find_slave(uint32_t address
)
392 while ((s
= ll_next(slaves
)))
394 if (s
->ip_address
== address
)
402 void _log(int level
, const char *format
, ...)
405 if (debug
< level
) return;
407 va_start(ap
, format
);
408 vfprintf(stderr
, format
, ap
);
411 void log_hex(int level
, const char *title
, const char *data
, int maxsize
)
414 unsigned const char *d
= (unsigned const char *)data
;
416 if (debug
< level
) return;
417 log(level
, "%s (%d bytes):\n", title
, maxsize
);
418 setvbuf(stderr
, NULL
, _IOFBF
, 16384);
419 for (i
= 0; i
< maxsize
; )
421 fprintf(stderr
, "%4X: ", i
);
422 for (j
= i
; j
< maxsize
&& j
< (i
+ 16); j
++)
424 fprintf(stderr
, "%02X ", d
[j
]);
429 for (; j
< i
+ 16; j
++)
437 for (j
= i
; j
< maxsize
&& j
< (i
+ 16); j
++)
439 if (d
[j
] >= 0x20 && d
[j
] < 0x7f && d
[j
] != 0x20)
452 setbuf(stderr
, NULL
);
455 int backup_up(slave
*s
)
457 log(2, "Becoming backup for \"%s\" (%s).\n", s
->hostname
, inet_toa(s
->ip_address
));
461 if (execl(L2TPNS
, L2TPNS
, "-a", inet_toa(s
->ip_address
), NULL
) < 0)
462 log(0, "Error execing backup " L2TPNS
": %s\n", strerror(errno
));
469 int backup_down(slave
*s
)
471 log(2, "Not being backup for \"%s\" (%s) anymore.\n", s
->hostname
, inet_toa(s
->ip_address
));
474 kill(s
->pid
, SIGTERM
);
476 kill(s
->pid
, SIGKILL
);