typo
[l2tpns.git] / cluster.c
index 7a83858..4497f69 100644 (file)
--- a/cluster.c
+++ b/cluster.c
@@ -1,6 +1,6 @@
 // L2TPNS Clustering Stuff
 
-char const *cvs_id_cluster = "$Id: cluster.c,v 1.19 2004/11/29 02:17:17 bodea Exp $";
+char const *cvs_id_cluster = "$Id: cluster.c,v 1.24 2004/12/15 02:56:38 bodea Exp $";
 
 #include <stdio.h>
 #include <sys/file.h>
@@ -81,7 +81,7 @@ int cluster_init()
        struct sockaddr_in interface_addr;
        struct ip_mreq mreq;
        struct ifreq   ifr;
-       int opt = 0;
+       int opt;
 
        config->cluster_undefined_sessions = MAXSESSION-1;
        config->cluster_undefined_tunnels = MAXTUNNEL-1;
@@ -99,6 +99,9 @@ int cluster_init()
        addr.sin_addr.s_addr = INADDR_ANY;
        setsockopt(cluster_sockfd, SOL_SOCKET, SO_REUSEADDR, &addr, sizeof(addr));
 
+       opt = fcntl(cluster_sockfd, F_GETFL, 0);
+       fcntl(cluster_sockfd, F_SETFL, opt | O_NONBLOCK);
+
        if (bind(cluster_sockfd, (void *) &addr, sizeof(addr)) < 0)
        {
                LOG(0, 0, 0, "Failed to bind cluster socket: %s\n", strerror(errno));
@@ -129,7 +132,7 @@ int cluster_init()
                return -1;
        }
 
-       if (setsockopt (cluster_sockfd, IPPROTO_IP, IP_MULTICAST_IF, &interface_addr, sizeof(interface_addr)) < 0)
+       if (setsockopt(cluster_sockfd, IPPROTO_IP, IP_MULTICAST_IF, &interface_addr, sizeof(interface_addr)) < 0)
        {
                LOG(0, 0, 0, "Failed to setsockopt (set mcast interface): %s\n", strerror(errno));
                return -1;
@@ -744,6 +747,8 @@ void cluster_heartbeat()
        if (!config->cluster_iam_master)        // Only the master does this.
                return;
 
+       config->cluster_table_version += config->cluster_num_changes;
+
        // Fill out the heartbeat header.
        memset(&h, 0, sizeof(h));
 
@@ -759,6 +764,7 @@ void cluster_heartbeat()
        h.size_tunn = sizeof(tunnelt);
        h.interval = config->cluster_hb_interval;
        h.timeout  = config->cluster_hb_timeout;
+       h.table_version = config->cluster_table_version;
 
        add_type(&p, C_HEARTBEAT, HB_VERSION, (char*) &h, sizeof(h));
 
@@ -817,8 +823,10 @@ void cluster_heartbeat()
                exit(1);
        }
 
