filters
[l2tpns.git] / l2tpns.c
index e61cf5d..54b4667 100644 (file)
--- a/l2tpns.c
+++ b/l2tpns.c
@@ -4,7 +4,7 @@
 // Copyright (c) 2002 FireBrick (Andrews & Arnold Ltd / Watchfront Ltd) - GPL licenced
 // vim: sw=8 ts=8
 
-char const *cvs_id_l2tpns = "$Id: l2tpns.c,v 1.50 2004/11/16 21:54:46 fred_nerk Exp $";
+char const *cvs_id_l2tpns = "$Id: l2tpns.c,v 1.57 2004/11/27 05:19:53 bodea Exp $";
 
 #include <arpa/inet.h>
 #include <assert.h>
@@ -54,7 +54,7 @@ char const *cvs_id_l2tpns = "$Id: l2tpns.c,v 1.50 2004/11/16 21:54:46 fred_nerk
 #endif /* BGP */
 
 // Globals
-struct configt *config = NULL; // all configuration
+configt *config = NULL;                // all configuration
 int tunfd = -1;                        // tun interface file handle. (network device)
 int udpfd = -1;                        // UDP file handle
 int controlfd = -1;            // Control signal handle
@@ -88,9 +88,9 @@ linked_list *loaded_plugins;
 linked_list *plugins[MAX_PLUGIN_TYPES];
 
 #define membersize(STRUCT, MEMBER) sizeof(((STRUCT *)0)->MEMBER)
-#define CONFIG(NAME, MEMBER, TYPE) { NAME, offsetof(struct configt, MEMBER), membersize(struct configt, MEMBER), TYPE }
+#define CONFIG(NAME, MEMBER, TYPE) { NAME, offsetof(configt, MEMBER), membersize(configt, MEMBER), TYPE }
 
