6f4695d7e9bf159db0b4ee7b0bbff480182a34a9
[l2tpns.git] / cluster.c
1 // L2TPNS Clustering Stuff
2 // $Id: cluster.c,v 1.3 2004-06-23 03:52:24 fred_nerk Exp $
3
4 #include <stdio.h>
5 #include <sys/file.h>
6 #include <sys/stat.h>
7 #include <sys/socket.h>
8 #include <netinet/in.h>
9 #include <arpa/inet.h>
10 #include <sys/ioctl.h>
11 #include <net/if.h>
12 #include <string.h>
13 #include <malloc.h>
14 #include <errno.h>
15 #include <stdlib.h>
16 #include <stdarg.h>
17 #include <unistd.h>
18 #include <stdio.h>
19 #include <libcli.h>
20
21 #include "l2tpns.h"
22 #include "cluster.h"
23 #include "util.h"
24 #include "tbf.h"
25
26 #ifdef BGP
27 #include "bgp.h"
28 #endif
29 /*
30 * All cluster packets have the same format.
31 *
32 * One or more instances of
33 * a 32 bit 'type' id.
34 * a 32 bit 'extra' data dependant on the 'type'.
35 * zero or more bytes of structure data, dependant on the type.
36 *
37 */
38
39 // Module variables.
40 int cluster_sockfd = 0; // The filedescriptor for the cluster communications port.
41
42 ipt my_address = 0; // The network address of my ethernet port.
43 static int walk_session_number = 0; // The next session to send when doing the slow table walk.
44 static int walk_tunnel_number = 0; // The next tunnel to send when doing the slow table walk.
45
46 static int hsess, fsess; // Saved copies of the highest used session id, and the first free one.
47
48 #define MAX_HEART_SIZE (8192) // Maximum size of heartbeat packet. Must be less than max IP packet size :)
49 #define MAX_CHANGES (MAX_HEART_SIZE/(sizeof(sessiont) + sizeof(int) ) - 2) // Assumes a session is the biggest type!
50
51 static struct {
52 int type;
53 int id;
54 } cluster_changes[MAX_CHANGES]; // Queue of changed structures that need to go out when next heartbeat.
55
56 static struct {
57 int seq;
58 int size;
59 char data[MAX_HEART_SIZE];
60 } past_hearts[HB_HISTORY_SIZE]; // Ring buffer of heartbeats that we've recently sent out. Needed so
61 // we can re-transmit if needed.
62
63 static struct {
64 u32 peer;
65 time_t basetime;
66 clockt timestamp;
67 int uptodate;
68 } peers[CLUSTER_MAX_SIZE]; // List of all the peers we've heard from.
69 static int num_peers; // Number of peers in list.
70 static int have_peers; // At least one peer
71
72 int rle_decompress(u8 ** src_p, int ssize, u8 *dst, int dsize);
73 int rle_compress(u8 ** src_p, int ssize, u8 *dst, int dsize);
74
75 //
76 // Create a listening socket
77 //
78 // This joins the cluster multi-cast group.
79 //
80 int cluster_init()
81 {
82 struct sockaddr_in addr;
83 struct sockaddr_in interface_addr;
84 struct ip_mreq mreq;
85 struct ifreq ifr;
86 int opt = 0;
87
88 config->cluster_undefined_sessions = MAXSESSION-1;
89 config->cluster_undefined_tunnels = MAXTUNNEL-1;
90
91 if (!config->cluster_address)
92 return 0;
93 if (!*config->cluster_interface)
94 return 0;
95
96 cluster_sockfd = socket(AF_INET, SOCK_DGRAM, UDP);
97
98 memset(&addr, 0, sizeof(addr));
99 addr.sin_family = AF_INET;
100 addr.sin_port = htons(CLUSTERPORT);
101 addr.sin_addr.s_addr = INADDR_ANY;
102 setsockopt(cluster_sockfd, SOL_SOCKET, SO_REUSEADDR, &addr, sizeof(addr));
103
104 if (bind(cluster_sockfd, (void *) &addr, sizeof(addr)) < 0)
105 {
106 log(0, 0, 0, 0, "Failed to bind cluster socket: %s\n", strerror(errno));
107 return -1;
108 }
109
110 strcpy(ifr.ifr_name, config->cluster_interface);
111 if (ioctl(cluster_sockfd, SIOCGIFADDR, &ifr) < 0) {
112 log(0, 0, 0, 0, "Failed to get interface address for (%s): %s\n", config->cluster_interface, strerror(errno));
113 return -1;
114 }
115
116 memcpy(&interface_addr, &ifr.ifr_addr, sizeof(interface_addr) );
117 my_address = interface_addr.sin_addr.s_addr;
118
119 // Join multicast group.
120 mreq.imr_multiaddr.s_addr = config->cluster_address;
121 mreq.imr_interface = interface_addr.sin_addr;
122
123
124 opt = 0; // Turn off multicast loopback.
125 setsockopt(cluster_sockfd, IPPROTO_IP, IP_MULTICAST_LOOP, &opt, sizeof(opt));
126
127 if (setsockopt(cluster_sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
128 log(0, 0, 0, 0, "Failed to setsockopt (join mcast group): %s\n", strerror(errno));
129 return -1;
130 }
131
132 if (setsockopt (cluster_sockfd, IPPROTO_IP, IP_MULTICAST_IF, &interface_addr, sizeof(interface_addr)) < 0) {
133 log(0, 0, 0, 0, "Failed to setsockopt (set mcast interface): %s\n", strerror(errno));
134 return -1;
135 }
136
137 config->cluster_last_hb = config->current_time;
138 config->cluster_seq_number = -1;
139
140 return cluster_sockfd;
141 }
142
143
144 //
145 // Send a chunk of data to the entire cluster (usually via the multicast
146 // address ).
147 //
148
149 int cluster_send_data(void *data, int datalen)
150 {
151 struct sockaddr_in addr = {0};
152
153 if (!cluster_sockfd) return -1;
154 if (!config->cluster_address) return 0;
155
156 addr.sin_addr.s_addr = config->cluster_address;
157 addr.sin_port = htons(CLUSTERPORT);
158 addr.sin_family = AF_INET;
159
160 // log_hex(4, "Cluster send", data, datalen); // VERY big data packets. How about we don't..
161
162 log(5,0,0,0, "Cluster send data: %d bytes\n", datalen);
163
164 if (sendto(cluster_sockfd, data, datalen, MSG_NOSIGNAL, (void *) &addr, sizeof(addr)) < 0)
165 {
166 log(0, 0, 0, 0, "sendto: %s\n", strerror(errno));
167 return -1;
168 }
169
170 return 0;
171 }
172
173 //
174 // Add a chunk of data to a heartbeat packet.
175 // Maintains the format. Assumes that the caller
176 // has passed in a big enough buffer!
177 //
178 static void add_type(char ** p, int type, int more, char * data, int size)
179 {
180 * ( (u32*)(*p) ) = type;
181 *p += sizeof(u32);
182
183 * ( (u32*)(*p) ) = more;
184 *p += sizeof(u32);
185
186 if (data && size > 0) {
187 memcpy(*p, data, size);
188 (*p) += size;
189 }
190 }
191
192 void cluster_uptodate(void)
193 {
194 if (config->cluster_iam_uptodate)
195 return;
196
197 if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels)
198 return;
199
200 config->cluster_iam_uptodate = 1;
201
202 log(0,0,0,0, "Now uptodate with master.\n");
203
204 // If we're not a master, or if we have no slaves
205 // then start taking traffic..
206 if (!config->cluster_iam_master || !have_peers)
207 {
208 #ifdef BGP
209 if (bgp_configured)
210 bgp_enable_routing(1);
211 else
212 #endif /* BGP */
213 if (config->send_garp)
214 send_garp(config->bind_address); // Start taking traffic.
215 }
216 }
217
218 //
219 // Send a unicast UDP packet to a peer with 'data' as the
220 // contents.
221 //
222 int peer_send_data(u32 peer, char * data, int size)
223 {
224 struct sockaddr_in addr = {0};
225
226 if (!cluster_sockfd) return -1;
227 if (!config->cluster_address) return 0;
228
229 if (!peer) // Odd??
230 return -1;
231
232 addr.sin_addr.s_addr = peer;
233 addr.sin_port = htons(CLUSTERPORT);
234 addr.sin_family = AF_INET;
235
236 log_hex(5, "Peer send", data, size);
237
238 if (sendto(cluster_sockfd, data, size, MSG_NOSIGNAL, (void *) &addr, sizeof(addr)) < 0)
239 {
240 log(0, 0, 0, 0, "sendto: %s\n", strerror(errno));
241 return -1;
242 }
243
244 return 0;
245 }
246
247 //
248 // Send a structured message to a peer with a single element of type 'type'.
249 //
250 int peer_send_message(u32 peer, int type, int more, char * data, int size)
251 {
252 char buf[65536]; // Vast overkill.
253 char * p = buf;
254
255 log(4,0,0,0, "Sending message to peer (type %d, more %d, size %d)\n", type, more, size);
256 add_type(&p, type, more, data, size);
257
258 return peer_send_data(peer, buf, (p-buf) );
259 }
260
261 //
262 // Forward a state changing packet to the master.
263 //
264 // The master just processes the payload as if it had
265 // received it off the tap device.
266 //
267 int master_forward_packet(char * data, int size, u32 addr, int port)
268 {
269 char buf[65536]; // Vast overkill.
270 char * p = buf;
271
272 if (!config->cluster_master_address) // No election has been held yet. Just skip it.
273 return -1;
274
275 log(4,0,0,0, "Forwarding packet from %s to master (size %d)\n", inet_toa(addr), size);
276
277 STAT(c_forwarded);
278 add_type(&p, C_FORWARD, addr, (char*) &port, sizeof(port) );
279 memcpy(p, data, size);
280 p += size;
281
282 return peer_send_data(config->cluster_master_address, buf, (p-buf) );
283
284 }
285
286 //
287 // Forward a throttled packet to the master for handling.
288 //
289 // The master just drops the packet into the appropriate
290 // token bucket queue, and lets normal processing take care
291 // of it.
292 //
293 int master_throttle_packet(int tbfid, char * data, int size)
294 {
295 char buf[65536]; // Vast overkill.
296 char * p = buf;
297
298 if (!config->cluster_master_address) // No election has been held yet. Just skip it.
299 return -1;
300
301 log(4,0,0,0, "Throttling packet master (size %d, tbfid %d)\n", size, tbfid);
302
303 add_type(&p, C_THROTTLE, tbfid, data, size);
304
305 return peer_send_data(config->cluster_master_address, buf, (p-buf) );
306
307 }
308
309 //
310 // Forward a walled garden packet to the master for handling.
311 //
312 // The master just writes the packet straight to the tun
313 // device (where is will normally loop through the
314 // firewall rules, and come back in on the tun device)
315 //
316 // (Note that this must be called with the tun header
317 // as the start of the data).
318 int master_garden_packet(sessionidt s, char *data, int size)
319 {
320 char buf[65536]; // Vast overkill.
321 char *p = buf;
322
323 if (!config->cluster_master_address) // No election has been held yet. Just skip it.
324 return -1;
325
326 log(4,0,0,0, "Walled garden packet to master (size %d)\n", size);
327
328 add_type(&p, C_GARDEN, s, data, size);
329
330 return peer_send_data(config->cluster_master_address, buf, (p-buf));
331
332 }
333
334 //
335 // Send a chunk of data as a heartbeat..
336 // We save it in the history buffer as we do so.
337 //
338 static void send_heartbeat(int seq, char * data, int size)
339 {
340 int i;
341
342 if (size > sizeof(past_hearts[0].data)) {
343 log(0,0,0,0, "Tried to heartbeat something larger than the maximum packet!\n");
344 kill(0, SIGTERM);
345 }
346 i = seq % HB_HISTORY_SIZE;
347 past_hearts[i].seq = seq;
348 past_hearts[i].size = size;
349 memcpy(&past_hearts[i].data, data, size); // Save it.
350 cluster_send_data(data, size);
351 }
352
353 //
354 // Send an 'i am alive' message to every machine in the cluster.
355 //
356 void cluster_send_ping(time_t basetime)
357 {
358 char buff[100 + sizeof(pingt)];
359 char *p = buff;
360 pingt x;
361
362 if (config->cluster_iam_master && basetime) // We're heartbeating so no need to ping.
363 return;
364
365 log(5,0,0,0, "Sending cluster ping...\n");
366
367 x.ver = 1;
368 x.addr = config->bind_address;
369 x.undef = config->cluster_undefined_sessions + config->cluster_undefined_tunnels;
370 x.basetime = basetime;
371
372 add_type(&p, C_PING, basetime, (char *) &x, sizeof(x));
373 cluster_send_data(buff, (p-buff) );
374 }
375
376 //
377 // Walk the session counters looking for non-zero ones to send
378 // to the master. We send up to 100 of them at one time.
379 // We examine a maximum of 2000 sessions.
380 // (50k max session should mean that we normally
381 // examine the entire session table every 25 seconds).
382
383 #define MAX_B_RECS (400)
384 void master_update_counts(void)
385 {
386 int i, c;
387 bytest b[MAX_B_RECS+1];
388
389 if (config->cluster_iam_master) // Only happens on the slaves.
390 return;
391
392 if (!config->cluster_master_address) // If we don't have a master, skip it for a while.
393 return;
394
395 i = MAX_B_RECS * 5; // Examine max 2000 sessions;
396 if (config->cluster_highest_sessionid > i)
397 i = config->cluster_highest_sessionid;
398
399 for ( c = 0; i > 0 ; --i) {
400 // Next session to look at.
401 walk_session_number++;
402 if ( walk_session_number > config->cluster_highest_sessionid)
403 walk_session_number = 1;
404
405 if (!sess_count[walk_session_number].cin && !sess_count[walk_session_number].cout)
406 continue; // Unused. Skip it.
407
408 b[c].sid = walk_session_number;
409 b[c].in = sess_count[walk_session_number].cin;
410 b[c].out = sess_count[walk_session_number].cout;
411
412 if (++c > MAX_B_RECS) // Send a max of 400 elements in a packet.
413 break;
414
415 // Reset counters.
416 sess_count[walk_session_number].cin = sess_count[walk_session_number].cout = 0;
417 }
418
419 if (!c) // Didn't find any that changes. Get out of here!
420 return;
421
422
423 // Forward the data to the master.
424 log(4,0,0,0, "Sending byte counters to master (%d elements)\n", c);
425 peer_send_message(config->cluster_master_address, C_BYTES, c, (char*) &b, sizeof(b[0]) * c);
426 return;
427 }
428
429 //
430 // Check that we have a master. If it's been too
431 // long since we heard from a master then hold an election.
432 //
433 void cluster_check_master(void)
434 {
435 int i, count, tcount, high_sid = 0;
436 int last_free = 0;
437 int had_peers = have_peers;
438 clockt t = config->current_time;
439
440 if (config->current_time < (config->cluster_last_hb + HB_TIMEOUT) )
441 return; // Everything's ok. return.
442
443 if (!config->cluster_iam_master)
444 log(0,0,0,0, "Master timed out! Holding election...\n");
445
446 config->cluster_last_hb = config->current_time + 1;
447
448 for (i = have_peers = 0; i < num_peers ; ++i) {
449 if ((peers[i].timestamp + HB_TIMEOUT) < t)
450 continue; // Stale peer! Skip them.
451
452 if (!peers[i].basetime)
453 continue; // Shutdown peer! Skip them.
454
455 have_peers = 1;
456 if (peers[i].basetime < basetime) {
457 log(1,0,0,0, "Expecting %s to become master\n", inet_toa(peers[i].peer) );
458 return; // They'll win the election. Get out of here.
459 }
460
461 if (peers[i].basetime == basetime &&
462 peers[i].peer > my_address) {
463 log(1,0,0,0, "Expecting %s to become master\n", inet_toa(peers[i].peer) );
464 return; // They'll win the election. Wait for them to come up.
465 }
466 }
467
468 if (config->cluster_iam_master) // If we're the master, we've already won
469 {
470 #ifdef BGP
471 // master lost all slaves, need to handle traffic ourself
472 if (bgp_configured && had_peers && !have_peers)
473 bgp_enable_routing(1);
474 #endif /* BGP */
475 return;
476 }
477
478 // Wow. it's been ages since I last heard a heartbeat
479 // and I'm better than an of my peers so it's time
480 // to become a master!!!
481
482 config->cluster_iam_master = 1;
483 config->cluster_master_address = 0;
484
485 log(0,0,0,0, "I am declaring myself the master!\n");
486
487 #ifdef BGP
488 if (bgp_configured && have_peers)
489 bgp_enable_routing(0); /* stop handling traffic */
490 #endif /* BGP */
491
492 if (config->cluster_seq_number == -1)
493 config->cluster_seq_number = 0;
494
495 //
496 // Go through and mark all the tunnels as defined.
497 // Count the highest used tunnel number as well.
498 //
499 config->cluster_highest_tunnelid = 0;
500 for (i = 0, tcount = 0; i < MAXTUNNEL; ++i) {
501 if (tunnel[i].state == TUNNELUNDEF)
502 tunnel[i].state = TUNNELFREE;
503
504 if (tunnel[i].state != TUNNELFREE && i > config->cluster_highest_tunnelid)
505 config->cluster_highest_tunnelid = i;
506 }
507
508 //
509 // Go through and mark all the sessions as being defined.
510 // reset the idle timeouts.
511 // add temporary byte counters to permanent ones.
512 // Re-string the free list.
513 // Find the ID of the highest session.
514 last_free = 0;
515 high_sid = 0;
516 config->cluster_highest_sessionid = 0;
517 for (i = 0, count = 0; i < MAXSESSION; ++i) {
518 if (session[i].tunnel == T_UNDEF) {
519 session[i].tunnel = T_FREE;
520 ++count;
521 }
522
523 if (session[i].tunnel == T_FREE) { // Unused session. Add to free list.
524 session[last_free].next = i;
525 session[i].next = 0;
526 last_free = i;
527 }
528
529 // Reset all the idle timeouts..
530 session[i].last_packet = time_now;
531
532 // Accumulate un-sent byte counters.
533 session[i].cin += sess_count[i].cin;
534 session[i].cout += sess_count[i].cout;
535 session[i].total_cin += sess_count[i].cin;
536 session[i].total_cout += sess_count[i].cout;
537
538 sess_count[i].cin = sess_count[i].cout = 0;
539
540 session[i].radius = 0; // Reset authentication as the radius blocks aren't up to date.
541
542 if (session[i].sid >= high_sid) // This is different to the index into the session table!!!
543 high_sid = session[i].sid+1;
544
545
546 session[i].tbf_in = session[i].tbf_out = 0; // Remove stale pointers from old master.
547 throttle_session(i, session[i].throttle);
548
549 // I'm unsure about this. --mo
550 // It's potentially a good thing, but it could send a
551 // LOT of packets.
552 // if (session[i].throttle)
553 // cluster_send_session(s); // Tell the slaves about the new tbf indexes.
554
555 if (session[i].tunnel != T_FREE && i > config->cluster_highest_sessionid)
556 config->cluster_highest_sessionid = i;
557
558 }
559
560 session[last_free].next = 0; // End of chain.
561 last_sid = high_sid; // Keep track of the highest used session ID.
562
563 become_master();
564
565 rebuild_address_pool();
566
567 // If we're not the very first master, this is a big issue!
568 if(count>0)
569 log(0,0,0,0, "Warning: Fixed %d uninitialized sessions in becoming master!\n", count);
570
571 config->cluster_undefined_sessions = 0;
572 config->cluster_undefined_tunnels = 0;
573
574 //
575 // FIXME. We need to fix up the tunnel control message
576 // queue here! There's a number of other variables we
577 // should also update.
578 cluster_uptodate();
579 }
580
581
582 //
583 // Check that our session table is validly matching what the
584 // master has in mind.
585 //
586 // In particular, if we have too many sessions marked 'undefined'
587 // we fix it up here, and we ensure that the 'first free session'
588 // pointer is valid.
589 //
590 static void cluster_check_sessions(int highsession, int freesession_ptr, int hightunnel)
591 {
592 int i;
593
594 sessionfree = freesession_ptr; // Keep the freesession ptr valid.
595
596 if (config->cluster_iam_uptodate)
597 return;
598
599 if (highsession > config->cluster_undefined_sessions && hightunnel > config->cluster_undefined_tunnels)
600 return;
601
602 // Clear out defined sessions, counting the number of
603 // undefs remaining.
604 config->cluster_undefined_sessions = 0;
605 for (i = 1 ; i < MAXSESSION; ++i) {
606 if (i > highsession) {
607 session[i].tunnel = 0; // Defined.
608 continue;
609 }
610 if (session[i].tunnel != T_UNDEF)
611 continue;
612 ++config->cluster_undefined_sessions;
613 }
614
615 // Clear out defined tunnels, counting the number of
616 // undefs remaining.
617 config->cluster_undefined_tunnels = 0;
618 for (i = 1 ; i < MAXTUNNEL; ++i) {
619 if (i > hightunnel) {
620 tunnel[i].state = TUNNELFREE; // Defined.
621 continue;
622 }
623 if (tunnel[i].state != TUNNELUNDEF)
624 continue;
625 ++config->cluster_undefined_tunnels;
626 }
627
628
629 if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels) {
630 log(2,0,0,0, "Cleared undefined sessions/tunnels. %d sess (high %d), %d tunn (high %d)\n",
631 config->cluster_undefined_sessions, highsession, config->cluster_undefined_tunnels, hightunnel);
632 return;
633 }
634
635 // Are we up to date?
636
637 if (!config->cluster_iam_uptodate)
638 cluster_uptodate();
639 }
640
641 int hb_add_type(char **p, int type, int id)
642 {
643 switch (type) {
644 case C_CSESSION: { // Compressed C_SESSION.
645 u8 c[sizeof(sessiont) * 2]; // Bigger than worst case.
646 u8 *d = (u8 *) &session[id];
647 u8 *orig = d;
648 int size;
649
650 size = rle_compress( &d, sizeof(sessiont), c, sizeof(c) );
651
652 // Did we compress the full structure, and is the size actually
653 // reduced??
654 if ( (d - orig) == sizeof(sessiont) && size < sizeof(sessiont) ) {
655 add_type(p, C_CSESSION, id, (char*) c, size);
656 break;
657 }
658 // Failed to compress : Fall through.
659 }
660 case C_SESSION: add_type(p, C_SESSION, id,
661 (char*) &session[id], sizeof(sessiont));
662 break;
663
664 case C_CTUNNEL: { // Compressed C_TUNNEL
665 u8 c[sizeof(tunnelt) * 2]; // Bigger than worst case.
666 u8 *d = (u8 *) &tunnel[id];
667 u8 *orig = d;
668 int size;
669
670 size = rle_compress( &d, sizeof(tunnelt), c, sizeof(c) );
671
672 // Did we compress the full structure, and is the size actually
673 // reduced??
674 if ( (d - orig) == sizeof(tunnelt) && size < sizeof(tunnelt) ) {
675 add_type(p, C_CTUNNEL, id, c, size);
676 break;
677 }
678 // Failed to compress : Fall through.
679 }
680 case C_TUNNEL: add_type(p, C_TUNNEL, id,
681 (char*) &tunnel[id], sizeof(tunnelt));
682 break;
683 default:
684 log(0,0,0,0, "Found an invalid type in heart queue! (%d)\n", type);
685 kill(0, SIGTERM);
686 }
687 return 0;
688 }
689
690 //
691 // Send a heartbeat, incidently sending out any queued changes..
692 //
693 void cluster_heartbeat(int highsession, int freesession, int hightunnel)
694 {
695 int i, count = 0, tcount = 0;
696 char buff[MAX_HEART_SIZE + sizeof(heartt) + sizeof(int) ];
697 heartt h;
698 char * p = buff;
699
700 if (!config->cluster_iam_master) // Only the master does this.
701 return;
702
703 hsess = highsession;
704 fsess = freesession;
705 // Fill out the heartbeat header.
706 h.version = HB_VERSION;
707 h.seq = config->cluster_seq_number;
708 h.basetime = basetime;
709 h.clusterid = config->bind_address; // Will this do??
710 h.basetime = basetime;
711 h.highsession = highsession;
712 h.freesession = freesession;
713 h.hightunnel = hightunnel;
714 h.size_sess = sizeof(sessiont); // Just in case.
715 h.size_tunn = sizeof(tunnelt);
716
717 add_type(&p, C_HEARTBEAT, HB_VERSION, (char*) &h, sizeof(h) );
718
719 for (i = 0; i < config->cluster_num_changes; ++i) {
720 hb_add_type(&p, cluster_changes[i].type, cluster_changes[i].id);
721 }
722
723 if (p > (buff + sizeof(buff))) { // Did we somehow manage to overun the buffer?
724 log(0,0,0,0, "Overrun the heartbeat buffer! This is fatal. Exiting. (size %d)\n", p - buff);
725 kill(0, SIGTERM);
726 }
727
728 //
729 // Fill out the packet with sessions from the session table...
730 // (not forgetting to leave space so we can get some tunnels in too )
731 while ( (p + sizeof(u32) * 2 + sizeof(sessiont) * 2 ) < (buff + MAX_HEART_SIZE) ) {
732
733 if (!walk_session_number) // session #0 isn't valid.
734 ++walk_session_number;
735
736 if (count >= highsession) // If we're a small cluster, don't go wild.
737 break;
738
739 hb_add_type(&p, C_CSESSION, walk_session_number);
740 walk_session_number = (1+walk_session_number)%(highsession+1); // +1 avoids divide by zero.
741
742 ++count; // Count the number of extra sessions we're sending.
743 }
744
745 //
746 // Fill out the packet with tunnels from the tunnel table...
747 //
748 while ( (p + sizeof(u32) * 2 + sizeof(tunnelt) ) < (buff + MAX_HEART_SIZE) ) {
749
750 if (!walk_tunnel_number) // tunnel #0 isn't valid.
751 ++walk_tunnel_number;
752
753 if (tcount >= config->cluster_highest_tunnelid)
754 break;
755
756 hb_add_type(&p, C_CTUNNEL, walk_tunnel_number);
757 walk_tunnel_number = (1+walk_tunnel_number)%(config->cluster_highest_tunnelid+1); // +1 avoids divide by zero.
758
759 ++tcount;
760 }
761
762 //
763 // Did we do something wrong?
764 if (p > (buff + sizeof(buff))) { // Did we somehow manage to overun the buffer?
765 log(0,0,0,0, "Overran the heartbeat buffer now! This is fatal. Exiting. (size %d)\n", p - buff);
766 kill(0, SIGTERM);
767 }
768
769 log(3,0,0,0, "Sending heartbeat with %d changes (%d x-sess, %d x-tunnels, %d highsess, %d hightun size %d)\n",
770 config->cluster_num_changes, count, tcount, config->cluster_highest_sessionid,
771 config->cluster_highest_tunnelid, (p-buff));
772
773 config->cluster_num_changes = 0;
774
775 send_heartbeat(h.seq, buff, (p-buff) ); // Send out the heartbeat to the cluster, keeping a copy of it.
776
777 config->cluster_seq_number = (config->cluster_seq_number+1)%HB_MAX_SEQ; // Next seq number to use.
778 }
779
780 //
781 // A structure of type 'type' has changed; Add it to the queue to send.
782 //
783 int type_changed(int type, int id)
784 {
785 int i;
786
787 for (i = 0 ; i < config->cluster_num_changes ; ++i)
788 if ( cluster_changes[i].id == id &&
789 cluster_changes[i].type == type)
790 return 0; // Already marked for change.
791
792 cluster_changes[i].type = type;
793 cluster_changes[i].id = id;
794 ++config->cluster_num_changes;
795
796 if (config->cluster_num_changes > MAX_CHANGES)
797 cluster_heartbeat(config->cluster_highest_sessionid, fsess, config->cluster_highest_tunnelid);
798
799 return 1;
800 }
801
802
803 // A particular session has been changed!
804 int cluster_send_session(int sid)
805 {
806 if (!config->cluster_iam_master) {
807 log(0,0,sid,0, "I'm not a master, but I just tried to change a session!\n");
808 return -1;
809 }
810
811 return type_changed(C_CSESSION, sid);
812 }
813
814 // A particular tunnel has been changed!
815 int cluster_send_tunnel(int tid)
816 {
817 if (!config->cluster_iam_master) {
818 log(0,0,0,tid, "I'm not a master, but I just tried to change a tunnel!\n");
819 return -1;
820 }
821
822 return type_changed(C_CTUNNEL, tid);
823 }
824
825
826 //
827 // We're a master, and a slave has just told us that it's
828 // missed a packet. We'll resend it every packet since
829 // the last one it's seen.
830 //
831 int cluster_catchup_slave(int seq, u32 slave)
832 {
833 int s;
834 int diff;
835
836 log(1,0,0,0, "Slave %s sent LASTSEEN with seq %d\n", inet_toa(slave), seq);
837
838 diff = config->cluster_seq_number - seq; // How many packet do we need to send?
839 if (diff < 0)
840 diff += HB_MAX_SEQ;
841
842 if (diff >= HB_HISTORY_SIZE) { // Ouch. We don't have the packet to send it!
843 log(0,0,0,0, "A slaved asked for message %d when our seq number is %d. Killing it.\n",
844 seq, config->cluster_seq_number);
845 return peer_send_message(slave, C_KILL, seq, NULL, 0);// Kill the slave. Nothing else to do.
846 }
847
848 // Now resend every packet that it missed, in order.
849 while (seq != config->cluster_seq_number) {
850 s = seq%HB_HISTORY_SIZE;
851 if (seq != past_hearts[s].seq) {
852 log(0,0,0,0, "Tried to re-send heartbeat for %s but %d doesn't match %d! (%d,%d)\n",
853 inet_toa(slave), seq, past_hearts[s].seq, s, config->cluster_seq_number);
854 return -1; // What to do here!?
855 }
856 peer_send_data(slave, past_hearts[s].data, past_hearts[s].size);
857 seq = (seq+1)%HB_MAX_SEQ; // Increment to next seq number.
858 }
859 return 0; // All good!
860 }
861
862 //
863 // We've heard from another peer! Add it to the list
864 // that we select from at election time.
865 //
866 int cluster_add_peer(u32 peer, time_t basetime, pingt *p)
867 {
868 int i;
869 u32 clusterid;
870
871 clusterid = p->addr;
872 if (clusterid != config->bind_address)
873 {
874 // Is this for us?
875 log(4,0,0,0, "Skipping ping from %s (different cluster)\n", inet_toa(peer));
876 return 0;
877 }
878
879 // Is this the master shutting down??
880 if (peer == config->cluster_master_address && !basetime) {
881 config->cluster_master_address = 0;
882 config->cluster_last_hb = 0; // Force an election.
883 cluster_check_master();
884 return 0;
885 }
886
887 for (i = 0; i < num_peers ; ++i)
888 {
889 if (peers[i].peer != peer)
890 continue;
891
892 // This peer already exists. Just update the timestamp.
893 peers[i].basetime = basetime;
894 peers[i].timestamp = config->current_time;
895 break;
896 }
897
898 if (i >= num_peers)
899 {
900 log(4,0,0,0, "Adding %s as a peer\n", inet_toa(peer));
901
902 // Not found. Is there a stale slot to re-use?
903 for (i = 0; i < num_peers ; ++i)
904 {
905 if (peers[i].peer != peer)
906 continue;
907 if ((peers[i].timestamp + HB_TIMEOUT * 10) < config->current_time) // Stale.
908 break;
909 }
910
911 if (i >= CLUSTER_MAX_SIZE)
912 {
913 // Too many peers!!
914 log(0,0,0,0, "Tried to add %s as a peer, but I already have %d of them!\n", inet_toa(peer), i);
915 return -1;
916 }
917
918 peers[i].peer = peer;
919 peers[i].basetime = basetime;
920 peers[i].timestamp = config->current_time;
921 if (i == num_peers)
922 ++num_peers;
923
924 log(1,0,0,0, "Added %s as a new peer. Now %d peers\n", inet_toa(peer), num_peers);
925 }
926
927 #ifdef BGP
928 /* drop routes if we've now got a peer */
929 if (bgp_configured && config->cluster_iam_master && !have_peers)
930 bgp_enable_routing(0);
931 #endif /* BGP */
932
933 have_peers = 1;
934
935 return 1;
936 }
937
938 /* Handle the slave updating the byte counters for the master. */
939 //
940 // Note that we don't mark the session as dirty; We rely on
941 // the slow table walk to propogate this back out to the slaves.
942 //
943 int cluster_handle_bytes(char * data, int size)
944 {
945 bytest * b;
946
947 b = (bytest*) data;
948
949 log(3,0,0,0, "Got byte counter update (size %d)\n", size);
950
951 /* Loop around, adding the byte
952 counts to each of the sessions. */
953
954 while (size >= sizeof(*b) ) {
955 if (b->sid > MAXSESSION) {
956 log(0,0,0,0, "Got C_BYTES with session #%d!\n", b->sid);
957 return -1; /* Abort processing */
958 }
959
960 session[b->sid].total_cin += b->in;
961 session[b->sid].total_cout += b->out;
962
963 session[b->sid].cin += b->in;
964 session[b->sid].cout += b->out;
965 session[b->sid].last_packet = time_now; // Reset idle timer!
966
967 size -= sizeof(*b);
968 ++b;
969 }
970
971 if (size != 0)
972 log(0,0,0,0, "Got C_BYTES with %d bytes of trailing junk!\n", size);
973
974 return size;
975 }
976
977 //
978 // Handle receiving a session structure in a heartbeat packet.
979 //
980 static int cluster_recv_session(int more , u8 * p)
981 {
982 if (more >= MAXSESSION) {
983 log(0,0,0,0, "DANGER: Received a heartbeat session id > MAXSESSION!\n");
984 return -1;
985 }
986
987 if (session[more].tunnel == T_UNDEF) {
988 if (config->cluster_iam_uptodate) { // Sanity.
989 log(0,0,0,0, "I thought I was uptodate but I just found an undefined session!\n");
990 } else {
991 --config->cluster_undefined_sessions;
992 }
993 }
994
995 load_session(more, (sessiont*) p); // Copy session into session table..
996
997 log(5,0,more,0, "Received session update (%d undef)\n", config->cluster_undefined_sessions);
998
999 if (!config->cluster_iam_uptodate)
1000 cluster_uptodate(); // Check to see if we're up to date.
1001 return 0;
1002 }
1003
1004 static int cluster_recv_tunnel(int more, u8 *p)
1005 {
1006 if (more >= MAXTUNNEL) {
1007 log(0,0,0,0, "DANGER: Received a tunnel session id > MAXTUNNEL!\n");
1008 return -1;
1009 }
1010
1011 if (tunnel[more].state == TUNNELUNDEF) {
1012 if (config->cluster_iam_uptodate) { // Sanity.
1013 log(0,0,0,0, "I thought I was uptodate but I just found an undefined tunnel!\n");
1014 } else {
1015 --config->cluster_undefined_tunnels;
1016 }
1017 }
1018
1019 memcpy(&tunnel[more], p, sizeof(tunnel[more]) );
1020
1021 //
1022 // Clear tunnel control messages. These are dynamically allocated.
1023 // If we get unlucky, this may cause the tunnel to drop!
1024 //
1025 tunnel[more].controls = tunnel[more].controle = NULL;
1026 tunnel[more].controlc = 0;
1027
1028 log(5,0,0,more, "Received tunnel update\n");
1029
1030 if (!config->cluster_iam_uptodate)
1031 cluster_uptodate(); // Check to see if we're up to date.
1032
1033 return 0;
1034 }
1035
1036
1037 //
1038 // Process a version one heartbeat..
1039 //
1040 static int cluster_process_heartbeat_v2(u8 * data, int size, int more, u8 * p, u32 addr)
1041 {
1042 heartt * h;
1043 int s = size - (p-data);
1044 int i, type;
1045
1046 if (more != HB_VERSION) {
1047 log(0,0,0,0, "Received a heartbeat version that I don't understand!\n");
1048 return -1; // Ignore it??
1049 }
1050 // Ok. It's a heartbeat packet from a cluster master!
1051 if (s < sizeof(*h))
1052 goto shortpacket;
1053
1054
1055 h = (heartt*) p;
1056 p += sizeof(*h);
1057 s -= sizeof(*h);
1058
1059 if (h->clusterid != config->bind_address)
1060 return -1; // It's not part of our cluster.
1061
1062 if (config->cluster_iam_master) { // Sanity...
1063 // Note that this MUST match the election process above!
1064
1065 log(0,0,0,0, "I just got a packet claiming to be from a master but _I_ am the master!\n");
1066 if (!h->basetime) {
1067 log(0,0,0,0, "Heartbeat from addr %s with zero basetime!\n", inet_toa(htonl(addr)) );
1068 return -1; // Skip it.
1069 }
1070 if (basetime > h->basetime) {
1071 log(0,0,0,0, "They're (%s) an older master than me so I'm gone!\n", inet_toa(htonl(addr)));
1072 kill(0, SIGTERM);
1073 }
1074 if (basetime == h->basetime && my_address < addr) { // Tie breaker.
1075 log(0,0,0,0, "They're a higher IP address than me, so I'm gone!\n");
1076 kill(0, SIGTERM);
1077 }
1078 return -1; // Skip it.
1079 }
1080
1081 if (config->cluster_seq_number == -1) // Don't have one. Just align to the master...
1082 config->cluster_seq_number = h->seq;
1083
1084 config->cluster_last_hb = config->current_time; // Reset to ensure that we don't become master!!
1085
1086 if (config->cluster_seq_number != h->seq) { // Out of sequence heartbeat!
1087 log(1,0,0,0, "HB: Got seq# %d but was expecting %d. asking for resend.\n", h->seq, config->cluster_seq_number);
1088
1089 peer_send_message(addr, C_LASTSEEN, config->cluster_seq_number, NULL, 0);
1090
1091 config->cluster_last_hb = config->current_time; // Reset to ensure that we don't become master!!
1092
1093 // Just drop the packet. The master will resend it as part of the catchup.
1094
1095 return 0;
1096 }
1097 // Save the packet in our buffer.
1098 // This is needed in case we become the master.
1099 config->cluster_seq_number = (h->seq+1)%HB_MAX_SEQ;
1100 i = h->seq % HB_HISTORY_SIZE;
1101 past_hearts[i].seq = h->seq;
1102 past_hearts[i].size = size;
1103 memcpy(&past_hearts[i].data, data, size); // Save it.
1104
1105
1106 // Check that we don't have too many undefined sessions, and
1107 // that the free session pointer is correct.
1108 cluster_check_sessions(h->highsession, h->freesession, h->hightunnel);
1109
1110 // Ok. process the packet...
1111 while ( s > 0) {
1112
1113 type = * ((u32*) p);
1114 p += sizeof(u32);
1115 s -= sizeof(u32);
1116
1117 more = * ((u32*) p);
1118 p += sizeof(u32);
1119 s -= sizeof(u32);
1120
1121 switch (type) {
1122 case C_CSESSION: { // Compressed session structure.
1123 u8 c [ sizeof(sessiont) + 2];
1124 int size;
1125 u8 * orig_p = p;
1126
1127 size = rle_decompress((u8 **) &p, s, c, sizeof(c) );
1128 s -= (p - orig_p);
1129
1130 if (size != sizeof(sessiont) ) { // Ouch! Very very bad!
1131 log(0,0,0,0, "DANGER: Received a CSESSION that didn't decompress correctly!\n");
1132 // Now what? Should exit! No-longer up to date!
1133 break;
1134 }
1135
1136 cluster_recv_session(more, c);
1137 break;
1138 }
1139 case C_SESSION:
1140 if ( s < sizeof(session[more]))
1141 goto shortpacket;
1142
1143 cluster_recv_session(more, p);
1144
1145 p += sizeof(session[more]);
1146 s -= sizeof(session[more]);
1147 break;
1148
1149 case C_CTUNNEL: { // Compressed tunnel structure.
1150 u8 c [ sizeof(tunnelt) + 2];
1151 int size;
1152 u8 * orig_p = p;
1153
1154 size = rle_decompress( (u8 **) &p, s, c, sizeof(c) );
1155 s -= (p - orig_p);
1156
1157 if (size != sizeof(tunnelt) ) { // Ouch! Very very bad!
1158 log(0,0,0,0, "DANGER: Received a CSESSION that didn't decompress correctly!\n");
1159 // Now what? Should exit! No-longer up to date!
1160 break;
1161 }
1162
1163 cluster_recv_tunnel(more, c);
1164 break;
1165
1166 }
1167 case C_TUNNEL:
1168 if ( s < sizeof(tunnel[more]))
1169 goto shortpacket;
1170
1171 cluster_recv_tunnel(more, p);
1172
1173 p += sizeof(tunnel[more]);
1174 s -= sizeof(tunnel[more]);
1175 break;
1176 default:
1177 log(0,0,0,0, "DANGER: I received a heartbeat element where I didn't understand the type! (%d)\n", type);
1178 return -1; // can't process any more of the packet!!
1179 }
1180 }
1181 if (config->cluster_master_address != addr)
1182 {
1183 char *str;
1184 str = strdup(inet_toa(config->cluster_master_address));
1185 log(0,0,0,0, "My master just changed from %s to %s!\n", str, inet_toa(addr));
1186 if (str) free(str);
1187 }
1188
1189 config->cluster_master_address = addr;
1190 config->cluster_last_hb = config->current_time; // Successfully received a heartbeat!
1191 return 0;
1192
1193 shortpacket:
1194 log(0,0,0,0, "I got an incomplete heartbeat packet! This means I'm probably out of sync!!\n");
1195 return -1;
1196 }
1197
1198 //
1199 // We got a packet on the cluster port!
1200 // Handle pings, lastseens, and heartbeats!
1201 //
1202 int processcluster(char * data, int size, u32 addr)
1203 {
1204 int type, more;
1205 char * p = data;
1206 int s = size;
1207
1208 if (addr == my_address)
1209 return -1; // Ignore it. Something looped back the multicast!
1210
1211 log(5,0,0,0, "Process cluster: %d bytes from %s\n", size, inet_toa(addr));
1212
1213 if (s <= 0) // Any data there??
1214 return -1;
1215
1216 if (s < 8)
1217 goto shortpacket;
1218
1219 type = * ((u32*) p);
1220 p += sizeof(u32);
1221 s -= sizeof(u32);
1222
1223 more = * ((u32*) p);
1224 p += sizeof(u32);
1225 s -= sizeof(u32);
1226
1227 switch (type) {
1228 case C_PING: // Update the peers table.
1229 return cluster_add_peer(addr, more, (pingt*)p);
1230
1231 case C_LASTSEEN: // Catch up a slave (slave missed a packet).
1232 return cluster_catchup_slave(more, addr);
1233
1234 case C_FORWARD: { // Forwarded control packet. pass off to processudp.
1235 struct sockaddr_in a;
1236 a.sin_addr.s_addr = more;
1237
1238 a.sin_port = * (int*) p;
1239 s -= sizeof(int);
1240 p += sizeof(int);
1241
1242 if (!config->cluster_iam_master) { // huh?
1243 log(0,0,0,0, "I'm not the master, but I got a C_FORWARD from %s?\n", inet_toa(addr));
1244 return -1;
1245 }
1246
1247 log(4,0,0,0, "Got a forwarded packet... (%s:%d)\n", inet_toa(more), a.sin_port);
1248 STAT(recv_forward);
1249 processudp(p, s, &a);
1250 return 0;
1251 }
1252 case C_THROTTLE: { // Receive a forwarded packet from a slave.
1253 if (!config->cluster_iam_master) {
1254 log(0,0,0,0, "I'm not the master, but I got a C_THROTTLE from %s?\n", inet_toa(addr));
1255 return -1;
1256 }
1257
1258 tbf_queue_packet(more, p, s); // The TBF id tells wether it goes in or out.
1259 return 0;
1260 }
1261 case C_GARDEN:
1262 // Receive a walled garden packet from a slave.
1263 if (!config->cluster_iam_master) {
1264 log(0,0,0,0, "I'm not the master, but I got a C_GARDEN from %s?\n", inet_toa(addr));
1265 return -1;
1266 }
1267
1268 tun_write(p, s);
1269 return 0;
1270
1271 case C_BYTES:
1272 return cluster_handle_bytes(p, s);
1273
1274 case C_KILL: // The master asked us to die!? (usually because we're too out of date).
1275 if (config->cluster_iam_master) {
1276 log(0,0,0,0, "_I_ am master, but I received a C_KILL from %s! (Seq# %d)\n", inet_toa(addr), more);
1277 return -1;
1278 }
1279 if (more != config->cluster_seq_number) {
1280 log(0,0,0,0, "The master asked us to die but the seq number didn't match!?\n");
1281 return -1;
1282 }
1283
1284 if (addr != config->cluster_master_address) {
1285 log(0,0,0,0, "Received a C_KILL from %s which doesn't match config->cluster_master_address (%x)",
1286 inet_toa(addr), config->cluster_master_address);
1287 // We can only warn about it. The master might really have switched!
1288 }
1289
1290 log(0,0,0,0, "Received a valid C_KILL: I'm going to die now.");
1291 kill(0, SIGTERM);
1292 exit(0); // Lets be paranoid;
1293 return -1; // Just signalling the compiler.
1294
1295 case C_HEARTBEAT:
1296 log(4,0,0,0, "Got a heartbeat from %s\n", inet_toa(addr));
1297
1298 return cluster_process_heartbeat_v2(data, size, more, p, addr);
1299
1300 default:
1301 log(0,0,0,0, "Strange type packet received on cluster socket (%d)\n", type);
1302 return -1;
1303 }
1304 return 0;
1305 shortpacket:
1306 log(0,0,0,0, "I got an cluster heartbeat packet! This means I'm probably out of sync!!\n");
1307 return -1;
1308 }
1309
1310 //====================================================================================================
1311
1312 int cmd_show_cluster(struct cli_def *cli, char *command, char **argv, int argc)
1313 {
1314 int i;
1315
1316 cli_print(cli, "Cluster status : %s", config->cluster_iam_master ? "Master" : "Slave" );
1317 cli_print(cli, "My address : %s", inet_toa(my_address));
1318 cli_print(cli, "VIP address : %s", inet_toa(config->bind_address));
1319 cli_print(cli, "Multicast address: %s", inet_toa(config->cluster_address));
1320 cli_print(cli, "Multicast i'face : %s", config->cluster_interface);
1321
1322 if (!config->cluster_iam_master) {
1323 cli_print(cli, "My master : %s (last heartbeat %.1f seconds old)",
1324 config->cluster_master_address ? inet_toa(config->cluster_master_address) : "Not defined",
1325 0.1 * (config->current_time - config->cluster_last_hb));
1326 cli_print(cli, "Uptodate : %s", config->cluster_iam_uptodate ? "Yes" : "No");
1327 cli_print(cli, "Next sequence number expected: %d", config->cluster_seq_number);
1328 cli_print(cli, "%d sessions undefined of %d", config->cluster_undefined_sessions, config->cluster_highest_sessionid);
1329 cli_print(cli, "%d tunnels undefined of %d", config->cluster_undefined_tunnels, config->cluster_highest_tunnelid);
1330 } else {
1331 cli_print(cli, "Next heartbeat # : %d", config->cluster_seq_number);
1332 cli_print(cli, "Highest session : %d", config->cluster_highest_sessionid);
1333 cli_print(cli, "Highest tunnel : %d", config->cluster_highest_tunnelid);
1334 cli_print(cli, "%d changes queued for sending", config->cluster_num_changes);
1335 }
1336 cli_print(cli, "%d peers.", num_peers);
1337
1338 if (num_peers)
1339 cli_print(cli, "%20s %10s %8s", "Address", "Basetime", "Age");
1340 for (i = 0; i < num_peers; ++i) {
1341 cli_print(cli, "%20s %10d %8d", inet_toa(peers[i].peer),
1342 peers[i].basetime, config->current_time - peers[i].timestamp);
1343 }
1344 return CLI_OK;
1345 }
1346
1347 //
1348 // Simple run-length-encoding compression.
1349 // Format is
1350 // 1 byte < 128 = count of non-zero bytes following. // Not legal to be zero.
1351 // n non-zero bytes;
1352 // or
1353 // 1 byte > 128 = (count - 128) run of zero bytes. //
1354 // repeat.
1355 // count == 0 indicates end of compressed stream.
1356 //
1357 // Compress from 'src' into 'dst'. return number of bytes
1358 // used from 'dst'.
1359 // Updates *src_p to indicate 1 past last bytes used.
1360 //
1361 // We could get an extra byte in the zero runs by storing (count-1)
1362 // but I'm playing it safe.
1363 //
1364 // Worst case is a 50% expansion in space required (trying to
1365 // compress { 0x00, 0x01 } * N )
1366 int rle_compress(u8 ** src_p, int ssize, u8 *dst, int dsize)
1367 {
1368 int count;
1369 int orig_dsize = dsize;
1370 u8 * x,*src;
1371 src = *src_p;
1372
1373 while (ssize > 0 && dsize > 2) {
1374 count = 0;
1375 x = dst++; --dsize; // Reserve space for count byte..
1376
1377 if (*src) { // Copy a run of non-zero bytes.
1378 while (*src && count < 127 && ssize > 0 && dsize > 1) { // Count number of non-zero bytes.
1379 *dst++ = *src++;
1380 --dsize; --ssize;
1381 ++count;
1382 }
1383 *x = count; // Store number of non-zero bytes. Guarenteed to be non-zero!
1384
1385 } else { // Compress a run of zero bytes.
1386 while (*src == 0 && count < 127 && ssize > 0) {
1387 ++src;
1388 --ssize;
1389 ++count;
1390 }
1391 *x = count | 0x80 ;
1392 }
1393 }
1394
1395 *dst++ = 0x0; // Add Stop byte.
1396 --dsize;
1397
1398 *src_p = src;
1399 return (orig_dsize - dsize);
1400 }
1401
1402 //
1403 // Decompress the buffer into **p.
1404 // 'psize' is the size of the decompression buffer available.
1405 //
1406 // Returns the number of bytes decompressed.
1407 //
1408 // Decompresses from '*src_p' into 'dst'.
1409 // Return the number of dst bytes used.
1410 // Updates the 'src_p' pointer to point to the
1411 // first un-used byte.
1412 int rle_decompress(u8 ** src_p, int ssize, u8 *dst, int dsize)
1413 {
1414 int count;
1415 int orig_dsize = dsize;
1416 char * src = *src_p;
1417
1418 while (ssize >0 && dsize > 0) { // While there's more to decompress, and there's room in the decompress buffer...
1419 count = *src++; --ssize; // get the count byte from the source.
1420 if (count == 0x0) // End marker reached? If so, finish.
1421 break;
1422
1423 if (count & 0x80) { // Decompress a run of zeros
1424 for (count &= 0x7f ; count > 0 && dsize > 0; --count) {
1425 *dst++ = 0x0;
1426 --dsize;
1427 }
1428 } else { // Copy run of non-zero bytes.
1429 for ( ; count > 0 && ssize && dsize; --count) { // Copy non-zero bytes across.
1430 *dst++ = *src++;
1431 --ssize; --dsize;
1432 }
1433 }
1434 }
1435 *src_p = src;
1436 return (orig_dsize - dsize);
1437 }