-       LOG(3, 0, 0, "Sending heartbeat #%d with %d changes (%d x-sess, %d x-tunnels, %d highsess, %d hightun, size %d)\n",
-           h.seq, config->cluster_num_changes, count, tcount, config->cluster_highest_sessionid,
+       LOG(3, 0, 0, "Sending v%d heartbeat #%d, change #%llu with %d changes "
+                    "(%d x-sess, %d x-tunnels, %d highsess, %d hightun, size %d)\n",
+           HB_VERSION, h.seq, h.table_version, config->cluster_num_changes,
+           count, tcount, config->cluster_highest_sessionid,
            config->cluster_highest_tunnelid, (p-buff));
 
        config->cluster_num_changes = 0;
@@ -891,7 +899,7 @@ static int cluster_catchup_slave(int seq, u32 slave)
                diff += HB_MAX_SEQ;
 
        if (diff >= HB_HISTORY_SIZE) {  // Ouch. We don't have the packet to send it!
-               LOG(0, 0, 0, "A slaved asked for message %d when our seq number is %d. Killing it.\n",
+               LOG(0, 0, 0, "A slave asked for message %d when our seq number is %d. Killing it.\n",
                        seq, config->cluster_seq_number);
                return peer_send_message(slave, C_KILL, seq, NULL, 0);// Kill the slave. Nothing else to do.
        }
@@ -1096,18 +1104,20 @@ static int cluster_recv_tunnel(int more, u8 *p)
 //
 // Process a heartbeat..
 //
+// v3: added interval, timeout
+// v4: added table_version
 static int cluster_process_heartbeat(u8 * data, int size, int more, u8 * p, u32 addr)
 {
        heartt * h;
        int s = size - (p-data);
        int i, type;
 
-#if HB_VERSION != 3
+#if HB_VERSION != 4
 # error "need to update cluster_process_heartbeat()"
 #endif
 
-       // we handle version 2+
-       if (more < 2 || more > HB_VERSION) {
+       // we handle versions 3 through 4
+       if (more < 3 || more > HB_VERSION) {
                LOG(0, 0, 0, "Received a heartbeat version that I don't support (%d)!\n", more);
                return -1; // Ignore it??
        }
@@ -1126,21 +1136,39 @@ static int cluster_process_heartbeat(u8 * data, int size, int more, u8 * p, u32
        if (config->cluster_iam_master) {       // Sanity...
                                // Note that this MUST match the election process above!
 
-               LOG(0, 0, 0, "I just got a packet claiming to be from a master but _I_ am the master!\n");
+               LOG(0, 0, 0, "I just got a heartbeat from master %s, but _I_ am the master!\n", fmtaddr(addr, 0));
                if (!h->basetime) {
-                       LOG(0, 0, 0, "Heartbeat from addr %s with zero basetime!\n", fmtaddr(addr, 0));
+                       LOG(0, 0, 0, "Heartbeat with zero basetime!  Ignoring\n");
                        return -1; // Skip it.
                }
+
+               if (more >= 4) {
+                       if (h->table_version > config->cluster_table_version) {
+                               LOG(0, 0, 0, "They've seen more state changes (%llu vs my %llu) so I'm gone!\n",
+                                       h->table_version, config->cluster_table_version);
+
+                               kill(0, SIGTERM);
+                               exit(1);
+                       }
+                       if (h->table_version < config->cluster_table_version)
+                               return -1;
+               }
+
                if (basetime > h->basetime) {
-                       LOG(0, 0, 0, "They're (%s) an older master than me so I'm gone!\n", fmtaddr(addr, 0));
+                       LOG(0, 0, 0, "They're an older master than me so I'm gone!\n");
                        kill(0, SIGTERM);
                        exit(1);
                }
-               if (basetime == h->basetime && my_address < addr) { // Tie breaker.
+
+               if (basetime < h->basetime)
+                       return -1;
+
+               if (my_address < addr) { // Tie breaker.
                        LOG(0, 0, 0, "They're a higher IP address than me, so I'm gone!\n");
                        kill(0, SIGTERM);
                        exit(1);
                }
+
                return -1; // Skip it.
        }
 
@@ -1173,23 +1201,20 @@ static int cluster_process_heartbeat(u8 * data, int size, int more, u8 * p, u32
                        // that the free session pointer is correct.
        cluster_check_sessions(h->highsession, h->freesession, h->hightunnel);
 
-       if (more > 2) // reserved section of heartt was not initialized prior to v3
+       if (h->interval != config->cluster_hb_interval)
        {
-               if (h->interval != config->cluster_hb_interval)
-               {
-                       LOG(2, 0, 0, "Master set ping/heartbeat interval to %u (was %u)\n",
-                               h->interval, config->cluster_hb_interval);
+               LOG(2, 0, 0, "Master set ping/heartbeat interval to %u (was %u)\n",
+                       h->interval, config->cluster_hb_interval);
 
-                       config->cluster_hb_interval = h->interval;
-               }
+               config->cluster_hb_interval = h->interval;
+       }
 
-               if (h->timeout != config->cluster_hb_timeout)
-               {
-                       LOG(2, 0, 0, "Master set heartbeat timeout to %u (was %u)\n",
-                               h->timeout, config->cluster_hb_timeout);
+       if (h->timeout != config->cluster_hb_timeout)
+       {
+               LOG(2, 0, 0, "Master set heartbeat timeout to %u (was %u)\n",
+                       h->timeout, config->cluster_hb_timeout);
 
-                       config->cluster_hb_timeout = h->timeout;
-               }
+               config->cluster_hb_timeout = h->timeout;
        }
 
                // Ok. process the packet...
@@ -1273,6 +1298,7 @@ static int cluster_process_heartbeat(u8 * data, int size, int more, u8 * p, u32
        }
 
        config->cluster_last_hb = TIME; // Successfully received a heartbeat!
+       config->cluster_table_version = h->table_version;
        return 0;
 
 shortpacket:
@@ -1414,10 +1440,12 @@ int cmd_show_cluster(struct cli_def *cli, char *command, char **argv, int argc)
                                : "Not defined",
                        0.1 * (TIME - config->cluster_last_hb));
                cli_print(cli, "Uptodate         : %s", config->cluster_iam_uptodate ? "Yes" : "No");
+               cli_print(cli, "Table version #  : %llu", config->cluster_table_version);
                cli_print(cli, "Next sequence number expected: %d", config->cluster_seq_number);
                cli_print(cli, "%d sessions undefined of %d", config->cluster_undefined_sessions, config->cluster_highest_sessionid);
                cli_print(cli, "%d tunnels undefined of %d", config->cluster_undefined_tunnels, config->cluster_highest_tunnelid);
        } else {
+               cli_print(cli, "Table version #  : %llu", config->cluster_table_version);
                cli_print(cli, "Next heartbeat # : %d", config->cluster_seq_number);
                cli_print(cli, "Highest session  : %d", config->cluster_highest_sessionid);
                cli_print(cli, "Highest tunnel   : %d", config->cluster_highest_tunnelid);