-struct config_descriptt config_values[] = {
+config_descriptt config_values[] = {
        CONFIG("debug", debug, INT),
        CONFIG("log_file", log_filename, STRING),
        CONFIG("pid_file", pid_file, STRING),
@@ -146,6 +146,7 @@ sessiont *session = NULL;           // Array of session structures.
 sessioncountt *sess_count = NULL;      // Array of partial per-session traffic counters.
 radiust *radius = NULL;                        // Array of radius structures.
 ippoolt *ip_address_pool = NULL;       // Array of dynamic IP addresses.
+ip_filtert *ip_filters = NULL; // Array of named filters.
 static controlt *controlfree = 0;
 struct Tstats *_statistics = NULL;
 #ifdef RINGBUFFER
@@ -167,10 +168,10 @@ static void build_chap_response(char *challenge, u8 id, u16 challenge_length, ch
 static void update_config(void);
 static void read_config_file(void);
 static void initplugins(void);
-static void add_plugin(char *plugin_name);
-static void remove_plugin(char *plugin_name);
+static int add_plugin(char *plugin_name);
+static int remove_plugin(char *plugin_name);
 static void plugins_done(void);
-static void processcontrol(u8 * buf, int len, struct sockaddr_in *addr);
+static void processcontrol(u8 * buf, int len, struct sockaddr_in *addr, int alen);
 static tunnelidt new_tunnel(void);
 static int unhide_avp(u8 *avp, tunnelidt t, sessionidt s, u16 length);
 
@@ -434,8 +435,8 @@ static void initudp(void)
        // Control
        memset(&addr, 0, sizeof(addr));
        addr.sin_family = AF_INET;
-       addr.sin_port = htons(1702);
-       controlfd = socket(AF_INET, SOCK_DGRAM, 17);
+       addr.sin_port = htons(NSCTL_PORT);
+       controlfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
        setsockopt(controlfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
        if (bind(controlfd, (void *) &addr, sizeof(addr)) < 0)
        {
@@ -657,6 +658,7 @@ void tunnelsend(u8 * buf, u16 l, tunnelidt t)
                STAT(tunnel_tx_errors);
                return;
        }
+
        memset(&addr, 0, sizeof(addr));
        addr.sin_family = AF_INET;
        *(u32 *) & addr.sin_addr = htonl(tunnel[t].ip);
@@ -783,11 +785,7 @@ static void processipout(u8 * buf, int len)
        // Add on L2TP header
        {
                u8 *p = makeppp(b, sizeof(b), buf, len, t, s, PPPIP);
-               if (!p)
-               {
-                       LOG(3, session[s].ip, s, t, "failed to send packet in processipout.\n");
-                       return;
-               }
+               if (!p) return;
                tunnelsend(b, len + (p-b), t); // send it...
        }
 
@@ -837,11 +835,7 @@ static void send_ipout(sessionidt s, u8 *buf, int len)
        // Add on L2TP header
        {
                u8 *p = makeppp(b, sizeof(b),  buf, len, t, s, PPPIP);
-               if (!p)
-               {
-                       LOG(3, session[s].ip, s, t, "failed to send packet in send_ipout.\n");
-                       return;
-               }
+               if (!p) return;
                tunnelsend(b, len + (p-b), t); // send it...
        }
 
@@ -949,8 +943,10 @@ static void controladd(controlt * c, tunnelidt t, sessionidt s)
                tunnel[t].controle->next = c;
        else
                tunnel[t].controls = c;
+
        tunnel[t].controle = c;
        tunnel[t].controlc++;
+
        // send now if space in window
        if (tunnel[t].controlc <= tunnel[t].window)
        {
@@ -1052,15 +1048,20 @@ void sessionshutdown(sessionidt s, char *reason)
        if (session[s].ip)
        {                          // IP allocated, clear and unroute
                int r;
+               int routed = 0;
                for (r = 0; r < MAXROUTE && session[s].route[r].ip; r++)
                {
+                       if ((session[s].ip & session[s].route[r].mask) ==
+                           (session[s].route[r].ip & session[s].route[r].mask))
+                               routed++;
+
                        routeset(s, session[s].route[r].ip, session[s].route[r].mask, 0, 0);
                        session[s].route[r].ip = 0;
                }
 
                if (session[s].ip_pool_index == -1) // static ip
                {
-                       routeset(s, session[s].ip, 0, 0, 0);    // Delete route.
+                       if (!routed) routeset(s, session[s].ip, 0, 0, 0);
                        session[s].ip = 0;
                }
                else
@@ -1109,11 +1110,7 @@ void sendipcp(tunnelidt t, sessionidt s)
        }
 
        q = makeppp(buf,sizeof(buf), 0, 0, t, s, PPPIPCP);
-       if (!q)
-       {
-               LOG(3, session[s].ip, s, t, "failed to send packet in sendipcp.\n");
-               return;
-       }
+       if (!q) return;
 
        *q = ConfigReq;
        q[1] = r << RADIUS_SHIFT;                    // ID, dont care, we only send one type of request
@@ -1312,10 +1309,9 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr)
                        STAT(tunnel_rx_errors);
                        return;
                }
-               LOG(3, ntohl(addr->sin_addr.s_addr), s, t, "Control message (%d bytes): (unacked %d) l-ns %d l-nr %d r-ns %d r-nr %d\n",
-                               l, tunnel[t].controlc, tunnel[t].ns, tunnel[t].nr, ns, nr);
-               // if no tunnel specified, assign one
-               if (!t)
+
+               // check for duplicate tunnel open message
+               if (!t && ns == 0)
                {
                        int i;
 
@@ -1329,10 +1325,15 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr)
                                        tunnel[i].port != ntohs(addr->sin_port) )
                                        continue;
                                t = i;
+                               LOG(3, ntohl(addr->sin_addr.s_addr), s, t, "Duplicate SCCRQ?\n");
                                break;
                        }
                }
 
+               LOG(3, ntohl(addr->sin_addr.s_addr), s, t, "Control message (%d bytes): (unacked %d) l-ns %d l-nr %d r-ns %d r-nr %d\n",
+                               l, tunnel[t].controlc, tunnel[t].ns, tunnel[t].nr, ns, nr);
+
+               // if no tunnel specified, assign one
                if (!t)
                {
                        if (!(t = new_tunnel()))
@@ -1345,8 +1346,28 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr)
                        tunnel[t].ip = ntohl(*(ipt *) & addr->sin_addr);
                        tunnel[t].port = ntohs(addr->sin_port);
                        tunnel[t].window = 4; // default window
-                       LOG(1, ntohl(addr->sin_addr.s_addr), 0, t, "   New tunnel from %u.%u.%u.%u/%u ID %d\n", tunnel[t].ip >> 24, tunnel[t].ip >> 16 & 255, tunnel[t].ip >> 8 & 255, tunnel[t].ip & 255, tunnel[t].port, t);
                        STAT(tunnel_created);
+                       LOG(1, ntohl(addr->sin_addr.s_addr), 0, t, "   New tunnel from %u.%u.%u.%u/%u ID %d\n",
+                               tunnel[t].ip >> 24, tunnel[t].ip >> 16 & 255,
+                               tunnel[t].ip >> 8 & 255, tunnel[t].ip & 255, tunnel[t].port, t);
+               }
+
+                       // If the 'ns' just received is not the 'nr' we're
+                       // expecting, just send an ack and drop it.
+                       //
+                       // if 'ns' is less, then we got a retransmitted packet.
+                       // if 'ns' is greater than missed a packet. Either way
+                       // we should ignore it.
+               if (ns != tunnel[t].nr)
+               {
+                       // is this the sequence we were expecting?
+                       STAT(tunnel_rx_errors);
+                       LOG(1, ntohl(addr->sin_addr.s_addr), 0, t, "   Out of sequence tunnel %d, (%d is not the expected %d)\n",
+                               t, ns, tunnel[t].nr);
+
+                       if (l)  // Is this not a ZLB?
+                               controlnull(t);
+                       return;
                }
 
                // This is used to time out old tunnels
@@ -1356,7 +1377,7 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr)
                {
                        int skip = tunnel[t].window; // track how many in-window packets are still in queue
                                // some to clear maybe?
-                       while (tunnel[t].controlc && (((tunnel[t].ns - tunnel[t].controlc) - nr) & 0x8000))
+                       while (tunnel[t].controlc > 0 && (((tunnel[t].ns - tunnel[t].controlc) - nr) & 0x8000))
                        {
                                controlt *c = tunnel[t].controls;
                                tunnel[t].controls = c->next;
@@ -1367,22 +1388,6 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr)
                                tunnel[t].try = 0; // we have progress
                        }
 
-                               // If the 'ns' just received is not the 'nr' we're
-                               // expecting, just send an ack and drop it.
-                               //
-                               // if 'ns' is less, then we got a retransmitted packet.
-                               // if 'ns' is greater than missed a packet. Either way
-                               // we should ignore it.
-                       if (ns != tunnel[t].nr)
-                       {
-                               // is this the sequence we were expecting?
-                               LOG(1, ntohl(addr->sin_addr.s_addr), 0, t, "   Out of sequence tunnel %d, (%d is not the expected %d)\n", t, ns, tunnel[t].nr);
-                               STAT(tunnel_rx_errors);
-
-                               if (l)  // Is this not a ZLB?
-                                       controlnull(t);
-                               return;
-                       }
                        // receiver advance (do here so quoted correctly in any sends below)
                        if (l) tunnel[t].nr = (ns + 1);
                        if (skip < 0) skip = 0;
@@ -2067,11 +2072,7 @@ static int regular_cleanups(void)
                        u8 b[MAXCONTROL] = {0};
 
                        u8 *q = makeppp(b, sizeof(b), 0, 0, session[s].tunnel, s, PPPLCP);
-                       if (!q)
-                       {
-                               LOG(3, session[s].ip, s, t, "failed to send ECHO packet.\n");
-                               continue;
-                       }
+                       if (!q) continue;
 
                        *q = EchoReq;
                        *(u8 *)(q + 1) = (time_now % 255); // ID
@@ -2324,7 +2325,7 @@ static void mainloop(void)
                        }
 
                        if (FD_ISSET(controlfd, &r))
-                               processcontrol(buf, recvfrom(controlfd, buf, sizeof(buf), MSG_WAITALL, (void *) &addr, &alen), &addr);
+                               processcontrol(buf, recvfrom(controlfd, buf, sizeof(buf), MSG_WAITALL, (void *) &addr, &alen), &addr, alen);
 
                        if (FD_ISSET(clifd, &r))
                        {
@@ -2490,12 +2491,12 @@ static void initdata(int optdebug, char *optconfig)
                LOG(0, 0, 0, 0, "Error doing malloc for _statistics: %s\n", strerror(errno));
                exit(1);
        }
-       if (!(config = shared_malloc(sizeof(struct configt))))
+       if (!(config = shared_malloc(sizeof(configt))))
        {
                LOG(0, 0, 0, 0, "Error doing malloc for configuration: %s\n", strerror(errno));
                exit(1);
        }
-       memset(config, 0, sizeof(struct configt));
+       memset(config, 0, sizeof(configt));
        time(&config->start_time);
        strncpy(config->config_file, optconfig, strlen(optconfig));
        config->debug = optdebug;
@@ -2531,6 +2532,13 @@ static void initdata(int optdebug, char *optconfig)
                exit(1);
        }
 
+if (!(ip_filters = shared_malloc(sizeof(ip_filtert) * MAXFILTER)))
+{
+       LOG(0, 0, 0, 0, "Error doing malloc for ip_filters: %s\n", strerror(errno));
+       exit(1);
+}
+memset(ip_filters, 0, sizeof(ip_filtert) * MAXFILTER);
+
 #ifdef RINGBUFFER
        if (!(ringbuffer = shared_malloc(sizeof(struct Tringbuffer))))
        {
@@ -2582,11 +2590,11 @@ static void initdata(int optdebug, char *optconfig)
        _statistics->start_time = _statistics->last_reset = time(NULL);
 
 #ifdef BGP
-       if (!(bgp_peers = shared_malloc(sizeof(struct bgp_peer) * BGP_NUM_PEERS)))
-       {
-               LOG(0, 0, 0, 0, "Error doing malloc for bgp: %s\n", strerror(errno));
-               exit(1);
-       }
+       if (!(bgp_peers = shared_malloc(sizeof(struct bgp_peer) * BGP_NUM_PEERS)))
+       {
+               LOG(0, 0, 0, 0, "Error doing malloc for bgp: %s\n", strerror(errno));
+               exit(1);
+       }
 #endif /* BGP */
 }
 
@@ -2934,31 +2942,31 @@ int main(int argc, char *argv[])
        {
                switch (i)
                {
-                       case 'd':
-                               if (fork()) exit(0);
-                               setsid();
-                               freopen("/dev/null", "r", stdin);
-                               freopen("/dev/null", "w", stdout);
-                               freopen("/dev/null", "w", stderr);
-                               break;
-                       case 'v':
-                               optdebug++;
-                               break;
-                       case 'c':
-                               optconfig = optarg;
-                               break;
-                       case 'h':
-                               snprintf(hostname, sizeof(hostname), "%s", optarg);
-                               break;
-                       default:
-                               printf("Args are:\n"
-                                      "\t-d\t\tDetach from terminal\n"
-                                      "\t-c <file>\tConfig file\n"
-                                      "\t-h <hostname>\tForce hostname\n"
-                                      "\t-v\t\tDebug\n");
-
-                               return (0);
-                               break;
+               case 'd':
+                       if (fork()) exit(0);
+                       setsid();
+                       freopen("/dev/null", "r", stdin);
+                       freopen("/dev/null", "w", stdout);
+                       freopen("/dev/null", "w", stderr);
+                       break;
+               case 'v':
+                       optdebug++;
+                       break;
+               case 'c':
+                       optconfig = optarg;
+                       break;
+               case 'h':
+                       snprintf(hostname, sizeof(hostname), "%s", optarg);
+                       break;
+               default:
+                       printf("Args are:\n"
+                              "\t-d\t\tDetach from terminal\n"
+                              "\t-c <file>\tConfig file\n"
+                              "\t-h <hostname>\tForce hostname\n"
+                              "\t-v\t\tDebug\n");
+
+                       return (0);
+                       break;
                }
        }
 
@@ -3576,18 +3584,30 @@ int sessionsetup(tunnelidt t, sessionidt s)
                }
        }
 
+       {
+               int routed = 0;
+
                // Add the route for this session.
-               //
-               // Static IPs need to be routed. Anything else
-               // is part of the IP address pool and is already routed,
-               // it just needs to be added to the IP cache.
-       if (session[s].ip_pool_index == -1) // static ip
-               routeset(s, session[s].ip, 0, 0, 1);
-       else
-               cache_ipmap(session[s].ip, s);
+               for (r = 0; r < MAXROUTE && session[s].route[r].ip; r++)
+               {
+                       if ((session[s].ip & session[s].route[r].mask) ==
+                           (session[s].route[r].ip & session[s].route[r].mask))
+                               routed++;
 
-       for (r = 0; r < MAXROUTE && session[s].route[r].ip; r++)
-               routeset(s, session[s].route[r].ip, session[s].route[r].mask, 0, 1);
+                       routeset(s, session[s].route[r].ip, session[s].route[r].mask, 0, 1);
+               }
+
+               // Static IPs need to be routed if not already
+               // convered by a Framed-Route.  Anything else is part
+               // of the IP address pool and is already routed, it
+               // just needs to be added to the IP cache.
+               if (session[s].ip_pool_index == -1) // static ip
+               {
+                       if (!routed) routeset(s, session[s].ip, 0, 0, 1);
+               }
+               else
+                       cache_ipmap(session[s].ip, s);
+       }
 
        if (!session[s].unique_id)
        {
@@ -3631,6 +3651,7 @@ int sessionsetup(tunnelidt t, sessionidt s)
 int load_session(sessionidt s, sessiont *new)
 {
        int i;
+       int newip = 0;
 
                // Sanity checks.
        if (new->ip_pool_index >= MAXIPPOOL ||
@@ -3648,48 +3669,66 @@ int load_session(sessionidt s, sessiont *new)
 
        session[s].tunnel = new->tunnel; // For logging in cache_ipmap
 
+       // See if routes/ip cache need updating
+       if (new->ip != session[s].ip)
+               newip++;
+
+       for (i = 0; !newip && i < MAXROUTE && (session[s].route[i].ip || new->route[i].ip); i++)
+               if (new->route[i].ip != session[s].route[i].ip ||
+                   new->route[i].mask != session[s].route[i].mask)
+                       newip++;
 
-       if (new->ip != session[s].ip)   // Changed ip. fix up hash tables.
+       // needs update
+       if (newip)
        {
-               if (session[s].ip)      // If there's an old one, remove it.
+               int routed = 0;
+
+               // remove old routes...
+               for (i = 0; i < MAXROUTE && session[s].route[i].ip; i++)
                {
-                       // Remove any routes if the IP has changed
-                       for (i = 0; i < MAXROUTE && session[s].route[i].ip; i++)
-                       {
-                               routeset(s, session[s].route[i].ip, session[s].route[i].mask, 0, 0);
-                               session[s].route[i].ip = 0;
-                       }
+                       if ((session[s].ip & session[s].route[i].mask) ==
+                           (session[s].route[i].ip & session[s].route[i].mask))
+                               routed++;
+
+                       routeset(s, session[s].route[i].ip, session[s].route[i].mask, 0, 0);
+               }
 
+               // ...ip
+               if (session[s].ip)
+               {
                        if (session[s].ip_pool_index == -1) // static IP
-                               routeset(s, session[s].ip, 0, 0, 0);
+                       {
+                               if (!routed) routeset(s, session[s].ip, 0, 0, 0);
+                       }
                        else            // It's part of the IP pool, remove it manually.
                                uncache_ipmap(session[s].ip);
                }
 
+               routed = 0;
+
+               // add new routes...
+               for (i = 0; i < MAXROUTE && new->route[i].ip; i++)
+               {
+                       if ((new->ip & new->route[i].mask) ==
+                           (new->route[i].ip & new->route[i].mask))
+                               routed++;
+
+                       routeset(s, new->route[i].ip, new->route[i].mask, 0, 1);
+               }
+
+               // ...ip
                if (new->ip)
                {
                        // If there's a new one, add it.
                        if (new->ip_pool_index == -1)
-                               routeset(s, new->ip, 0, 0, 1);
+                       {
+                               if (!routed) routeset(s, new->ip, 0, 0, 1);
+                       }
                        else
                                cache_ipmap(new->ip, s);
                }
        }
 
-       // Update routed networks
-       for (i = 0; i < MAXROUTE && (session[s].route[i].ip || new->route[i].ip); i++)
-       {
-               if (new->route[i].ip == session[s].route[i].ip &&
-                   new->route[i].mask == session[s].route[i].mask)
-                       continue;
-
-               if (session[s].route[i].ip) // Remove the old one if it exists.
-                       routeset(s, session[s].route[i].ip, session[s].route[i].mask, 0, 0);
-
-               if (new->route[i].ip)   // Add the new one if it exists.
-                       routeset(s, new->route[i].ip, new->route[i].mask, 0, 1);
-       }
-
        if (new->tunnel && s > config->cluster_highest_sessionid)       // Maintain this in the slave. It's used
                                        // for walking the sessions to forward byte counts to the master.
                config->cluster_highest_sessionid = s;
@@ -3751,7 +3790,7 @@ static void *getconfig(char *key, enum config_typet type)
        return 0;
 }
 
-static void add_plugin(char *plugin_name)
+static int add_plugin(char *plugin_name)
 {
        static struct pluginfuncs funcs = {
                _log,
@@ -3760,10 +3799,12 @@ static void add_plugin(char *plugin_name)
                sessionbyuser,
                sessiontbysessionidt,
                sessionidtbysessiont,
-               sessionkill,
                radiusnew,
                radiussend,
                getconfig,
+               sessionkill,
+               throttle_session,
+               cluster_send_session,
        };
 
        void *p = open_plugin(plugin_name, 1);
@@ -3773,22 +3814,22 @@ static void add_plugin(char *plugin_name)
        if (!p)
        {
                LOG(1, 0, 0, 0, "   Plugin load failed: %s\n", dlerror());
-               return;
+               return -1;
        }
 
        if (ll_contains(loaded_plugins, p))
        {
                dlclose(p);
-               return;
+               return 0; // already loaded
        }
 
        {
-               int *v = dlsym(p, "__plugin_api_version");
+               int *v = dlsym(p, "plugin_api_version");
                if (!v || *v != PLUGIN_API_VERSION)
                {
                        LOG(1, 0, 0, 0, "   Plugin load failed: API version mismatch: %s\n", dlerror());
                        dlclose(p);
-                       return;
+                       return -1;
                }
        }
 
@@ -3798,7 +3839,7 @@ static void add_plugin(char *plugin_name)
                {
                        LOG(1, 0, 0, 0, "   Plugin load failed: plugin_init() returned FALSE: %s\n", dlerror());
                        dlclose(p);
-                       return;
+                       return -1;
                }
        }
 
@@ -3815,6 +3856,7 @@ static void add_plugin(char *plugin_name)
        }
 
        LOG(2, 0, 0, 0, "   Loaded plugin %s\n", plugin_name);
+       return 1;
 }
 
 static void run_plugin_done(void *plugin)
@@ -3825,45 +3867,51 @@ static void run_plugin_done(void *plugin)
                donefunc();
 }
 
-static void remove_plugin(char *plugin_name)
+static int remove_plugin(char *plugin_name)
 {
        void *p = open_plugin(plugin_name, 0);
-       int i;
+       int loaded = 0;
 
        if (!p)
-               return;
-
-       for (i = 0; i < max_plugin_functions; i++)
-       {
-               void *x;
-               if (plugin_functions[i] && (x = dlsym(p, plugin_functions[i])))
-                       ll_delete(plugins[i], x);
-       }
+               return -1;
 
        if (ll_contains(loaded_plugins, p))
        {
+               int i;
+               for (i = 0; i < max_plugin_functions; i++)
+               {
+                       void *x;
+                       if (plugin_functions[i] && (x = dlsym(p, plugin_functions[i])))
+                               ll_delete(plugins[i], x);
+               }
+
                ll_delete(loaded_plugins, p);
                run_plugin_done(p);
+               loaded = 1;
        }
 
        dlclose(p);
        LOG(2, 0, 0, 0, "Removed plugin %s\n", plugin_name);
+       return loaded;
 }
 
 int run_plugins(int plugin_type, void *data)
 {
        int (*func)(void *data);
-       if (!plugins[plugin_type] || plugin_type > max_plugin_functions) return 1;
+
+       if (!plugins[plugin_type] || plugin_type > max_plugin_functions)
+               return PLUGIN_RET_ERROR;
 
        ll_reset(plugins[plugin_type]);
        while ((func = ll_next(plugins[plugin_type])))
        {
-               int rc;
-               rc = func(data);
-               if (rc == PLUGIN_RET_STOP) return 1;
-               if (rc == PLUGIN_RET_ERROR) return 0;
+               int r = func(data);
+
+               if (r != PLUGIN_RET_OK)
+                       return r; // stop here
        }
-       return 1;
+
+       return PLUGIN_RET_OK;
 }
 
 static void plugins_done()
@@ -3875,39 +3923,174 @@ static void plugins_done()
                run_plugin_done(p);
 }
 
-static void processcontrol(u8 * buf, int len, struct sockaddr_in *addr)
+static void processcontrol(u8 * buf, int len, struct sockaddr_in *addr, int alen)
 {
-       char *resp;
-       int l;
-       struct param_control param = { buf, len, ntohl(addr->sin_addr.s_addr), ntohs(addr->sin_port), NULL, 0, 0 };
-
+       struct nsctl request;
+       struct nsctl response;
+       int type = unpack_control(&request, buf, len);
+       int r;
+       void *p;
 
        if (log_stream && config->debug >= 4)
        {
-               LOG(4, ntohl(addr->sin_addr.s_addr), 0, 0, "Received ");
-               dump_packet(buf, log_stream);
+               if (type < 0)
+               {
+                       LOG(4, ntohl(addr->sin_addr.s_addr), 0, 0, "Bogus control message (%d)\n", type);
+               }
+               else
+               {
+                       LOG(4, ntohl(addr->sin_addr.s_addr), 0, 0, "Received ");
+                       dump_control(&request, log_stream);
+               }
        }
 
-       resp = calloc(1400, 1);
-       l = new_packet(PKT_RESP_ERROR, resp);
-       *(int *)(resp + 6) = *(int *)(buf + 6);
+       switch (type)
+       {
+       case NSCTL_REQ_LOAD:
+               if (request.argc != 1)
+               {
+                       response.type = NSCTL_RES_ERR;
+                       response.argc = 1;
+                       response.argv[0] = "name of plugin required";
+               }
+               else if ((r = add_plugin(request.argv[0])) < 1)
+               {
+                       response.type = NSCTL_RES_ERR;
+                       response.argc = 1;
+                       response.argv[0] = !r
+                               ? "plugin already loaded"
+                               : "error loading plugin";
+               }
+               else
+               {
+                       response.type = NSCTL_RES_OK;
+                       response.argc = 0;
+               }
 
-       param.type = ntohs(*(short *)(buf + 2));
-       param.id = ntohl(*(int *)(buf + 6));
-       param.data_length = ntohs(*(short *)(buf + 4)) - 10;
-       param.data = (param.data_length > 0) ? (char *)(buf + 10) : NULL;
-       param.response = resp;
-       param.response_length = l;
+               break;
 
-       run_plugins(PLUGIN_CONTROL, &param);
+       case NSCTL_REQ_UNLOAD:
+               if (request.argc != 1)
+               {
+                       response.type = NSCTL_RES_ERR;
+                       response.argc = 1;
+                       response.argv[0] = "name of plugin required";
+               }
+               else if ((r = remove_plugin(request.argv[0])) < 1)
+               {
+                       response.type = NSCTL_RES_ERR;
+                       response.argc = 1;
+                       response.argv[0] = !r
+                               ? "plugin not loaded"
+                               : "plugin not found";
+               }
+               else
+               {
+                       response.type = NSCTL_RES_OK;
+                       response.argc = 0;
+               }
+
+               break;
+
+       case NSCTL_REQ_HELP:
+               response.type = NSCTL_RES_OK;
+               response.argc = 0;
+
+               ll_reset(loaded_plugins);
+               while ((p = ll_next(loaded_plugins)))
+               {
+                       char **help = dlsym(p, "plugin_control_help");
+                       while (response.argc < 0xff && help && *help)
+                               response.argv[response.argc++] = *help++;
+               }
+
+               break;
 
-       if (param.send_response)
+       case NSCTL_REQ_CONTROL:
+               {
+                       struct param_control param = {
+                               config->cluster_iam_master,
+                               request.argc,
+                               request.argv,
+                               0,
+                               NULL,
+                       };
+
+                       int r = run_plugins(PLUGIN_CONTROL, &param);
+
+                       if (r == PLUGIN_RET_ERROR)
+                       {
+                               response.type = NSCTL_RES_ERR;
+                               response.argc = 1;
+                               response.argv[0] = param.additional
+                                       ? param.additional
+                                       : "error returned by plugin";
+                       }
+                       else if (r == PLUGIN_RET_NOTMASTER)
+                       {
+                               static char msg[] = "must be run on master: 000.000.000.000";
+
+                               response.type = NSCTL_RES_ERR;
+                               response.argc = 1;
+                               if (config->cluster_master_address)
+                               {
+                                       strcpy(msg + 23, inet_toa(config->cluster_master_address));
+                                       response.argv[0] = msg;
+                               }
+                               else
+                               {
+                                       response.argv[0] = "must be run on master: none elected";
+                               }
+                       }
+                       else if (!(param.response & NSCTL_RESPONSE))
+                       {
+                               response.type = NSCTL_RES_ERR;
+                               response.argc = 1;
+                               response.argv[0] = param.response
+                                       ? "unrecognised response value from plugin"
+                                       : "unhandled action";
+                       }
+                       else
+                       {
+                               response.type = param.response;
+                               response.argc = 0;
+                               if (param.additional)
+                               {
+                                       response.argc = 1;
+                                       response.argv[0] = param.additional;
+                               }
+                       }
+               }
+
+               break;
+
+       default:
+               response.type = NSCTL_RES_ERR;
+               response.argc = 1;
+               response.argv[0] = "error unpacking control packet";
+       }
+
+       buf = calloc(NSCTL_MAX_PKT_SZ, 1);
+       if (!buf)
+       {
+               LOG(2, ntohl(addr->sin_addr.s_addr), 0, 0, "Failed to allocate nsctl response\n");
+               return;
+       }
+
+       r = pack_control(buf, NSCTL_MAX_PKT_SZ, response.type, response.argc, response.argv);
+       if (r > 0)
        {
-               send_packet(controlfd, ntohl(addr->sin_addr.s_addr), ntohs(addr->sin_port), param.response, param.response_length);
-               LOG(4, ntohl(addr->sin_addr.s_addr), 0, 0, "Sent Control packet response\n");
+               sendto(controlfd, buf, r, 0, (const struct sockaddr *) addr, alen);
+               if (log_stream && config->debug >= 4)
+               {
+                       LOG(4, ntohl(addr->sin_addr.s_addr), 0, 0, "Sent ");
+                       dump_control(&response, log_stream);
+               }
        }
+       else
+               LOG(2, ntohl(addr->sin_addr.s_addr), 0, 0, "Failed to pack nsctl response (%d)\n", r);
 
-       free(resp);
+       free(buf);
 }
 
 static tunnelidt new_tunnel()