From: fred_nerk Date: Wed, 23 Jun 2004 03:52:24 +0000 (+0000) Subject: * Wed Jun 23 2004 David Parrish 2.0.0 X-Git-Tag: release_2_0_1~18 X-Git-Url: http://git.sameswireless.fr/l2tpns.git/commitdiff_plain/ed90ea49e025e0c8083f9d98768d870e6a880f13?ds=inline * Wed Jun 23 2004 David Parrish 2.0.0 - Major release - Completely replace active/standby clustering with a new peer-to-peer clustering method which allows much greater throughput and is a lot more fault tolerant - Add internal tbf implementation for throttling without relying on tc and kernel HTB - Add support for iBGP and eBGP to advertise routes - Add cli commands "show cluster", "show bgp", "show ipcache", "show throttle", "show tbf", "suspend bgp", "restart bgp", "show user" - Interception destination must be set per-user - If SMP machine, allow use of SCHED_FIFO, which should improve performance - Added config option to send GARP at startup - Added plugin_become_master and plugin_new_session_master plugin hooks - Remove useless sessionsendarp(). This isn't needed now that we are using TUN instead of TAP. - ICMP rate limiting so not every unreachable packet is replied with an ICMP unreachable message - mangle table is not required on anything but the cluster master, so slaves will drop the mangle table and attempt to unload the ip_conntrack module - Statically assigned IP addresses (by Radius) work now - Add -d command-line flag to detach and become a daemon - Configuration file is now "/etc/l2tpns/startup-config" - Reduced MIN_IP_SIZE to 0x19 to stop a pile of Short IP warnings - Resend initial IPCP request until it's acknowleged by the client - Better radius session cleanup logic - Many miscellaenous bugfixes and performance enhancements - Thanks to Michael O'Reilly and Brendan O'Dea for most of these new features --- diff --git a/Changes b/Changes index 5900809..d026dfe 100644 --- a/Changes +++ b/Changes @@ -1,3 +1,32 @@ +* Wed Jun 23 2004 David Parrish 2.0.0 +- Major release +- Completely replace active/standby clustering with a new peer-to-peer + clustering method which allows much greater throughput and is a lot more fault + tolerant +- Add internal tbf implementation for throttling without relying on tc and + kernel HTB +- Add support for iBGP and eBGP to advertise routes +- Add cli commands "show cluster", "show bgp", "show ipcache", "show throttle", + "show tbf", "suspend bgp", "restart bgp", "show user" +- Interception destination must be set per-user +- If SMP machine, allow use of SCHED_FIFO, which should improve performance +- Added config option to send GARP at startup +- Added plugin_become_master and plugin_new_session_master plugin hooks +- Remove useless sessionsendarp(). This isn't needed now that we are using TUN + instead of TAP. +- ICMP rate limiting so not every unreachable packet is replied with an ICMP + unreachable message +- mangle table is not required on anything but the cluster master, so slaves + will drop the mangle table and attempt to unload the ip_conntrack module +- Statically assigned IP addresses (by Radius) work now +- Add -d command-line flag to detach and become a daemon +- Configuration file is now "/etc/l2tpns/startup-config" +- Reduced MIN_IP_SIZE to 0x19 to stop a pile of Short IP warnings +- Resend initial IPCP request until it's acknowleged by the client +- Better radius session cleanup logic +- Many miscellaenous bugfixes and performance enhancements +- Thanks to Michael O'Reilly and Brendan O'Dea for most of these new features + * Mon May 24 2004 David Parrish 1.2.0 - Fix SEGFAULT in garden module - Use multiple radius sockets to allow more concurrent authentication requests diff --git a/Makefile b/Makefile index 18be310..04bf448 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,9 @@ etcdir = $(PREFIX)/etc/l2tpns libdir = $(PREFIX)/usr/lib/l2tpns CC = gcc -CFLAGS=-Wall -g -O3 -funroll-loops -fomit-frame-pointer -finline-functions +DEFINES= -DBGP -DRINGBUFFER -DSTAT_CALLS -DSTATISTICS +OPTIM=-g -O3 -funroll-loops -fomit-frame-pointer -finline-functions +CFLAGS=-Wall $(OPTIM) $(DEFINES) LDFLAGS = LIBS = -lm -ldl -lcli INSTALL = /usr/bin/install -c @@ -15,36 +17,38 @@ OBJS= md5.o \ l2tpns.o \ ppp.o \ radius.o \ - throttle.o \ - rl.o \ ll.o \ cluster.o \ - cluster_slave.o \ arp.o \ constants.o \ ll.o \ control.o \ util.o \ + tbf.o \ + bgp.o \ PLUGINS=garden.so autothrottle.so autosnoop.so -all: l2tpns cluster_master nsctl $(PLUGINS) +all: l2tpns nsctl $(PLUGINS) l2tpns: $(OBJS) $(CC) $(CFLAGS) -o $@ $^ $(LIBS) $(DEFS) -cluster_master: cluster_master.o ll.o cluster.o util.o - $(CC) $(CFLAGS) -o $@ $^ $(DEFS) - nsctl: nsctl.o control.o $(CC) $(CFLAGS) -o $@ $^ $(DEFS) clean: - /bin/rm -f *.o *.so l2tpns cluster_master nsctl + /bin/rm -f *.o *.so l2tpns nsctl + +depend: + (sed -n 'p; /^## Dependencies: (autogenerated) ##/q' Makefile && \ + gcc -MM $(DEFINES) $(OBJS:.o=.c) && \ + gcc -MM $(DEFINES) $(PLUGINS:.so=.c) | sed 's/\.o/.so/') >Makefile.tmp + mv Makefile Makefile.bak + mv Makefile.tmp Makefile install: all $(INSTALL) -D -o root -g root -m 0755 l2tpns $(bindir)/l2tpns - $(INSTALL) -D -o root -g root -m 0755 cluster_master $(bindir)/cluster_master $(INSTALL) -D -o root -g root -m 0755 nsctl $(bindir)/nsctl $(INSTALL) -D -o root -g root -m 0600 etc/l2tpns.cfg.default $(etcdir)/l2tpns.cfg $(INSTALL) -D -o root -g root -m 0644 etc/ip_pool.default $(etcdir)/l2tpns.ip_pool @@ -57,8 +61,32 @@ install: all mknod /dev/net/tun c 10 200; \ fi +%.o: %.c + $(CC) -c $(CFLAGS) -o $@ $< + %.so: %.c - $(CC) -fPIC -shared -o $@ $^ $(LDFLAGS) $(LIBS) $(LIBPATH) + $(CC) -fPIC -shared $(CFLAGS) -o $@ $< $(LDFLAGS) $(LIBS) $(LIBPATH) + +.PHONY: all clean depend -%.o: %.c l2tpns.h - $(CC) -c -o $@ $< $(CFLAGS) +## Dependencies: (autogenerated) ## +md5.o: md5.c md5.h +icmp.o: icmp.c l2tpns.h config.h +cli.o: cli.c l2tpns.h config.h util.h cluster.h tbf.h bgp.h +l2tpns.o: l2tpns.c md5.h l2tpns.h config.h cluster.h plugin.h ll.h \ + constants.h control.h util.h tbf.h bgp.h +ppp.o: ppp.c l2tpns.h config.h constants.h plugin.h util.h tbf.h \ + cluster.h +radius.o: radius.c md5.h constants.h l2tpns.h config.h plugin.h util.h +ll.o: ll.c ll.h +cluster.o: cluster.c l2tpns.h config.h cluster.h util.h tbf.h bgp.h +arp.o: arp.c l2tpns.h config.h +constants.o: constants.c constants.h +ll.o: ll.c ll.h +control.o: control.c control.h +util.o: util.c l2tpns.h config.h +tbf.o: tbf.c l2tpns.h config.h tbf.h +bgp.o: bgp.c l2tpns.h config.h bgp.h util.h +garden.so: garden.c l2tpns.h config.h plugin.h control.h +autothrottle.so: autothrottle.c l2tpns.h config.h plugin.h control.h +autosnoop.so: autosnoop.c l2tpns.h config.h plugin.h control.h diff --git a/autosnoop.c b/autosnoop.c index 1c27190..7ab4321 100644 --- a/autosnoop.c +++ b/autosnoop.c @@ -8,21 +8,27 @@ #include "control.h" int __plugin_api_version = 1; -struct pluginfuncs p; +struct pluginfuncs *p; int plugin_radius_response(struct param_radius_response *data) { if (strcmp(data->key, "intercept") == 0) { - if (strcmp(data->value, "yes") == 0) + char *x; + data->s->snoop_ip = 0; + data->s->snoop_port = 0; + if ((x = strchr(data->value, ':'))) { - p.log(3, 0, 0, 0, " Intercepting user\n"); - data->s->snoop = 1; + *x++ = 0; + if (*data->value) data->s->snoop_ip = inet_addr(data->value); + if (data->s->snoop_ip == INADDR_NONE) data->s->snoop_ip = 0; + if (*x) data->s->snoop_port = atoi(x); + p->log(3, 0, 0, 0, " Intercepting user to %s:%d\n", + p->inet_toa(data->s->snoop_ip), data->s->snoop_port); } - else if (strcmp(data->value, "no") == 0) + else { - p.log(3, 0, 0, 0, " Not intercepting user\n"); - data->s->snoop = 0; + p->log(3, 0, 0, 0, " Not Intercepting user (reply string should be snoop=ip:port)\n"); } } return PLUGIN_RET_OK; @@ -30,10 +36,7 @@ int plugin_radius_response(struct param_radius_response *data) int plugin_init(struct pluginfuncs *funcs) { - if (!funcs) return 0; - memcpy(&p, funcs, sizeof(p)); - - return 1; + return ((p = funcs)) ? 1 : 0; } void plugin_done() diff --git a/autothrottle.c b/autothrottle.c index 4d985a2..4def160 100644 --- a/autothrottle.c +++ b/autothrottle.c @@ -8,7 +8,7 @@ #include "control.h" int __plugin_api_version = 1; -struct pluginfuncs p; +struct pluginfuncs *p; int plugin_radius_response(struct param_radius_response *data) { @@ -16,12 +16,12 @@ int plugin_radius_response(struct param_radius_response *data) { if (strcmp(data->value, "yes") == 0) { - p.log(3, 0, 0, 0, " Throttling user\n"); + p->log(3, 0, 0, 0, " Throttling user\n"); data->s->throttle = 1; } else if (strcmp(data->value, "no") == 0) { - p.log(3, 0, 0, 0, " Not throttling user\n"); + p->log(3, 0, 0, 0, " Not throttling user\n"); data->s->throttle = 0; } } @@ -30,10 +30,7 @@ int plugin_radius_response(struct param_radius_response *data) int plugin_init(struct pluginfuncs *funcs) { - if (!funcs) return 0; - memcpy(&p, funcs, sizeof(p)); - - return 1; + return ((p = funcs)) ? 1 : 0; } void plugin_done() diff --git a/bgp.c b/bgp.c new file mode 100644 index 0000000..ec084d1 --- /dev/null +++ b/bgp.c @@ -0,0 +1,1308 @@ +/* + * BGPv4 + * Used to advertise routes for upstream (l2tp port, rather than gratiutious + * arp) and downstream--allowing routers to load-balance both. + * + * Implementation limitations: + * - We never listen for incoming connections (session always initiated by us). + * - Any routes advertised by the peer are accepted, but ignored. + * - No password support; neither RFC1771 (which no-one seems to do anyway) + * nor RFC2385 (which requires a kernel patch on 2.4 kernels). + */ + +/* $Id: bgp.c,v 1.1 2004/06/23 03:52:24 fred_nerk Exp $ */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "l2tpns.h" +#include "bgp.h" +#include "util.h" + +static void bgp_clear(struct bgp_peer *peer); +static void bgp_set_retry(struct bgp_peer *peer); +static void bgp_cidr(in_addr_t ip, in_addr_t mask, struct bgp_ip_prefix *pfx); +static struct bgp_route_list *bgp_insert_route(struct bgp_route_list *head, + struct bgp_route_list *new); + +static void bgp_free_routes(struct bgp_route_list *routes); +static char const *bgp_state_str(enum bgp_state state); +static char const *bgp_msg_type_str(u8 type); +static int bgp_connect(struct bgp_peer *peer); +static int bgp_handle_connect(struct bgp_peer *peer); +static int bgp_write(struct bgp_peer *peer); +static int bgp_read(struct bgp_peer *peer); +static int bgp_handle_input(struct bgp_peer *peer); +static int bgp_send_open(struct bgp_peer *peer); +static int bgp_send_keepalive(struct bgp_peer *peer); +static int bgp_send_update(struct bgp_peer *peer); +static int bgp_send_notification(struct bgp_peer *peer, u8 code, u8 subcode); + +static u16 our_as; + +/* prepare peer structure, globals */ +int bgp_setup(int as) +{ + int i; + struct bgp_peer *peer; + + for (i = 0; i < BGP_NUM_PEERS; i++) + { + peer = &bgp_peers[i]; + memset(peer, 0, sizeof(*peer)); + + peer->addr = INADDR_NONE; + peer->sock = -1; + peer->state = peer->next_state = Disabled; + + if (!((peer->outbuf = malloc(sizeof(*peer->outbuf))) + && (peer->inbuf = malloc(sizeof(*peer->inbuf))))) + { + log(0, 0, 0, 0, "Can't allocate buffers for bgp peer (%s)\n", + strerror(errno)); + + return 0; + } + } + + if (as < 1) + as = 0; + + if ((our_as = as)) + return 0; + + bgp_routes = 0; + bgp_configured = 0; /* set by bgp_start */ + + return 1; +} + +/* start connection with a peer */ +int bgp_start(struct bgp_peer *peer, char *name, int as, int enable) +{ + struct hostent *h; + int ibgp; + int i; + struct bgp_path_attr a; + char path_attrs[64]; + char *p = path_attrs; + in_addr_t ip; + u32 metric = htonl(BGP_METRIC); + u32 no_export = htonl(BGP_COMMUNITY_NO_EXPORT); + + if (!our_as) + return 0; + + if (peer->state != Disabled) + bgp_halt(peer); + + snprintf(peer->name, sizeof(peer->name), "%s", name); + + if (!(h = gethostbyname(name)) || h->h_addrtype != AF_INET) + { + log(0, 0, 0, 0, "Can't get address for BGP peer %s (%s)\n", + name, h ? "no address" : hstrerror(h_errno)); + + return 0; + } + + memcpy(&peer->addr, h->h_addr, sizeof(peer->addr)); + peer->as = as > 0 ? as : our_as; + ibgp = peer->as == our_as; + + /* clear buffers, go to Idle state */ + peer->next_state = Idle; + bgp_clear(peer); + + /* set initial routing state */ + peer->routing = enable; + + /* all our routes use the same attributes, so prepare it in advance */ + if (peer->path_attrs) + free(peer->path_attrs); + + peer->path_attr_len = 0; + + /* ORIGIN */ + a.flags = BGP_PATH_ATTR_FLAG_TRANS; + a.code = BGP_PATH_ATTR_CODE_ORIGIN; + a.data.s.len = 1; + a.data.s.value[0] = BGP_PATH_ATTR_CODE_ORIGIN_IGP; + +#define ADD_ATTRIBUTE() do { \ + i = BGP_PATH_ATTR_SIZE(a); \ + memcpy(p, &a, i); \ + p += i; \ + peer->path_attr_len += i; } while (0) + + ADD_ATTRIBUTE(); + + /* AS_PATH */ + a.flags = BGP_PATH_ATTR_FLAG_TRANS; + a.code = BGP_PATH_ATTR_CODE_AS_PATH; + if (ibgp) + { + /* empty path */ + a.data.s.len = 0; + } + else + { + /* just our AS */ + struct { + u8 type; + u8 len; + u16 value; + } as_path = { + BGP_PATH_ATTR_CODE_AS_PATH_AS_SEQUENCE, + 1, + htons(our_as), + }; + + a.data.s.len = sizeof(as_path); + memcpy(&a.data.s.value, &as_path, sizeof(as_path)); + } + + ADD_ATTRIBUTE(); + + /* NEXT_HOP */ + a.flags = BGP_PATH_ATTR_FLAG_TRANS; + a.code = BGP_PATH_ATTR_CODE_NEXT_HOP; + ip = my_address; /* we're it */ + a.data.s.len = sizeof(ip); + memcpy(a.data.s.value, &ip, sizeof(ip)); + + ADD_ATTRIBUTE(); + + /* MULTI_EXIT_DISC */ + a.flags = BGP_PATH_ATTR_FLAG_OPTIONAL; + a.code = BGP_PATH_ATTR_CODE_MULTI_EXIT_DISC; + a.data.s.len = sizeof(metric); + memcpy(a.data.s.value, &metric, sizeof(metric)); + + ADD_ATTRIBUTE(); + + if (ibgp) + { + u32 local_pref = htonl(BGP_LOCAL_PREF); + + /* LOCAL_PREF */ + a.flags = BGP_PATH_ATTR_FLAG_TRANS; + a.code = BGP_PATH_ATTR_CODE_LOCAL_PREF; + a.data.s.len = sizeof(local_pref); + memcpy(a.data.s.value, &local_pref, sizeof(local_pref)); + + ADD_ATTRIBUTE(); + } + + /* COMMUNITIES */ + a.flags = BGP_PATH_ATTR_FLAG_OPTIONAL | BGP_PATH_ATTR_FLAG_TRANS; + a.code = BGP_PATH_ATTR_CODE_COMMUNITIES; + a.data.s.len = sizeof(no_export); + memcpy(a.data.s.value, &no_export, sizeof(no_export)); + + ADD_ATTRIBUTE(); + + if (!(peer->path_attrs = malloc(peer->path_attr_len))) + { + log(0, 0, 0, 0, "Can't allocate path_attrs for %s (%s)\n", + name, strerror(errno)); + + return 0; + } + + memcpy(peer->path_attrs, path_attrs, peer->path_attr_len); + + log(4, 0, 0, 0, "Initiating BGP connection to %s (routing %s)\n", + name, enable ? "enabled" : "suspended"); + + /* we have at least one peer configured */ + bgp_configured = 1; + + /* connect */ + return bgp_connect(peer); +} + +/* clear counters, timers, routes and buffers; close socket; move to + next_state, which may be Disabled or Idle */ +static void bgp_clear(struct bgp_peer *peer) +{ + if (peer->sock != -1) + { + close(peer->sock); + peer->sock = -1; + } + + peer->keepalive_time = 0; + peer->hold = 0; + peer->expire_time = 0; + + bgp_free_routes(peer->routes); + peer->routes = 0; + + peer->outbuf->packet.header.len = 0; + peer->outbuf->done = 0; + peer->inbuf->packet.header.len = 0; + peer->inbuf->done = 0; + + peer->cli_flag = 0; + + if (peer->state != peer->next_state) + { + peer->state = peer->next_state; + peer->state_time = time_now; + + log(4, 0, 0, 0, "BGP peer %s: state %s\n", peer->name, + bgp_state_str(peer->next_state)); + } +} + +/* initiate a clean shutdown */ +void bgp_stop(struct bgp_peer *peer) +{ + log(4, 0, 0, 0, "Terminating BGP connection to %s\n", peer->name); + bgp_send_notification(peer, BGP_ERR_CEASE, 0); +} + +/* drop connection (if any) and set state to Disabled */ +void bgp_halt(struct bgp_peer *peer) +{ + log(4, 0, 0, 0, "Aborting BGP connection to %s\n", peer->name); + peer->next_state = Disabled; + bgp_clear(peer); +} + +/* drop connection (if any) and set to Idle for connection retry */ +int bgp_restart(struct bgp_peer *peer) +{ + peer->next_state = Idle; + bgp_clear(peer); + + /* restart now */ + peer->retry_time = time_now; + peer->retry_count = 0; + + /* connect */ + return bgp_connect(peer); +} + +static void bgp_set_retry(struct bgp_peer *peer) +{ + if (peer->retry_count++ < BGP_MAX_RETRY) + { + peer->retry_time = time_now + (BGP_RETRY_BACKOFF * peer->retry_count); + peer->next_state = Idle; + bgp_clear(peer); + } + else + bgp_halt(peer); /* give up */ +} + +/* convert ip/mask to CIDR notation */ +static void bgp_cidr(in_addr_t ip, in_addr_t mask, struct bgp_ip_prefix *pfx) +{ + int i; + u32 b; + + /* convert to prefix notation */ + pfx->len = 32; + pfx->prefix = ip; + + if (!mask) /* bogus */ + mask = 0xffffffff; + + for (i = 0; i < 32 && ((b = ntohl(1 << i)), !(mask & b)); i++) + { + pfx->len--; + pfx->prefix &= ~b; + } +} + +/* insert route into list; sorted */ +static struct bgp_route_list *bgp_insert_route(struct bgp_route_list *head, + struct bgp_route_list *new) +{ + struct bgp_route_list *p = head; + struct bgp_route_list *e = 0; + + while (p && memcmp(&p->dest, &new->dest, sizeof(p->dest)) < 0) + { + e = p; + p = p->next; + } + + if (e) + { + new->next = e->next; + e->next = new; + } + else + { + new->next = head; + head = new; + } + + return head; +} + +/* add route to list for peers */ +/* + * Note: this doesn't do route aggregation, nor drop routes if a less + * specific match already exists (partly because I'm lazy, but also so + * that if that route is later deleted we don't have to be concerned + * about adding back the more specific one). + */ +int bgp_add_route(in_addr_t ip, in_addr_t mask) +{ + struct bgp_route_list *r = bgp_routes; + struct bgp_route_list add; + int i; + + bgp_cidr(ip, mask, &add.dest); + add.next = 0; + + /* check for duplicate */ + while (r) + { + i = memcmp(&r->dest, &add.dest, sizeof(r->dest)); + if (!i) + return 1; /* already covered */ + + if (i > 0) + break; + + r = r->next; + } + + /* insert into route list; sorted */ + if (!(r = malloc(sizeof(*r)))) + { + log(0, 0, 0, 0, "Can't allocate route for %s/%d (%s)\n", + inet_toa(add.dest.prefix), add.dest.len, strerror(errno)); + + return 0; + } + + memcpy(r, &add, sizeof(*r)); + bgp_routes = bgp_insert_route(bgp_routes, r); + + /* flag established peers for update */ + for (i = 0; i < BGP_NUM_PEERS; i++) + if (bgp_peers[i].state == Established) + bgp_peers[i].update_routes = 1; + + log(4, 0, 0, 0, "Registered BGP route %s/%d\n", inet_toa(add.dest.prefix), + add.dest.len); + + return 1; +} + +/* remove route from list for peers */ +int bgp_del_route(in_addr_t ip, in_addr_t mask) +{ + struct bgp_route_list *r = bgp_routes; + struct bgp_route_list *e = 0; + struct bgp_route_list del; + int i; + + bgp_cidr(ip, mask, &del.dest); + del.next = 0; + + /* find entry in routes list and remove */ + while (r) + { + i = memcmp(&r->dest, &del.dest, sizeof(r->dest)); + if (!i) + { + if (e) + e->next = r->next; + else + bgp_routes = r->next; + + free(r); + break; + } + + e = r; + + if (i > 0) + r = 0; /* stop */ + else + r = r->next; + } + + /* not found */ + if (!r) + return 1; + + /* flag established peers for update */ + for (i = 0; i < BGP_NUM_PEERS; i++) + if (bgp_peers[i].state == Established) + bgp_peers[i].update_routes = 1; + + log(4, 0, 0, 0, "Removed BGP route %s/%d\n", inet_toa(del.dest.prefix), + del.dest.len); + + return 1; +} + +/* enable or disable routing */ +void bgp_enable_routing(int enable) +{ + int i; + + for (i = 0; i < BGP_NUM_PEERS; i++) + { + bgp_peers[i].routing = enable; + + /* flag established peers for update */ + if (bgp_peers[i].state == Established) + bgp_peers[i].update_routes = 1; + } + + log(4, 0, 0, 0, "%s BGP routing\n", enable ? "Enabled" : "Suspended"); +} + +/* return a bitmask indicating if the socket should be added to the + read set (1) and or write set (2) for select */ +int bgp_select_state(struct bgp_peer *peer) +{ + int flags = 0; + + if (!bgp_configured) + return 0; + + if (peer->state == Disabled || peer->state == Idle) + return 0; + + if (peer->inbuf->done < BGP_MAX_PACKET_SIZE) + flags |= 1; + + if (peer->state == Connect || /* connection in progress */ + peer->update_routes || /* routing updates */ + peer->outbuf->packet.header.len) /* pending output */ + flags |= 2; + + return flags; +} + +/* process bgp peer */ +int bgp_process(struct bgp_peer *peer, int readable, int writable) +{ + if (!bgp_configured) + return 0; + + if (*peer->name && peer->cli_flag == BGP_CLI_RESTART) + return bgp_restart(peer); + + if (peer->state == Disabled) + return 1; + + if (peer->cli_flag) + { + switch (peer->cli_flag) + { + case BGP_CLI_SUSPEND: + if (peer->routing) + { + peer->routing = 0; + if (peer->state == Established) + peer->update_routes = 1; + } + + break; + + case BGP_CLI_ENABLE: + if (!peer->routing) + { + peer->routing = 1; + if (peer->state == Established) + peer->update_routes = 1; + } + + break; + } + + peer->cli_flag = 0; + } + + /* handle empty/fill of buffers */ + if (writable) + { + int r = 1; + if (peer->state == Connect) + r = bgp_handle_connect(peer); + else if (peer->outbuf->packet.header.len) + r = bgp_write(peer); + + if (!r) + return 0; + } + + if (readable) + { + if (!bgp_read(peer)) + return 0; + } + + /* process input buffer contents */ + while (peer->inbuf->done >= sizeof(peer->inbuf->packet.header) + && !peer->outbuf->packet.header.len) /* may need to queue a response */ + { + if (bgp_handle_input(peer) < 0) + return 0; + } + + /* process pending updates */ + if (peer->update_routes + && !peer->outbuf->packet.header.len) /* ditto */ + { + if (!bgp_send_update(peer)) + return 0; + } + + /* process timers */ + if (peer->state == Established) + { + if (time_now > peer->expire_time) + { + log(1, 0, 0, 0, "No message from BGP peer %s in %ds\n", + peer->name, peer->hold); + + bgp_send_notification(peer, BGP_ERR_HOLD_TIMER_EXP, 0); + return 0; + } + + if (time_now > peer->keepalive_time && !peer->outbuf->packet.header.len) + bgp_send_keepalive(peer); + } + else if (peer->state == Idle) + { + if (time_now > peer->retry_time) + return bgp_connect(peer); + } + else if (time_now > peer->state_time + BGP_KEEPALIVE_TIME) + { + log(1, 0, 0, 0, "%s timer expired for BGP peer %s\n", + bgp_state_str(peer->state), peer->name); + + return bgp_restart(peer); + } + + return 1; +} + +static void bgp_free_routes(struct bgp_route_list *routes) +{ + struct bgp_route_list *tmp; + + while ((tmp = routes)) + { + routes = tmp->next; + free(tmp); + } +} + +static char const *bgp_state_str(enum bgp_state state) +{ + switch (state) + { + case Disabled: return "Disabled"; + case Idle: return "Idle"; + case Connect: return "Connect"; + case Active: return "Active"; + case OpenSent: return "OpenSent"; + case OpenConfirm: return "OpenConfirm"; + case Established: return "Established"; + } + + return "?"; +} + +static char const *bgp_msg_type_str(u8 type) +{ + switch (type) + { + case BGP_MSG_OPEN: return "OPEN"; + case BGP_MSG_UPDATE: return "UPDATE"; + case BGP_MSG_NOTIFICATION: return "NOTIFICATION"; + case BGP_MSG_KEEPALIVE: return "KEEPALIVE"; + } + + return "?"; +} + +/* attempt to connect to peer */ +static int bgp_connect(struct bgp_peer *peer) +{ + static int bgp_port = 0; + struct sockaddr_in addr; + + if (!bgp_port) + { + struct servent *serv; + if (!(serv = getservbyname("bgp", "tcp"))) + { + log(0, 0, 0, 0, "Can't get bgp service (%s)\n", strerror(errno)); + return 0; + } + + bgp_port = serv->s_port; + } + + if ((peer->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) + { + log(0, 0, 0, 0, "Can't create a socket for BGP peer %s (%s)\n", + peer->name, strerror(errno)); + + peer->state = peer->next_state = Disabled; + return 0; + } + + /* set to non-blocking */ + fcntl(peer->sock, F_SETFL, fcntl(peer->sock, F_GETFL, 0) | O_NONBLOCK); + + /* try connect */ + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = bgp_port; + addr.sin_addr.s_addr = peer->addr; + + while (connect(peer->sock, (struct sockaddr *) &addr, sizeof(addr)) == -1) + { + if (errno == EINTR) /* SIGALARM handler */ + continue; + + if (errno != EINPROGRESS) + { + log(1, 0, 0, 0, "Can't connect to BGP peer %s (%s)\n", + inet_ntoa(addr.sin_addr), strerror(errno)); + + bgp_set_retry(peer); + return 0; + } + + peer->state = Connect; + peer->state_time = time_now; + + log(4, 0, 0, 0, "BGP peer %s: state Connect\n", peer->name); + return 1; + } + + peer->state = Active; + peer->state_time = time_now; + peer->retry_time = peer->retry_count = 0; + + log(4, 0, 0, 0, "BGP peer %s: state Active\n", inet_ntoa(addr.sin_addr)); + + return bgp_send_open(peer); +} + +/* complete partial connection (state = Connect) */ +static int bgp_handle_connect(struct bgp_peer *peer) +{ + int err = 0; + int len = sizeof(int); + getsockopt(peer->sock, SOL_SOCKET, SO_ERROR, &err, &len); + if (err) + { + log(1, 0, 0, 0, "Can't connect to BGP peer %s (%s)\n", peer->name, + strerror(err)); + + bgp_set_retry(peer); + return 0; + } + + peer->state = Active; + peer->state_time = time_now; + + log(4, 0, 0, 0, "BGP peer %s: state Active\n", peer->name); + + return bgp_send_open(peer); +} + +/* initiate a write */ +static int bgp_write(struct bgp_peer *peer) +{ + int len = htons(peer->outbuf->packet.header.len); + int r; + + while ((r = write(peer->sock, &peer->outbuf->packet + peer->outbuf->done, + len - peer->outbuf->done)) == -1) + { + if (errno == EINTR) + continue; + + if (errno == EAGAIN) + return 1; + + if (errno == EPIPE) + log(1, 0, 0, 0, "Connection to BGP peer %s closed\n", peer->name); + else + log(1, 0, 0, 0, "Can't write to BGP peer %s (%s)\n", peer->name, + strerror(errno)); + + bgp_set_retry(peer); + return 0; + } + + if (r < len) + { + peer->outbuf->done += r; + return 1; + } + + log(4, 0, 0, 0, "Sent %s to BGP peer %s\n", + bgp_msg_type_str(peer->outbuf->packet.header.type), peer->name); + + peer->outbuf->packet.header.len = 0; + peer->outbuf->done = 0; + + if (peer->state == Established) + peer->keepalive_time = time_now + BGP_KEEPALIVE_TIME; + + if (peer->state != peer->next_state) + { + if (peer->next_state == Disabled || peer->next_state == Idle) + { + bgp_clear(peer); + return 0; + } + + peer->state = peer->next_state; + peer->state_time = time_now; + + log(4, 0, 0, 0, "BGP peer %s: state %s\n", peer->name, + bgp_state_str(peer->state)); + } + + return 1; +} + +/* initiate a read */ +static int bgp_read(struct bgp_peer *peer) +{ + int r; + + while ((r = read(peer->sock, &peer->inbuf->packet + peer->inbuf->done, + BGP_MAX_PACKET_SIZE - peer->inbuf->done)) < 1) + { + if (!r) + { + log(1, 0, 0, 0, "Connection to BGP peer %s closed\n", peer->name); + } + else + { + if (errno == EINTR) + continue; + + if (errno == EAGAIN) + return 1; + + log(1, 0, 0, 0, "Can't read from BGP peer %s (%s)\n", peer->name, + strerror(errno)); + } + + bgp_set_retry(peer); + return 0; + } + + peer->inbuf->done += r; + return 1; +} + +/* process buffered packets */ +static int bgp_handle_input(struct bgp_peer *peer) +{ + struct bgp_packet *p = &peer->inbuf->packet; + int len = ntohs(p->header.len); + + if (len > BGP_MAX_PACKET_SIZE) + { + log(1, 0, 0, 0, "Bad header length from BGP %s\n", peer->name); + bgp_send_notification(peer, BGP_ERR_HEADER, BGP_ERR_HDR_BAD_LEN); + return 0; + } + + if (peer->inbuf->done < len) + return 0; + + log(4, 0, 0, 0, "Received %s from BGP peer %s\n", + bgp_msg_type_str(p->header.type), peer->name); + + switch (p->header.type) + { + case BGP_MSG_OPEN: + { + struct bgp_data_open data; + int i; + + for (i = 0; i < sizeof(p->header.marker); i++) + { + if ((unsigned char) p->header.marker[i] != 0xff) + { + log(1, 0, 0, 0, "Invalid marker from BGP peer %s\n", + peer->name); + + bgp_send_notification(peer, BGP_ERR_HEADER, + BGP_ERR_HDR_NOT_SYNC); + + return 0; + } + } + + if (peer->state != OpenSent) + { + log(1, 0, 0, 0, "OPEN from BGP peer %s in %s state\n", + peer->name, bgp_state_str(peer->state)); + + bgp_send_notification(peer, BGP_ERR_FSM, 0); + return 0; + } + + memcpy(&data, p->data, len - sizeof(p->header)); + + if (data.version != BGP_VERSION) + { + log(1, 0, 0, 0, "Bad version (%d) sent by BGP peer %s\n", + (int) data.version, peer->name); + + bgp_send_notification(peer, BGP_ERR_OPEN, BGP_ERR_OPN_VERSION); + return 0; + } + + if (ntohs(data.as) != peer->as) + { + log(1, 0, 0, 0, "Bad AS sent by BGP peer %s (got %d, " + "expected %d)\n", peer->name, (int) htons(data.as), + (int) peer->as); + + bgp_send_notification(peer, BGP_ERR_OPEN, BGP_ERR_OPN_BAD_AS); + return 0; + } + + if ((peer->hold = ntohs(data.hold_time)) < 10) + { + log(1, 0, 0, 0, "Bad hold time (%d) from BGP peer %s\n", + peer->hold, peer->name); + + bgp_send_notification(peer, BGP_ERR_OPEN, BGP_ERR_OPN_HOLD_TIME); + return 0; + } + + /* next transition requires an exchange of keepalives */ + bgp_send_keepalive(peer); + + /* FIXME: may need to check for optional params */ + } + + break; + + case BGP_MSG_KEEPALIVE: + if (peer->state == OpenConfirm) + { + peer->state = peer->next_state = Established; + peer->state_time = time_now; + peer->keepalive_time = time_now + BGP_KEEPALIVE_TIME; + peer->update_routes = 1; + peer->retry_count = 0; + peer->retry_time = 0; + + log(4, 0, 0, 0, "BGP peer %s: state Established\n", peer->name); + } + + break; + + case BGP_MSG_NOTIFICATION: + if (len > sizeof(p->header)) + { + struct bgp_data_notification *notification = + (struct bgp_data_notification *) p->data; + + if (notification->error_code == BGP_ERR_CEASE) + { + log(4, 0, 0, 0, "BGP peer %s sent CEASE\n", peer->name); + bgp_halt(peer); + return 0; + } + + /* FIXME: should handle more notifications */ + log(4, 0, 0, 0, "BGP peer %s sent unhandled NOTIFICATION %d\n", + peer->name, (int) notification->error_code); + } + + break; + } + + /* reset timer */ + peer->expire_time = time_now + peer->hold; + + /* see if there's another message in the same packet/buffer */ + if (peer->inbuf->done > len) + { + peer->inbuf->done -= len; + memmove(p, (char *) p + len, peer->inbuf->done); + } + else + { + peer->inbuf->packet.header.len = 0; + peer->inbuf->done = 0; + } + + return peer->inbuf->done; +} + +/* send/buffer OPEN message */ +static int bgp_send_open(struct bgp_peer *peer) +{ + struct bgp_data_open data; + u16 len = sizeof(peer->outbuf->packet.header); + + memset(peer->outbuf->packet.header.marker, 0xff, + sizeof(peer->outbuf->packet.header.marker)); + + peer->outbuf->packet.header.type = BGP_MSG_OPEN; + + data.version = BGP_VERSION; + data.as = htons(our_as); + data.hold_time = htons(BGP_HOLD_TIME); + data.identifier = my_address; + data.opt_len = 0; + + memcpy(peer->outbuf->packet.data, &data, BGP_DATA_OPEN_SIZE); + len += BGP_DATA_OPEN_SIZE; + + peer->outbuf->packet.header.len = htons(len); + peer->outbuf->done = 0; + peer->next_state = OpenSent; + + return bgp_write(peer); +} + +/* send/buffer KEEPALIVE message */ +static int bgp_send_keepalive(struct bgp_peer *peer) +{ + memset(peer->outbuf->packet.header.marker, 0xff, + sizeof(peer->outbuf->packet.header.marker)); + + peer->outbuf->packet.header.type = BGP_MSG_KEEPALIVE; + peer->outbuf->packet.header.len = + htons(sizeof(peer->outbuf->packet.header)); + + peer->outbuf->done = 0; + peer->next_state = (peer->state == OpenSent) ? OpenConfirm : peer->state; + + return bgp_write(peer); +} + +/* send/buffer UPDATE message */ +static int bgp_send_update(struct bgp_peer *peer) +{ + u16 unf_len = 0; + u16 attr_len; + u16 len = sizeof(peer->outbuf->packet.header); + struct bgp_route_list *have = peer->routes; + struct bgp_route_list *want = peer->routing ? bgp_routes : 0; + struct bgp_route_list *e = 0; + struct bgp_route_list *add = 0; + int s; + + char *data = (char *) &peer->outbuf->packet.data; + + /* need leave room for attr_len, bgp_path_attrs and one prefix */ + char *max = (char *) &peer->outbuf->packet.data + + sizeof(peer->outbuf->packet.data) + - sizeof(attr_len) - peer->path_attr_len - sizeof(struct bgp_ip_prefix); + + /* skip over unf_len */ + data += sizeof(unf_len); + len += sizeof(unf_len); + + memset(peer->outbuf->packet.header.marker, 0xff, + sizeof(peer->outbuf->packet.header.marker)); + + peer->outbuf->packet.header.type = BGP_MSG_UPDATE; + + peer->update_routes = 0; /* tentatively clear */ + + /* find differences */ + while ((have || want) && data < (max - sizeof(struct bgp_ip_prefix))) + { + if (have) + s = want + ? memcmp(&have->dest, &want->dest, sizeof(have->dest)) + : -1; + else + s = 1; + + if (s < 0) /* found one to delete */ + { + struct bgp_route_list *tmp = have; + have = have->next; + + s = BGP_IP_PREFIX_SIZE(tmp->dest); + memcpy(data, &tmp->dest, s); + data += s; + unf_len += s; + len += s; + + log(5, 0, 0, 0, "Withdrawing route %s/%d from BGP peer %s\n", + inet_toa(tmp->dest.prefix), tmp->dest.len, peer->name); + + free(tmp); + + if (e) + e->next = have; + else + peer->routes = have; + } + else + { + if (!s) /* same */ + { + e = have; /* stash the last found to relink above */ + have = have->next; + want = want->next; + } + else if (s > 0) /* addition reqd. */ + { + if (add) + { + peer->update_routes = 1; /* only one add per packet */ + if (!have) + break; + } + else + add = want; + + if (want) + want = want->next; + } + } + } + + if (have || want) + peer->update_routes = 1; /* more to do */ + + /* anything changed? */ + if (!(unf_len || add)) + return 1; + + /* go back and insert unf_len */ + unf_len = htons(unf_len); + memcpy(&peer->outbuf->packet.data, &unf_len, sizeof(unf_len)); + + if (add) + { + if (!(e = malloc(sizeof(*e)))) + { + log(0, 0, 0, 0, "Can't allocate route for %s/%d (%s)\n", + inet_toa(add->dest.prefix), add->dest.len, strerror(errno)); + + return 0; + } + + memcpy(e, add, sizeof(*e)); + e->next = 0; + peer->routes = bgp_insert_route(peer->routes, e); + + attr_len = htons(peer->path_attr_len); + memcpy(data, &attr_len, sizeof(attr_len)); + data += sizeof(attr_len); + len += sizeof(attr_len); + + memcpy(data, peer->path_attrs, peer->path_attr_len); + data += peer->path_attr_len; + len += peer->path_attr_len; + + s = BGP_IP_PREFIX_SIZE(add->dest); + memcpy(data, &add->dest, s); + data += s; + len += s; + + log(5, 0, 0, 0, "Advertising route %s/%d to BGP peer %s\n", + inet_toa(add->dest.prefix), add->dest.len, peer->name); + } + else + { + attr_len = 0; + memcpy(data, &attr_len, sizeof(attr_len)); + data += sizeof(attr_len); + len += sizeof(attr_len); + } + + peer->outbuf->packet.header.len = htons(len); + peer->outbuf->done = 0; + + return bgp_write(peer); +} + +/* send/buffer NOTIFICATION message */ +static int bgp_send_notification(struct bgp_peer *peer, u8 code, u8 subcode) +{ + struct bgp_data_notification data; + u16 len = 0; + + data.error_code = code; + len += sizeof(data.error_code); + + data.error_subcode = subcode; + len += sizeof(data.error_code); + + memset(peer->outbuf->packet.header.marker, 0xff, + sizeof(peer->outbuf->packet.header.marker)); + + peer->outbuf->packet.header.type = BGP_MSG_NOTIFICATION; + peer->outbuf->packet.header.len = + htons(sizeof(peer->outbuf->packet.header) + len); + + memcpy(peer->outbuf->packet.data, &data, len); + + peer->outbuf->done = 0; + peer->next_state = code == BGP_ERR_CEASE ? Disabled : Idle; + + /* we're dying; ignore any pending input */ + peer->inbuf->packet.header.len = 0; + peer->inbuf->done = 0; + + return bgp_write(peer); +} + +/* CLI stuff */ + +#include + +int cmd_show_bgp(struct cli_def *cli, char *command, char **argv, int argc) +{ + int i; + int hdr = 0; + char *addr; + + if (!bgp_configured) + return CLI_OK; + + cli_print(cli, "BGPv%d router identifier %s, local AS number %d, " + "hold time %ds", BGP_VERSION, inet_toa(my_address), (int) our_as, + BGP_HOLD_TIME); + + time(&time_now); + + for (i = 0; i < BGP_NUM_PEERS; i++) + { + if (!*bgp_peers[i].name) + continue; + + addr = inet_toa(bgp_peers[i].addr); + if (argc && strcmp(addr, argv[0]) && + strncmp(bgp_peers[i].name, argv[0], strlen(argv[0]))) + continue; + + if (!hdr++) + { + cli_print(cli, ""); + cli_print(cli, "Peer AS Address " + "State Retries Retry in Route Pend"); + cli_print(cli, "------------------ ----- --------------- " + "----------- ------- -------- ----- ----"); + } + + cli_print(cli, "%-18.18s %5d %15s %-11s %7d %7ds %5s %4s", + bgp_peers[i].name, + bgp_peers[i].as, + addr, + bgp_state_str(bgp_peers[i].state), + bgp_peers[i].retry_count, + bgp_peers[i].retry_time ? bgp_peers[i].retry_time - time_now : 0, + bgp_peers[i].routing ? "yes" : "no", + bgp_peers[i].update_routes ? "yes" : "no"); + } + + return CLI_OK; +} + +int cmd_suspend_bgp(struct cli_def *cli, char *command, char **argv, int argc) +{ + int i; + char *addr; + + if (!bgp_configured) + return CLI_OK; + + for (i = 0; i < BGP_NUM_PEERS; i++) + { + if (bgp_peers[i].state != Established) + continue; + + if (!bgp_peers[i].routing) + continue; + + addr = inet_toa(bgp_peers[i].addr); + if (argc && strcmp(addr, argv[0]) && strcmp(bgp_peers[i].name, argv[0])) + continue; + + bgp_peers[i].cli_flag = BGP_CLI_SUSPEND; + cli_print(cli, "Suspending peer %s", bgp_peers[i].name); + } + + return CLI_OK; +} + +int cmd_no_suspend_bgp(struct cli_def *cli, char *command, char **argv, int argc) +{ + int i; + char *addr; + + if (!bgp_configured) + return CLI_OK; + + for (i = 0; i < BGP_NUM_PEERS; i++) + { + if (bgp_peers[i].state != Established) + continue; + + if (bgp_peers[i].routing) + continue; + + addr = inet_toa(bgp_peers[i].addr); + if (argc && strcmp(addr, argv[0]) && + strncmp(bgp_peers[i].name, argv[0], strlen(argv[0]))) + continue; + + bgp_peers[i].cli_flag = BGP_CLI_ENABLE; + cli_print(cli, "Un-suspending peer %s", bgp_peers[i].name); + } + + return CLI_OK; +} + +int cmd_restart_bgp(struct cli_def *cli, char *command, char **argv, int argc) +{ + int i; + char *addr; + + if (!bgp_configured) + return CLI_OK; + + for (i = 0; i < BGP_NUM_PEERS; i++) + { + if (!*bgp_peers[i].name) + continue; + + addr = inet_toa(bgp_peers[i].addr); + if (argc && strcmp(addr, argv[0]) && + strncmp(bgp_peers[i].name, argv[0], strlen(argv[0]))) + continue; + + bgp_peers[i].cli_flag = BGP_CLI_RESTART; + cli_print(cli, "Restarting peer %s", bgp_peers[i].name); + } + + return CLI_OK; +} diff --git a/bgp.h b/bgp.h new file mode 100644 index 0000000..fc47a02 --- /dev/null +++ b/bgp.h @@ -0,0 +1,202 @@ +/* BGPv4 (RFC1771) */ +/* $Id: bgp.h,v 1.1 2004/06/23 03:52:24 fred_nerk Exp $ */ + +#ifndef __BGP_H__ +#define __BGP_H__ + +#define BGP_MAX_PACKET_SIZE 4096 +#define BGP_HOLD_TIME 180 /* seconds before peer times us out */ +#define BGP_KEEPALIVE_TIME 60 /* seconds between messages */ +#define BGP_MAX_RETRY 42 /* maximum number of times to retry */ +#define BGP_RETRY_BACKOFF 60 /* number of seconds between retries, + cumulative */ + +#define BGP_METRIC 1 /* multi_exit_disc */ +#define BGP_LOCAL_PREF 100 /* local preference value */ + +struct bgp_header { + char marker[16]; + u16 len; + u8 type; +} __attribute__ ((packed)); + +/* bgp_header.type */ +#define BGP_MSG_OPEN 1 +#define BGP_MSG_UPDATE 2 +#define BGP_MSG_NOTIFICATION 3 +#define BGP_MSG_KEEPALIVE 4 + +struct bgp_packet { + struct bgp_header header; + char data[BGP_MAX_PACKET_SIZE - sizeof(struct bgp_header)]; /* variable */ +} __attribute__ ((packed)); + +struct bgp_data_open { + u8 version; +#define BGP_VERSION 4 + u16 as; + u16 hold_time; + u32 identifier; + u8 opt_len; +#define BGP_DATA_OPEN_SIZE 10 /* size of struct excluding opt_params */ + char opt_params[sizeof(((struct bgp_packet *)0)->data) - BGP_DATA_OPEN_SIZE]; /* variable */ +} __attribute__ ((packed)); + +struct bgp_ip_prefix { + u8 len; + u32 prefix; /* variable */ +} __attribute__ ((packed)); + +#define BGP_IP_PREFIX_SIZE(p) (1 + ((p).len / 8) + ((p).len % 8 != 0)) + +struct bgp_path_attr { + u8 flags; + u8 code; + union { + struct { + u8 len; + char value[29]; /* semi-random size, adequate for l2tpns */ + } __attribute__ ((packed)) s; /* short */ + struct { + u16 len; + char value[28]; + } __attribute__ ((packed)) e; /* extended */ + } data; /* variable */ +} __attribute__ ((packed)); + +/* bgp_path_attr.flags (bitfields) */ +#define BGP_PATH_ATTR_FLAG_OPTIONAL (1 << 7) +#define BGP_PATH_ATTR_FLAG_TRANS (1 << 6) +#define BGP_PATH_ATTR_FLAG_PARTIAL (1 << 5) +#define BGP_PATH_ATTR_FLAG_EXTLEN (1 << 4) + +/* bgp_path_attr.code, ...value */ +#define BGP_PATH_ATTR_CODE_ORIGIN 1 /* well-known, mandatory */ +# define BGP_PATH_ATTR_CODE_ORIGIN_IGP 0 +# define BGP_PATH_ATTR_CODE_ORIGIN_EGP 1 +# define BGP_PATH_ATTR_CODE_ORIGIN_INCOMPLETE 2 +#define BGP_PATH_ATTR_CODE_AS_PATH 2 /* well-known, mandatory */ +# define BGP_PATH_ATTR_CODE_AS_PATH_AS_SET 1 +# define BGP_PATH_ATTR_CODE_AS_PATH_AS_SEQUENCE 2 +#define BGP_PATH_ATTR_CODE_NEXT_HOP 3 /* well-known, mandatory */ +#define BGP_PATH_ATTR_CODE_MULTI_EXIT_DISC 4 /* optional, non-transitive */ +#define BGP_PATH_ATTR_CODE_LOCAL_PREF 5 /* well-known, discretionary */ +#define BGP_PATH_ATTR_CODE_ATOMIC_AGGREGATE 6 /* well-known, discretionary */ +#define BGP_PATH_ATTR_CODE_AGGREGATOR 7 /* optional, transitive */ +#define BGP_PATH_ATTR_CODE_COMMUNITIES 8 /* optional, transitive (RFC1997) */ + +#define BGP_PATH_ATTR_SIZE(p) ((((p).flags & BGP_PATH_ATTR_FLAG_EXTLEN) \ + ? ((p).data.e.len + 1) : (p).data.s.len) + 3) + +/* well known COMMUNITIES */ +#define BGP_COMMUNITY_NO_EXPORT 0xffffff01 /* don't advertise outside confederation */ +#define BGP_COMMUNITY_NO_ADVERTISE 0xffffff02 /* don't advertise to any peer */ +#define BGP_COMMUNITY_NO_EXPORT_SUBCONFED 0xffffff03 /* don't advertise to any other AS */ + +struct bgp_data_notification { + u8 error_code; + u8 error_subcode; + char data[sizeof(((struct bgp_packet *)0)->data) - 2]; /* variable */ +} __attribute__ ((packed)); + +/* bgp_data_notification.error_code, .error_subcode */ +#define BGP_ERR_HEADER 1 +# define BGP_ERR_HDR_NOT_SYNC 1 +# define BGP_ERR_HDR_BAD_LEN 2 +# define BGP_ERR_HDR_BAD_TYPE 3 +#define BGP_ERR_OPEN 2 +# define BGP_ERR_OPN_VERSION 1 +# define BGP_ERR_OPN_BAD_AS 2 +# define BGP_ERR_OPN_BAD_IDENT 3 +# define BGP_ERR_OPN_UNSUP_PARAM 4 +# define BGP_ERR_OPN_AUTH_FAILURE 5 +# define BGP_ERR_OPN_HOLD_TIME 6 +#define BGP_ERR_UPDATE 3 +# define BGP_ERR_UPD_BAD_ATTR_LIST 1 +# define BGP_ERR_UPD_UNKN_WK_ATTR 2 +# define BGP_ERR_UPD_MISS_WK_ATTR 3 +# define BGP_ERR_UPD_BAD_ATTR_FLAG 4 +# define BGP_ERR_UPD_BAD_ATTR_LEN 5 +# define BGP_ERR_UPD_BAD_ORIGIN 6 +# define BGP_ERR_UPD_ROUTING_LOOP 7 +# define BGP_ERR_UPD_BAD_NEXT_HOP 8 +# define BGP_ERR_UPD_BAD_OPT_ATTR 9 +# define BGP_ERR_UPD_BAD_NETWORK 10 +# define BGP_ERR_UPD_BAD_AS_PATH 11 +#define BGP_ERR_HOLD_TIMER_EXP 4 +#define BGP_ERR_FSM 5 +#define BGP_ERR_CEASE 6 + +enum bgp_state { + Disabled, /* initial, or failed */ + Idle, /* trying to connect */ + Connect, /* connect issued */ + Active, /* connected, waiting to send OPEN */ + OpenSent, /* OPEN sent, waiting for peer OPEN */ + OpenConfirm, /* KEEPALIVE sent, waiting for peer KEEPALIVE */ + Established, /* established */ +}; + +struct bgp_route_list { + struct bgp_ip_prefix dest; + struct bgp_route_list *next; +}; + +struct bgp_buf { + struct bgp_packet packet; /* BGP packet */ + size_t done; /* bytes sent/recvd */ +}; + +/* state */ +struct bgp_peer { + char name[32]; /* peer name */ + in_addr_t addr; /* peer address */ + int as; /* AS number */ + int sock; + enum bgp_state state; /* FSM state */ + enum bgp_state next_state; /* next state after outbuf cleared */ + time_t state_time; /* time of last state change */ + time_t keepalive_time; /* time to send next keepalive */ + time_t retry_time; /* time for connection retry */ + int retry_count; /* connection retry count */ + int hold; /* hold time from peer */ + time_t expire_time; /* time next peer packet expected */ + int routing; /* propagate routes */ + int update_routes; /* UPDATE required */ + struct bgp_route_list *routes; /* routes known by this peer */ + struct bgp_buf *outbuf; /* pending output */ + struct bgp_buf *inbuf; /* pending input */ + int cli_flag; /* updates requested from CLI */ + char *path_attrs; /* path attrs to send in UPDATE message */ + int path_attr_len; /* length of path attrs */ +}; + +/* bgp_peer.cli_flag */ +#define BGP_CLI_SUSPEND 1 +#define BGP_CLI_ENABLE 2 +#define BGP_CLI_RESTART 3 + +#define BGP_NUM_PEERS 2 +extern struct bgp_peer *bgp_peers; +extern struct bgp_route_list *bgp_routes; +extern int bgp_configured; + +/* actions */ +int bgp_setup(int as); +int bgp_start(struct bgp_peer *peer, char *name, int as, int enable); +void bgp_stop(struct bgp_peer *peer); +void bgp_halt(struct bgp_peer *peer); +int bgp_restart(struct bgp_peer *peer); +int bgp_add_route(in_addr_t ip, in_addr_t mask); +int bgp_del_route(in_addr_t ip, in_addr_t mask); +void bgp_enable_routing(int enable); +int bgp_select_state(struct bgp_peer *peer); +int bgp_process(struct bgp_peer *peer, int readable, int writable); + +/* CLI */ +int cmd_show_bgp(struct cli_def *cli, char *command, char **argv, int argc); +int cmd_suspend_bgp(struct cli_def *cli, char *command, char **argv, int argc); +int cmd_no_suspend_bgp(struct cli_def *cli, char *command, char **argv, int argc); +int cmd_restart_bgp(struct cli_def *cli, char *command, char **argv, int argc); + +#endif /* __BGP_H__ */ diff --git a/cli.c b/cli.c index 321f029..af13505 100644 --- a/cli.c +++ b/cli.c @@ -1,5 +1,5 @@ // L2TPNS Command Line Interface -// $Id: cli.c,v 1.4 2004/05/24 04:12:02 fred_nerk Exp $ +// $Id: cli.c,v 1.5 2004/06/23 03:52:24 fred_nerk Exp $ // vim: sw=4 ts=8 #include @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -16,23 +17,26 @@ #include #include #include +#include #include "l2tpns.h" -#include "libcli.h" #include "util.h" +#include "cluster.h" +#include "tbf.h" +#ifdef BGP +#include "bgp.h" +#endif extern tunnelt *tunnel; extern sessiont *session; extern radiust *radius; extern ippoolt *ip_address_pool; extern struct Tstats *_statistics; -extern int cli_pid; struct cli_def *cli = NULL; int cli_quit = 0; extern int clifd, udpfd, tapfd, snoopfd, ifrfd, cluster_sockfd; extern int *radfds; extern sessionidt *cli_session_kill; extern tunnelidt *cli_tunnel_kill; -extern tbft *filter_buckets; extern struct configt *config; extern struct config_descriptt config_values[]; extern char hostname[]; @@ -40,7 +44,7 @@ extern char hostname[]; extern struct Tringbuffer *ringbuffer; #endif -char *rcs_id = "$Id: cli.c,v 1.4 2004/05/24 04:12:02 fred_nerk Exp $"; +char *rcs_id = "$Id: cli.c,v 1.5 2004/06/23 03:52:24 fred_nerk Exp $"; char *debug_levels[] = { "CRIT", @@ -77,6 +81,7 @@ int cmd_show_run(struct cli_def *cli, char *command, char **argv, int argc); int cmd_show_banana(struct cli_def *cli, char *command, char **argv, int argc); int cmd_show_plugins(struct cli_def *cli, char *command, char **argv, int argc); int cmd_show_throttle(struct cli_def *cli, char *command, char **argv, int argc); +int cmd_show_cluster(struct cli_def *cli, char *command, char **argv, int argc); int cmd_write_memory(struct cli_def *cli, char *command, char **argv, int argc); int cmd_clear_counters(struct cli_def *cli, char *command, char **argv, int argc); int cmd_drop_user(struct cli_def *cli, char *command, char **argv, int argc); @@ -99,22 +104,33 @@ void init_cli() FILE *f; char buf[4096]; struct cli_command *c; + struct cli_command *c2; int on = 1; struct sockaddr_in addr; cli = cli_init(); c = cli_register_command(cli, NULL, "show", NULL, NULL); + cli_register_command(cli, c, "banana", cmd_show_banana, "Show a banana"); +#ifdef BGP + cli_register_command(cli, c, "bgp", cmd_show_bgp, "Show BGP status"); +#endif /* BGP */ + cli_register_command(cli, c, "cluster", cmd_show_cluster, "Show cluster information"); + cli_register_command(cli, c, "ipcache", cmd_show_ipcache, "Show contents of the IP cache"); + cli_register_command(cli, c, "plugins", cmd_show_plugins, "List all installed plugins"); + cli_register_command(cli, c, "pool", cmd_show_pool, "Show the IP address allocation pool"); + cli_register_command(cli, c, "radius", cmd_show_radius, "Show active radius queries"); + cli_register_command(cli, c, "running-config", cmd_show_run, "Show the currently running configuration"); cli_register_command(cli, c, "session", cmd_show_session, "Show a list of sessions or details for a single session"); + cli_register_command(cli, c, "tbf", cmd_show_tbf, "List all token bucket filters in use"); + cli_register_command(cli, c, "throttle", cmd_show_throttle, "List all throttled sessions and associated TBFs"); cli_register_command(cli, c, "tunnels", cmd_show_tunnels, "Show a list of tunnels or details for a single tunnel"); cli_register_command(cli, c, "users", cmd_show_users, "Show a list of all connected users or details of selected user"); cli_register_command(cli, c, "version", cmd_show_version, "Show currently running software version"); - cli_register_command(cli, c, "banana", cmd_show_banana, "Show a banana"); - cli_register_command(cli, c, "pool", cmd_show_pool, "Show the IP address allocation pool"); - cli_register_command(cli, c, "running-config", cmd_show_run, "Show the currently running configuration"); - cli_register_command(cli, c, "radius", cmd_show_radius, "Show active radius queries"); - cli_register_command(cli, c, "plugins", cmd_show_plugins, "List all installed plugins"); - cli_register_command(cli, c, "throttle", cmd_show_throttle, "List all token bucket filters in use"); + + c2 = cli_register_command(cli, c, "histogram", NULL, NULL); + cli_register_command(cli, c2, "idle", cmd_show_hist_idle, "Show histogram of session idle times"); + cli_register_command(cli, c2, "open", cmd_show_hist_open, "Show histogram of session durations"); #ifdef STATISTICS cli_register_command(cli, c, "counters", cmd_show_counters, "Display all the internal counters and running totals"); @@ -131,18 +147,25 @@ void init_cli() cli_register_command(cli, NULL, "snoop", cmd_snoop, "Temporarily enable interception for a user"); cli_register_command(cli, NULL, "throttle", cmd_throttle, "Temporarily enable throttling for a user"); + cli_register_command(cli, NULL, "debug", cmd_debug, "Set the level of logging that is shown on the console"); + + c = cli_register_command(cli, NULL, "suspend", NULL, NULL); + cli_register_command(cli, c, "bgp", cmd_suspend_bgp, "Withdraw routes from BGP peer"); c = cli_register_command(cli, NULL, "no", NULL, NULL); cli_register_command(cli, c, "snoop", cmd_no_snoop, "Temporarily disable interception for a user"); cli_register_command(cli, c, "throttle", cmd_no_throttle, "Temporarily disable throttling for a user"); cli_register_command(cli, c, "debug", cmd_no_debug, "Turn off logging of a certain level of debugging"); + c2 = cli_register_command(cli, c, "suspend", NULL, NULL); + cli_register_command(cli, c2, "bgp", cmd_no_suspend_bgp, "Advertise routes to BGP peer"); c = cli_register_command(cli, NULL, "drop", NULL, NULL); cli_register_command(cli, c, "user", cmd_drop_user, "Disconnect a user"); cli_register_command(cli, c, "tunnel", cmd_drop_tunnel, "Disconnect a tunnel and all sessions on that tunnel"); cli_register_command(cli, c, "session", cmd_drop_session, "Disconnect a session"); - cli_register_command(cli, NULL, "debug", cmd_debug, "Set the level of logging that is shown on the console"); + c = cli_register_command(cli, NULL, "restart", NULL, NULL); + cli_register_command(cli, c, "bgp", cmd_restart_bgp, "Restart BGP"); c = cli_register_command(cli, NULL, "load", NULL, NULL); cli_register_command(cli, c, "plugin", cmd_load_plugin, "Load a plugin"); @@ -200,6 +223,30 @@ void cli_do(int sockfd) int i; if (fork()) return; + if (config->scheduler_fifo) + { + int ret; + struct sched_param params = {0}; + params.sched_priority = 0; + if ((ret = sched_setscheduler(0, SCHED_OTHER, ¶ms)) == 0) + { + log(3, 0, 0, 0, "Dropped FIFO scheduler\n"); + } + else + { + log(0, 0, 0, 0, "Error setting scheduler to OTHER: %s\n", strerror(errno)); + log(0, 0, 0, 0, "This is probably really really bad.\n"); + } + } + + signal(SIGPIPE, SIG_DFL); + signal(SIGCHLD, SIG_DFL); + signal(SIGHUP, SIG_DFL); + signal(SIGUSR1, SIG_DFL); + signal(SIGQUIT, SIG_DFL); + signal(SIGKILL, SIG_DFL); + signal(SIGALRM, SIG_DFL); + signal(SIGTERM, SIG_DFL); // Close sockets if (udpfd) close(udpfd); udpfd = 0; @@ -210,14 +257,11 @@ void cli_do(int sockfd) if (ifrfd) close(ifrfd); ifrfd = 0; if (cluster_sockfd) close(cluster_sockfd); cluster_sockfd = 0; if (clifd) close(clifd); clifd = 0; - - signal(SIGPIPE, SIG_DFL); - signal(SIGCHLD, SIG_DFL); - signal(SIGHUP, SIG_DFL); - signal(SIGUSR1, SIG_DFL); - signal(SIGQUIT, SIG_DFL); - signal(SIGKILL, SIG_DFL); - signal(SIGALRM, SIG_DFL); +#ifdef BGP + for (i = 0; i < BGP_NUM_PEERS; i++) + if (bgp_peers[i].sock != -1) + close(bgp_peers[i].sock); +#endif /* BGP */ log(3, 0, 0, 0, "Accepted connection to CLI\n"); @@ -231,7 +275,7 @@ void cli_do(int sockfd) { char prompt[1005]; - snprintf(prompt, 1005, "%s> ", hostname); + snprintf(prompt, 1005, "l2tpns> "); cli_loop(cli, sockfd, prompt); } @@ -281,16 +325,20 @@ int cmd_show_session(struct cli_def *cli, char *command, char **argv, int argc) cli_print(cli, " Idle time: %u seconds", abs(time_now - session[s].last_packet)); cli_print(cli, " Next Recv: %u", session[s].nr); cli_print(cli, " Next Send: %u", session[s].ns); - cli_print(cli, " Bytes In/Out: %lu/%lu", (unsigned long)session[s].cin, (unsigned long)session[s].total_cout); - cli_print(cli, " Pkts In/Out: %lu/%lu", (unsigned long)session[s].pin, (unsigned long)session[s].pout); + cli_print(cli, " Bytes In/Out: %lu/%lu", (unsigned long)session[s].total_cout, (unsigned long)session[s].total_cin); + cli_print(cli, " Pkts In/Out: %lu/%lu", (unsigned long)session[s].pout, (unsigned long)session[s].pin); cli_print(cli, " MRU: %d", session[s].mru); cli_print(cli, " Radius Session: %u", session[s].radius); cli_print(cli, " Rx Speed: %lu", session[s].rx_connect_speed); cli_print(cli, " Tx Speed: %lu", session[s].tx_connect_speed); - cli_print(cli, " Intercepted: %s", session[s].snoop ? "YES" : "no"); + if (session[s].snoop_ip && session[s].snoop_port) + cli_print(cli, " Intercepted: %s:%d", inet_toa(session[s].snoop_ip), session[s] .snoop_port); + else + cli_print(cli, " Intercepted: no"); cli_print(cli, " Throttled: %s", session[s].throttle ? "YES" : "no"); cli_print(cli, " Walled Garden: %s", session[s].walled_garden ? "YES" : "no"); - cli_print(cli, " Filter Bucket: %s", session[s].tbf ? filter_buckets[session[s].tbf].handle : "none"); + cli_print(cli, " Filter BucketI: %d", session[s].tbf_in); + cli_print(cli, " Filter BucketO: %d", session[s].tbf_out); } return CLI_OK; } @@ -321,7 +369,7 @@ int cmd_show_session(struct cli_def *cli, char *command, char **argv, int argc) session[i].tunnel, session[i].user[0] ? session[i].user : "*", userip, - (session[i].snoop) ? "Y" : "N", + (session[i].snoop_ip && session[i].snoop_port) ? "Y" : "N", (session[i].throttle) ? "Y" : "N", (session[i].walled_garden) ? "Y" : "N", abs(time_now - (unsigned long)session[i].opened), @@ -490,6 +538,9 @@ int cmd_show_counters(struct cli_def *cli, char *command, char **argv, int argc) cli_print(cli, "%-30s%lu", "session_overflow", GET_STAT(session_overflow)); cli_print(cli, "%-30s%lu", "ip_allocated", GET_STAT(ip_allocated)); cli_print(cli, "%-30s%lu", "ip_freed", GET_STAT(ip_freed)); + cli_print(cli, "%-30s%lu", "cluster_forwarded", GET_STAT(c_forwarded)); + cli_print(cli, "%-30s%lu", "recv_forward", GET_STAT(recv_forward)); + #ifdef STAT_CALLS cli_print(cli, "\n%-30s%-10s", "Counter", "Value"); @@ -549,7 +600,7 @@ int cmd_show_pool(struct cli_def *cli, char *command, char **argv, int argc) if (ip_address_pool[i].assigned) { cli_print(cli, "%-15s Y %8d %s", - inet_toa(ip_address_pool[i].address), ip_address_pool[i].session, session[ip_address_pool[i].session].user); + inet_toa(htonl(ip_address_pool[i].address)), ip_address_pool[i].session, session[ip_address_pool[i].session].user); used++; } @@ -557,10 +608,10 @@ int cmd_show_pool(struct cli_def *cli, char *command, char **argv, int argc) { if (ip_address_pool[i].last) cli_print(cli, "%-15s N %8s [%s] %ds", - inet_toa(ip_address_pool[i].address), "", + inet_toa(htonl(ip_address_pool[i].address)), "", ip_address_pool[i].user, time_now - ip_address_pool[i].last); else if (show_all) - cli_print(cli, "%-15s N", inet_toa(ip_address_pool[i].address)); + cli_print(cli, "%-15s N", inet_toa(htonl(ip_address_pool[i].address))); free++; } @@ -588,7 +639,6 @@ int cmd_write_memory(struct cli_def *cli, char *command, char **argv, int argc) cmd_show_run(cli, command, argv, argc); cli_print_callback(cli, NULL); fclose(save_config_fh); - sleep(1); } else { @@ -698,13 +748,13 @@ int cmd_show_throttle(struct cli_def *cli, char *command, char **argv, int argc) cli_print(cli, "%-6s %8s %-4s", "ID", "Handle", "Used"); for (i = 0; i < MAXSESSION; i++) { - if (!*filter_buckets[i].handle) + if (!session[i].throttle) continue; - cli_print(cli, "%-6d %8s %c", + cli_print(cli, "%-6d %8d %8d", i, - filter_buckets[i].handle, - (filter_buckets[i].in_use) ? 'Y' : 'N'); + session[i].tbf_in, + session[i].tbf_out); } return CLI_OK; } @@ -741,6 +791,11 @@ int cmd_drop_user(struct cli_def *cli, char *command, char **argv, int argc) int i; sessionidt s; + if (!config->cluster_iam_master) + { + cli_print(cli, "Can't do this on a slave. Do it on %s", inet_toa(config->cluster_master_address)); + return CLI_OK; + } if (!argc) { cli_print(cli, "Specify a user to drop"); @@ -787,6 +842,11 @@ int cmd_drop_tunnel(struct cli_def *cli, char *command, char **argv, int argc) int i; tunnelidt tid; + if (!config->cluster_iam_master) + { + cli_print(cli, "Can't do this on a slave. Do it on %s", inet_toa(config->cluster_master_address)); + return CLI_OK; + } if (!argc) { cli_print(cli, "Specify a tunnel to drop"); @@ -842,6 +902,11 @@ int cmd_drop_session(struct cli_def *cli, char *command, char **argv, int argc) int i; sessionidt s; + if (!config->cluster_iam_master) + { + cli_print(cli, "Can't do this on a slave. Do it on %s", inet_toa(config->cluster_master_address)); + return CLI_OK; + } if (!argc) { cli_print(cli, "Specify a session id to drop"); @@ -889,33 +954,56 @@ int cmd_drop_session(struct cli_def *cli, char *command, char **argv, int argc) int cmd_snoop(struct cli_def *cli, char *command, char **argv, int argc) { int i; + ipt ip; + u16 port; sessionidt s; - if (!argc) + if (!config->cluster_iam_master) { - cli_print(cli, "Specify a user"); + cli_print(cli, "Can't do this on a slave. Do it on %s", inet_toa(config->cluster_master_address)); return CLI_OK; } + + if (argc < 3) + { + cli_print(cli, "Specify username ip port"); + return CLI_OK; + } + for (i = 0; i < argc; i++) { if (strchr(argv[i], '?')) { - cli_print(cli, "username ..."); + cli_print(cli, "username ip port"); return CLI_OK; } } - for (i = 0; i < argc; i++) + + if (!(s = sessionbyuser(argv[0]))) { - if (!(s = sessionbyuser(argv[i]))) - { - cli_print(cli, "User %s is not connected", argv[i]); - continue; - } - session[s].snoop = 1; + cli_print(cli, "User %s is not connected", argv[0]); + return CLI_OK; + } + + ip = inet_addr(argv[1]); + if (!ip || ip == INADDR_NONE) + { + cli_print(cli, "Cannot parse IP \"%s\"", argv[1]); + return CLI_OK; + } - cli_print(cli, "Snooping user %s", argv[i]); + port = atoi(argv[2]); + if (!port) + { + cli_print(cli, "Invalid port %s", argv[2]); + return CLI_OK; } + + session[s].snoop_ip = ip; + session[s].snoop_port = port; + + cli_print(cli, "Snooping user %s to %s:%d", argv[0], inet_toa(session[s].snoop_ip), session[s].snoop_port); return CLI_OK; } @@ -924,6 +1012,12 @@ int cmd_no_snoop(struct cli_def *cli, char *command, char **argv, int argc) int i; sessionidt s; + if (!config->cluster_iam_master) + { + cli_print(cli, "Can't do this on a slave. Do it on %s", inet_toa(config->cluster_master_address)); + return CLI_OK; + } + if (!argc) { cli_print(cli, "Specify a user"); @@ -945,7 +1039,8 @@ int cmd_no_snoop(struct cli_def *cli, char *command, char **argv, int argc) cli_print(cli, "User %s is not connected", argv[i]); continue; } - session[s].snoop = 0; + session[s].snoop_ip = 0; + session[s].snoop_port = 0; cli_print(cli, "Not snooping user %s", argv[i]); } @@ -957,6 +1052,11 @@ int cmd_throttle(struct cli_def *cli, char *command, char **argv, int argc) int i; sessionidt s; + if (!config->cluster_iam_master) + { + cli_print(cli, "Can't do this on a slave. Do it on %s", inet_toa(config->cluster_master_address)); + return CLI_OK; + } if (!argc) { cli_print(cli, "Specify a user"); @@ -978,10 +1078,10 @@ int cmd_throttle(struct cli_def *cli, char *command, char **argv, int argc) cli_print(cli, "User %s is not connected", argv[i]); continue; } - if (!throttle_session(s, 1)) - cli_print(cli, "error throttling %s", argv[i]); + if (!throttle_session(s, config->rl_rate)) + cli_print(cli, "Error throttling %s", argv[i]); else - cli_print(cli, "throttling user %s", argv[i]); + cli_print(cli, "Throttling user %s", argv[i]); } return CLI_OK; } @@ -991,6 +1091,11 @@ int cmd_no_throttle(struct cli_def *cli, char *command, char **argv, int argc) int i; sessionidt s; + if (!config->cluster_iam_master) + { + cli_print(cli, "Can't do this on a slave. Do it on %s", inet_toa(config->cluster_master_address)); + return CLI_OK; + } if (!argc) { cli_print(cli, "Specify a user"); @@ -1265,7 +1370,7 @@ int regular_stuff(struct cli_def *cli) if (show_message) { - ipt address = ntohl(ringbuffer->buffer[i].address); + ipt address = htonl(ringbuffer->buffer[i].address); char *ipaddr; struct in_addr addr; diff --git a/cluster.c b/cluster.c index 9a988e2..06ae93e 100644 --- a/cluster.c +++ b/cluster.c @@ -1,8 +1,9 @@ // L2TPNS Clustering Stuff -// $Id: cluster.c,v 1.2 2004/03/05 00:09:03 fred_nerk Exp $ #include #include +// $Id: cluster.c,v 1.3 2004/06/23 03:52:24 fred_nerk Exp $ +#include +#include #include -#include #include #include #include @@ -15,71 +16,1422 @@ #include #include #include +#include + +#include "l2tpns.h" #include "cluster.h" +#include "util.h" +#include "tbf.h" -int cluster_sockfd = 0; -int cluster_server = 0; -uint32_t vip_address; -extern int debug; -void _log_hex(int level, const char *title, const char *data, int maxsize); -#define log_hex(a,b,c,d) -#ifndef log_hex -#define log_hex(a,b,c,d) do{if (a > debug) _log_hex(a,b,c,d);}while (0) +#ifdef BGP +#include "bgp.h" #endif +/* + * All cluster packets have the same format. + * + * One or more instances of + * a 32 bit 'type' id. + * a 32 bit 'extra' data dependant on the 'type'. + * zero or more bytes of structure data, dependant on the type. + * + */ + +// Module variables. +int cluster_sockfd = 0; // The filedescriptor for the cluster communications port. + +ipt my_address = 0; // The network address of my ethernet port. +static int walk_session_number = 0; // The next session to send when doing the slow table walk. +static int walk_tunnel_number = 0; // The next tunnel to send when doing the slow table walk. + +static int hsess, fsess; // Saved copies of the highest used session id, and the first free one. + +#define MAX_HEART_SIZE (8192) // Maximum size of heartbeat packet. Must be less than max IP packet size :) +#define MAX_CHANGES (MAX_HEART_SIZE/(sizeof(sessiont) + sizeof(int) ) - 2) // Assumes a session is the biggest type! +static struct { + int type; + int id; +} cluster_changes[MAX_CHANGES]; // Queue of changed structures that need to go out when next heartbeat. + +static struct { + int seq; + int size; + char data[MAX_HEART_SIZE]; +} past_hearts[HB_HISTORY_SIZE]; // Ring buffer of heartbeats that we've recently sent out. Needed so + // we can re-transmit if needed. + +static struct { + u32 peer; + time_t basetime; + clockt timestamp; + int uptodate; +} peers[CLUSTER_MAX_SIZE]; // List of all the peers we've heard from. +static int num_peers; // Number of peers in list. +static int have_peers; // At least one peer + +int rle_decompress(u8 ** src_p, int ssize, u8 *dst, int dsize); +int rle_compress(u8 ** src_p, int ssize, u8 *dst, int dsize); + +// // Create a listening socket -int cluster_init(uint32_t bind_address, int server) +// +// This joins the cluster multi-cast group. +// +int cluster_init() { struct sockaddr_in addr; + struct sockaddr_in interface_addr; + struct ip_mreq mreq; + struct ifreq ifr; + int opt = 0; + + config->cluster_undefined_sessions = MAXSESSION-1; + config->cluster_undefined_tunnels = MAXTUNNEL-1; - vip_address = bind_address; - cluster_server = !!server; + if (!config->cluster_address) + return 0; + if (!*config->cluster_interface) + return 0; cluster_sockfd = socket(AF_INET, SOCK_DGRAM, UDP); memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; - addr.sin_port = htons(cluster_server ? CLUSTERPORT : CLUSTERCLIENTPORT); + addr.sin_port = htons(CLUSTERPORT); addr.sin_addr.s_addr = INADDR_ANY; setsockopt(cluster_sockfd, SOL_SOCKET, SO_REUSEADDR, &addr, sizeof(addr)); + if (bind(cluster_sockfd, (void *) &addr, sizeof(addr)) < 0) { - perror("bind"); - exit(-1); + log(0, 0, 0, 0, "Failed to bind cluster socket: %s\n", strerror(errno)); + return -1; + } + + strcpy(ifr.ifr_name, config->cluster_interface); + if (ioctl(cluster_sockfd, SIOCGIFADDR, &ifr) < 0) { + log(0, 0, 0, 0, "Failed to get interface address for (%s): %s\n", config->cluster_interface, strerror(errno)); + return -1; } + memcpy(&interface_addr, &ifr.ifr_addr, sizeof(interface_addr) ); + my_address = interface_addr.sin_addr.s_addr; + + // Join multicast group. + mreq.imr_multiaddr.s_addr = config->cluster_address; + mreq.imr_interface = interface_addr.sin_addr; + + + opt = 0; // Turn off multicast loopback. + setsockopt(cluster_sockfd, IPPROTO_IP, IP_MULTICAST_LOOP, &opt, sizeof(opt)); + + if (setsockopt(cluster_sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { + log(0, 0, 0, 0, "Failed to setsockopt (join mcast group): %s\n", strerror(errno)); + return -1; + } + + if (setsockopt (cluster_sockfd, IPPROTO_IP, IP_MULTICAST_IF, &interface_addr, sizeof(interface_addr)) < 0) { + log(0, 0, 0, 0, "Failed to setsockopt (set mcast interface): %s\n", strerror(errno)); + return -1; + } + + config->cluster_last_hb = config->current_time; + config->cluster_seq_number = -1; + return cluster_sockfd; } -int cluster_send_message(unsigned long ip_address, uint32_t vip, char type, void *data, int datalen) + +// +// Send a chunk of data to the entire cluster (usually via the multicast +// address ). +// + +int cluster_send_data(void *data, int datalen) { - size_t l = 1 + sizeof(uint32_t) + datalen; - char *buf = NULL; struct sockaddr_in addr = {0}; if (!cluster_sockfd) return -1; - if (!ip_address) return 0; + if (!config->cluster_address) return 0; + + addr.sin_addr.s_addr = config->cluster_address; + addr.sin_port = htons(CLUSTERPORT); + addr.sin_family = AF_INET; + +// log_hex(4, "Cluster send", data, datalen); // VERY big data packets. How about we don't.. + + log(5,0,0,0, "Cluster send data: %d bytes\n", datalen); + + if (sendto(cluster_sockfd, data, datalen, MSG_NOSIGNAL, (void *) &addr, sizeof(addr)) < 0) + { + log(0, 0, 0, 0, "sendto: %s\n", strerror(errno)); + return -1; + } + + return 0; +} + +// +// Add a chunk of data to a heartbeat packet. +// Maintains the format. Assumes that the caller +// has passed in a big enough buffer! +// +static void add_type(char ** p, int type, int more, char * data, int size) +{ + * ( (u32*)(*p) ) = type; + *p += sizeof(u32); + + * ( (u32*)(*p) ) = more; + *p += sizeof(u32); - buf = calloc(l, 1); - *(uint32_t *)(buf) = htonl(vip); - *(char *)(buf+sizeof(uint32_t)) = type; + if (data && size > 0) { + memcpy(*p, data, size); + (*p) += size; + } +} + +void cluster_uptodate(void) +{ + if (config->cluster_iam_uptodate) + return; + + if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels) + return; + + config->cluster_iam_uptodate = 1; + + log(0,0,0,0, "Now uptodate with master.\n"); + + // If we're not a master, or if we have no slaves + // then start taking traffic.. + if (!config->cluster_iam_master || !have_peers) + { +#ifdef BGP + if (bgp_configured) + bgp_enable_routing(1); + else +#endif /* BGP */ + if (config->send_garp) + send_garp(config->bind_address); // Start taking traffic. + } +} + +// +// Send a unicast UDP packet to a peer with 'data' as the +// contents. +// +int peer_send_data(u32 peer, char * data, int size) +{ + struct sockaddr_in addr = {0}; - if (data && datalen > 0) - memcpy((char *)(buf + sizeof(uint32_t) + 1), data, datalen); + if (!cluster_sockfd) return -1; + if (!config->cluster_address) return 0; + + if (!peer) // Odd?? + return -1; - addr.sin_addr.s_addr = ip_address; - addr.sin_port = htons(cluster_server ? CLUSTERCLIENTPORT : CLUSTERPORT); + addr.sin_addr.s_addr = peer; + addr.sin_port = htons(CLUSTERPORT); addr.sin_family = AF_INET; - log_hex(4, "Cluster send", buf, l); + log_hex(5, "Peer send", data, size); - if (sendto(cluster_sockfd, buf, l, MSG_NOSIGNAL, (void *) &addr, sizeof(addr)) < 0) + if (sendto(cluster_sockfd, data, size, MSG_NOSIGNAL, (void *) &addr, sizeof(addr)) < 0) { - perror("sendto"); - free(buf); + log(0, 0, 0, 0, "sendto: %s\n", strerror(errno)); return -1; } - free(buf); return 0; } + +// +// Send a structured message to a peer with a single element of type 'type'. +// +int peer_send_message(u32 peer, int type, int more, char * data, int size) +{ + char buf[65536]; // Vast overkill. + char * p = buf; + + log(4,0,0,0, "Sending message to peer (type %d, more %d, size %d)\n", type, more, size); + add_type(&p, type, more, data, size); + + return peer_send_data(peer, buf, (p-buf) ); +} + +// +// Forward a state changing packet to the master. +// +// The master just processes the payload as if it had +// received it off the tap device. +// +int master_forward_packet(char * data, int size, u32 addr, int port) +{ + char buf[65536]; // Vast overkill. + char * p = buf; + + if (!config->cluster_master_address) // No election has been held yet. Just skip it. + return -1; + + log(4,0,0,0, "Forwarding packet from %s to master (size %d)\n", inet_toa(addr), size); + + STAT(c_forwarded); + add_type(&p, C_FORWARD, addr, (char*) &port, sizeof(port) ); + memcpy(p, data, size); + p += size; + + return peer_send_data(config->cluster_master_address, buf, (p-buf) ); + +} + +// +// Forward a throttled packet to the master for handling. +// +// The master just drops the packet into the appropriate +// token bucket queue, and lets normal processing take care +// of it. +// +int master_throttle_packet(int tbfid, char * data, int size) +{ + char buf[65536]; // Vast overkill. + char * p = buf; + + if (!config->cluster_master_address) // No election has been held yet. Just skip it. + return -1; + + log(4,0,0,0, "Throttling packet master (size %d, tbfid %d)\n", size, tbfid); + + add_type(&p, C_THROTTLE, tbfid, data, size); + + return peer_send_data(config->cluster_master_address, buf, (p-buf) ); + +} + +// +// Forward a walled garden packet to the master for handling. +// +// The master just writes the packet straight to the tun +// device (where is will normally loop through the +// firewall rules, and come back in on the tun device) +// +// (Note that this must be called with the tun header +// as the start of the data). +int master_garden_packet(sessionidt s, char *data, int size) +{ + char buf[65536]; // Vast overkill. + char *p = buf; + + if (!config->cluster_master_address) // No election has been held yet. Just skip it. + return -1; + + log(4,0,0,0, "Walled garden packet to master (size %d)\n", size); + + add_type(&p, C_GARDEN, s, data, size); + + return peer_send_data(config->cluster_master_address, buf, (p-buf)); + +} + +// +// Send a chunk of data as a heartbeat.. +// We save it in the history buffer as we do so. +// +static void send_heartbeat(int seq, char * data, int size) +{ + int i; + + if (size > sizeof(past_hearts[0].data)) { + log(0,0,0,0, "Tried to heartbeat something larger than the maximum packet!\n"); + kill(0, SIGTERM); + } + i = seq % HB_HISTORY_SIZE; + past_hearts[i].seq = seq; + past_hearts[i].size = size; + memcpy(&past_hearts[i].data, data, size); // Save it. + cluster_send_data(data, size); +} + +// +// Send an 'i am alive' message to every machine in the cluster. +// +void cluster_send_ping(time_t basetime) +{ + char buff[100 + sizeof(pingt)]; + char *p = buff; + pingt x; + + if (config->cluster_iam_master && basetime) // We're heartbeating so no need to ping. + return; + + log(5,0,0,0, "Sending cluster ping...\n"); + + x.ver = 1; + x.addr = config->bind_address; + x.undef = config->cluster_undefined_sessions + config->cluster_undefined_tunnels; + x.basetime = basetime; + + add_type(&p, C_PING, basetime, (char *) &x, sizeof(x)); + cluster_send_data(buff, (p-buff) ); +} + +// +// Walk the session counters looking for non-zero ones to send +// to the master. We send up to 100 of them at one time. +// We examine a maximum of 2000 sessions. +// (50k max session should mean that we normally +// examine the entire session table every 25 seconds). + +#define MAX_B_RECS (400) +void master_update_counts(void) +{ + int i, c; + bytest b[MAX_B_RECS+1]; + + if (config->cluster_iam_master) // Only happens on the slaves. + return; + + if (!config->cluster_master_address) // If we don't have a master, skip it for a while. + return; + + i = MAX_B_RECS * 5; // Examine max 2000 sessions; + if (config->cluster_highest_sessionid > i) + i = config->cluster_highest_sessionid; + + for ( c = 0; i > 0 ; --i) { + // Next session to look at. + walk_session_number++; + if ( walk_session_number > config->cluster_highest_sessionid) + walk_session_number = 1; + + if (!sess_count[walk_session_number].cin && !sess_count[walk_session_number].cout) + continue; // Unused. Skip it. + + b[c].sid = walk_session_number; + b[c].in = sess_count[walk_session_number].cin; + b[c].out = sess_count[walk_session_number].cout; + + if (++c > MAX_B_RECS) // Send a max of 400 elements in a packet. + break; + + // Reset counters. + sess_count[walk_session_number].cin = sess_count[walk_session_number].cout = 0; + } + + if (!c) // Didn't find any that changes. Get out of here! + return; + + + // Forward the data to the master. + log(4,0,0,0, "Sending byte counters to master (%d elements)\n", c); + peer_send_message(config->cluster_master_address, C_BYTES, c, (char*) &b, sizeof(b[0]) * c); + return; +} + +// +// Check that we have a master. If it's been too +// long since we heard from a master then hold an election. +// +void cluster_check_master(void) +{ + int i, count, tcount, high_sid = 0; + int last_free = 0; + int had_peers = have_peers; + clockt t = config->current_time; + + if (config->current_time < (config->cluster_last_hb + HB_TIMEOUT) ) + return; // Everything's ok. return. + + if (!config->cluster_iam_master) + log(0,0,0,0, "Master timed out! Holding election...\n"); + + config->cluster_last_hb = config->current_time + 1; + + for (i = have_peers = 0; i < num_peers ; ++i) { + if ((peers[i].timestamp + HB_TIMEOUT) < t) + continue; // Stale peer! Skip them. + + if (!peers[i].basetime) + continue; // Shutdown peer! Skip them. + + have_peers = 1; + if (peers[i].basetime < basetime) { + log(1,0,0,0, "Expecting %s to become master\n", inet_toa(peers[i].peer) ); + return; // They'll win the election. Get out of here. + } + + if (peers[i].basetime == basetime && + peers[i].peer > my_address) { + log(1,0,0,0, "Expecting %s to become master\n", inet_toa(peers[i].peer) ); + return; // They'll win the election. Wait for them to come up. + } + } + + if (config->cluster_iam_master) // If we're the master, we've already won + { +#ifdef BGP + // master lost all slaves, need to handle traffic ourself + if (bgp_configured && had_peers && !have_peers) + bgp_enable_routing(1); +#endif /* BGP */ + return; + } + + // Wow. it's been ages since I last heard a heartbeat + // and I'm better than an of my peers so it's time + // to become a master!!! + + config->cluster_iam_master = 1; + config->cluster_master_address = 0; + + log(0,0,0,0, "I am declaring myself the master!\n"); + +#ifdef BGP + if (bgp_configured && have_peers) + bgp_enable_routing(0); /* stop handling traffic */ +#endif /* BGP */ + + if (config->cluster_seq_number == -1) + config->cluster_seq_number = 0; + + // + // Go through and mark all the tunnels as defined. + // Count the highest used tunnel number as well. + // + config->cluster_highest_tunnelid = 0; + for (i = 0, tcount = 0; i < MAXTUNNEL; ++i) { + if (tunnel[i].state == TUNNELUNDEF) + tunnel[i].state = TUNNELFREE; + + if (tunnel[i].state != TUNNELFREE && i > config->cluster_highest_tunnelid) + config->cluster_highest_tunnelid = i; + } + + // + // Go through and mark all the sessions as being defined. + // reset the idle timeouts. + // add temporary byte counters to permanent ones. + // Re-string the free list. + // Find the ID of the highest session. + last_free = 0; + high_sid = 0; + config->cluster_highest_sessionid = 0; + for (i = 0, count = 0; i < MAXSESSION; ++i) { + if (session[i].tunnel == T_UNDEF) { + session[i].tunnel = T_FREE; + ++count; + } + + if (session[i].tunnel == T_FREE) { // Unused session. Add to free list. + session[last_free].next = i; + session[i].next = 0; + last_free = i; + } + + // Reset all the idle timeouts.. + session[i].last_packet = time_now; + + // Accumulate un-sent byte counters. + session[i].cin += sess_count[i].cin; + session[i].cout += sess_count[i].cout; + session[i].total_cin += sess_count[i].cin; + session[i].total_cout += sess_count[i].cout; + + sess_count[i].cin = sess_count[i].cout = 0; + + session[i].radius = 0; // Reset authentication as the radius blocks aren't up to date. + + if (session[i].sid >= high_sid) // This is different to the index into the session table!!! + high_sid = session[i].sid+1; + + + session[i].tbf_in = session[i].tbf_out = 0; // Remove stale pointers from old master. + throttle_session(i, session[i].throttle); + +// I'm unsure about this. --mo +// It's potentially a good thing, but it could send a +// LOT of packets. +// if (session[i].throttle) +// cluster_send_session(s); // Tell the slaves about the new tbf indexes. + + if (session[i].tunnel != T_FREE && i > config->cluster_highest_sessionid) + config->cluster_highest_sessionid = i; + + } + + session[last_free].next = 0; // End of chain. + last_sid = high_sid; // Keep track of the highest used session ID. + + become_master(); + + rebuild_address_pool(); + + // If we're not the very first master, this is a big issue! + if(count>0) + log(0,0,0,0, "Warning: Fixed %d uninitialized sessions in becoming master!\n", count); + + config->cluster_undefined_sessions = 0; + config->cluster_undefined_tunnels = 0; + + // + // FIXME. We need to fix up the tunnel control message + // queue here! There's a number of other variables we + // should also update. + cluster_uptodate(); +} + + +// +// Check that our session table is validly matching what the +// master has in mind. +// +// In particular, if we have too many sessions marked 'undefined' +// we fix it up here, and we ensure that the 'first free session' +// pointer is valid. +// +static void cluster_check_sessions(int highsession, int freesession_ptr, int hightunnel) +{ + int i; + + sessionfree = freesession_ptr; // Keep the freesession ptr valid. + + if (config->cluster_iam_uptodate) + return; + + if (highsession > config->cluster_undefined_sessions && hightunnel > config->cluster_undefined_tunnels) + return; + + // Clear out defined sessions, counting the number of + // undefs remaining. + config->cluster_undefined_sessions = 0; + for (i = 1 ; i < MAXSESSION; ++i) { + if (i > highsession) { + session[i].tunnel = 0; // Defined. + continue; + } + if (session[i].tunnel != T_UNDEF) + continue; + ++config->cluster_undefined_sessions; + } + + // Clear out defined tunnels, counting the number of + // undefs remaining. + config->cluster_undefined_tunnels = 0; + for (i = 1 ; i < MAXTUNNEL; ++i) { + if (i > hightunnel) { + tunnel[i].state = TUNNELFREE; // Defined. + continue; + } + if (tunnel[i].state != TUNNELUNDEF) + continue; + ++config->cluster_undefined_tunnels; + } + + + if (config->cluster_undefined_sessions || config->cluster_undefined_tunnels) { + log(2,0,0,0, "Cleared undefined sessions/tunnels. %d sess (high %d), %d tunn (high %d)\n", + config->cluster_undefined_sessions, highsession, config->cluster_undefined_tunnels, hightunnel); + return; + } + + // Are we up to date? + + if (!config->cluster_iam_uptodate) + cluster_uptodate(); +} + +int hb_add_type(char **p, int type, int id) +{ + switch (type) { + case C_CSESSION: { // Compressed C_SESSION. + u8 c[sizeof(sessiont) * 2]; // Bigger than worst case. + u8 *d = (u8 *) &session[id]; + u8 *orig = d; + int size; + + size = rle_compress( &d, sizeof(sessiont), c, sizeof(c) ); + + // Did we compress the full structure, and is the size actually + // reduced?? + if ( (d - orig) == sizeof(sessiont) && size < sizeof(sessiont) ) { + add_type(p, C_CSESSION, id, (char*) c, size); + break; + } + // Failed to compress : Fall through. + } + case C_SESSION: add_type(p, C_SESSION, id, + (char*) &session[id], sizeof(sessiont)); + break; + + case C_CTUNNEL: { // Compressed C_TUNNEL + u8 c[sizeof(tunnelt) * 2]; // Bigger than worst case. + u8 *d = (u8 *) &tunnel[id]; + u8 *orig = d; + int size; + + size = rle_compress( &d, sizeof(tunnelt), c, sizeof(c) ); + + // Did we compress the full structure, and is the size actually + // reduced?? + if ( (d - orig) == sizeof(tunnelt) && size < sizeof(tunnelt) ) { + add_type(p, C_CTUNNEL, id, c, size); + break; + } + // Failed to compress : Fall through. + } + case C_TUNNEL: add_type(p, C_TUNNEL, id, + (char*) &tunnel[id], sizeof(tunnelt)); + break; + default: + log(0,0,0,0, "Found an invalid type in heart queue! (%d)\n", type); + kill(0, SIGTERM); + } + return 0; +} + +// +// Send a heartbeat, incidently sending out any queued changes.. +// +void cluster_heartbeat(int highsession, int freesession, int hightunnel) +{ + int i, count = 0, tcount = 0; + char buff[MAX_HEART_SIZE + sizeof(heartt) + sizeof(int) ]; + heartt h; + char * p = buff; + + if (!config->cluster_iam_master) // Only the master does this. + return; + + hsess = highsession; + fsess = freesession; + // Fill out the heartbeat header. + h.version = HB_VERSION; + h.seq = config->cluster_seq_number; + h.basetime = basetime; + h.clusterid = config->bind_address; // Will this do?? + h.basetime = basetime; + h.highsession = highsession; + h.freesession = freesession; + h.hightunnel = hightunnel; + h.size_sess = sizeof(sessiont); // Just in case. + h.size_tunn = sizeof(tunnelt); + + add_type(&p, C_HEARTBEAT, HB_VERSION, (char*) &h, sizeof(h) ); + + for (i = 0; i < config->cluster_num_changes; ++i) { + hb_add_type(&p, cluster_changes[i].type, cluster_changes[i].id); + } + + if (p > (buff + sizeof(buff))) { // Did we somehow manage to overun the buffer? + log(0,0,0,0, "Overrun the heartbeat buffer! This is fatal. Exiting. (size %d)\n", p - buff); + kill(0, SIGTERM); + } + + // + // Fill out the packet with sessions from the session table... + // (not forgetting to leave space so we can get some tunnels in too ) + while ( (p + sizeof(u32) * 2 + sizeof(sessiont) * 2 ) < (buff + MAX_HEART_SIZE) ) { + + if (!walk_session_number) // session #0 isn't valid. + ++walk_session_number; + + if (count >= highsession) // If we're a small cluster, don't go wild. + break; + + hb_add_type(&p, C_CSESSION, walk_session_number); + walk_session_number = (1+walk_session_number)%(highsession+1); // +1 avoids divide by zero. + + ++count; // Count the number of extra sessions we're sending. + } + + // + // Fill out the packet with tunnels from the tunnel table... + // + while ( (p + sizeof(u32) * 2 + sizeof(tunnelt) ) < (buff + MAX_HEART_SIZE) ) { + + if (!walk_tunnel_number) // tunnel #0 isn't valid. + ++walk_tunnel_number; + + if (tcount >= config->cluster_highest_tunnelid) + break; + + hb_add_type(&p, C_CTUNNEL, walk_tunnel_number); + walk_tunnel_number = (1+walk_tunnel_number)%(config->cluster_highest_tunnelid+1); // +1 avoids divide by zero. + + ++tcount; + } + + // + // Did we do something wrong? + if (p > (buff + sizeof(buff))) { // Did we somehow manage to overun the buffer? + log(0,0,0,0, "Overran the heartbeat buffer now! This is fatal. Exiting. (size %d)\n", p - buff); + kill(0, SIGTERM); + } + + log(3,0,0,0, "Sending heartbeat with %d changes (%d x-sess, %d x-tunnels, %d highsess, %d hightun size %d)\n", + config->cluster_num_changes, count, tcount, config->cluster_highest_sessionid, + config->cluster_highest_tunnelid, (p-buff)); + + config->cluster_num_changes = 0; + + send_heartbeat(h.seq, buff, (p-buff) ); // Send out the heartbeat to the cluster, keeping a copy of it. + + config->cluster_seq_number = (config->cluster_seq_number+1)%HB_MAX_SEQ; // Next seq number to use. +} + +// +// A structure of type 'type' has changed; Add it to the queue to send. +// +int type_changed(int type, int id) +{ + int i; + + for (i = 0 ; i < config->cluster_num_changes ; ++i) + if ( cluster_changes[i].id == id && + cluster_changes[i].type == type) + return 0; // Already marked for change. + + cluster_changes[i].type = type; + cluster_changes[i].id = id; + ++config->cluster_num_changes; + + if (config->cluster_num_changes > MAX_CHANGES) + cluster_heartbeat(config->cluster_highest_sessionid, fsess, config->cluster_highest_tunnelid); + + return 1; +} + + +// A particular session has been changed! +int cluster_send_session(int sid) +{ + if (!config->cluster_iam_master) { + log(0,0,sid,0, "I'm not a master, but I just tried to change a session!\n"); + return -1; + } + + return type_changed(C_CSESSION, sid); +} + +// A particular tunnel has been changed! +int cluster_send_tunnel(int tid) +{ + if (!config->cluster_iam_master) { + log(0,0,0,tid, "I'm not a master, but I just tried to change a tunnel!\n"); + return -1; + } + + return type_changed(C_CTUNNEL, tid); +} + + +// +// We're a master, and a slave has just told us that it's +// missed a packet. We'll resend it every packet since +// the last one it's seen. +// +int cluster_catchup_slave(int seq, u32 slave) +{ + int s; + int diff; + + log(1,0,0,0, "Slave %s sent LASTSEEN with seq %d\n", inet_toa(slave), seq); + + diff = config->cluster_seq_number - seq; // How many packet do we need to send? + if (diff < 0) + diff += HB_MAX_SEQ; + + if (diff >= HB_HISTORY_SIZE) { // Ouch. We don't have the packet to send it! + log(0,0,0,0, "A slaved asked for message %d when our seq number is %d. Killing it.\n", + seq, config->cluster_seq_number); + return peer_send_message(slave, C_KILL, seq, NULL, 0);// Kill the slave. Nothing else to do. + } + + // Now resend every packet that it missed, in order. + while (seq != config->cluster_seq_number) { + s = seq%HB_HISTORY_SIZE; + if (seq != past_hearts[s].seq) { + log(0,0,0,0, "Tried to re-send heartbeat for %s but %d doesn't match %d! (%d,%d)\n", + inet_toa(slave), seq, past_hearts[s].seq, s, config->cluster_seq_number); + return -1; // What to do here!? + } + peer_send_data(slave, past_hearts[s].data, past_hearts[s].size); + seq = (seq+1)%HB_MAX_SEQ; // Increment to next seq number. + } + return 0; // All good! +} + +// +// We've heard from another peer! Add it to the list +// that we select from at election time. +// +int cluster_add_peer(u32 peer, time_t basetime, pingt *p) +{ + int i; + u32 clusterid; + + clusterid = p->addr; + if (clusterid != config->bind_address) + { + // Is this for us? + log(4,0,0,0, "Skipping ping from %s (different cluster)\n", inet_toa(peer)); + return 0; + } + + // Is this the master shutting down?? + if (peer == config->cluster_master_address && !basetime) { + config->cluster_master_address = 0; + config->cluster_last_hb = 0; // Force an election. + cluster_check_master(); + return 0; + } + + for (i = 0; i < num_peers ; ++i) + { + if (peers[i].peer != peer) + continue; + + // This peer already exists. Just update the timestamp. + peers[i].basetime = basetime; + peers[i].timestamp = config->current_time; + break; + } + + if (i >= num_peers) + { + log(4,0,0,0, "Adding %s as a peer\n", inet_toa(peer)); + + // Not found. Is there a stale slot to re-use? + for (i = 0; i < num_peers ; ++i) + { + if (peers[i].peer != peer) + continue; + if ((peers[i].timestamp + HB_TIMEOUT * 10) < config->current_time) // Stale. + break; + } + + if (i >= CLUSTER_MAX_SIZE) + { + // Too many peers!! + log(0,0,0,0, "Tried to add %s as a peer, but I already have %d of them!\n", inet_toa(peer), i); + return -1; + } + + peers[i].peer = peer; + peers[i].basetime = basetime; + peers[i].timestamp = config->current_time; + if (i == num_peers) + ++num_peers; + + log(1,0,0,0, "Added %s as a new peer. Now %d peers\n", inet_toa(peer), num_peers); + } + +#ifdef BGP + /* drop routes if we've now got a peer */ + if (bgp_configured && config->cluster_iam_master && !have_peers) + bgp_enable_routing(0); +#endif /* BGP */ + + have_peers = 1; + + return 1; +} + +/* Handle the slave updating the byte counters for the master. */ +// +// Note that we don't mark the session as dirty; We rely on +// the slow table walk to propogate this back out to the slaves. +// +int cluster_handle_bytes(char * data, int size) +{ + bytest * b; + + b = (bytest*) data; + + log(3,0,0,0, "Got byte counter update (size %d)\n", size); + + /* Loop around, adding the byte + counts to each of the sessions. */ + + while (size >= sizeof(*b) ) { + if (b->sid > MAXSESSION) { + log(0,0,0,0, "Got C_BYTES with session #%d!\n", b->sid); + return -1; /* Abort processing */ + } + + session[b->sid].total_cin += b->in; + session[b->sid].total_cout += b->out; + + session[b->sid].cin += b->in; + session[b->sid].cout += b->out; + session[b->sid].last_packet = time_now; // Reset idle timer! + + size -= sizeof(*b); + ++b; + } + + if (size != 0) + log(0,0,0,0, "Got C_BYTES with %d bytes of trailing junk!\n", size); + + return size; +} + +// +// Handle receiving a session structure in a heartbeat packet. +// +static int cluster_recv_session(int more , u8 * p) +{ + if (more >= MAXSESSION) { + log(0,0,0,0, "DANGER: Received a heartbeat session id > MAXSESSION!\n"); + return -1; + } + + if (session[more].tunnel == T_UNDEF) { + if (config->cluster_iam_uptodate) { // Sanity. + log(0,0,0,0, "I thought I was uptodate but I just found an undefined session!\n"); + } else { + --config->cluster_undefined_sessions; + } + } + + load_session(more, (sessiont*) p); // Copy session into session table.. + + log(5,0,more,0, "Received session update (%d undef)\n", config->cluster_undefined_sessions); + + if (!config->cluster_iam_uptodate) + cluster_uptodate(); // Check to see if we're up to date. + return 0; +} + +static int cluster_recv_tunnel(int more, u8 *p) +{ + if (more >= MAXTUNNEL) { + log(0,0,0,0, "DANGER: Received a tunnel session id > MAXTUNNEL!\n"); + return -1; + } + + if (tunnel[more].state == TUNNELUNDEF) { + if (config->cluster_iam_uptodate) { // Sanity. + log(0,0,0,0, "I thought I was uptodate but I just found an undefined tunnel!\n"); + } else { + --config->cluster_undefined_tunnels; + } + } + + memcpy(&tunnel[more], p, sizeof(tunnel[more]) ); + + // + // Clear tunnel control messages. These are dynamically allocated. + // If we get unlucky, this may cause the tunnel to drop! + // + tunnel[more].controls = tunnel[more].controle = NULL; + tunnel[more].controlc = 0; + + log(5,0,0,more, "Received tunnel update\n"); + + if (!config->cluster_iam_uptodate) + cluster_uptodate(); // Check to see if we're up to date. + + return 0; +} + + +// +// Process a version one heartbeat.. +// +static int cluster_process_heartbeat_v2(u8 * data, int size, int more, u8 * p, u32 addr) +{ + heartt * h; + int s = size - (p-data); + int i, type; + + if (more != HB_VERSION) { + log(0,0,0,0, "Received a heartbeat version that I don't understand!\n"); + return -1; // Ignore it?? + } + // Ok. It's a heartbeat packet from a cluster master! + if (s < sizeof(*h)) + goto shortpacket; + + + h = (heartt*) p; + p += sizeof(*h); + s -= sizeof(*h); + + if (h->clusterid != config->bind_address) + return -1; // It's not part of our cluster. + + if (config->cluster_iam_master) { // Sanity... + // Note that this MUST match the election process above! + + log(0,0,0,0, "I just got a packet claiming to be from a master but _I_ am the master!\n"); + if (!h->basetime) { + log(0,0,0,0, "Heartbeat from addr %s with zero basetime!\n", inet_toa(htonl(addr)) ); + return -1; // Skip it. + } + if (basetime > h->basetime) { + log(0,0,0,0, "They're (%s) an older master than me so I'm gone!\n", inet_toa(htonl(addr))); + kill(0, SIGTERM); + } + if (basetime == h->basetime && my_address < addr) { // Tie breaker. + log(0,0,0,0, "They're a higher IP address than me, so I'm gone!\n"); + kill(0, SIGTERM); + } + return -1; // Skip it. + } + + if (config->cluster_seq_number == -1) // Don't have one. Just align to the master... + config->cluster_seq_number = h->seq; + + config->cluster_last_hb = config->current_time; // Reset to ensure that we don't become master!! + + if (config->cluster_seq_number != h->seq) { // Out of sequence heartbeat! + log(1,0,0,0, "HB: Got seq# %d but was expecting %d. asking for resend.\n", h->seq, config->cluster_seq_number); + + peer_send_message(addr, C_LASTSEEN, config->cluster_seq_number, NULL, 0); + + config->cluster_last_hb = config->current_time; // Reset to ensure that we don't become master!! + + // Just drop the packet. The master will resend it as part of the catchup. + + return 0; + } + // Save the packet in our buffer. + // This is needed in case we become the master. + config->cluster_seq_number = (h->seq+1)%HB_MAX_SEQ; + i = h->seq % HB_HISTORY_SIZE; + past_hearts[i].seq = h->seq; + past_hearts[i].size = size; + memcpy(&past_hearts[i].data, data, size); // Save it. + + + // Check that we don't have too many undefined sessions, and + // that the free session pointer is correct. + cluster_check_sessions(h->highsession, h->freesession, h->hightunnel); + + // Ok. process the packet... + while ( s > 0) { + + type = * ((u32*) p); + p += sizeof(u32); + s -= sizeof(u32); + + more = * ((u32*) p); + p += sizeof(u32); + s -= sizeof(u32); + + switch (type) { + case C_CSESSION: { // Compressed session structure. + u8 c [ sizeof(sessiont) + 2]; + int size; + u8 * orig_p = p; + + size = rle_decompress((u8 **) &p, s, c, sizeof(c) ); + s -= (p - orig_p); + + if (size != sizeof(sessiont) ) { // Ouch! Very very bad! + log(0,0,0,0, "DANGER: Received a CSESSION that didn't decompress correctly!\n"); + // Now what? Should exit! No-longer up to date! + break; + } + + cluster_recv_session(more, c); + break; + } + case C_SESSION: + if ( s < sizeof(session[more])) + goto shortpacket; + + cluster_recv_session(more, p); + + p += sizeof(session[more]); + s -= sizeof(session[more]); + break; + + case C_CTUNNEL: { // Compressed tunnel structure. + u8 c [ sizeof(tunnelt) + 2]; + int size; + u8 * orig_p = p; + + size = rle_decompress( (u8 **) &p, s, c, sizeof(c) ); + s -= (p - orig_p); + + if (size != sizeof(tunnelt) ) { // Ouch! Very very bad! + log(0,0,0,0, "DANGER: Received a CSESSION that didn't decompress correctly!\n"); + // Now what? Should exit! No-longer up to date! + break; + } + + cluster_recv_tunnel(more, c); + break; + + } + case C_TUNNEL: + if ( s < sizeof(tunnel[more])) + goto shortpacket; + + cluster_recv_tunnel(more, p); + + p += sizeof(tunnel[more]); + s -= sizeof(tunnel[more]); + break; + default: + log(0,0,0,0, "DANGER: I received a heartbeat element where I didn't understand the type! (%d)\n", type); + return -1; // can't process any more of the packet!! + } + } + if (config->cluster_master_address != addr) + { + char *str; + str = strdup(inet_toa(config->cluster_master_address)); + log(0,0,0,0, "My master just changed from %s to %s!\n", str, inet_toa(addr)); + if (str) free(str); + } + + config->cluster_master_address = addr; + config->cluster_last_hb = config->current_time; // Successfully received a heartbeat! + return 0; + +shortpacket: + log(0,0,0,0, "I got an incomplete heartbeat packet! This means I'm probably out of sync!!\n"); + return -1; +} + +// +// We got a packet on the cluster port! +// Handle pings, lastseens, and heartbeats! +// +int processcluster(char * data, int size, u32 addr) +{ + int type, more; + char * p = data; + int s = size; + + if (addr == my_address) + return -1; // Ignore it. Something looped back the multicast! + + log(5,0,0,0, "Process cluster: %d bytes from %s\n", size, inet_toa(addr)); + + if (s <= 0) // Any data there?? + return -1; + + if (s < 8) + goto shortpacket; + + type = * ((u32*) p); + p += sizeof(u32); + s -= sizeof(u32); + + more = * ((u32*) p); + p += sizeof(u32); + s -= sizeof(u32); + + switch (type) { + case C_PING: // Update the peers table. + return cluster_add_peer(addr, more, (pingt*)p); + + case C_LASTSEEN: // Catch up a slave (slave missed a packet). + return cluster_catchup_slave(more, addr); + + case C_FORWARD: { // Forwarded control packet. pass off to processudp. + struct sockaddr_in a; + a.sin_addr.s_addr = more; + + a.sin_port = * (int*) p; + s -= sizeof(int); + p += sizeof(int); + + if (!config->cluster_iam_master) { // huh? + log(0,0,0,0, "I'm not the master, but I got a C_FORWARD from %s?\n", inet_toa(addr)); + return -1; + } + + log(4,0,0,0, "Got a forwarded packet... (%s:%d)\n", inet_toa(more), a.sin_port); + STAT(recv_forward); + processudp(p, s, &a); + return 0; + } + case C_THROTTLE: { // Receive a forwarded packet from a slave. + if (!config->cluster_iam_master) { + log(0,0,0,0, "I'm not the master, but I got a C_THROTTLE from %s?\n", inet_toa(addr)); + return -1; + } + + tbf_queue_packet(more, p, s); // The TBF id tells wether it goes in or out. + return 0; + } + case C_GARDEN: + // Receive a walled garden packet from a slave. + if (!config->cluster_iam_master) { + log(0,0,0,0, "I'm not the master, but I got a C_GARDEN from %s?\n", inet_toa(addr)); + return -1; + } + + tun_write(p, s); + return 0; + + case C_BYTES: + return cluster_handle_bytes(p, s); + + case C_KILL: // The master asked us to die!? (usually because we're too out of date). + if (config->cluster_iam_master) { + log(0,0,0,0, "_I_ am master, but I received a C_KILL from %s! (Seq# %d)\n", inet_toa(addr), more); + return -1; + } + if (more != config->cluster_seq_number) { + log(0,0,0,0, "The master asked us to die but the seq number didn't match!?\n"); + return -1; + } + + if (addr != config->cluster_master_address) { + log(0,0,0,0, "Received a C_KILL from %s which doesn't match config->cluster_master_address (%x)", + inet_toa(addr), config->cluster_master_address); + // We can only warn about it. The master might really have switched! + } + + log(0,0,0,0, "Received a valid C_KILL: I'm going to die now."); + kill(0, SIGTERM); + exit(0); // Lets be paranoid; + return -1; // Just signalling the compiler. + + case C_HEARTBEAT: + log(4,0,0,0, "Got a heartbeat from %s\n", inet_toa(addr)); + + return cluster_process_heartbeat_v2(data, size, more, p, addr); + + default: + log(0,0,0,0, "Strange type packet received on cluster socket (%d)\n", type); + return -1; + } + return 0; +shortpacket: + log(0,0,0,0, "I got an cluster heartbeat packet! This means I'm probably out of sync!!\n"); + return -1; +} + +//==================================================================================================== + +int cmd_show_cluster(struct cli_def *cli, char *command, char **argv, int argc) +{ + int i; + + cli_print(cli, "Cluster status : %s", config->cluster_iam_master ? "Master" : "Slave" ); + cli_print(cli, "My address : %s", inet_toa(my_address)); + cli_print(cli, "VIP address : %s", inet_toa(config->bind_address)); + cli_print(cli, "Multicast address: %s", inet_toa(config->cluster_address)); + cli_print(cli, "Multicast i'face : %s", config->cluster_interface); + + if (!config->cluster_iam_master) { + cli_print(cli, "My master : %s (last heartbeat %.1f seconds old)", + config->cluster_master_address ? inet_toa(config->cluster_master_address) : "Not defined", + 0.1 * (config->current_time - config->cluster_last_hb)); + cli_print(cli, "Uptodate : %s", config->cluster_iam_uptodate ? "Yes" : "No"); + cli_print(cli, "Next sequence number expected: %d", config->cluster_seq_number); + cli_print(cli, "%d sessions undefined of %d", config->cluster_undefined_sessions, config->cluster_highest_sessionid); + cli_print(cli, "%d tunnels undefined of %d", config->cluster_undefined_tunnels, config->cluster_highest_tunnelid); + } else { + cli_print(cli, "Next heartbeat # : %d", config->cluster_seq_number); + cli_print(cli, "Highest session : %d", config->cluster_highest_sessionid); + cli_print(cli, "Highest tunnel : %d", config->cluster_highest_tunnelid); + cli_print(cli, "%d changes queued for sending", config->cluster_num_changes); + } + cli_print(cli, "%d peers.", num_peers); + + if (num_peers) + cli_print(cli, "%20s %10s %8s", "Address", "Basetime", "Age"); + for (i = 0; i < num_peers; ++i) { + cli_print(cli, "%20s %10d %8d", inet_toa(peers[i].peer), + peers[i].basetime, config->current_time - peers[i].timestamp); + } + return CLI_OK; +} + +// +// Simple run-length-encoding compression. +// Format is +// 1 byte < 128 = count of non-zero bytes following. // Not legal to be zero. +// n non-zero bytes; +// or +// 1 byte > 128 = (count - 128) run of zero bytes. // +// repeat. +// count == 0 indicates end of compressed stream. +// +// Compress from 'src' into 'dst'. return number of bytes +// used from 'dst'. +// Updates *src_p to indicate 1 past last bytes used. +// +// We could get an extra byte in the zero runs by storing (count-1) +// but I'm playing it safe. +// +// Worst case is a 50% expansion in space required (trying to +// compress { 0x00, 0x01 } * N ) +int rle_compress(u8 ** src_p, int ssize, u8 *dst, int dsize) +{ + int count; + int orig_dsize = dsize; + u8 * x,*src; + src = *src_p; + + while (ssize > 0 && dsize > 2) { + count = 0; + x = dst++; --dsize; // Reserve space for count byte.. + + if (*src) { // Copy a run of non-zero bytes. + while (*src && count < 127 && ssize > 0 && dsize > 1) { // Count number of non-zero bytes. + *dst++ = *src++; + --dsize; --ssize; + ++count; + } + *x = count; // Store number of non-zero bytes. Guarenteed to be non-zero! + + } else { // Compress a run of zero bytes. + while (*src == 0 && count < 127 && ssize > 0) { + ++src; + --ssize; + ++count; + } + *x = count | 0x80 ; + } + } + + *dst++ = 0x0; // Add Stop byte. + --dsize; + + *src_p = src; + return (orig_dsize - dsize); +} + +// +// Decompress the buffer into **p. +// 'psize' is the size of the decompression buffer available. +// +// Returns the number of bytes decompressed. +// +// Decompresses from '*src_p' into 'dst'. +// Return the number of dst bytes used. +// Updates the 'src_p' pointer to point to the +// first un-used byte. +int rle_decompress(u8 ** src_p, int ssize, u8 *dst, int dsize) +{ + int count; + int orig_dsize = dsize; + char * src = *src_p; + + while (ssize >0 && dsize > 0) { // While there's more to decompress, and there's room in the decompress buffer... + count = *src++; --ssize; // get the count byte from the source. + if (count == 0x0) // End marker reached? If so, finish. + break; + + if (count & 0x80) { // Decompress a run of zeros + for (count &= 0x7f ; count > 0 && dsize > 0; --count) { + *dst++ = 0x0; + --dsize; + } + } else { // Copy run of non-zero bytes. + for ( ; count > 0 && ssize && dsize; --count) { // Copy non-zero bytes across. + *dst++ = *src++; + --ssize; --dsize; + } + } + } + *src_p = src; + return (orig_dsize - dsize); +} diff --git a/cluster.h b/cluster.h index 23ce345..2bfba31 100644 --- a/cluster.h +++ b/cluster.h @@ -1,19 +1,85 @@ // L2TPNS Clustering Stuff -// $Id: cluster.h,v 1.2 2004/03/05 00:09:03 fred_nerk Exp $ +// $Id: cluster.h,v 1.3 2004/06/23 03:52:24 fred_nerk Exp $ -#define C_HELLO 1 -#define C_HELLO_RESPONSE 2 +#ifndef __CLUSTER_H__ +#define __CLUSTER_H__ + + +#define C_HEARTBEAT 1 +#define C_ACK 2 #define C_PING 3 -#define C_TUNNEL 4 -#define C_SESSION 5 +#define C_TUNNEL 4 // Tunnel structure. +#define C_SESSION 5 // Session structure. #define C_GOODBYE 6 +#define C_LASTSEEN 7 // Tell master the last heartbeat that I handled. +#define C_KILL 8 // Tell a slave to die. +#define C_FORWARD 9 // Forwarded packet.. +#define C_BYTES 10 // Update byte counters. +#define C_THROTTLE 11 // A packet for the master to throttle. (The TBF tells direction). +#define C_CSESSION 12 // Compressed session structure. +#define C_CTUNNEL 13 // Compressed tunnel structure. +#define C_GARDEN 14 // Gardened packet + +#define HB_VERSION 2 // Protocol version number.. +#define HB_MAX_SEQ (1<<30) // Maximum sequence number. (MUST BE A POWER OF 2!) +#define HB_HISTORY_SIZE 64 // How many old heartbeats we remember?? (Must be a factor of HB_MAX_SEQ) + +#define PING_INTERVAL 5 // 0.5 seconds. Needs to be short to keep session tables fresh. +#define HB_TIMEOUT (15*2*PING_INTERVAL) // 15 seconds without heartbeat triggers an election.. #define CLUSTERPORT 32792 -#define CLUSTERCLIENTPORT 32793 #define UDP 17 #define TIMEOUT 20 #define IL sizeof(int) -int cluster_init(uint32_t bind_address, int server); -int cluster_send_message(unsigned long ip_address, uint32_t vip, char type, void *data, int datalen); -int processcluster(char *buf, int l); +#define CLUSTER_MAX_SIZE 32 // No more than 32 machines in a cluster! + +#define DEFAULT_MCAST_ADDR "239.192.13.13" // Need an assigned number! +#define DEFAULT_MCAST_INTERFACE "eth0" + +typedef struct { + u32 version; // protocol version. + u32 seq; // Sequence number for this heatbeat. + u32 basetime; // What time I started + u32 clusterid; // Id for this cluster? + + u32 highsession; // Id of the highest in-use session. + u32 freesession; // Id of the first free session. + u32 hightunnel; // Id of the highest used tunnel. + u32 size_sess; // Size of the session structure. + + u32 size_tunn; // size of the tunnel structure. + + char reserved[128 - 9*sizeof(u32)]; // Pad out to 128 bytes. +} heartt; + +typedef struct { /* Used to update byte counters on the */ + /* master. */ + u32 sid; + u32 in; + u32 out; +} bytest; + +typedef struct { + u32 addr; // + u32 ver; // version of structure. + u32 undef; // Number of undefined structures. 0 if up-to-date. + u32 basetime; // start time of this peer. +} pingt; + +int cluster_init(); +int processcluster(char *buf, int size, u32 addr); +int cluster_forward_packet(char *buf, int size, u32 addr); +int cluster_send_session(int sid); +int cluster_send_tunnel(int tid); +int master_forward_packet(char * data, int size, u32 addr, int port); +int master_throttle_packet(int tid, char * data, int size); +int master_garden_packet(sessionidt s, char * data, int size); +void master_update_counts(void); + +void cluster_send_ping(time_t basetime); +void cluster_heartbeat(int highsession, int freesession, int hightunnel); +void cluster_check_master(void); +int show_cluster(struct cli_def *cli, char *command, char **argv, int argc); + +#endif /* __CLUSTER_H__ */ diff --git a/cluster_master.c b/cluster_master.c deleted file mode 100644 index 31e9332..0000000 --- a/cluster_master.c +++ /dev/null @@ -1,517 +0,0 @@ -// L2TPNS Cluster Master -// $Id: cluster_master.c,v 1.3 2004/05/24 04:12:34 fred_nerk Exp $ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "cluster.h" -#include "ll.h" -#include "util.h" -#include "config.h" - -#define L2TPNS BINDIR "/l2tpns" - -typedef struct -{ - char *hostname; - unsigned long last_message; - uint32_t ip_address; - uint32_t slave_address; - int remove_from_cluster; - int down; - int tunnel_len; - int session_len; - pid_t pid; - - int num_tunnels; - char *tunnels[1000]; - int num_sessions; - char *sessions[13000]; -} slave; - -uint32_t master_address; -linked_list *slaves; -extern int cluster_sockfd; -int debug = 4; - -int processmsg(char *buf, int l, struct sockaddr_in *src_addr); -int handle_hello(char *buf, int l, struct sockaddr_in *src_addr, uint32_t addr); -int handle_tunnel(char *buf, int l, uint32_t addr); -int handle_session(char *buf, int l, uint32_t addr); -int handle_ping(char *buf, int l, uint32_t addr); -int handle_goodbye(char *buf, int l, uint32_t addr); -int backup_up(slave *s); -int backup_down(slave *s); -int return_state(slave *s); -slave *find_slave(uint32_t address); -#define log _log -void _log(int level, const char *format, ...) __attribute__((format (printf, 2, 3))); -void log_hex(int level, const char *title, const char *data, int maxsize); - -/* Catch our forked processes exiting */ -void sigchild_handler(int signal) -{ - int status; - int pid; - - pid = wait(&status); - /* TODO: catch errors and respawn? */ -} - -int main(int argc, char *argv[]) -{ - slave *s; - char buf[4096]; - struct timeval to; - - if (argc != 2) { - log(0, "Usage: %s
\n", argv[0]); - exit(-1); - } - - master_address = inet_addr(argv[1]); - if (master_address == INADDR_NONE) { - log(0, "Invalid ip %s\n", argv[1]); - exit(-1); - } - - cluster_init(master_address, 1); - slaves = ll_init(); - - signal(SIGCHLD, sigchild_handler); - - log(0, "Cluster Manager $Id: cluster_master.c,v 1.3 2004/05/24 04:12:34 fred_nerk Exp $ starting\n"); - - to.tv_sec = 1; - to.tv_usec = 0; - while (1) - { - fd_set r; - int n; - - FD_ZERO(&r); - FD_SET(cluster_sockfd, &r); - n = select(cluster_sockfd + 1, &r, 0, 0, &to); - if (n < 0) - { - if (errno != EINTR) - { - perror("select"); - exit(-1); - } - continue; - } - else if (n) - { - struct sockaddr_in addr; - int alen = sizeof(addr); - - memset(buf, 0, sizeof(buf)); - if (FD_ISSET(cluster_sockfd, &r)) - processmsg(buf, recvfrom(cluster_sockfd, buf, sizeof(buf), MSG_WAITALL, (void *) &addr, &alen), &addr); - continue; - } - - // Handle slaves timing out - { - time_t now = time(NULL); - ll_reset(slaves); - while ((s = ll_next(slaves))) - { - if (s->down) continue; - if (s->last_message < (now - TIMEOUT)) - { - log(4, "Slave \"%s\" s->last_message is %lu (timeout is %lu)\n", s->hostname, s->last_message, (now - TIMEOUT)); - if (s->remove_from_cluster) - { - // Remove them from the cluster - ll_delete(slaves, s); - if (s->hostname) free(s->hostname); - free(s); - ll_reset(slaves); - continue; - } - backup_up(s); - } - } - } - - to.tv_sec = 1; - to.tv_usec = 0; - } - - return 0; -} - -int processmsg(char *buf, int l, struct sockaddr_in *src_addr) -{ - slave *s; - char mtype; - uint32_t addr; - - log_hex(4, "Received", buf, l); - if (!buf || l <= sizeof(uint32_t)) return 0; - - addr = ntohl(*(uint32_t*)buf); - buf += sizeof(uint32_t); - l -= sizeof(uint32_t); - - mtype = *buf; buf++; l--; - - if (mtype != C_GOODBYE && (s = find_slave(addr)) && s->down) - { - char *hostname; - hostname = calloc(l + 1, 1); - memcpy(hostname, buf, l); - log(1, "Slave \"%s\" (for %s) has come back.\n", hostname, inet_toa(s->ip_address)); - backup_down(s); - free(hostname); - } - - switch (mtype) - { - case C_HELLO: - handle_hello(buf, l, src_addr, addr); - break; - case C_PING: - if (!find_slave(addr)) - handle_hello(buf, l, src_addr, addr); - else - handle_ping(buf, l, addr); - break; - case C_TUNNEL: - if (!find_slave(addr)) handle_hello((char *)(buf + 1), *(char *)buf, src_addr, addr); - handle_tunnel(buf, l, addr); - break; - case C_SESSION: - if (!find_slave(addr)) handle_hello((char *)(buf + 1), *(char *)buf, src_addr, addr); - handle_session(buf, l, addr); - break; - case C_GOODBYE: - if (!find_slave(addr)) break; - handle_goodbye(buf, l, addr); - break; - } - return mtype; -} - -int handle_hello(char *buf, int l, struct sockaddr_in *src_addr, uint32_t addr) -{ - slave *s; - char *hostname; - - hostname = calloc(l + 1, 1); - memcpy(hostname, buf, l); - - // Is this a slave we have state information for? - if ((s = find_slave(addr))) - { - if (src_addr->sin_addr.s_addr == master_address) - { - log(1, "Got hello from \"%s\", local backup for %s.\n", hostname, inet_toa(s->ip_address)); - } - else if (s->down) - { - log(1, "Slave \"%s\" (for %s) has come back.\n", hostname, inet_toa(s->ip_address)); - backup_down(s); - } - else - { - log(1, "Slave \"%s\" said hello and we didn't know it was down.\n", s->hostname); - } - - /* Reset the hostname if needed */ - free(s->hostname); - s->hostname = hostname; - } else { - // No state information, it's a new slave - s = calloc(sizeof(slave), 1); - s->ip_address = addr; - ll_push(slaves, s); - s->hostname = hostname; - log(1, "New slave added to cluster \"%s\"\n", s->hostname); - } - - s->slave_address = src_addr->sin_addr.s_addr; - - // Send state information back - return_state(s); - - s->last_message = time(NULL); - - return 0; -} - -int handle_tunnel(char *buf, int l, uint32_t addr) -{ - int tid; - slave *s; - if (!(s = find_slave(addr))) - { - log(0, "handle_tunnel() called with no valid slave\n"); - return 0; - } - s->last_message = time(NULL); - - // Skip hostname - tid = *(char *)buf; - buf += (tid + 1); - l -= (tid + 1); - - // Grab tunnel ID - tid = *(int *)buf; - buf += sizeof(int); - l -= sizeof(int); - - log(3, "Received tunnel %d from \"%s\" (%d bytes long)\n", tid, s->hostname, l); - - // Allocate memory for it if it's not already - if (!s->tunnels[tid]) - { - s->tunnels[tid] = malloc(l); - s->num_tunnels++; - s->tunnel_len = l; - } - - memcpy(s->tunnels[tid], buf, l); - return l; -} - -int handle_session(char *buf, int l, uint32_t addr) -{ - slave *s; - int sid; - char hostname[4096] = {0}; - if (!(s = find_slave(addr))) - { - log(0, "handle_session() called with no valid slave\n"); - return 0; - } - s->last_message = time(NULL); - - // Skip hostname - sid = *(char *)buf; - memcpy(hostname, (char *)(buf + 1), sid); - buf += (sid + 1); - l -= (sid + 1); - log(0, "Hostname is %s\n", hostname); - - // Grab session ID - sid = *(int *)buf; - buf += sizeof(int); - l -= sizeof(int); - - log(3, "Received session %d from \"%s\" (%d bytes long)\n", sid, s->hostname, l); - - // Allocate memory for it if it's not already - if (!s->sessions[sid]) - { - s->sessions[sid] = malloc(l); - s->num_sessions++; - s->session_len = l; - } - - memcpy(s->sessions[sid], buf, l); - return l; -} - -int handle_ping(char *buf, int l, uint32_t addr) -{ - slave *s; - if (!(s = find_slave(addr))) - { - log(0, "handle_ping() called with no valid slave\n"); - return 0; - } - s->last_message = time(NULL); - - return 0; -} - -int return_state(slave *s) -{ - char *packet; - int i; - int num_tunnels = 0, num_sessions = 0; - int pktlen; - - log(3, "Sending state information to \"%s\"\n", s->hostname); - - for (i = 0; i < 1000; i++) - if (s->tunnels[i]) num_tunnels++; - - for (i = 0; i < 13000; i++) - if (s->sessions[i]) num_sessions++; - - if (!num_sessions && !num_tunnels) return 0; - - packet = calloc(IL * 4, 1); - *(int *)(packet + IL * 0) = num_tunnels; - *(int *)(packet + IL * 1) = num_sessions; - *(int *)(packet + IL * 2) = s->tunnel_len; - *(int *)(packet + IL * 3) = s->session_len; - cluster_send_message(s->slave_address, s->ip_address, C_HELLO_RESPONSE, packet, IL * 4); - free(packet); - - // Send tunnels one-by-one, in order - log(0, "Sending %d tunnels of %d bytes each\n", num_tunnels, s->tunnel_len); - pktlen = s->tunnel_len + sizeof(int); - packet = malloc(pktlen); - for (i = 0; i < 1000; i++) - { - if (s->tunnels[i]) - { - *(int *)packet = i; - memcpy((char *)(packet + sizeof(int)), s->tunnels[i], s->tunnel_len); - log(0, "Sending tunnel %d\n", i); - cluster_send_message(s->slave_address, s->ip_address, C_TUNNEL, packet, pktlen); - } - } - free(packet); - - // Send sessions one-by-one, in order - log(0, "Sending %d sessions of %d bytes each\n", num_sessions, s->session_len); - pktlen = s->session_len + sizeof(int); - packet = malloc(pktlen); - for (i = 0; i < 13000; i++) - { - if (s->sessions[i]) - { - *(int *)packet = i; - memcpy((char *)(packet + sizeof(int)), s->sessions[i], s->session_len); - log(0, "Sending session %d\n", i); - cluster_send_message(s->slave_address, s->ip_address, C_SESSION, packet, pktlen); - } - } - free(packet); - - return 0; -} - -slave *find_slave(uint32_t address) -{ - slave *s; - - ll_reset(slaves); - while ((s = ll_next(slaves))) - { - if (s->ip_address == address) - { - return s; - } - } - return NULL; -} - -void _log(int level, const char *format, ...) -{ - va_list ap; - if (debug < level) return; - - va_start(ap, format); - vfprintf(stderr, format, ap); -} - -void log_hex(int level, const char *title, const char *data, int maxsize) -{ - unsigned int i, j; - unsigned const char *d = (unsigned const char *)data; - - if (debug < level) return; - log(level, "%s (%d bytes):\n", title, maxsize); - setvbuf(stderr, NULL, _IOFBF, 16384); - for (i = 0; i < maxsize; ) - { - fprintf(stderr, "%4X: ", i); - for (j = i; j < maxsize && j < (i + 16); j++) - { - fprintf(stderr, "%02X ", d[j]); - if (j == i + 7) - fputs(": ", stderr); - } - - for (; j < i + 16; j++) - { - fputs(" ", stderr); - if (j == i + 7) - fputs(": ", stderr); - } - - fputs(" ", stderr); - for (j = i; j < maxsize && j < (i + 16); j++) - { - if (d[j] >= 0x20 && d[j] < 0x7f && d[j] != 0x20) - fputc(d[j], stderr); - else - fputc('.', stderr); - - if (j == i + 7) - fputs(" ", stderr); - } - - i = j; - fputs("\n", stderr); - } - fflush(stderr); - setbuf(stderr, NULL); -} - -int backup_up(slave *s) -{ - log(2, "Becoming backup for \"%s\" (%s).\n", s->hostname, inet_toa(s->ip_address)); - s->pid = fork(); - if (!s->pid) - { - if (execl(L2TPNS, L2TPNS, "-a", inet_toa(s->ip_address), NULL) < 0) - log(0, "Error execing backup " L2TPNS ": %s\n", strerror(errno)); - exit(0); - } - s->down = 1; - return 0; -} - -int backup_down(slave *s) -{ - log(2, "Not being backup for \"%s\" (%s) anymore.\n", s->hostname, inet_toa(s->ip_address)); - s->down = 0; - if (s->pid) { - kill(s->pid, SIGTERM); - sleep(2); - kill(s->pid, SIGKILL); - } - return 0; -} - -int handle_goodbye(char *buf, int l, uint32_t addr) -{ - int i; - slave *s; - - // Is this a slave we have state information for? - if ((s = find_slave(addr))) - { - log(0, "Received goodbye for slave %s\n", s->hostname); - ll_delete(slaves, s); - for (i = 0; i < s->num_tunnels; i++) - if (s->tunnels[i]) free(s->tunnels[i]); - for (i = 0; i < s->num_sessions; i++) - if (s->sessions[i]) free(s->sessions[i]); - if (s->hostname) free(s->hostname); - free(s); - } - - return 0; -} - diff --git a/cluster_slave.c b/cluster_slave.c deleted file mode 100644 index d1f316d..0000000 --- a/cluster_slave.c +++ /dev/null @@ -1,270 +0,0 @@ -// L2TPNS Cluster Master -// $Id: cluster_slave.c,v 1.4 2004/05/24 04:12:48 fred_nerk Exp $ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "l2tpns.h" -#include "cluster.h" -#include "ll.h" -#include "util.h" - -// vim: sw=4 ts=8 - -extern int cluster_sockfd; -extern char hostname[1000]; -extern ippoolt *ip_address_pool; -extern uint32_t vip_address; -extern struct configt *config; - -int handle_tunnel(char *buf, int l); -int handle_session(char *buf, int l); -int handle_hello_response(char *buf, int l); - -int processcluster(char *buf, int l) -{ - char mtype; - uint32_t addr; - - log_hex(4, "Cluster receive", buf, l); - if (!buf || l <= sizeof(uint32_t)) return 0; - - addr = ntohl(*(uint32_t*)buf); - buf += sizeof(uint32_t); - l -= sizeof(uint32_t); - - if (addr != vip_address) - { - log(0, 0, 0, 0, "Received cluster message for VIP %s, which isn't ours\n", inet_toa(addr)); - } - - mtype = *buf; buf++; l--; - - switch (mtype) - { - case C_HELLO: - break; - case C_HELLO_RESPONSE: - handle_hello_response(buf, l); - break; - case C_PING: - break; - case C_TUNNEL: - handle_tunnel(buf, l); - break; - case C_SESSION: - handle_session(buf, l); - break; - } - return mtype; - - return 0; -} - -int handle_tunnel(char *buf, int l) -{ - int t; - - // Ignore tunnel message if NOSTATEFILE exists - if (config->ignore_cluster_updates) - { - log(1, 0, 0, 0, "Discarding tunnel message from cluster master.\n"); - return 0; - } - - t = *(int *)buf; - log(1, 0, 0, t, "Receiving tunnel %d from cluster master (%d bytes)\n", t, l); - buf += sizeof(int); l -= sizeof(int); - - if (t > MAXTUNNEL) - { - log(0, 0, 0, t, "Cluster master tried to send tunnel %d, which is bigger than MAXTUNNEL (%d)\n", t, MAXTUNNEL); - return 0; - } - - if (l != sizeof(tunnelt)) - { - log(1, 0, 0, t, "Discarding bogus tunnel message (%d bytes instead of %d).\n", l, sizeof(tunnelt)); - return 0; - } - - memcpy(&tunnel[t], buf, l); - log(3, 0, 0, t, "Cluster master sent tunnel for %s\n", tunnel[t].hostname); - - tunnel[t].controlc = 0; - tunnel[t].controls = NULL; - tunnel[t].controle = NULL; - return 0; -} - -int handle_session(char *buf, int l) -{ - int s; - - // Ignore tunnel message if NOSTATEFILE exists - if (config->ignore_cluster_updates) - { - log(1, 0, 0, 0, "Discarding session message from cluster master.\n"); - return 0; - } - - s = *(int *)buf; - log(1, 0, s, 0, "Receiving session %d from cluster master (%d bytes)\n", s, l); - buf += sizeof(int); l -= sizeof(int); - - if (s > MAXSESSION) - { - log(0, 0, s, 0, "Cluster master tried to send session %d, which is bigger than MAXSESSION (%d)\n", s, MAXSESSION); - return 0; - } - - if (l != sizeof(sessiont)) - { - log(1, 0, s, 0, "Discarding short session message (%d bytes instead of %d).\n", l, sizeof(sessiont)); - return 0; - } - - if (s > 1) - { - session[s-1].next = session[s].next; - } - - if (sessionfree == s) - { - sessionfree = session[s].next; - } - - memcpy(&session[s], buf, l); - session[s].tbf = 0; - session[s].throttle = 0; - if (session[s].opened) - { - log(2, 0, s, session[s].tunnel, "Cluster master sent active session for user %s\n", session[s].user); - sessionsetup(session[s].tunnel, s, 0); - if (session[s].ip && session[s].ip != 0xFFFFFFFE) - { - int x; - for (x = 0; x < MAXIPPOOL && ip_address_pool[x].address; x++) - { - if (ip_address_pool[x].address == session[s].ip) - { - ip_address_pool[x].assigned = 1; - break; - } - } - } - } - return 0; -} - -int handle_hello_response(char *buf, int l) -{ - int numtunnels, numsessions; - - /* The cluster master has downed the address, so send another garp */ - send_garp(vip_address); - - if (!l) return 0; - - if (l < (4 * IL)) - { - log(1, 0, 0, 0, "Cluster master sent invalid hello response: %d bytes instead of %d\n", l, (4 * IL)); - return 0; - } - numtunnels = *(int *)(buf + IL * 0); - numsessions = *(int *)(buf + IL * 1); - if (numtunnels == 0 && numsessions == 0) - { - log(2, 0, 0, 0, "Cluster master has no state information for us.\n"); - return 0; - } - log(2, 0, 0, 0, "The cluster master will send %d tunnels and %d sessions.\n", numtunnels, numsessions); - return 0; -} - -int cluster_send_session(int s) -{ - char *packet; - int len = 0; - - if (!cluster_sockfd) return 1; - - packet = malloc(4096); - - // Hostname - len = strlen(hostname); - *(char *)packet = len; - memcpy((char *)(packet + 1), hostname, len); - len++; - - // Session ID - *(int *)(packet + len) = s; - len += sizeof(int); - - // Session data - memcpy((char *)(packet + len), &session[s], sizeof(sessiont)); - len += sizeof(sessiont); - - cluster_send_message(config->cluster_address, vip_address, C_SESSION, packet, len); - free(packet); - - return 1; -} - -int cluster_send_tunnel(int t) -{ - char *packet; - int len = 0; - - packet = malloc(4096); - - // Hostname - len = strlen(hostname); - *(char *)packet = len; - memcpy((char *)(packet + 1), hostname, len); - len++; - - // Tunnel ID - *(int *)(packet + len) = t; - len += sizeof(int); - - // Tunnel data - memcpy((char *)(packet + len), &tunnel[t], sizeof(tunnelt)); - len += sizeof(tunnelt); - - cluster_send_message(config->cluster_address, vip_address, C_TUNNEL, packet, len); - free(packet); - - return 1; -} - -int cluster_send_goodbye() -{ - char *packet; - int len = 0; - - packet = malloc(4096); - - log(2, 0, 0, 0, "Sending goodbye to cluster master\n"); - // Hostname - len = strlen(hostname); - *(char *)packet = len; - memcpy((char *)(packet + 1), hostname, len); - len++; - - cluster_send_message(config->cluster_address, vip_address, C_GOODBYE, packet, len); - free(packet); - - return 1; -} - diff --git a/config.h b/config.h index b5b3405..bd17966 100644 --- a/config.h +++ b/config.h @@ -1,3 +1,4 @@ #define LIBDIR "/usr/lib/l2tpns" #define ETCDIR "/etc/l2tpns" #define BINDIR "/usr/sbin" +#define DATADIR "/tmp" diff --git a/constants.c b/constants.c index 46258b1..c53a26f 100644 --- a/constants.c +++ b/constants.c @@ -4,7 +4,7 @@ const char *lcp_types[MAX_LCP_TYPE+1] = { "Reserved", "Maximum-Receive-Unit", - "Reserved 2", + "Async-Control-Map", "Authentication-Protocol", "Quality-Protocol", "Magic-Number", diff --git a/constants.h b/constants.h index 3ac8983..e561260 100644 --- a/constants.h +++ b/constants.h @@ -1,5 +1,5 @@ - -// enum these ? +#ifndef __CONSTANTS_H__ +#define __CONSTANTS_H__ #define MAX_LCP_TYPE 8 extern const char *lcp_types[MAX_LCP_TYPE+1]; @@ -25,3 +25,5 @@ extern const char *radius_states[MAX_RADIUS_STATE+1]; #define MAX_L2TP_MESSAGE_TYPE 16 extern const char *l2tp_message_types[MAX_L2TP_MESSAGE_TYPE+1]; + +#endif /* __CONSTANTS_H__ */ diff --git a/control.h b/control.h index 041d3ce..60e0d03 100644 --- a/control.h +++ b/control.h @@ -15,4 +15,4 @@ int send_packet(int sockfd, int dest_ip, int dest_port, char *packet, int len); void dump_packet(char *packet, FILE *stream); int read_packet(int sockfd, char *packet); -#endif +#endif /* __CONTROL_H__ */ diff --git a/garden.c b/garden.c index c3ce34a..7477089 100644 --- a/garden.c +++ b/garden.c @@ -8,21 +8,28 @@ #include "control.h" int __plugin_api_version = 1; -struct pluginfuncs p; +static struct pluginfuncs *p = 0; -char *init_commands[] = { - // This is for incoming connections to a gardened user - "iptables -t nat -N garden_users 2>&1 >/dev/null", +static int iam_master = 0; // We're all slaves! Slaves I tell you! + +char *up_commands[] = { + "iptables -t nat -N garden >/dev/null 2>&1", // Create a chain that all gardened users will go through + "iptables -t nat -F garden", + ". " PLUGINCONF "/build-garden", // Populate with site-specific DNAT rules + "iptables -t nat -N garden_users >/dev/null 2>&1",// Empty chain, users added/removed by garden_session "iptables -t nat -F garden_users", - "iptables -t nat -N garden 2>&1", /* Don't flush - init script sets this up */ - "iptables -t nat -A l2tpns -j garden_users", - NULL + "iptables -t nat -A PREROUTING -j garden_users", // DNAT any users on the garden_users chain + NULL, }; -char *done_commands[] = { - "iptables -t nat -F garden_users 2>&1 >/dev/null", - "iptables -t nat -D l2tpns -j garden_users", - NULL +char *down_commands[] = { + "iptables -t nat -F PREROUTING", + "iptables -t nat -F garden_users", + "iptables -t nat -X garden_users", + "iptables -t nat -F garden", + "iptables -t nat -X garden", + "rmmod iptable_nat ip_conntrack", + NULL, }; int garden_session(sessiont *s, int flag); @@ -32,7 +39,7 @@ int plugin_post_auth(struct param_post_auth *data) // Ignore if user authentication was successful if (data->auth_allowed) return PLUGIN_RET_OK; - p.log(3, 0, 0, 0, "Walled Garden allowing login\n"); + p->log(3, 0, 0, 0, "Walled Garden allowing login\n"); data->auth_allowed = 1; data->s->walled_garden = 1; return PLUGIN_RET_OK; @@ -40,13 +47,23 @@ int plugin_post_auth(struct param_post_auth *data) int plugin_new_session(struct param_new_session *data) { - if (data->s->walled_garden) garden_session(data->s, 1); + if (!iam_master) + return PLUGIN_RET_OK; // Slaves don't do walled garden processing. + + if (data->s->walled_garden) + garden_session(data->s, 1); + return PLUGIN_RET_OK; } int plugin_kill_session(struct param_new_session *data) { - if (data->s->walled_garden) garden_session(data->s, 0); + if (!iam_master) + return PLUGIN_RET_OK; // Slaves don't do walled garden processing. + + if (data->s->walled_garden) + garden_session(data->s, 0); + return PLUGIN_RET_OK; } @@ -55,12 +72,21 @@ int plugin_control(struct param_control *data) sessiont *s; sessionidt session; - if (data->type != PKT_GARDEN && data->type != PKT_UNGARDEN) return PLUGIN_RET_OK; - if (!data->data && data->data_length) return PLUGIN_RET_OK; + if (!iam_master) // All garden processing happens on the master. + return PLUGIN_RET_OK; + + if (data->type != PKT_GARDEN && data->type != PKT_UNGARDEN) + return PLUGIN_RET_OK; + + if (!data->data && data->data_length) + return PLUGIN_RET_OK; + session = atoi((char*)(data->data)); - if (!session) return PLUGIN_RET_OK; // Really? + if (!session) + return PLUGIN_RET_OK; + data->send_response = 1; - s = p.get_session_by_id(session); + s = p->get_session_by_id(session); if (!s || !s->ip) { char *errormsg = "Session not connected"; @@ -68,7 +94,7 @@ int plugin_control(struct param_control *data) sprintf((data->response + data->response_length), "%s", errormsg); data->response_length += strlen(errormsg) + 1; - p.log(3, 0, 0, 0, "Unknown session %d\n", session); + p->log(3, 0, 0, 0, "Unknown session %d\n", session); return PLUGIN_RET_STOP; } *(short *)(data->response + 2) = ntohs(PKT_RESP_OK); @@ -80,9 +106,33 @@ int plugin_control(struct param_control *data) sprintf((data->response + data->response_length), "%s", errormsg); data->response_length += strlen(errormsg) + 1; } + return PLUGIN_RET_STOP; } +int plugin_become_master(void) +{ + int i; + iam_master = 1; // We just became the master. Wow! + + for (i = 0; up_commands[i] && *up_commands[i]; i++) + { + p->log(3, 0, 0, 0, "Running %s\n", up_commands[i]); + system(up_commands[i]); + } + + return PLUGIN_RET_OK; +} + +// Called for each active session after becoming master +int plugin_new_session_master(sessiont * s) +{ + if (s->walled_garden) + garden_session(s, 1); + + return PLUGIN_RET_OK; +} + int garden_session(sessiont *s, int flag) { char cmd[2048]; @@ -90,34 +140,11 @@ int garden_session(sessiont *s, int flag) if (!s) return 0; if (!s->opened) return 0; - /* Note that we don't handle throttling/snooping/etc here - * To do that, we'd need to send an end accounting record - * then a radius auth, then start accouting again. - * That means that we need the password (which garden has) - * and a lot of code to check that the new set of params - * (routes, IP, ACLs, etc) 'matched' the old one in a - * 'compatable' way. (ie user's system doesn't need to be told - * of the change) - * - * Thats a lot of pain/code for very little gain. - * If we want them redone from scratch, just sessionkill them - - * a user on garden isn't going to have any open TCP - * connections which are worth caring about, anyway. - * - * Note that the user will be rethrottled shortly by the scan - * script thingy if appropriate. - * - * Currently, garden only directly ungardens someone if - * they haven't paid their bill, and then subsequently do so - * online. This isn't something which can be set up by a malicious - * customer at will. - */ if (flag == 1) { - // Gardened User - p.log(2, 0, 0, s->tunnel, "Trap user %s (%s) in walled garden\n", s->user, p.inet_toa(ntohl(s->ip))); - snprintf(cmd, 2048, "iptables -t nat -A garden_users -s %s -j garden", p.inet_toa(ntohl(s->ip))); - p.log(3, 0, 0, s->tunnel, "%s\n", cmd); + p->log(2, 0, 0, s->tunnel, "Garden user %s (%s)\n", s->user, p->inet_toa(htonl(s->ip))); + snprintf(cmd, 2048, "iptables -t nat -A garden_users -s %s -j garden", p->inet_toa(htonl(s->ip))); + p->log(3, 0, 0, s->tunnel, "%s\n", cmd); system(cmd); s->walled_garden = 1; } @@ -127,18 +154,18 @@ int garden_session(sessiont *s, int flag) int count = 40; // Normal User - p.log(2, 0, 0, s->tunnel, "Release user %s (%s) from walled garden\n", s->user, p.inet_toa(ntohl(s->ip))); + p->log(2, 0, 0, s->tunnel, "Un-Garden user %s (%s)\n", s->user, p->inet_toa(htonl(s->ip))); // Kick off any duplicate usernames // but make sure not to kick off ourself - if (s->ip && !s->die && (other = p.get_session_by_username(s->user)) && s != p.get_session_by_id(other)) { - p.sessionkill(other, "Duplicate session when user un-gardened"); + if (s->ip && !s->die && (other = p->get_session_by_username(s->user)) && s != p->get_session_by_id(other)) { + p->sessionkill(other, "Duplicate session when user released from walled garden"); } /* Clean up counters */ s->cin = s->cout = 0; s->pin = s->pout = 0; - snprintf(cmd, 2048, "iptables -t nat -D garden_users -s %s -j garden", p.inet_toa(ntohl(s->ip))); - p.log(3, 0, 0, s->tunnel, "%s\n", cmd); + snprintf(cmd, 2048, "iptables -t nat -D garden_users -s %s -j garden", p->inet_toa(htonl(s->ip))); + p->log(3, 0, 0, s->tunnel, "%s\n", cmd); while (--count) { int status = system(cmd); @@ -149,8 +176,8 @@ int garden_session(sessiont *s, int flag) if (!s->die) { /* OK, we're up! */ - u16 r = p.radiusnew(p.get_id_by_session(s)); - p.radiussend(r, RADIUSSTART); + u16 r = p->radiusnew(p->get_id_by_session(s)); + p->radiussend(r, RADIUSSTART); } } s->walled_garden = flag; @@ -159,15 +186,32 @@ int garden_session(sessiont *s, int flag) int plugin_init(struct pluginfuncs *funcs) { - int i; + FILE *tables; + int found_nat = 0; + + if (!funcs) + return 0; - if (!funcs) return 0; - memcpy(&p, funcs, sizeof(p)); + p = funcs; - for (i = 0; init_commands[i] && *init_commands[i]; i++) + if ((tables = fopen("/proc/net/ip_tables_names", "r"))) { - p.log(3, 0, 0, 0, "Running %s\n", init_commands[i]); - system(init_commands[i]); + char buf[1024]; + while (fgets(buf, sizeof(buf), tables) && !found_nat) + found_nat = !strcmp(buf, "nat\n"); + + fclose(tables); + } + + /* master killed/crashed? */ + if (found_nat) + { + int i; + for (i = 0; down_commands[i] && *down_commands[i]; i++) + { + p->log(3, 0, 0, 0, "Running %s\n", down_commands[i]); + system(down_commands[i]); + } } return 1; @@ -176,10 +220,14 @@ int plugin_init(struct pluginfuncs *funcs) void plugin_done() { int i; - for (i = 0; done_commands[i] && *done_commands[i]; i++) + + if (!iam_master) // Never became master. nothing to do. + return; + + for (i = 0; down_commands[i] && *down_commands[i]; i++) { - p.log(3, 0, 0, 0, "Running %s\n", done_commands[i]); - system(done_commands[i]); + p->log(3, 0, 0, 0, "Running %s\n", down_commands[i]); + system(down_commands[i]); } } diff --git a/icmp.c b/icmp.c index 0a56740..f7d73c6 100644 --- a/icmp.c +++ b/icmp.c @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -9,9 +10,8 @@ #include #include #include -#include "l2tpns.h" -extern ipt myip; +#include "l2tpns.h" __u16 _checksum(unsigned char *addr, int count); diff --git a/l2tpns.c b/l2tpns.c index 7b8822d..abdfb9c 100644 --- a/l2tpns.c +++ b/l2tpns.c @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -27,13 +26,16 @@ #include #include #include -#include #include #include #include #include #include #include +#include +#include +#include + #include "md5.h" #include "l2tpns.h" #include "cluster.h" @@ -42,6 +44,7 @@ #include "constants.h" #include "control.h" #include "util.h" +#include "tbf.h" // Globals struct configt *config = NULL; // all configuration @@ -53,7 +56,6 @@ int *radfds = NULL; // RADIUS requests file handles int ifrfd = -1; // File descriptor for routing, etc time_t basetime = 0; // base clock char hostname[1000] = ""; // us. -ipt myip = 0; // MY IP u16 tapmac[3]; // MAC of tap interface int tapidx; // ifr_ifindex of tap device u32 sessionid = 0; // session id for radius accounting @@ -61,21 +63,28 @@ int syslog_log = 0; // are we logging to syslog FILE *log_stream = NULL; struct sockaddr_in snoop_addr = {0}; extern int cluster_sockfd; -unsigned long last_sid = 0; +u32 last_sid = 0; int clifd = 0; sessionidt *cli_session_kill = NULL; tunnelidt *cli_tunnel_kill = NULL; static void *ip_hash[256]; -unsigned long udp_tx = 0, udp_rx = 0, udp_rx_pkt = 0; -unsigned long eth_tx = 0, eth_rx = 0, eth_rx_pkt = 0; -unsigned int ip_pool_size = 0; -time_t time_now; +u32 udp_tx = 0, udp_rx = 0, udp_rx_pkt = 0; +u32 eth_tx = 0, eth_rx = 0, eth_rx_pkt = 0; +u32 ip_pool_size = 1; +time_t time_now = 0; char time_now_string[64] = {0}; char main_quit = 0; char *_program_name = NULL; linked_list *loaded_plugins; linked_list *plugins[MAX_PLUGIN_TYPES]; +#ifdef BGP +#include "bgp.h" +struct bgp_peer *bgp_peers = 0; +struct bgp_route_list *bgp_routes = 0; +int bgp_configured = 0; +#endif /* BGP */ + #define membersize(STRUCT, MEMBER) sizeof(((STRUCT *)0)->MEMBER) #define CONFIG(NAME, MEMBER, TYPE) { NAME, offsetof(struct configt, MEMBER), membersize(struct configt, MEMBER), TYPE } @@ -86,20 +95,29 @@ struct config_descriptt config_values[] = { CONFIG("primary_dns", default_dns1, IP), CONFIG("secondary_dns", default_dns2, IP), CONFIG("save_state", save_state, BOOL), - CONFIG("snoop_host", snoop_destination_host, IP), - CONFIG("snoop_port", snoop_destination_port, SHORT), CONFIG("primary_radius", radiusserver[0], IP), CONFIG("secondary_radius", radiusserver[1], IP), CONFIG("radius_accounting", radius_accounting, BOOL), CONFIG("radius_secret", radiussecret, STRING), CONFIG("bind_address", bind_address, IP), - CONFIG("cluster_master", cluster_address, IP), + CONFIG("send_garp", send_garp, BOOL), CONFIG("throttle_speed", rl_rate, UNSIGNED_LONG), CONFIG("accounting_dir", accounting_dir, STRING), CONFIG("setuid", target_uid, INT), CONFIG("dump_speed", dump_speed, BOOL), CONFIG("cleanup_interval", cleanup_interval, INT), CONFIG("multi_read_count", multi_read_count, INT), + CONFIG("scheduler_fifo", scheduler_fifo, BOOL), + CONFIG("icmp_rate", icmp_rate, INT), + CONFIG("cluster_address", cluster_address, IP), + CONFIG("cluster_interface", cluster_interface, STRING), +#ifdef BGP + CONFIG("as_number", as_number, SHORT), + CONFIG("bgp_peer1", bgp_peer[0], STRING), + CONFIG("bgp_peer1_as", bgp_peer_as[0], SHORT), + CONFIG("bgp_peer2", bgp_peer[1], STRING), + CONFIG("bgp_peer2_as", bgp_peer_as[1], SHORT), +#endif /* BGP */ { NULL, 0, 0, 0 }, }; @@ -114,11 +132,15 @@ char *plugin_functions[] = { "plugin_kill_session", "plugin_control", "plugin_radius_response", + "plugin_become_master", + "plugin_new_session_master", }; + #define max_plugin_functions (sizeof(plugin_functions) / sizeof(char *)) tunnelt *tunnel = NULL; // 1000 * 45 = 45000 = 45k sessiont *session = NULL; // 5000 * 213 = 1065000 = 1 Mb +sessioncountt *sess_count = NULL; radiust *radius = NULL; ippoolt *ip_address_pool = NULL; controlt *controlfree = 0; @@ -126,7 +148,6 @@ struct Tstats *_statistics = NULL; #ifdef RINGBUFFER struct Tringbuffer *ringbuffer = NULL; #endif -tbft *filter_buckets = NULL; void sigalrm_handler(int); void sighup_handler(int); @@ -140,6 +161,9 @@ void tunnel_clean(); tunnelidt new_tunnel(); void update_config(); +static void cache_ipmap(ipt ip, int s); +static void uncache_ipmap(ipt ip); + // return internal time (10ths since run) clockt now(void) { @@ -157,8 +181,8 @@ clockt backoff(u8 try) void _log(int level, ipt address, sessionidt s, tunnelidt t, const char *format, ...) { - static char message[65535] = {0}; - static char message2[65535] = {0}; + static char message[65536] = {0}; + static char message2[65536] = {0}; va_list ap; #ifdef RINGBUFFER @@ -187,7 +211,7 @@ void _log(int level, ipt address, sessionidt s, tunnelidt t, const char *format, { vsnprintf(message2, 65535, format, ap); snprintf(message, 65535, "%s %02d/%02d %s", time_now_string, t, s, message2); - fprintf(log_stream, message); + fprintf(log_stream, "%s", message); } else if (syslog_log) { @@ -201,7 +225,7 @@ void _log(int level, ipt address, sessionidt s, tunnelidt t, const char *format, void _log_hex(int level, ipt address, sessionidt s, tunnelidt t, const char *title, const char *data, int maxsize) { int i, j; - unsigned const char *d = (unsigned const char *)data; + const u8 *d = (const u8 *)data; if (config->debug < level) return; @@ -251,9 +275,22 @@ void _log_hex(int level, ipt address, sessionidt s, tunnelidt t, const char *tit // Add a route -void routeset(ipt ip, ipt mask, ipt gw, u8 add) +// +// This adds it to the routing table, advertises it +// via iBGP if enabled, and stuffs it into the +// 'sessionbyip' cache. +// +// 'ip' and 'mask' must be in _host_ order. +// +void routeset(sessionidt s, ipt ip, ipt mask, ipt gw, u8 add) { struct rtentry r; + int i; + + if (!mask) mask = 0xffffffff; + + ip = ip & mask; // Force the ip to be the first one in the route. + memset(&r, 0, sizeof(r)); r.rt_dev = config->tapdevice; r.rt_dst.sa_family = AF_INET; @@ -261,14 +298,44 @@ void routeset(ipt ip, ipt mask, ipt gw, u8 add) r.rt_gateway.sa_family = AF_INET; *(u32 *) & (((struct sockaddr_in *) &r.rt_gateway)->sin_addr.s_addr) = htonl(gw); r.rt_genmask.sa_family = AF_INET; - *(u32 *) & (((struct sockaddr_in *) &r.rt_genmask)->sin_addr.s_addr) = htonl(mask ? mask : 0xFFFFFFF); + *(u32 *) & (((struct sockaddr_in *) &r.rt_genmask)->sin_addr.s_addr) = htonl(mask); r.rt_flags = (RTF_UP | RTF_STATIC); if (gw) r.rt_flags |= RTF_GATEWAY; - else + else if (mask == 0xffffffff) r.rt_flags |= RTF_HOST; - if (ioctl(ifrfd, add ? SIOCADDRT : SIOCDELRT, (void *) &r) < 0) perror("routeset"); - log(1, ip, 0, 0, "Route %s %u.%u.%u.%u/%u.%u.%u.%u %u.%u.%u.%u\n", add ? "Add" : "Del", ip >> 24, ip >> 16 & 255, ip >> 8 & 255, ip & 255, mask >> 24, mask >> 16 & 255, mask >> 8 & 255, mask & 255, gw >> 24, gw >> 16 & 255, gw >> 8 & 255, gw & 255); + if (ioctl(ifrfd, add ? SIOCADDRT : SIOCDELRT, (void *) &r) < 0) + log(0, 0, 0, 0, "routeset() error in ioctl: %s\n", strerror(errno)); + + log(1, ip, 0, 0, "Route %s %u.%u.%u.%u/%u.%u.%u.%u %u.%u.%u.%u\n", + add ? "add" : "del", + ip >> 24, ip >> 16 & 0xff, ip >> 8 & 0xff, ip & 0xff, + mask >> 24, mask >> 16 & 0xff, mask >> 8 & 0xff, mask & 0xff, + gw >> 24, gw >> 16 & 0xff, gw >> 8 & 0xff, gw & 0xff); + +#ifdef BGP + if (add) + bgp_add_route(htonl(ip), htonl(mask)); + else + bgp_del_route(htonl(ip), htonl(mask)); +#endif /* BGP */ + + // Add/Remove the IPs to the 'sessionbyip' cache. + // Note that we add the zero address in the case of + // a network route. Roll on CIDR. + + // Note that 's == 0' implies this is the address pool. + // We still cache it here, because it will pre-fill + // the malloc'ed tree. + + if (s) + { + if (!add) // Are we deleting a route? + s = 0; // Caching the session as '0' is the same as uncaching. + + for (i = ip; (i&mask) == (ip&mask) ; ++i) + cache_ipmap(i, s); + } } // Set up TAP interface @@ -283,7 +350,7 @@ void inittap(void) if (tapfd < 0) { // fatal log(0, 0, 0, 0, "Can't open %s: %s\n", TAPDEVICE, strerror(errno)); - exit(-1); + exit(1); } { int flags = fcntl(tapfd, F_GETFL, 0); @@ -292,7 +359,7 @@ void inittap(void) if (ioctl(tapfd, TUNSETIFF, (void *) &ifr) < 0) { log(0, 0, 0, 0, "Can't set tap interface: %s\n", strerror(errno)); - exit(-1); + exit(1); } assert(strlen(ifr.ifr_name) < sizeof(config->tapdevice)); strncpy(config->tapdevice, ifr.ifr_name, sizeof(config->tapdevice) - 1); @@ -304,32 +371,32 @@ void inittap(void) if (ioctl(ifrfd, SIOCSIFADDR, (void *) &ifr) < 0) { - perror("set tap addr"); - exit( -1); + log(0, 0, 0, 0, "Error setting tun address: %s\n", strerror(errno)); + exit(1); } /* Bump up the qlen to deal with bursts from the network */ ifr.ifr_qlen = 1000; if (ioctl(ifrfd, SIOCSIFTXQLEN, (void *) &ifr) < 0) { - perror("set tap qlen"); - exit( -1); + log(0, 0, 0, 0, "Error setting tun queue length: %s\n", strerror(errno)); + exit(1); } ifr.ifr_flags = IFF_UP; if (ioctl(ifrfd, SIOCSIFFLAGS, (void *) &ifr) < 0) { - perror("set tap flags"); - exit( -1); + log(0, 0, 0, 0, "Error setting tun flags: %s\n", strerror(errno)); + exit(1); } if (ioctl(ifrfd, SIOCGIFHWADDR, (void *) &ifr) < 0) { - perror("get tap hwaddr"); - exit( -1); + log(0, 0, 0, 0, "Error setting tun hardware address: %s\n", strerror(errno)); + exit(1); } memcpy(&tapmac, 2 + (u8 *) & ifr.ifr_hwaddr, 6); if (ioctl(ifrfd, SIOCGIFINDEX, (void *) &ifr) < 0) { - perror("get tap ifindex"); - exit( -1); + log(0, 0, 0, 0, "Error setting tun ifindex: %s\n", strerror(errno)); + exit(1); } tapidx = ifr.ifr_ifindex; } @@ -353,10 +420,11 @@ void initudp(void) } if (bind(udpfd, (void *) &addr, sizeof(addr)) < 0) { - perror("udp bind"); - exit( -1); + log(0, 0, 0, 0, "Error in UDP bind: %s\n", strerror(errno)); + exit(1); } snoopfd = socket(AF_INET, SOCK_DGRAM, UDP); + snoop_addr.sin_family = AF_INET; // Control memset(&addr, 0, sizeof(addr)); @@ -366,35 +434,62 @@ void initudp(void) setsockopt(controlfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); if (bind(controlfd, (void *) &addr, sizeof(addr)) < 0) { - perror("bind"); - exit(-1); + log(0, 0, 0, 0, "Error in control bind: %s\n", strerror(errno)); + exit(1); } } -// Find session by IP, 0 for not found -sessionidt sessionbyip(ipt ip) +// +// Find session by IP, < 1 for not found +// +// Confusingly enough, this 'ip' must be +// in _network_ order. This being the common +// case when looking it up from IP packet headers. +// +// We actually use this cache for two things. +// #1. For used IP addresses, this maps to the +// session ID that it's used by. +// #2. For un-used IP addresses, this maps to the +// index into the pool table that contains that +// IP address. +// + +int lookup_ipmap(ipt ip) { - unsigned char *a = (unsigned char *)&ip; + u8 *a = (u8 *)&ip; char **d = (char **) ip_hash; - sessionidt s; - -#ifdef STAT_CALLS - STAT(call_sessionbyip); -#endif + int s; if (!(d = (char **) d[(size_t) *a++])) return 0; if (!(d = (char **) d[(size_t) *a++])) return 0; if (!(d = (char **) d[(size_t) *a++])) return 0; s = (ipt) d[(size_t) *a]; - if (s && session[s].tunnel) + return s; +} + +sessionidt sessionbyip(ipt ip) +{ + int s = lookup_ipmap(ip); + +#ifdef STAT_CALLS + STAT(call_sessionbyip); +#endif + if (s > 0 && s < MAXSESSION && session[s].tunnel) return s; return 0; } -void cache_sessionid(ipt ip, sessionidt s) +// +// Take an IP address in HOST byte order and +// add it to the sessionid by IP cache. +// +// (It's actually cached in network order) +// +static void cache_ipmap(ipt ip, int s) { - unsigned char *a = (unsigned char *) &ip; + ipt nip = htonl(ip); // MUST be in network order. I.e. MSB must in be ((char*)(&ip))[0] + u8 *a = (u8 *) &nip; char **d = (char **) ip_hash; int i; @@ -409,44 +504,81 @@ void cache_sessionid(ipt ip, sessionidt s) d = (char **) d[(size_t) a[i]]; } - log(4, ip, s, session[s].tunnel, "Caching session ID %d for ip address\n", s); d[(size_t) a[3]] = (char *)((int)s); + + if (s > 0) + log(4, ip, s, session[s].tunnel, "Caching ip address %s\n", inet_toa(nip)); + else if (s == 0) + log(4, ip, 0, 0, "Un-caching ip address %s\n", inet_toa(nip)); + // else a map to an ip pool index. } -void uncache_sessionid(ipt ip) +static void uncache_ipmap(ipt ip) { - unsigned char *a = (unsigned char *) &ip; - char **d = (char **) ip_hash; - int i; + cache_ipmap(ip, 0); // Assign it to the NULL session. +} - for (i = 0; i < 3; i++) - { - if (!d[(size_t) a[i]]) return; - d = (char **) d[(size_t) a[i]]; +// +// CLI list to dump current ipcache. +// +int cmd_show_ipcache(struct cli_def *cli, char *command, char **argv, int argc) +{ + char **d = (char **) ip_hash, **e, **f, **g; + int i, j, k, l; + int count = 0; + + cli_print(cli, "%7s %s", "Sess#", "IP Address"); + + for (i = 0; i < 256; ++i) { + if (!d[i]) continue; + e = (char**) d[i]; + for (j = 0; j < 256; ++j) { + if (!e[j]) continue; + f = (char**) e[j]; + for (k = 0; k < 256; ++k) { + if (!f[k]) continue; + g = (char**)f[k]; + for (l = 0; l < 256; ++l) { + if (!g[l]) continue; + cli_print(cli, "%7d %d.%d.%d.%d", (int) g[l], i, j, k, l); + ++count; + } + } + } } - d[(size_t) a[3]] = NULL; + cli_print(cli, "%d entries in cache", count); + return CLI_OK; } + // Find session by username, 0 for not found // walled garden users aren't authenticated, so the username is // reasonably useless. Ignore them to avoid incorrect actions +// +// This is VERY inefficent. Don't call it often. :) +// sessionidt sessionbyuser(char *username) { int s; #ifdef STAT_CALLS STAT(call_sessionbyuser); #endif - for (s = 1; s < MAXSESSION && (session[s].walled_garden || strncmp(session[s].user, username, 128)); s++); - if (s < MAXSESSION) - return s; - return 0; + for (s = 1; s < MAXSESSION ; ++s) { + if (session[s].walled_garden) + continue; // Skip walled garden users. + + if (!strncmp(session[s].user, username, 128)) + return s; + + } + return 0; // Not found. } void send_garp(ipt ip) { int s; struct ifreq ifr; - unsigned char mac[6]; + u8 mac[6]; s = socket(PF_INET, SOCK_DGRAM, 0); if (s < 0) @@ -487,20 +619,6 @@ sessionidt sessionidtbysessiont(sessiont *s) return val; } -// send gratuitous ARP to set ARP table for newly allocated IP -void sessionsendarp(sessionidt s) -{ - unsigned char mac[6]; -#ifdef STAT_CALLS - STAT(call_sendarp); -#endif - *(u16 *) (mac + 0) = htons(tapmac[0]); // set source address - *(u16 *) (mac + 2) = htons(tapmac[1]); - *(u16 *) (mac + 4) = htons(tapmac[2]); - sendarp(tapidx, mac, session[s].ip); - STAT(arp_sent); -} - // Handle ARP requests void processarp(u8 * buf, int len) { @@ -553,27 +671,27 @@ void processarp(u8 * buf, int len) STAT(arp_errors); return; } - ip = ntohl(*(u32 *) (buf + 42)); + ip = *(u32 *) (buf + 42); // look up session - s = sessionbyip(htonl(ip)); + s = sessionbyip(ip); if (s) { - log(3, ip, s, session[s].tunnel, "ARP reply for %u.%u.%u.%u\n", ip >> 24, ip >> 16 & 255, ip >> 8 & 255, ip & 255); + log(3, ntohl(ip), s, session[s].tunnel, "ARP reply for %s\n", inet_toa(ip)); memcpy(buf + 4, buf + 10, 6); // set destination as source - *(u16 *) (buf + 10) = htons(tapmac[0]); // set soucre address + *(u16 *) (buf + 10) = htons(tapmac[0]); // set source address *(u16 *) (buf + 12) = htons(tapmac[1]); *(u16 *) (buf + 14) = htons(tapmac[2]); *(u16 *) (buf + 24) = htons(0x0002); // ARP reply memcpy(buf + 26, buf + 10, 6); // sender ethernet memcpy(buf + 36, buf + 4, 6); // target ethernet *(u32 *) (buf + 42) = *(u32 *) (buf + 32); // target IP - *(u32 *) (buf + 32) = htonl(ip); // sender IP + *(u32 *) (buf + 32) = ip; // sender IP write(tapfd, buf, len); STAT(arp_replies); } else { - log(3, ip, 0, 0, "ARP request for unknown IP %u.%u.%u.%u\n", ip >> 24, ip >> 16 & 255, ip >> 8 & 255, ip & 255); + log(3, ntohl(ip), 0, 0, "ARP request for unknown IP %s\n", inet_toa(ip)); STAT(arp_discarded); } } @@ -636,14 +754,28 @@ void tunnelsend(u8 * buf, u16 l, tunnelidt t) INC_STAT(tunnel_tx_bytes, l); } +// +// Tiny helper function to write data to +// the 'tun' device. +// +int tun_write(u8 * data, int size) +{ + return write(tapfd, data, size); +} + // process outgoing (to tunnel) IP +// void processipout(u8 * buf, int len) { sessionidt s; sessiont *sp; tunnelidt t; ipt ip; - u8 b[MAXETHER]; + + char * data = buf; // Keep a copy of the originals. + int size = len; + + u8 b[MAXETHER + 20]; #ifdef STAT_CALLS STAT(call_processipout); #endif @@ -653,6 +785,12 @@ void processipout(u8 * buf, int len) STAT(tunnel_tx_errors); return; } + if (len >= MAXETHER) + { + log(1, 0, 0, 0, "Oversize IP packet %d bytes\n", len); + STAT(tunnel_tx_errors); + return; + } // Skip the tun header buf += 4; @@ -668,27 +806,112 @@ void processipout(u8 * buf, int len) ip = *(u32 *)(buf + 16); if (!(s = sessionbyip(ip))) { - log(4, 0, 0, 0, "IP: Sending ICMP host unreachable to %s\n", inet_toa(*(u32 *)(buf + 12))); - host_unreachable(*(u32 *)(buf + 12), *(u16 *)(buf + 4), ip, buf, (len < 64) ? 64 : len); + // Is this a packet for a session that doesn't exist? + static int rate = 0; // Number of ICMP packets we've sent this second. + static int last = 0; // Last time we reset the ICMP packet counter 'rate'. + + if (last != time_now) { + last = time_now; + rate = 0; + } + + if (rate++ < config->icmp_rate) // Only send a max of icmp_rate per second. + { + log(4, 0, 0, 0, "IP: Sending ICMP host unreachable to %s\n", inet_toa(*(u32 *)(buf + 12))); + host_unreachable(*(u32 *)(buf + 12), *(u16 *)(buf + 4), ip, buf, (len < 64) ? 64 : len); + } return; } t = session[s].tunnel; sp = &session[s]; - // Snooping this session, send it to ASIO - if (sp->snoop) snoop_send_packet(buf, len); + if (sp->tbf_out) + { + // Are we throttling this session? + if (config->cluster_iam_master) + tbf_queue_packet(sp->tbf_out, data, size); + else + master_throttle_packet(sp->tbf_out, data, size); + return; + } + else if (sp->walled_garden && !config->cluster_iam_master) + { + // We are walled-gardening this + master_garden_packet(s, data, size); + return; + } + + // Snooping this session, send it to intercept box + if (sp->snoop_ip && sp->snoop_port) + snoop_send_packet(buf, len, sp->snoop_ip, sp->snoop_port); + + log(5, session[s].ip, s, t, "Ethernet -> Tunnel (%d bytes)\n", len); + + // Add on L2TP header + { + u8 *p = makeppp(b, sizeof(b), buf, len, t, s, PPPIP); + if (!p) { + log(3, session[s].ip, s, t, "failed to send packet in processipout.\n"); + return; + } + tunnelsend(b, len + (p-b), t); // send it... + } + + sp->cout += len; // byte count + sp->total_cout += len; // byte count + sp->pout++; + udp_tx += len; + sess_count[s].cout += len; // To send to master.. +} + +// +// Helper routine for the TBF filters. +// Used to send queued data in to the user! +// +void send_ipout(sessionidt s, u8 *buf, int len) +{ + sessiont *sp; + tunnelidt t; + ipt ip; + + u8 b[MAXETHER + 20]; + + if (len < 0 || len > MAXETHER) { + log(1,0,0,0, "Odd size IP packet: %d bytes\n", len); + return; + } + + // Skip the tun header + buf += 4; + len -= 4; + + ip = *(u32 *)(buf + 16); + + if (!session[s].ip) + return; + t = session[s].tunnel; + sp = &session[s]; log(5, session[s].ip, s, t, "Ethernet -> Tunnel (%d bytes)\n", len); + // Snooping this session, send it to ASIO + if (sp->snoop_ip && sp->snoop_port) + snoop_send_packet(buf, len, sp->snoop_ip, sp->snoop_port); + // Add on L2TP header { - u8 *p = makeppp(b, buf, len, t, s, PPPIP); + u8 *p = makeppp(b, sizeof(b), buf, len, t, s, PPPIP); + if (!p) { + log(3, session[s].ip, s, t, "failed to send packet in send_ipout.\n"); + return; + } tunnelsend(b, len + (p-b), t); // send it... - sp->cout += len; // byte count - sp->total_cout += len; // byte count - sp->pout++; - udp_tx += len; } + sp->cout += len; // byte count + sp->total_cout += len; // byte count + sp->pout++; + udp_tx += len; + sess_count[s].cout += len; // To send to master.. } // add an AVP (16 bit) @@ -755,11 +978,13 @@ controlt *controlnew(u16 mtype) } // send zero block if nothing is waiting +// (ZLB send). void controlnull(tunnelidt t) { u8 buf[12]; - if (tunnel[t].controlc) + if (tunnel[t].controlc) // Messages queued; They will carry the ack. return; + *(u16 *) (buf + 0) = htons(0xC802); // flags/ver *(u16 *) (buf + 2) = htons(12); // length *(u16 *) (buf + 4) = htons(tunnel[t].far); // tunnel @@ -792,6 +1017,61 @@ void controladd(controlt * c, tunnelidt t, sessionidt s) } } +// +// Throttle or Unthrottle a session +// +// Throttle the data folling through a session +// to be no more than 'throttle' kbit/sec each way. +// +int throttle_session(sessionidt s, int throttle) +{ + if (!session[s].tunnel) + return 0; // No-one home. + + if (!*session[s].user) + return 0; // User not logged in + + if (throttle) { + if (session[s].tbf_in || session[s].tbf_out) { + if (throttle == session[s].throttle) + return 1; + + // Currently throttled but the rate is changing. + + free_tbf(session[s].tbf_in); + free_tbf(session[s].tbf_out); + } + + session[s].tbf_in = new_tbf(s, throttle*1024/4, throttle*1024/8, send_ipin); + session[s].tbf_out = new_tbf(s, throttle*1024/4, throttle*1024/8, send_ipout); + + if (throttle != session[s].throttle) { // Changed. Flood to slaves. + session[s].throttle = throttle; + cluster_send_session(s); + } + + return 1; + } + + // else Unthrottling. + + if (!session[s].tbf_in && !session[s].tbf_out && !session[s].throttle) + return 0; + + free_tbf(session[s].tbf_in); + session[s].tbf_in = 0; + + free_tbf(session[s].tbf_out); + session[s].tbf_out = 0; + + if (throttle != session[s].throttle) { // Changed. Flood to slaves. + session[s].throttle = throttle; + cluster_send_session(s); + } + + return 0; +} + // start tidy shutdown of session void sessionshutdown(sessionidt s, char *reason) { @@ -840,21 +1120,23 @@ void sessionshutdown(sessionidt s, char *reason) if (session[s].ip) { // IP allocated, clear and unroute - u16 r; - if (session[s].route[0].ip) + int r; + for (r = 0; r < MAXROUTE && session[s].route[r].ip; r++) { - routeset(session[s].ip, 0, 0, 0); - for (r = 0; r < MAXROUTE; r++) - { - if (session[s].route[r].ip) - { - routeset(session[s].route[r].ip, session[s].route[r].mask, session[s].ip, 0); - session[s].route[r].ip = 0; - } - } + routeset(s, session[s].route[r].ip, session[s].route[r].mask, session[s].ip, 0); + session[s].route[r].ip = 0; } - if (session[s].throttle) throttle_session(s, 0); session[s].throttle = 0; - free_ip_address(s); + + if (session[s].ip_pool_index == -1) // static ip + { + routeset(s, session[s].ip, 0, 0, 0); // Delete route. + session[s].ip = 0; + } + else + free_ip_address(s); + + if (session[s].throttle) // Unthrottle if throttled. + throttle_session(s, 0); } { // Send CDN controlt *c = controlnew(14); // sending CDN @@ -886,14 +1168,21 @@ void sendipcp(tunnelidt t, sessionidt s) sessionshutdown(s, "No reply on IPCP"); return; } - q = makeppp(buf, 0, 0, t, s, PPPIPCP); + + q = makeppp(buf,sizeof(buf), 0, 0, t, s, PPPIPCP); + if (!q) { + log(3, session[s].ip, s, t, "failed to send packet in sendipcp.\n"); + return; + } + *q = ConfigReq; q[1] = r << RADIUS_SHIFT; // ID, dont care, we only send one type of request *(u16 *) (q + 2) = htons(10); q[4] = 3; q[5] = 6; - *(u32 *) (q + 6) = htonl(myip ? myip : session[s].ip); // send my IP (use theirs if I dont have one) + *(u32 *) (q + 6) = config->bind_address; // send my IP tunnelsend(buf, 10 + (q - buf), t); // send it + session[s].flags &= ~SF_IPCP_ACKED; // Clear flag. } // kill a session now @@ -905,8 +1194,12 @@ void sessionkill(sessionidt s, char *reason) sessionshutdown(s, reason); // close radius/routes, etc. if (session[s].radius) radiusclear(session[s].radius, 0); // cant send clean accounting data, session is killed - log(2, 0, s, session[s].tunnel, "Kill session %d: %s\n", s, reason); + log(2, 0, s, session[s].tunnel, "Kill session %d (%s): %s\n", s, session[s].user, reason); + + throttle_session(s, 0); // Force session to be un-throttle. Free'ing TBF structures. + memset(&session[s], 0, sizeof(session[s])); + session[s].tunnel = T_FREE; // Mark it as free. session[s].next = sessionfree; sessionfree = s; cluster_send_session(s); @@ -1025,12 +1318,6 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr) STAT(tunnel_rx_errors); return; } - if (s && !session[s].tunnel) - { - log(1, ntohl(addr->sin_addr.s_addr), s, t, "UDP packet contains session %d but no session[%d].tunnel exists (LAC said tunnel = %d). Dropping packet.\n", s, s, t); - STAT(tunnel_rx_errors); - return; - } if (*buf & 0x08) { // ns/nr ns = ntohs(*(u16 *) p); @@ -1063,17 +1350,39 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr) int requestchap = 0; // do we request PAP instead of original CHAP request? char called[MAXTEL] = ""; // called number char calling[MAXTEL] = ""; // calling number + + if (!config->cluster_iam_master) { + master_forward_packet(buf, len, addr->sin_addr.s_addr, addr->sin_port); + return; + } + if ((*buf & 0xCA) != 0xC8) { log(1, ntohl(addr->sin_addr.s_addr), s, t, "Bad control header %02X\n", *buf); STAT(tunnel_rx_errors); return; } - log(3, ntohl(addr->sin_addr.s_addr), s, t, "Control message (%d bytes): %d ns %d nr %d ns %d nr %d\n", + log(3, ntohl(addr->sin_addr.s_addr), s, t, "Control message (%d bytes): (unacked %d) l-ns %d l-nr %d r-ns %d r-nr %d\n", l, tunnel[t].controlc, tunnel[t].ns, tunnel[t].nr, ns, nr); // if no tunnel specified, assign one if (!t) { + int i; + + // + // Is this a duplicate of the first packet? (SCCRQ) + // + for ( i = 1; i <= config->cluster_highest_tunnelid ; ++i) { + if (tunnel[t].state != TUNNELOPENING || + tunnel[t].ip != ntohl(*(ipt *) & addr->sin_addr) || + tunnel[t].port != ntohs(addr->sin_port) ) + continue; + t = i; + break; + } + } + + if (!t) { if (!(t = new_tunnel())) { log(1, ntohl(addr->sin_addr.s_addr), 0, 0, "No more tunnels\n"); @@ -1094,29 +1403,36 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr) // check sequence of this message { int skip = tunnel[t].window; // track how many in-window packets are still in queue - if (tunnel[t].controlc) - { // some to clear maybe - while (tunnel[t].controlc && (((tunnel[t].ns - tunnel[t].controlc) - nr) & 0x8000)) - { - controlt *c = tunnel[t].controls; - tunnel[t].controls = c->next; - tunnel[t].controlc--; - c->next = controlfree; - controlfree = c; - skip--; - tunnel[t].try = 0; // we have progress - } + // some to clear maybe? + while (tunnel[t].controlc && (((tunnel[t].ns - tunnel[t].controlc) - nr) & 0x8000)) + { + controlt *c = tunnel[t].controls; + tunnel[t].controls = c->next; + tunnel[t].controlc--; + c->next = controlfree; + controlfree = c; + skip--; + tunnel[t].try = 0; // we have progress } - if (tunnel[t].nr < ns && tunnel[t].nr != 0) + + // If the 'ns' just received is not the 'nr' we're + // expecting, just send an ack and drop it. + // + // if 'ns' is less, then we got a retransmitted packet. + // if 'ns' is greater than missed a packet. Either way + // we should ignore it. + if (ns != tunnel[t].nr) { // is this the sequence we were expecting? - log(1, ntohl(addr->sin_addr.s_addr), 0, t, " Out of sequence tunnel %d, (%d not %d)\n", t, ns, tunnel[t].nr); + log(1, ntohl(addr->sin_addr.s_addr), 0, t, " Out of sequence tunnel %d, (%d is not the expected %d)\n", t, ns, tunnel[t].nr); STAT(tunnel_rx_errors); -// controlnull(t); + + if (l) // Is this not a ZLB? + controlnull(t); return; } // receiver advance (do here so quoted correctly in any sends below) - if (l) tunnel[t].nr++; + if (l) tunnel[t].nr = (ns + 1); if (skip < 0) skip = 0; if (skip < tunnel[t].controlc) { @@ -1323,7 +1639,7 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr) memcpy(tmp, b, (n >= 30) ? 30 : n); session[s].tx_connect_speed = atol(tmp); } - log(4, ntohl(addr->sin_addr.s_addr), s, t, " TX connect speed <%lu>\n", + log(4, ntohl(addr->sin_addr.s_addr), s, t, " TX connect speed <%u>\n", session[s].tx_connect_speed); break; case 38: // rx connect speed @@ -1338,7 +1654,7 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr) memcpy(tmp, b, (n >= 30) ? 30 : n); session[s].rx_connect_speed = atol(tmp); } - log(4, ntohl(addr->sin_addr.s_addr), s, t, " RX connect speed <%lu>\n", + log(4, ntohl(addr->sin_addr.s_addr), s, t, " RX connect speed <%u>\n", session[s].rx_connect_speed); break; case 25: // Physical Channel ID @@ -1493,10 +1809,14 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr) sessionfree = session[s].next; memset(&session[s], 0, sizeof(session[s])); + if (s > config->cluster_highest_sessionid) + config->cluster_highest_sessionid = s; + // make a RADIUS session if (!(r = radiusnew(s))) { log(1, ntohl(addr->sin_addr.s_addr), s, t, "No free RADIUS sessions for ICRQ\n"); +// sessionkill(s, "no free RADIUS sesions"); return; } @@ -1526,7 +1846,7 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr) break; case 12: // ICCN session[s].magic = amagic; // set magic number - session[s].flags = aflags; // set flags received + session[s].l2tp_flags = aflags; // set flags received log(3, ntohl(addr->sin_addr.s_addr), s, t, "Magic %X Flags %X\n", amagic, aflags); controlnull(t); // ack // In CHAP state, request PAP instead @@ -1561,11 +1881,6 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr) u16 prot; log_hex(5, "Receive Tunnel Data", p, l); - if (session[s].die) - { - log(3, ntohl(addr->sin_addr.s_addr), s, t, "Session %d is closing. Don't process PPP packets\n", s); - return; // closing session, PPP not processed - } if (l > 2 && p[0] == 0xFF && p[1] == 0x03) { // HDLC address header, discard p += 2; @@ -1588,34 +1903,83 @@ void processudp(u8 * buf, int len, struct sockaddr_in *addr) p += 2; l -= 2; } + + if (s && !session[s].tunnel) // Is something wrong?? + { + if (!config->cluster_iam_master) + { + // Pass it off to the master to deal with.. + master_forward_packet(buf, len, addr->sin_addr.s_addr, addr->sin_port); + return; + } + + + log(1, ntohl(addr->sin_addr.s_addr), s, t, "UDP packet contains session %d " + "but no session[%d].tunnel exists (LAC said" + " tunnel = %d). Dropping packet.\n", s, s, t); + STAT(tunnel_rx_errors); + return; + } + + if (session[s].die) + { + log(3, ntohl(addr->sin_addr.s_addr), s, t, "Session %d is closing. Don't process PPP packets\n", s); +// I'm pretty sure this isn't right -- mo. +// return; // closing session, PPP not processed + } if (prot == PPPPAP) { session[s].last_packet = time_now; + if (!config->cluster_iam_master) { master_forward_packet(buf, len, addr->sin_addr.s_addr, addr->sin_port); return; } processpap(t, s, p, l); } else if (prot == PPPCHAP) { session[s].last_packet = time_now; + if (!config->cluster_iam_master) { master_forward_packet(buf, len, addr->sin_addr.s_addr, addr->sin_port); return; } processchap(t, s, p, l); } else if (prot == PPPLCP) { session[s].last_packet = time_now; + if (!config->cluster_iam_master) { master_forward_packet(buf, len, addr->sin_addr.s_addr, addr->sin_port); return; } processlcp(t, s, p, l); } else if (prot == PPPIPCP) { session[s].last_packet = time_now; + if (!config->cluster_iam_master) { master_forward_packet(buf, len, addr->sin_addr.s_addr, addr->sin_port); return; } processipcp(t, s, p, l); } else if (prot == PPPCCP) { session[s].last_packet = time_now; + if (!config->cluster_iam_master) { master_forward_packet(buf, len, addr->sin_addr.s_addr, addr->sin_port); return; } processccp(t, s, p, l); } else if (prot == PPPIP) { - session[s].last_packet = time_now; + if (!config->cluster_iam_master) + { + // We're a slave. Should we forward this packet to the master? + + // Is this a walled garden session, or something that needs it's + // idle time updated?? + + // Maintain the idle timeouts on the master. If this would + // significantly reset the idletimeout, run it via the master + // to refresh the master's idle timer. + // Not sure this is ideal: It may re-order packets. + + if (session[s].walled_garden || (session[s].last_packet + (ECHO_TIMEOUT/2)) < time_now) + { + master_forward_packet(buf, len, addr->sin_addr.s_addr, addr->sin_port); + session[s].last_packet = time_now; + return; + } + // fall through to processipin. + } else + session[s].last_packet = time_now; processipin(t, s, p, l); } else @@ -1647,65 +2011,291 @@ void processtap(u8 * buf, int len) processarp(buf, len); else if (*(u16 *) (buf + 2) == htons(PKTIP)) // IP processipout(buf, len); + // Else discard. } -// main loop - gets packets on tap or udp and processes them -void mainloop(void) +// +// Maximum number of actions to complete. +// This is to avoid sending out too many packets +// at once. +#define MAX_ACTIONS 500 + +int regular_cleanups(void) { - fd_set cr; - int cn, i; - u8 buf[65536]; - struct timeval to; + static sessionidt s = 0; // Next session to check for actions on. + tunnelidt t; + int count=0,i; + u16 r; + static clockt next_acct = 0; - clockt slow = now(); // occasional functions like session/tunnel expiry, tunnel hello, etc - clockt next_acct = slow + ACCT_TIME; - clockt next_cluster_ping = slow + 50; - clockt next_clean = time_now + config->cleanup_interval; - to.tv_sec = 1; - to.tv_usec = 0; - log(4, 0, 0, 0, "Beginning of main loop. udpfd=%d, tapfd=%d, cluster_sockfd=%d, controlfd=%d\n", - udpfd, tapfd, cluster_sockfd, controlfd); + log(3, 0, 0, 0, "Begin regular cleanup\n"); - FD_ZERO(&cr); - FD_SET(udpfd, &cr); - FD_SET(tapfd, &cr); - FD_SET(controlfd, &cr); - FD_SET(clifd, &cr); - if (cluster_sockfd) FD_SET(cluster_sockfd, &cr); - cn = udpfd; - if (cn < tapfd) cn = tapfd; - if (cn < controlfd) cn = controlfd; - if (cn < clifd) cn = clifd; - if (cn < cluster_sockfd) cn = cluster_sockfd; - for (i = 0; i < config->num_radfds; i++) + for (r = 1; r < MAXRADIUS; r++) { - if (!radfds[i]) continue; - FD_SET(radfds[i], &cr); - if (radfds[i] > cn) - cn = radfds[i]; + if (!radius[r].state) + continue; + if (radius[r].retry) + { + if (radius[r].retry <= config->current_time) + radiusretry(r); + } else + radius[r].retry = backoff(radius[r].try+1); // Is this really needed? --mo } - - while (!main_quit) + for (t = 1; t < config->cluster_highest_tunnelid; t++) { - fd_set r; - int n = cn; - - if (config->reload_config) + // check for expired tunnels + if (tunnel[t].die && tunnel[t].die <= config->current_time) { - // Update the config state based on config settings - update_config(); + STAT(tunnel_timeout); + tunnelkill(t, "Expired"); + continue; } - - memcpy(&r, &cr, sizeof(fd_set)); - n = select(n + 1, &r, 0, 0, &to); - if (n < 0) + // check for message resend + if (tunnel[t].retry && tunnel[t].controlc) { - if (errno != EINTR) + // resend pending messages as timeout on reply + if (tunnel[t].retry <= config->current_time) { - perror("select"); - exit( -1); + controlt *c = tunnel[t].controls; + u8 w = tunnel[t].window; + tunnel[t].try++; // another try + if (tunnel[t].try > 5) + tunnelkill(t, "Timeout on control message"); // game over + else + while (c && w--) + { + tunnelsend(c->buf, c->length, t); + c = c->next; + } } } + // Send hello + if (tunnel[t].state == TUNNELOPEN && tunnel[t].lastrec < config->current_time + 600) + { + controlt *c = controlnew(6); // sending HELLO + controladd(c, t, 0); // send the message + log(3, tunnel[t].ip, 0, t, "Sending HELLO message\n"); + } + } + + // Check for sessions that have been killed from the CLI + if (cli_session_kill[0]) + { + int i; + for (i = 0; i < MAXSESSION && cli_session_kill[i]; i++) + { + log(2, 0, cli_session_kill[i], 0, "Dropping session by CLI\n"); + sessionshutdown(cli_session_kill[i], "Requested by administrator"); + cli_session_kill[i] = 0; + } + } + // Check for tunnels that have been killed from the CLI + if (cli_tunnel_kill[0]) + { + int i; + for (i = 1; i < MAXTUNNEL && cli_tunnel_kill[i]; i++) + { + log(2, 0, cli_tunnel_kill[i], 0, "Dropping tunnel by CLI\n"); + tunnelshutdown(cli_tunnel_kill[i], "Requested by administrator"); + cli_tunnel_kill[i] = 0; + } + } + + count = 0; + for (i = 1; i < config->cluster_highest_sessionid; i++) + { + + s++; + if (s >= config->cluster_highest_sessionid) + s = 1; + + if (!session[s].tunnel) // Session isn't in use + continue; + + if (!session[s].die && session[s].ip && !(session[s].flags & SF_IPCP_ACKED) ) + { + // IPCP has not completed yet. Resend + log(3, session[s].ip, s, session[s].tunnel, "No ACK for initial IPCP ConfigReq... resending\n"); + sendipcp(session[s].tunnel, s); + } + + // check for expired sessions + if (session[s].die && session[s].die <= config->current_time) + { + sessionkill(s, "Expired"); + if (++count >= MAX_ACTIONS) break; + continue; + } + + // Drop sessions who have not responded within IDLE_TIMEOUT seconds + if (session[s].last_packet && (time_now - session[s].last_packet >= IDLE_TIMEOUT)) + { + sessionkill(s, "No response to LCP ECHO requests"); + STAT(session_timeout); + if (++count >= MAX_ACTIONS) break; + continue; + } + + // No data in IDLE_TIMEOUT seconds, send LCP ECHO + if (session[s].user[0] && (time_now - session[s].last_packet >= ECHO_TIMEOUT)) + { + u8 b[MAXCONTROL] = {0}; + + u8 *q = makeppp(b, sizeof(b), 0, 0, session[s].tunnel, s, PPPLCP); + if (!q) { + log(3, session[s].ip, s, t, "failed to send ECHO packet.\n"); + continue; + } + + *q = EchoReq; + *(u8 *)(q + 1) = (time_now % 255); // ID + *(u16 *)(q + 2) = htons(8); // Length + *(u32 *)(q + 4) = 0; // Magic Number (not supported) + + log(4, session[s].ip, s, session[s].tunnel, "No data in %d seconds, sending LCP ECHO\n", + (int)(time_now - session[s].last_packet)); + tunnelsend(b, 24, session[s].tunnel); // send it + if (++count >= MAX_ACTIONS) break; + continue; + } + } + if (config->accounting_dir && next_acct <= config->current_time) + { + // Dump accounting data + next_acct = config->current_time + ACCT_TIME; + dump_acct_info(); + } + + if (count >= MAX_ACTIONS) + return 1; // Didn't finish! + + log(3, 0, 0, 0, "End regular cleanup (%d actions), next in %d seconds\n", count, config->cleanup_interval); + return 0; +} + + +// +// Are we in the middle of a tunnel update, or radius +// requests?? +// +int still_busy(void) +{ + int i; + static int last_talked = 0; + for (i = config->cluster_highest_tunnelid ; i > 0 ; --i) { + if (!tunnel[i].controlc) + continue; + + if (last_talked != config->current_time) { + log(2,0,0,0, "Tunnel %d still has an-acked control messages.\n", i); + last_talked = config->current_time; + } + return 1; + } + + for (i = 1; i < MAXRADIUS; i++) + { + if (radius[i].state == RADIUSNULL) + continue; + if (radius[i].state == RADIUSWAIT) + continue; + + if (last_talked != config->current_time) { + log(2,0,0,0, "Radius session %d is still busy (sid %d)\n", i, radius[i].session); + last_talked = config->current_time; + } + return 1; + } + + return 0; +} + +// main loop - gets packets on tap or udp and processes them +void mainloop(void) +{ + fd_set cr; + int cn, i; + u8 buf[65536]; + struct timeval to; + time_t next_cluster_ping = 0; // default 1 second pings. + clockt next_clean = time_now + config->cleanup_interval; + + log(4, 0, 0, 0, "Beginning of main loop. udpfd=%d, tapfd=%d, cluster_sockfd=%d, controlfd=%d\n", + udpfd, tapfd, cluster_sockfd, controlfd); + + FD_ZERO(&cr); + FD_SET(udpfd, &cr); + FD_SET(tapfd, &cr); + FD_SET(controlfd, &cr); + FD_SET(clifd, &cr); + if (cluster_sockfd) FD_SET(cluster_sockfd, &cr); + cn = udpfd; + if (cn < tapfd) cn = tapfd; + if (cn < controlfd) cn = controlfd; + if (cn < clifd) cn = clifd; + if (cn < cluster_sockfd) cn = cluster_sockfd; + for (i = 0; i < config->num_radfds; i++) + { + if (!radfds[i]) continue; + FD_SET(radfds[i], &cr); + if (radfds[i] > cn) + cn = radfds[i]; + } + + while (!main_quit || still_busy()) + { + fd_set r; + int n = cn; +#ifdef BGP + fd_set w; + int bgp_set[BGP_NUM_PEERS]; +#endif /* BGP */ + + if (config->reload_config) + { + // Update the config state based on config settings + update_config(); + } + + memcpy(&r, &cr, sizeof(fd_set)); + to.tv_sec = 0; + to.tv_usec = 100000; // 1/10th of a second. + +#ifdef BGP + FD_ZERO(&w); + for (i = 0; i < BGP_NUM_PEERS; i++) + { + bgp_set[i] = bgp_select_state(&bgp_peers[i]); + if (bgp_set[i] & 1) + { + FD_SET(bgp_peers[i].sock, &r); + if (bgp_peers[i].sock > n) + n = bgp_peers[i].sock; + } + + if (bgp_set[i] & 2) + { + FD_SET(bgp_peers[i].sock, &w); + if (bgp_peers[i].sock > n) + n = bgp_peers[i].sock; + } + } + + n = select(n + 1, &r, &w, 0, &to); +#else /* BGP */ + n = select(n + 1, &r, 0, 0, &to); +#endif /* BGP */ + + config->current_time = now(); + if (n < 0) + { + if (errno == EINTR) + continue; + + log(0, 0, 0, 0, "Error returned from select(): %s\n", strerror(errno)); + main_quit++; + break; + } else if (n) { struct sockaddr_in addr; @@ -1735,8 +2325,11 @@ void mainloop(void) for (i = 0; i < config->num_radfds; i++) if (FD_ISSET(radfds[i], &r)) processrad(buf, recv(radfds[i], buf, sizeof(buf), 0), i); - if (FD_ISSET(cluster_sockfd, &r)) - processcluster(buf, recvfrom(cluster_sockfd, buf, sizeof(buf), MSG_WAITALL, (void *) &addr, &alen)); + if (FD_ISSET(cluster_sockfd, &r)) { + int size; + size = recvfrom(cluster_sockfd, buf, sizeof(buf), MSG_WAITALL, (void *) &addr, &alen); + processcluster(buf, size, addr.sin_addr.s_addr); + } if (FD_ISSET(controlfd, &r)) processcontrol(buf, recvfrom(controlfd, buf, sizeof(buf), MSG_WAITALL, (void *) &addr, &alen), &addr); if (FD_ISSET(clifd, &r)) @@ -1758,152 +2351,65 @@ void mainloop(void) } } - /* Handle timeouts. Make sure that this gets run anyway, even if there was - * something to read, else under load this will never actually run.... - */ - if (n == 0 || next_clean <= time_now) { - clockt when = now(); - clockt best = when + 100; // default timeout - sessionidt s; - tunnelidt t; - int count; - u16 r; - - log(3, 0, 0, 0, "Begin regular cleanup\n"); - for (r = 1; r < MAXRADIUS; r++) - { - if (radius[r].state && radius[r].retry) - { - if (radius[r].retry <= when) - radiusretry(r); - if (radius[r].retry && radius[r].retry < best) - best = radius[r].retry; - } - else if (radius[r].state && !radius[r].retry) - radius[r].retry = backoff(radius[r].try+1); - } - for (t = 1; t < MAXTUNNEL; t++) - { - // check for expired tunnels - if (tunnel[t].die && tunnel[t].die <= when) - { - STAT(tunnel_timeout); - tunnelkill(t, "Expired"); - continue; - } - // check for message resend - if (tunnel[t].retry && tunnel[t].controlc) - { - // resend pending messages as timeout on reply - if (tunnel[t].retry <= when) - { - controlt *c = tunnel[t].controls; - u8 w = tunnel[t].window; - tunnel[t].try++; // another try - if (tunnel[t].try > 5) - tunnelkill(t, "Timeout on control message"); // game over - else - while (c && w--) - { - tunnelsend(c->buf, c->length, t); - c = c->next; - } - } - if (tunnel[t].retry && tunnel[t].retry < best) - best = tunnel[t].retry; - } - // Send hello - if (tunnel[t].state == TUNNELOPEN && tunnel[t].lastrec < when + 600) - { - controlt *c = controlnew(6); // sending HELLO - controladd(c, t, 0); // send the message - log(3, tunnel[t].ip, 0, t, "Sending HELLO message\n"); - } - } - - // Check for sessions that have been killed from the CLI - if (cli_session_kill[0]) - { - int i; - for (i = 0; i < MAXSESSION && cli_session_kill[i]; i++) - { - log(2, 0, cli_session_kill[i], 0, "Dropping session by CLI\n"); - sessionshutdown(cli_session_kill[i], "Requested by administrator"); - cli_session_kill[i] = 0; - } - } - // Check for tunnels that have been killed from the CLI - if (cli_tunnel_kill[0]) - { - int i; - for (i = 1; i < MAXTUNNEL && cli_tunnel_kill[i]; i++) - { - log(2, 0, cli_tunnel_kill[i], 0, "Dropping tunnel by CLI\n"); - tunnelshutdown(cli_tunnel_kill[i], "Requested by administrator"); - cli_tunnel_kill[i] = 0; - } - } + // Runs on every machine (master and slaves). + if (cluster_sockfd && next_cluster_ping <= time_now) + { + // Check to see which of the cluster is still alive.. + next_cluster_ping = time_now + 1; + cluster_send_ping(basetime); - count = 0; - for (s = 1; s < MAXSESSION; s++) - { - // check for expired sessions - if (session[s].die && session[s].die <= when) - { - sessionkill(s, "Expired"); - if (++count >= 1000) break; - continue; - } + cluster_check_master(); - // Drop sessions who have not responded within IDLE_TIMEOUT seconds - if (session[s].last_packet && (time_now - session[s].last_packet >= IDLE_TIMEOUT)) - { - sessionkill(s, "No response to LCP ECHO requests"); - STAT(session_timeout); - if (++count >= 1000) break; - continue; - } + cluster_heartbeat(config->cluster_highest_sessionid, sessionfree, config->cluster_highest_tunnelid); // Only does anything if we're a master. + master_update_counts(); // If we're a slave, send our byte counters to our master. + } - // No data in IDLE_TIMEOUT seconds, send LCP ECHO - if (session[s].user[0] && (time_now - session[s].last_packet >= ECHO_TIMEOUT)) - { - u8 b[MAXCONTROL] = {0}; - u8 *q = makeppp(b, 0, 0, session[s].tunnel, s, PPPLCP); - - *q = EchoReq; - *(u8 *)(q + 1) = (time_now % 255); // ID - *(u16 *)(q + 2) = htons(8); // Length - *(u32 *)(q + 4) = 0; // Magic Number (not supported) - - log(4, session[s].ip, s, session[s].tunnel, "No data in %d seconds, sending LCP ECHO\n", - (int)(time_now - session[s].last_packet)); - tunnelsend(b, 24, session[s].tunnel); // send it - if (++count >= 1000) break; - continue; - } - } - if (config->accounting_dir && next_acct <= when) - { - // Dump accounting data - next_acct = when + ACCT_TIME; - dump_acct_info(); + // Run token bucket filtering queue.. + // Only run it every 1/10th of a second. + // Runs on all machines both master and slave. + { + static clockt last_run = 0; + if (last_run != config->current_time) { + last_run = config->current_time; + tbf_run_timer(); } + } - if (cluster_sockfd && next_cluster_ping <= when) - { - // Dump accounting data - next_cluster_ping = when + 50; - cluster_send_message(config->cluster_address, config->bind_address, C_PING, hostname, strlen(hostname)); + /* Handle timeouts. Make sure that this gets run anyway, even if there was + * something to read, else under load this will never actually run.... + * + */ + if (config->cluster_iam_master && next_clean <= time_now) { + if (regular_cleanups()) { // Did it finish? + next_clean = time_now + 1 ; // Didn't finish. Check quickly. + } else { + next_clean = time_now + config->cleanup_interval; // Did. Move to next interval. } + } - if (best < when + config->cleanup_interval) - best = when + config->cleanup_interval; // Throttle to at most once per 10 seconds - next_clean = time_now + config->cleanup_interval; - to.tv_sec = config->cleanup_interval; - to.tv_usec = 0; - log(3, 0, 0, 0, "End regular cleanup, next in %d seconds\n", config->cleanup_interval); +#ifdef BGP + for (i = 0; i < BGP_NUM_PEERS; i++) + { + bgp_process(&bgp_peers[i], + bgp_set[i] ? FD_ISSET(bgp_peers[i].sock, &r) : 0, + bgp_set[i] ? FD_ISSET(bgp_peers[i].sock, &w) : 0); } +#endif /* BGP */ + } + + // Are we the master and shutting down?? + if (config->cluster_iam_master) { + + cluster_heartbeat(config->cluster_highest_sessionid, sessionfree, + config->cluster_highest_tunnelid); // Flush any queued changes.. } + + // Ok. Notify everyone we're shutting down. If we're + // the master, this will force an election. + cluster_send_ping(0); + + // + // Important!!! We MUST not process any packets past this point! } // Init data structures @@ -1938,6 +2444,14 @@ void initdata(void) log(0, 0, 0, 0, "Error doing mmap for sessions: %s\n", strerror(errno)); exit(1); } + + sess_count = mmap(NULL, sizeof(sessioncountt) * MAXSESSION, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0); + if (sess_count == MAP_FAILED) + { + log(0, 0, 0, 0, "Error doing mmap for sessions_count: %s\n", strerror(errno)); + exit(1); + } + radius = mmap(NULL, sizeof(radiust) * MAXRADIUS, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0); if (radius == MAP_FAILED) { @@ -1975,48 +2489,52 @@ void initdata(void) } memset(cli_tunnel_kill, 0, sizeof(tunnelidt) * MAXSESSION); - filter_buckets = mmap(NULL, sizeof(tbft) * MAXSESSION, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0); - if (filter_buckets == MAP_FAILED) - { - log(0, 0, 0, 0, "Error doing mmap for filter buckets: %s\n", strerror(errno)); - exit(1); - } - memset(filter_buckets, 0, sizeof(tbft) * MAXSESSION); - memset(tunnel, 0, sizeof(tunnelt) * MAXTUNNEL); memset(session, 0, sizeof(sessiont) * MAXSESSION); memset(radius, 0, sizeof(radiust) * MAXRADIUS); memset(ip_address_pool, 0, sizeof(ippoolt) * MAXIPPOOL); - for (i = 1; i < MAXSESSION - 1; i++) + + // Put all the sessions on the free list marked as undefined. + for (i = 1; i < MAXSESSION - 1; i++) { session[i].next = i + 1; + session[i].tunnel = T_UNDEF; // mark it as not filled in. + } session[MAXSESSION - 1].next = 0; sessionfree = 1; + + // Mark all the tunnels as undefined (waiting to be filled in by a download). + for (i = 1; i < MAXTUNNEL- 1; i++) { + tunnel[i].state = TUNNELUNDEF; // mark it as not filled in. + } + if (!*hostname) { - char *p; // Grab my hostname unless it's been specified gethostname(hostname, sizeof(hostname)); - { - struct hostent *h = gethostbyname(hostname); - if (h) - myip = ntohl(*(u32 *) h->h_addr); - } - - if ((p = strstr(hostname, ".optusnet.com.au"))) *p = 0; } _statistics->start_time = _statistics->last_reset = time(NULL); + +#ifdef BGP + bgp_peers = mmap(NULL, sizeof(struct bgp_peer) * BGP_NUM_PEERS, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0); + if (bgp_peers == MAP_FAILED) + { + log(0, 0, 0, 0, "Error doing mmap for bgp: %s\n", strerror(errno)); + exit(1); + } +#endif /* BGP */ } void initiptables(void) { /* Flush the tables here so that we have a clean slate */ - system("iptables -t nat -F l2tpns"); - system("iptables -t mangle -F l2tpns"); + +// Not needed. 'nat' is setup by garden.c +// mangle isn't used (as throttling is done by tbf inhouse). } int assign_ip_address(sessionidt s) { - unsigned i; + u32 i; int best = -1; clockt best_time = time_now; char *u = session[s].user; @@ -2025,7 +2543,7 @@ int assign_ip_address(sessionidt s) #ifdef STAT_CALLS STAT(call_assign_ip_address); #endif - for (i = 0; i < ip_pool_size; i++) + for (i = 1; i < ip_pool_size; i++) { if (!ip_address_pool[i].address || ip_address_pool[i].assigned) continue; @@ -2051,7 +2569,7 @@ int assign_ip_address(sessionidt s) return 0; } - session[s].ip = ntohl(ip_address_pool[best].address); + session[s].ip = ip_address_pool[best].address; session[s].ip_pool_index = best; ip_address_pool[best].assigned = 1; ip_address_pool[best].last = time_now; @@ -2075,40 +2593,150 @@ void free_ip_address(sessionidt s) { int i = session[s].ip_pool_index; -#ifdef STAT_CALLS - STAT(call_free_ip_address); -#endif - if (!session[s].ip) return; // what the? + if (i < 0) // Is this actually part of the ip pool? + i = 0; + STAT(ip_freed); - uncache_sessionid(session[s].ip); + cache_ipmap(session[s].ip, -i); // Change the mapping to point back to the ip pool index. session[s].ip = 0; ip_address_pool[i].assigned = 0; ip_address_pool[i].session = 0; ip_address_pool[i].last = time_now; + +#ifdef STAT_CALLS + STAT(call_free_ip_address); +#endif +} + +// +// Fsck the address pool against the session table. +// Normally only called when we become a master. +// +// This isn't perfect: We aren't keep tracking of which +// users used to have an IP address. +// +void rebuild_address_pool(void) +{ + int i; + + // + // Zero the IP pool allocation, and build + // a map from IP address to pool index. + for (i = 1; i < MAXIPPOOL; ++i) { + ip_address_pool[i].assigned = 0; + ip_address_pool[i].session = 0; + if (!ip_address_pool[i].address) + continue; + + cache_ipmap(ip_address_pool[i].address, -i); // Map pool IP to pool index. + } + + for (i = 0; i < MAXSESSION; ++i) { + int ipid; + if (!session[i].ip || !session[i].tunnel) + continue; + ipid = - lookup_ipmap(htonl(session[i].ip)); + + if (session[i].ip_pool_index < 0) { // Not allocated out of the pool. + if (ipid < 1) // Not found in the pool either? good. + continue; + + log(0, 0, i, 0, "Session %d has an IP address (%s) that was marked static, but is in the pool (%d)!", + i, inet_toa(session[i].ip), ipid); + + // Fall through and process it as part of the pool. + } + + + if (ipid > MAXIPPOOL || ipid < 0) { + log(0, 0, i, 0, "Session %d has a pool IP that's not found in the pool! (%d)\n", i, ipid); + ipid = -1; + session[i].ip_pool_index = ipid; + continue; + } + + ip_address_pool[ipid].assigned = 1; + ip_address_pool[ipid].session = i; + ip_address_pool[ipid].last = time_now; + strncpy(ip_address_pool[ipid].user, session[i].user, sizeof(ip_address_pool[ipid].user) - 1); + session[i].ip_pool_index = ipid; + cache_ipmap(session[i].ip, i); // Fix the ip map. + } +} + +// +// Fix the address pool to match a changed session. +// (usually when the master sends us an update). +void fix_address_pool(int sid) +{ + int ipid; + + ipid = session[sid].ip_pool_index; + + if (ipid > ip_pool_size) + return; // Ignore it. rebuild_address_pool will fix it up. + + if (ip_address_pool[ipid].address != session[sid].ip) + return; // Just ignore it. rebuild_address_pool will take care of it. + + ip_address_pool[ipid].assigned = 1; + ip_address_pool[ipid].session = sid; + ip_address_pool[ipid].last = time_now; + strncpy(ip_address_pool[ipid].user, session[sid].user, sizeof(ip_address_pool[ipid].user) - 1); +} + +// +// Add a block of addresses to the IP pool to hand out. +// +void add_to_ip_pool(u32 addr, u32 mask) +{ + int i; + if (mask == 0) + mask = 0xffffffff; // Host route only. + + addr &= mask; + + if (ip_pool_size >= MAXIPPOOL) // Pool is full! + return ; + + for (i = addr ;(i & mask) == addr; ++i) + { + if ((i & 0xff) == 0 || (i&0xff) == 255) + continue; // Skip 0 and broadcast addresses. + + ip_address_pool[ip_pool_size].address = i; + ip_address_pool[ip_pool_size].assigned = 0; + ++ip_pool_size; + if (ip_pool_size >= MAXIPPOOL) + { + log(0,0,0,0, "Overflowed IP pool adding %s\n", inet_toa(htonl(addr)) ); + return; + } + } } // Initialize the IP address pool void initippool() { FILE *f; - char *buf, *p; - int pi = 0; + char *p; + char buf[4096]; memset(ip_address_pool, 0, sizeof(ip_address_pool)); if (!(f = fopen(IPPOOLFILE, "r"))) { log(0, 0, 0, 0, "Can't load pool file " IPPOOLFILE ": %s\n", strerror(errno)); - exit(-1); + exit(1); } - buf = (char *)malloc(4096); - - while (pi < MAXIPPOOL && fgets(buf, 4096, f)) + while (ip_pool_size < MAXIPPOOL && fgets(buf, 4096, f)) { - char* pool = buf; + char *pool = buf; + buf[4095] = 0; // Force it to be zero terminated/ + if (*buf == '#' || *buf == '\n') continue; // Skip comments / blank lines if ((p = (char *)strrchr(buf, '\n'))) *p = 0; @@ -2120,7 +2748,7 @@ void initippool() if (src == INADDR_NONE) { log(0, 0, 0, 0, "Invalid address pool IP %s", buf); - exit(-1); + exit(1); } // This entry is for a specific IP only if (src != config->bind_address) @@ -2132,8 +2760,7 @@ void initippool() { // It's a range int numbits = 0; - unsigned long start = 0, end = 0, mask = 0, ip; - struct rtentry r; + u32 start = 0, mask = 0; log(2, 0, 0, 0, "Adding IP address range %s\n", buf); *p++ = 0; @@ -2142,51 +2769,35 @@ void initippool() log(0, 0, 0, 0, "Invalid pool range %s\n", buf); continue; } - start = end = ntohl(inet_addr(pool)); - mask = (unsigned long)(pow(2, numbits) - 1) << (32 - numbits); - start &= mask; - end = start + (int)(pow(2, (32 - numbits))) - 1; - for (ip = (start + 1); ip < end && pi < MAXIPPOOL; ip++) - { - if ((ip & 0xFF) == 0 || (ip & 0xFF) == 255) - continue; - ip_address_pool[pi++].address = htonl(ip); - } + start = ntohl(inet_addr(pool)); + mask = (u32)(pow(2, numbits) - 1) << (32 - numbits); // Add a static route for this pool - log(5, 0, 0, 0, "Adding route for address pool %s/%lu\n", inet_toa(htonl(start)), 32 + mask); - memset(&r, 0, sizeof(r)); - r.rt_dev = config->tapdevice; - r.rt_dst.sa_family = AF_INET; - *(u32 *) & (((struct sockaddr_in *) &r.rt_dst)->sin_addr.s_addr) = htonl(start); - r.rt_genmask.sa_family = AF_INET; - *(u32 *) & (((struct sockaddr_in *) &r.rt_genmask)->sin_addr.s_addr) = htonl(mask); - r.rt_flags = (RTF_UP | RTF_STATIC); - if (ioctl(ifrfd, SIOCADDRT, (void *) &r) < 0) - { - log(0, 0, 0, 0, "Error adding ip address pool route %s/%lu: %s\n", - inet_toa(start), mask, strerror(errno)); - } + log(5, 0, 0, 0, "Adding route for address pool %s/%u\n", inet_toa(htonl(start)), 32 + mask); + routeset(0, start, mask, 0, 1); + + add_to_ip_pool(start, mask); } else { // It's a single ip address - ip_address_pool[pi++].address = inet_addr(pool); + add_to_ip_pool(inet_addr(pool), 0); } } - - free(buf); fclose(f); - log(1, 0, 0, 0, "IP address pool is %d addresses\n", pi); - ip_pool_size = pi; + log(1, 0, 0, 0, "IP address pool is %d addresses\n", ip_pool_size - 1); } -void snoop_send_packet(char *packet, u16 size) +void snoop_send_packet(char *packet, u16 size, ipt destination, u16 port) { - if (!snoop_addr.sin_port || snoopfd <= 0 || size <= 0 || !packet) + if (!destination || !port || snoopfd <= 0 || size <= 0 || !packet) return; - log(5, 0, 0, 0, "Snooping packet at %p (%d bytes) to %s:%d\n", packet, size, inet_toa(snoop_addr.sin_addr.s_addr), htons(snoop_addr.sin_port)); + snoop_addr.sin_addr.s_addr = destination; + snoop_addr.sin_port = ntohs(port); + + log(5, 0, 0, 0, "Snooping packet at %p (%d bytes) to %s:%d\n", + packet, size, inet_toa(snoop_addr.sin_addr.s_addr), htons(snoop_addr.sin_port)); if (sendto(snoopfd, packet, size, MSG_DONTWAIT | MSG_NOSIGNAL, (void *) &snoop_addr, sizeof(snoop_addr)) < 0) log(0, 0, 0, 0, "Error sending intercept packet: %s\n", strerror(errno)); STAT(packets_snooped); @@ -2230,12 +2841,12 @@ void dump_acct_info() } log(4, 0, 0, 0, "Dumping accounting information for %s\n", session[i].user); - fprintf(f, "%s %s %d %lu %lu\n", + fprintf(f, "%s %s %d %u %u\n", session[i].user, // username inet_toa(htonl(session[i].ip)), // ip (session[i].throttle) ? 2 : 1, // qos - (unsigned long)session[i].cin, // uptxoctets - (unsigned long)session[i].cout); // downrxoctets + (u32)session[i].cin, // uptxoctets + (u32)session[i].cout); // downrxoctets session[i].pin = session[i].cin = 0; session[i].pout = session[i].cout = 0; @@ -2252,9 +2863,9 @@ int main(int argc, char *argv[]) _program_name = strdup(argv[0]); time(&basetime); // start clock - // scan args - while ((o = getopt(argc, argv, "vc:h:a:d")) >= 0) + // scan args + while ((o = getopt(argc, argv, "vc:h:a:")) >= 0) { switch (o) { @@ -2271,7 +2882,13 @@ int main(int argc, char *argv[]) break; case '?': default: - printf("Args are:\n\t-d\tDetach from terminal\n\t-c \tConfig file\n\t-h \tForce hostname\n\t-a
\tUse specific address\n\t-v\t\tDebug\n"); + printf("Args are:\n" + "\t-d\tDetach from terminal\n" + "\t-c \tConfig file\n" + "\t-h \tForce hostname\n" + "\t-a
\tUse specific address\n" + "\t-v\t\tDebug\n"); + return (0); break; } @@ -2286,9 +2903,13 @@ int main(int argc, char *argv[]) initiptables(); initplugins(); initdata(); + init_tbf(); init_cli(); read_config_file(); - log(0, 0, 0, 0, "$Id: l2tpns.c,v 1.7 2004/05/24 04:42:50 fred_nerk Exp $\n(c) Copyright 2002 FireBrick (Andrews & Arnold Ltd / Watchfront Ltd) - GPL licenced\n"); + + log(0, 0, 0, 0, "$Id: l2tpns.c,v 1.8 2004/06/23 03:52:24 fred_nerk Exp $\n" + "(c) Copyright 2003, 2004 Optus Internet Engineering\n" + "(c) Copyright 2002 FireBrick (Andrews & Arnold Ltd / Watchfront Ltd) - GPL licenced\n"); { struct rlimit rlim; rlim.rlim_cur = RLIM_INFINITY; @@ -2300,12 +2921,47 @@ int main(int argc, char *argv[]) chdir("/tmp"); } - /* Start up the cluster first, so that we don't have two machines with - * the same IP at once. - * This is still racy, but the second GARP should fix that - */ - cluster_init(config->bind_address, 0); - cluster_send_message(config->cluster_address, config->bind_address, C_HELLO, hostname, strlen(hostname)); + if (config->scheduler_fifo) + { + int ret; + struct sched_param params = {0}; + params.sched_priority = 1; + + if (get_nprocs() < 2) + { + log(0, 0, 0, 0, "Not using FIFO scheduler, there is only 1 processor in the system.\n"); + config->scheduler_fifo = 0; + } + else + { + if ((ret = sched_setscheduler(0, SCHED_FIFO, ¶ms)) == 0) + { + log(1, 0, 0, 0, "Using FIFO scheduler. Say goodbye to any other processes running\n"); + } + else + { + log(0, 0, 0, 0, "Error setting scheduler to FIFO: %s\n", strerror(errno)); + config->scheduler_fifo = 0; + } + } + } + + /* Set up the cluster communications port. */ + if (cluster_init(config->bind_address) < 0) + exit(1); + +#ifdef BGP + signal(SIGPIPE, SIG_IGN); + bgp_setup(config->as_number); + bgp_add_route(config->bind_address, 0xffffffff); + if (*config->bgp_peer[0]) + bgp_start(&bgp_peers[0], config->bgp_peer[0], + config->bgp_peer_as[0], 0); /* 0 = routing disabled */ + + if (*config->bgp_peer[1]) + bgp_start(&bgp_peers[1], config->bgp_peer[1], + config->bgp_peer_as[1], 0); +#endif /* BGP */ inittap(); log(1, 0, 0, 0, "Set up on interface %s\n", config->tapdevice); @@ -2313,13 +2969,6 @@ int main(int argc, char *argv[]) initudp(); initrad(); initippool(); - init_rl(); - if (config->bind_address) - send_garp(config->bind_address); - - // If NOSTATEFILE exists, we will ignore any updates from the cluster master for this execution - if (!unlink(NOSTATEFILE)) - config->ignore_cluster_updates = 1; read_state(); @@ -2336,6 +2985,24 @@ int main(int argc, char *argv[]) setuid(config->target_uid); mainloop(); + +#ifdef BGP + /* try to shut BGP down cleanly; with luck the sockets will be + writable since we're out of the select */ + { + int i; + for (i = 0; i < BGP_NUM_PEERS; i++) + if (bgp_peers[i].state == Established) + bgp_stop(&bgp_peers[i]); + } +#endif /* BGP */ + + /* remove plugins (so cleanup code gets run) */ + plugins_done(); + + /* kill CLI children */ + signal(SIGTERM, SIG_IGN); + kill(0, SIGTERM); return 0; } @@ -2355,7 +3022,7 @@ void sigalrm_handler(int junk) // Log current traffic stats snprintf(config->bandwidth, sizeof(config->bandwidth), - "UDP-ETH:%1.0f/%1.0f ETH-UDP:%1.0f/%1.0f TOTAL:%0.1f IN:%lu OUT:%lu", + "UDP-ETH:%1.0f/%1.0f ETH-UDP:%1.0f/%1.0f TOTAL:%0.1f IN:%u OUT:%u", (udp_rx / 1024.0 / 1024.0 * 8), (eth_tx / 1024.0 / 1024.0 * 8), (eth_rx / 1024.0 / 1024.0 * 8), @@ -2388,12 +3055,12 @@ void sigterm_handler(int junk) log(1, 0, 0, 0, "Shutting down cleanly\n"); if (config->save_state) dump_state(); + main_quit++; } void sigquit_handler(int junk) { - FILE *f; int i; log(1, 0, 0, 0, "Shutting down without saving sessions\n"); @@ -2408,12 +3075,6 @@ void sigquit_handler(int junk) tunnelshutdown(i, "L2TPNS Closing"); } - cluster_send_goodbye(); - - // Touch a file which says not to reload the state - f = fopen(NOSTATEFILE, "w"); - if (f) fclose(f); - main_quit++; } @@ -2433,17 +3094,16 @@ void read_state() u32 buf[2]; if (!config->save_state) - return; - - // Ignore saved state if NOSTATEFILE exists - if (config->ignore_cluster_updates) { unlink(STATEFILE); return; } if (stat(STATEFILE, &sb) < 0) + { + unlink(STATEFILE); return; + } if (sb.st_mtime < (time(NULL) - 60)) { @@ -2535,12 +3195,13 @@ void read_state() for (i = 0; i < MAXSESSION; i++) { - session[i].tbf = 0; + session[i].tbf_in = 0; + session[i].tbf_out = 0; if (session[i].opened) { log(2, 0, i, 0, "Loaded active session for user %s\n", session[i].user); - if (session[i].ip && session[i].ip != 0xFFFFFFFE) - sessionsetup(session[i].tunnel, i, 0); + if (session[i].ip) + sessionsetup(session[i].tunnel, i); } } @@ -2628,10 +3289,6 @@ void update_config() { int i; - snoop_addr.sin_family = AF_INET; - snoop_addr.sin_addr.s_addr = config->snoop_destination_host; - snoop_addr.sin_port = htons(config->snoop_destination_port); - // Update logging closelog(); syslog_log = 0; @@ -2703,6 +3360,10 @@ void update_config() memcpy(config->old_plugins, config->plugins, sizeof(config->plugins)); if (!config->cleanup_interval) config->cleanup_interval = 10; if (!config->multi_read_count) config->multi_read_count = 1; + if (!config->cluster_address) config->cluster_address = inet_addr(DEFAULT_MCAST_ADDR); + if (!*config->cluster_interface) + strncpy(config->cluster_interface, DEFAULT_MCAST_INTERFACE, sizeof(config->cluster_interface) - 1); + config->reload_config = 0; } @@ -2723,81 +3384,165 @@ void read_config_file() update_config(); } -int sessionsetup(tunnelidt t, sessionidt s, u8 routes) +int sessionsetup(tunnelidt t, sessionidt s) { // A session now exists, set it up ipt ip; char *user; sessionidt i; + int r; #ifdef STAT_CALLS STAT(call_sessionsetup); #endif + log(3, session[s].ip, s, t, "Doing session setup for session\n"); - if (!session[s].ip) { - log(0, session[s].ip, s, t, "VERY VERY BAD! sessionsetup() called with no session[s].ip\n"); - return 1; + + if (!session[s].ip || session[s].ip == 0xFFFFFFFE) + { + assign_ip_address(s); + if (session[s].ip) + log(3, 0, s, t, " No IP allocated. Assigned %s from pool\n", + inet_toa(htonl(session[s].ip))); + else + log(0, 0, s, t, " No IP allocated. The IP address pool is FULL!\n"); } + // Make sure this is right session[s].tunnel = t; + // zap old sessions with same IP and/or username // Don't kill gardened sessions - doing so leads to a DoS // from someone who doesn't need to know the password - ip = session[s].ip; - user = session[s].user; - for (i = 1; i < MAXSESSION; i++) - { - if (i == s) continue; - if (ip == session[i].ip) sessionkill(i, "Duplicate IP address"); - if (!session[s].walled_garden && !session[i].walled_garden && strcasecmp(user, session[i].user) == 0) - sessionkill(i, "Duplicate session for user"); - } - - if (routes) { - if (session[s].route[routes].ip && session[s].route[routes].mask) + ip = session[s].ip; + user = session[s].user; + for (i = 1; i <= config->cluster_highest_sessionid; i++) { - log(2, session[s].ip, s, t, "Routing session\n"); - routeset(session[s].ip, 0, 0, 1); - while (routes--) - routeset(session[s].route[routes].ip, session[s].route[routes].mask, - session[s].ip, 1); + if (i == s) continue; + if (ip == session[i].ip) sessionkill(i, "Duplicate IP address"); + if (!session[s].walled_garden && !session[i].walled_garden && strcasecmp(user, session[i].user) == 0) + sessionkill(i, "Duplicate session for users"); } } - sessionsendarp(s); - if (!session[s].sid) - sendipcp(t, s); - // Force throttling on or off - // This has the advantage of cleaning up after another throttled user who may have left - // firewall rules lying around - session[s].throttle = throttle_session(s, session[s].throttle); + // Add the route for this session. + // + // Static IPs need to be routed. Anything else + // is part of the IP address pool and is already routed, + // it just needs to be added to the IP cache. + if (session[s].ip_pool_index == -1) // static ip + routeset(s, session[s].ip, 0, 0, 1); + else + cache_ipmap(session[s].ip, s); + + for (r = 0; r < MAXROUTE && session[s].route[r].ip; r++) + routeset(s, session[s].route[r].ip, session[s].route[r].mask, session[s].ip, 1); + if (!session[s].sid) { // did this session just finish radius? + log(3, session[s].ip, s, t, "Sending initial IPCP to client\n"); + sendipcp(t, s); + session[s].sid = ++last_sid; + } + + // Run the plugin's against this new session. { struct param_new_session data = { &tunnel[t], &session[s] }; run_plugins(PLUGIN_NEW_SESSION, &data); } - if (!session[s].sid) - session[s].sid = ++last_sid; - - cache_sessionid(htonl(session[s].ip), s); + // Force throttling on or off (Actually : refresh the current throttling status) + // This has the advantage of cleaning up after another throttled user who may have left + // firewall rules lying around + throttle_session(s, session[s].throttle); - cluster_send_session(s); session[s].last_packet = time_now; + { char *sessionip, *tunnelip; - sessionip = strdup(inet_toa(ntohl(session[s].ip))); - tunnelip = strdup(inet_toa(ntohl(tunnel[t].ip))); + sessionip = strdup(inet_toa(htonl(session[s].ip))); + tunnelip = strdup(inet_toa(htonl(tunnel[t].ip))); log(2, session[s].ip, s, t, "Login by %s at %s from %s (%s)\n", session[s].user, sessionip, tunnelip, tunnel[t].hostname); if (sessionip) free(sessionip); if (tunnelip) free(tunnelip); } + cluster_send_session(s); // Mark it as dirty, and needing to the flooded to the cluster. + return 1; // RADIUS OK and IP allocated, done... } +// +// This session just got dropped on us by the master or something. +// Make sure our tables up up to date... +// +int load_session(sessionidt s, sessiont *new) +{ + int i; + + // Sanity checks. + if (new->ip_pool_index >= MAXIPPOOL || + new->tunnel >= MAXTUNNEL) { + log(0,0,s,0, "Strange session update received!\n"); + // FIXME! What to do here? + return 0; + } + + // + // Ok. All sanity checks passed. Now we're committed to + // loading the new session. + // + + session[s].tunnel = new->tunnel; // For logging in cache_ipmap + + + if (new->ip != session[s].ip) // Changed ip. fix up hash tables. + { + if (session[s].ip) // If there's an old one, remove it. + { + if (session[s].ip_pool_index == -1) // static IP + routeset(s, session[s].ip, 0, 0, 0); + else // It's part of the IP pool, add it manually. + uncache_ipmap(session[s].ip); + } + + if (new->ip) { // If there's a new one, add it. + if (new->ip_pool_index == -1) + routeset(s, new->ip, 0, 0, 1); + else + cache_ipmap(new->ip, s); + } + } + + // Add routes for the session if they're new. + for (i = 0; i < MAXROUTE && (session[s].route[i].ip || new->route[i].ip); i++) + { + if (new->route[i].ip == session[s].route[i].ip && + new->route[i].mask == session[s].route[i].mask) + continue; + + if (session[s].route[i].ip) // Remove the old one if it exists. + routeset(s, session[s].route[i].ip, session[s].route[i].mask, session[s].route[i].ip, 0); + + if (new->route[i].ip) // Add the new one if it exists. + routeset(s, new->route[i].ip, new->route[i].mask, new->ip, 1); + } + + + + if (new->tunnel && s > config->cluster_highest_sessionid) // Maintain this in the slave. It's used + // for walking the sessions to forward byte counts to the master. + config->cluster_highest_sessionid = s; + + memcpy(&session[s], new, sizeof(session[s])); // Copy over.. + + // Do fixups into address pool. + if (new->ip_pool_index != -1) + fix_address_pool(s); + return 1; +} + #ifdef RINGBUFFER void ringbuffer_dump(FILE *stream) { @@ -2823,28 +3568,33 @@ void initplugins() plugins[i] = ll_init(); } +static void *open_plugin(char *plugin_name, int load) +{ + char path[256] = ""; + + snprintf(path, 256, PLUGINDIR "/%s.so", plugin_name); + log(2, 0, 0, 0, "%soading plugin from %s\n", load ? "L" : "Un-l", path); + return dlopen(path, RTLD_NOW); +} + void add_plugin(char *plugin_name) { - void *p; + static struct pluginfuncs funcs = { + _log, + _log_hex, + inet_toa, + sessionbyuser, + sessiontbysessionidt, + sessionidtbysessiont, + sessionkill, + radiusnew, + radiussend, + }; + + void *p = open_plugin(plugin_name, 1); int (*initfunc)(struct pluginfuncs *); - char path[256] = {0}; int i; - struct pluginfuncs funcs; - - funcs._log = _log; - funcs._log_hex = _log_hex; - funcs.inet_toa = inet_toa; - funcs.get_session_by_username = sessionbyuser; - funcs.get_session_by_id = sessiontbysessionidt; - funcs.get_id_by_session = sessionidtbysessiont; - funcs.sessionkill = sessionkill; - funcs.radiusnew = radiusnew; - funcs.radiussend = radiussend; - - snprintf(path, 256, "%s/%s.so", LIBDIR, plugin_name); - - log(2, 0, 0, 0, "Loading plugin from %s\n", path); - p = dlopen(path, RTLD_NOW); + if (!p) { log(1, 0, 0, 0, " Plugin load failed: %s\n", dlerror()); @@ -2867,64 +3617,60 @@ void add_plugin(char *plugin_name) } } - initfunc = dlsym(p, "plugin_init"); - if (!initfunc) + if ((initfunc = dlsym(p, "plugin_init"))) { - log(1, 0, 0, 0, " Plugin load failed: function plugin_init() does not exist: %s\n", dlerror()); - dlclose(p); - return; + if (!initfunc(&funcs)) + { + log(1, 0, 0, 0, " Plugin load failed: plugin_init() returned FALSE: %s\n", dlerror()); + dlclose(p); + return; + } } - if (!initfunc(&funcs)) - { - log(1, 0, 0, 0, " Plugin load failed: plugin_init() returned FALSE: %s\n", dlerror()); - dlclose(p); - return; - } + ll_push(loaded_plugins, p); for (i = 0; i < max_plugin_functions; i++) { void *x; - if (!plugin_functions[i]) continue; - if ((x = dlsym(p, plugin_functions[i]))) + if (plugin_functions[i] && (x = dlsym(p, plugin_functions[i]))) { log(3, 0, 0, 0, " Supports function \"%s\"\n", plugin_functions[i]); ll_push(plugins[i], x); } } + log(2, 0, 0, 0, " Loaded plugin %s\n", plugin_name); } +static void run_plugin_done(void *plugin) +{ + int (*donefunc)(void) = dlsym(plugin, "plugin_done"); + + if (donefunc) + donefunc(); +} + void remove_plugin(char *plugin_name) { - void *p; - int (*donefunc)(); - char path[256] = {0}; + void *p = open_plugin(plugin_name, 0); int i; - snprintf(path, 256, "%s/%s.so", LIBDIR, plugin_name); - - log(2, 0, 0, 0, "Removing plugin %s\n", plugin_name); - // Get the existing pointer - p = dlopen(path, RTLD_LAZY); - if (!p) return; + if (!p) + return; for (i = 0; i < max_plugin_functions; i++) { void *x; - if (!plugin_functions[i]) continue; - if ((x = dlsym(p, plugin_functions[i]))) ll_delete(plugins[i], x); + if (plugin_functions[i] && (x = dlsym(p, plugin_functions[i]))) + ll_delete(plugins[i], x); } if (ll_contains(loaded_plugins, p)) { ll_delete(loaded_plugins, p); - - donefunc = dlsym(p, "plugin_done"); - if (donefunc) donefunc(); + run_plugin_done(p); } - dlclose(p); dlclose(p); log(2, 0, 0, 0, "Removed plugin %s\n", plugin_name); } @@ -2945,6 +3691,15 @@ int run_plugins(int plugin_type, void *data) return 1; } +void plugins_done() +{ + void *p; + + ll_reset(loaded_plugins); + while ((p = ll_next(loaded_plugins))) + run_plugin_done(p); +} + void processcontrol(u8 * buf, int len, struct sockaddr_in *addr) { char *resp; @@ -3013,6 +3768,8 @@ tunnelidt new_tunnel() if (tunnel[i].state == TUNNELFREE) { log(4, 0, 0, i, "Assigning tunnel ID %d\n", i); + if (i > config->cluster_highest_tunnelid) + config->cluster_highest_tunnelid = i; return i; } } @@ -3020,3 +3777,91 @@ tunnelidt new_tunnel() return 0; } +// +// We're becoming the master. Do any required setup.. +// +// This is principally telling all the plugins that we're +// now a master, and telling them about all the sessions +// that are active too.. +// +void become_master(void) +{ + int s; + run_plugins(PLUGIN_BECOME_MASTER, NULL); + + for (s = 0; s < config->cluster_highest_sessionid ; ++s) { + if (!session[s].tunnel) // Not an in-use session. + continue; + + run_plugins(PLUGIN_NEW_SESSION_MASTER, &session[s]); + } +} + + + +int cmd_show_hist_idle(struct cli_def *cli, char *command, char **argv, int argc) +{ + int s, i; + int count = 0; + int buckets[64]; + + time(&time_now); + for (i = 0; i < 64;++i) buckets[i] = 0; + + for (s = 0; s < config->cluster_highest_sessionid ; ++s) { + int idle; + if (!session[s].tunnel) + continue; + + idle = time_now - session[s].last_packet; + idle /= 5 ; // In multiples of 5 seconds. + if (idle < 0) + idle = 0; + if (idle > 63) + idle = 63; + + ++count; + ++buckets[idle]; + } + + for (i = 0; i < 63; ++i) { + cli_print(cli, "%3d seconds : %7.2f%% (%6d)", i * 5, (double) buckets[i] * 100.0 / count , buckets[i]); + } + cli_print(cli, "lots of secs : %7.2f%% (%6d)", (double) buckets[63] * 100.0 / count , buckets[i]); + cli_print(cli, "%d total sessions open.", count); + return CLI_OK; +} + +int cmd_show_hist_open(struct cli_def *cli, char *command, char **argv, int argc) +{ + int s, i; + int count = 0; + int buckets[64]; + + time(&time_now); + for (i = 0; i < 64;++i) buckets[i] = 0; + + for (s = 0; s < config->cluster_highest_sessionid ; ++s) { + int open = 0, d; + if (!session[s].tunnel) + continue; + + d = time_now - session[s].opened; + if (d < 0) + d = 0; + while (d > 1 && open < 32) { + ++open; + d >>= 1; // half. + } + ++count; + ++buckets[open]; + } + + s = 1; + for (i = 0; i < 30; ++i) { + cli_print(cli, " < %8d seconds : %7.2f%% (%6d)", s, (double) buckets[i] * 100.0 / count , buckets[i]); + s <<= 1; + } + cli_print(cli, "%d total sessions open.", count); + return CLI_OK; +} diff --git a/l2tpns.h b/l2tpns.h index b498fd1..fe19bc6 100644 --- a/l2tpns.h +++ b/l2tpns.h @@ -1,20 +1,35 @@ // L2TPNS Global Stuff -// $Id: l2tpns.h,v 1.6 2004/05/24 04:33:35 fred_nerk Exp $ +// $Id: l2tpns.h,v 1.7 2004/06/23 03:52:24 fred_nerk Exp $ + +#ifndef __L2TPNS_H__ +#define __L2TPNS_H__ #include -#include #include +#include +#include +#include +#include +#include +#include +#include +#include #include "config.h" -#define VERSION "1.2.0" +#define VERSION "2.0.0" // Limits #define MAXTUNNEL 500 // could be up to 65535 #define MAXSESSION 50000 // could be up to 65535 +#define MAXTBFS 6000 // Maximum token bucket filters. Might need up to 2 * session. + #define RADIUS_SHIFT 5 #define RADIUS_MASK ((unsigned short)(((unsigned short)~0) >> (16 - RADIUS_SHIFT))) #define MAXRADIUS ((2 << (RADIUS_SHIFT - 1)) * 255) +#define T_UNDEF (0xffff) // A tunnel ID that won't ever be used. Mark session as undefined. +#define T_FREE (0) // A tunnel ID that won't ever be used. Mark session as free. + #define MAXCONTROL 1000 // max length control message we ever send... #define MAXETHER (1500+18) // max packet we try sending to tap #define MAXTEL 96 // telephone number @@ -28,20 +43,28 @@ #define IDLE_TIMEOUT 240 // Time between last packet sent and LCP ECHO generation // Constants -#define STATISTICS -#define STAT_CALLS -#define RINGBUFFER +#ifndef PLUGINDIR +#define PLUGINDIR LIBDIR // Plugins +#endif + +#ifndef PLUGINCONF +#define PLUGINCONF ETCDIR // Plugin config dir +#endif + +#ifndef DATADIR +#define DATADIR "/tmp" +#endif + +#ifndef FLASHDIR +#define FLASHDIR ETCDIR +#endif + #define TAPDEVICE "/dev/net/tun" #define UDP 17 -#define HOMEDIR "/home/l2tpns/" // Base dir for data -#define STATEFILE "/tmp/l2tpns.dump" // State dump file -#define NOSTATEFILE "/tmp/l2tpns.no_state_reload" // If exists, state will not be reloaded -#define CONFIGFILE ETCDIR "/l2tpns.cfg" // Configuration file -#define CLIUSERS ETCDIR "/l2tpns.users" // CLI Users file -#define IPPOOLFILE ETCDIR "/l2tpns.ip_pool" // Address pool configuration -#ifndef LIBDIR -#define LIBDIR "/usr/lib/l2tpns" -#endif +#define STATEFILE DATADIR "/state.dump" // State dump file +#define CONFIGFILE FLASHDIR "/startup-config" // Configuration file +#define CLIUSERS FLASHDIR "/users" // CLI Users file +#define IPPOOLFILE FLASHDIR "/ip_pool" // Address pool configuration #define ACCT_TIME 3000 // 5 minute accounting interval #define L2TPPORT 1701 // L2TP port #define RADPORT 1645 // old radius port... @@ -56,7 +79,7 @@ #define PPPCCP 0x80FD #define PPPIP 0x0021 #define PPPMP 0x003D -#define MIN_IP_SIZE 0x20 +#define MIN_IP_SIZE 0x19 enum { ConfigReq = 1, @@ -102,20 +125,13 @@ typedef struct controls // control message } controlt; -typedef struct stbft -{ - char handle[10]; - char in_use; -} tbft; - - // 336 bytes per session typedef struct sessions { sessionidt next; // next session in linked list sessionidt far; // far end session ID tunnelidt tunnel; // tunnel ID - ipt ip; // IP of session set by RADIUS response + ipt ip; // IP of session set by RADIUS response (host byte order). int ip_pool_index; // index to IP pool unsigned long sid; // session id for hsddb u16 nr; // next receive @@ -126,28 +142,40 @@ typedef struct sessions u32 total_cin; // This counter is never reset while a session is open u32 total_cout; // This counter is never reset while a session is open u32 id; // session id + u32 throttle; // non-zero if this session is throttled. clockt opened; // when started clockt die; // being closed, when to finally free time_t last_packet; // Last packet from the user (used for idle timeouts) ipt dns1, dns2; // DNS servers routet route[MAXROUTE]; // static routes u16 radius; // which radius session is being used (0 for not waiting on authentication) - u8 flags; // various bit flags - u8 snoop; // are we snooping this session? - u8 throttle; // is this session throttled? - u8 walled_garden; // is this session gardened? u16 mru; // maximum receive unit - u16 tbf; // filter bucket for throttling + u16 tbf_in; // filter bucket for throttling in from the user. + u16 tbf_out; // filter bucket for throttling out to the user. + u8 l2tp_flags; // various bit flags from the ICCN on the l2tp tunnel. + u8 walled_garden; // is this session gardened? + u8 flags1; // additional flags (currently unused); char random_vector[MAXTEL]; int random_vector_length; - char user[129]; // user (needed in seesion for radius stop messages) + char user[129]; // user (needed in seesion for radius stop messages) (can we reduce this? --mo) char called[MAXTEL]; // called number char calling[MAXTEL]; // calling number - unsigned long tx_connect_speed; - unsigned long rx_connect_speed; + u32 tx_connect_speed; + u32 rx_connect_speed; + u32 flags; // Various session flags. + ipt snoop_ip; // Interception destination IP + u16 snoop_port; // Interception destination port + char reserved[28]; // Space to expand structure without changing HB_VERSION } sessiont; +#define SF_IPCP_ACKED (1<<0) // Has this session seen an IPCP Ack? + +typedef struct { + u32 cin; + u32 cout; +} sessioncountt; + #define SESSIONPFC 1 // PFC negotiated flags #define SESSIONACFC 2 // ACFC negotiated flags @@ -191,7 +219,7 @@ radiust; typedef struct { - ipt address; + ipt address; // Host byte order.. char assigned; // 1 if assigned, 0 if free sessionidt session; clockt last; // last used @@ -223,7 +251,8 @@ enum TUNNELFREE, // Not in use TUNNELOPEN, // Active tunnel TUNNELDIE, // Currently closing - TUNNELOPENING // Busy opening + TUNNELOPENING, // Busy opening + TUNNELUNDEF, // Undefined }; enum @@ -234,7 +263,8 @@ enum RADIUSIPCP, // sending IPCP to end user RADIUSSTART, // sending start accounting to RADIUS server RADIUSSTOP, // sending stop accounting to RADIUS server - RADIUSWAIT // waiting timeout before available, in case delayed replies + RADIUSWAIT, // waiting timeout before available, in case delayed replies + RADIUSDEAD, // errored while talking to radius server. }; struct Tstats @@ -278,6 +308,9 @@ struct Tstats unsigned long ip_allocated; unsigned long ip_freed; + + unsigned long c_forwarded; + unsigned long recv_forward; #ifdef STAT_CALLS unsigned long call_processtap; unsigned long call_processarp; @@ -326,6 +359,7 @@ struct configt int debug; // debugging level time_t start_time; // time when l2tpns was started char bandwidth[256]; // current bandwidth + clockt current_time; char config_file[128]; int reload_config; // flag to re-read config (set by cli) @@ -344,21 +378,43 @@ struct configt ipt default_dns1, default_dns2; - ipt snoop_destination_host; - u16 snoop_destination_port; - unsigned long rl_rate; int save_state; - uint32_t cluster_address; - int ignore_cluster_updates; char accounting_dir[128]; ipt bind_address; + int send_garp; // Set to true to garp for vip address on startup + int target_uid; int dump_speed; char plugins[64][MAXPLUGINS]; char old_plugins[64][MAXPLUGINS]; int next_tbf; // Next HTB id available to use + int scheduler_fifo; // If 1, will force scheduler to use SCHED_FIFO. + // Don't use this unless you have a dual processor machine! + int icmp_rate; // Max number of ICMP unreachable per second to send + + u32 cluster_address; // Multicast address of cluster. + // Send to this address to have everyone hear. + char cluster_interface[64]; // Which interface to listen for multicast on. + int cluster_iam_master; // Are we the cluster master??? + int cluster_iam_uptodate; // Set if we've got a full set of state from the master. + u32 cluster_master_address; // The network address of the cluster master. + // Zero if i am the cluster master. + int cluster_seq_number; // Sequence number of the next heartbeat we'll send out + // (or the seq number we're next expecting if we're a slave). + int cluster_undefined_sessions; // How many sessions we're yet to receive from the master. + int cluster_undefined_tunnels; // How many tunnels we're yet to receive from the master. + int cluster_highest_sessionid; + int cluster_highest_tunnelid; + clockt cluster_last_hb; // Last time we saw a heartbeat from the master. + int cluster_num_changes; // Number of changes queued. + +#ifdef BGP + u16 as_number; + char bgp_peer[2][64]; + u16 bgp_peer_as[2]; +#endif }; struct config_descriptt @@ -381,10 +437,10 @@ void processipcp(tunnelidt t, sessionidt s, u8 * p, u16 l); void processipin(tunnelidt t, sessionidt s, u8 * p, u16 l); void processccp(tunnelidt t, sessionidt s, u8 * p, u16 l); void sendchap(tunnelidt t, sessionidt s); -u8 *makeppp(u8 * b, u8 * p, int l, tunnelidt t, sessionidt s, u16 mtype); +u8 *makeppp(u8 * b, int size, u8 * p, int l, tunnelidt t, sessionidt s, u16 mtype); u8 *findppp(u8 * b, u8 mtype); void initlcp(tunnelidt t, sessionidt s); -void dumplcp(char *p, int l); +void dumplcp(u8 *p, int l); // radius.c @@ -410,7 +466,7 @@ void rl_destroy_tbf(u16 t); // l2tpns.c clockt now(void); clockt backoff(u8 try); -void routeset(ipt ip, ipt mask, ipt gw, u8 add); +void routeset(sessionidt, ipt ip, ipt mask, ipt gw, u8 add); void inittap(void); void initudp(void); void initdata(void); @@ -439,9 +495,13 @@ void processtap(u8 * buf, int len); void processcontrol(u8 * buf, int len, struct sockaddr_in *addr); int assign_ip_address(sessionidt s); void free_ip_address(sessionidt s); -void snoop_send_packet(char *packet, u16 size); +void snoop_send_packet(char *packet, u16 size, ipt destination, u16 port); void dump_acct_info(); void mainloop(void); +int cmd_show_ipcache(struct cli_def *cli, char *command, char **argv, int argc); +int cmd_show_hist_idle(struct cli_def *cli, char *command, char **argv, int argc); +int cmd_show_hist_open(struct cli_def *cli, char *command, char **argv, int argc); + #define log _log #ifndef log_hex #define log_hex(a,b,c,d) do{if (a <= config->debug) _log_hex(a,0,0,0,b,c,d);}while (0) @@ -449,7 +509,7 @@ void mainloop(void); void _log(int level, ipt address, sessionidt s, tunnelidt t, const char *format, ...) __attribute__((format (printf, 5, 6))); void _log_hex(int level, ipt address, sessionidt s, tunnelidt t, const char *title, const char *data, int maxsize); void build_chap_response(char *challenge, u8 id, u16 challenge_length, char **challenge_response); -int sessionsetup(tunnelidt t, sessionidt s, u8 routes); +int sessionsetup(tunnelidt t, sessionidt s); int cluster_send_session(int s); int cluster_send_tunnel(int t); int cluster_send_goodbye(); @@ -459,15 +519,23 @@ void cli_do(int sockfd); #ifdef RINGBUFFER void ringbuffer_dump(FILE *stream); #endif -void initplugins(); +void initplugins(void); int run_plugins(int plugin_type, void *data); void add_plugin(char *plugin_name); void remove_plugin(char *plugin_name); +void plugins_done(void); void tunnelclear(tunnelidt t); void host_unreachable(ipt destination, u16 id, ipt source, char *packet, int packet_len); - +void fix_address_pool(int sid); +void rebuild_address_pool(void); +void send_ipin(sessionidt s, u8 * buf, int len); +int throttle_session(sessionidt s, int throttle); +int load_session(sessionidt, sessiont *); +void become_master(void); // We're the master; kick off any required master initializations. extern tunnelt *tunnel; extern sessiont *session; +extern sessioncountt *sess_count; +extern ippoolt *ip_address_pool; #define sessionfree (session[0].next) #define log_backtrace(count, max) \ @@ -485,3 +553,12 @@ if (count++ < max) { \ free(strings); \ } + +extern struct configt *config; +extern time_t basetime; // Time when this process started. +extern time_t time_now; // Seconds since EPOCH. +extern u32 last_sid; +extern struct Tstats *_statistics; +extern ipt my_address; +extern int tun_write(u8 *data, int size); +#endif /* __L2TPNS_H__ */ diff --git a/ll.c b/ll.c index 1b5a8ac..83a5b37 100644 --- a/ll.c +++ b/ll.c @@ -1,5 +1,5 @@ // L2TPNS Linked List Stuff -// $Id: ll.c,v 1.2 2004/03/05 00:09:03 fred_nerk Exp $ +// $Id: ll.c,v 1.3 2004/06/23 03:52:24 fred_nerk Exp $ #include #include diff --git a/ll.h b/ll.h index ad4d30c..f4d2d88 100644 --- a/ll.h +++ b/ll.h @@ -25,4 +25,4 @@ void *ll_next(linked_list *l); int ll_size(linked_list *l); int ll_contains(linked_list *l, void *search); -#endif +#endif /* __LL_H__ */ diff --git a/machines.cfg b/machines.cfg deleted file mode 100644 index a10a694..0000000 --- a/machines.cfg +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/perl -w -# vim:ft=perl - -$m{$_class} = { - IPfilter => "iptables", - - File_append => [ - [ "/etc/modules.conf", "alias char-major-10-200 tun\n", "char-major-10-200", "depmod -a" ], - ], - inittab_include => [ - "$_path/src/l2tpns", - ], - rpm_check => [ - "$_path/rpm/libcli-1.2.0-1.i386.rpm", - 'iproute', - 'perl-Compress-Zlib', - 'perl-MLDBM', - 'perl-Storable', - ], - Firewall => { - 'all' => '1701:udp', - }, - F_Firewall => { - all => 'all', - }, - Sysctl => { - 'net.ipv4.ip_forward' => 1, - 'net.ipv4.conf.all.proxy_arp' => 1, - 'net.core.rmem_max' => 8388608, - 'net.core.wmem_max' => 8388608, - 'net.core.rmem_default' => 8388608, - 'net.core.wmem_default' => 8388608, - 'net.ipv4.tcp_rmem' => '4096 65530 128388607', - 'net.ipv4.tcp_wmem' => '4096 65530 128388607', - }, - File_install => [ - [ "/etc/logrotate.d/l2tpns", "$_path/etc/l2tpns.logrotate", undef, { mode => 0755 } ], - ], -}; diff --git a/md5.h b/md5.h index cf6eee8..5685fdb 100644 --- a/md5.h +++ b/md5.h @@ -1,6 +1,9 @@ -/* GLOBAL.H - RSAREF types and constants +/* RSAREF types and constants */ +#ifndef __MD5_H__ +#define __MD5_H__ + /* PROTOTYPES should be set to one if and only if the compiler supports function argument prototyping. The following makes PROTOTYPES default to 0 if it has not already @@ -68,3 +71,4 @@ void MD5Update PROTO_LIST ((MD5_CTX *, unsigned char *, unsigned int)); void MD5Final PROTO_LIST ((unsigned char [16], MD5_CTX *)); +#endif /* __MD5_H__ */ diff --git a/plugin.h b/plugin.h index e980579..896f08e 100644 --- a/plugin.h +++ b/plugin.h @@ -14,7 +14,9 @@ enum PLUGIN_NEW_SESSION, PLUGIN_KILL_SESSION, PLUGIN_CONTROL, - PLUGIN_RADIUS_RESPONSE + PLUGIN_RADIUS_RESPONSE, + PLUGIN_BECOME_MASTER, + PLUGIN_NEW_SESSION_MASTER, }; #define PLUGIN_RET_ERROR 0 @@ -115,4 +117,4 @@ struct param_radius_response char *value; }; -#endif +#endif /* __PLUGIN_H__ */ diff --git a/ppp.c b/ppp.c index e1edba1..9621841 100644 --- a/ppp.c +++ b/ppp.c @@ -1,5 +1,5 @@ // L2TPNS PPP Stuff -// $Id: ppp.c,v 1.4 2004/05/24 04:26:01 fred_nerk Exp $ +// $Id: ppp.c,v 1.5 2004/06/23 03:52:24 fred_nerk Exp $ #include #include @@ -10,14 +10,15 @@ #include "constants.h" #include "plugin.h" #include "util.h" +#include "tbf.h" +#include "cluster.h" extern tunnelt *tunnel; extern sessiont *session; extern radiust *radius; extern int tapfd; -extern char hostname[1000]; -extern struct Tstats *_statistics; -extern unsigned long eth_tx; +extern char hostname[]; +extern u32 eth_tx; extern time_t time_now; extern struct configt *config; @@ -66,7 +67,11 @@ void processpap(tunnelidt t, sessionidt s, u8 * p, u16 l) // respond now, either no RADIUS available or already authenticated u8 b[MAXCONTROL]; u8 id = p[1]; - u8 *p = makeppp(b, 0, 0, t, s, PPPPAP); + u8 *p = makeppp(b, sizeof(b), 0, 0, t, s, PPPPAP); + if (!p) { // Failed to make ppp header! + log(1,0,0,0, "Failed to make PPP header in process pap!\n"); + return; + } if (session[s].ip) *p = 2; // ACK else @@ -126,6 +131,9 @@ void processchap(tunnelidt t, sessionidt s, u8 * p, u16 l) if (!r) { log(1, 0, s, t, "Unexpected CHAP message\n"); + +// FIXME: Need to drop the session here. + STAT(tunnel_rx_errors); return; } @@ -206,21 +214,21 @@ char *ppp_lcp_types[] = { "DiscardRequest", }; -void dumplcp(char *p, int l) +void dumplcp(u8 *p, int l) { - signed int x = l - 3; - char *o = (p + 3); + signed int x = l - 4; + u8 *o = (p + 4); log_hex(5, "PPP LCP Packet", p, l); - log(4, 0, 0, 0, "PPP LCP Packet type %d (%s)\n", *p, ppp_lcp_types[(int)*p]); + log(4, 0, 0, 0, "PPP LCP Packet type %d (%s len %d)\n", *p, ppp_lcp_types[(int)*p], ntohs( ((u16 *) p)[1]) ); log(4, 0, 0, 0, "Length: %d\n", l); if (*p != ConfigReq && *p != ConfigRej && *p != ConfigAck) return; while (x > 2) { - int type = *(u8 *)(o); - int length = *(u8 *)(o + 1); + int type = o[0]; + int length = o[1]; if (length == 0) { log(4, 0, 0, 0, " Option length is 0...\n"); @@ -245,18 +253,18 @@ void dumplcp(char *p, int l) proto == 0xC223 ? "CHAP" : "PAP"); break; } - case 5: // Magic-Number - { - u32 magicno = ntohl(*(u32 *)(o + 2)); - log(4, 0, 0, 0, " %s %x\n", lcp_types[type], magicno); - break; - } case 4: // Quality-Protocol { u32 qp = ntohl(*(u32 *)(o + 2)); log(4, 0, 0, 0, " %s %x\n", lcp_types[type], qp); break; } + case 5: // Magic-Number + { + u32 magicno = ntohl(*(u32 *)(o + 2)); + log(4, 0, 0, 0, " %s %x\n", lcp_types[type], magicno); + break; + } case 7: // Protocol-Field-Compression { u32 pfc = ntohl(*(u32 *)(o + 2)); @@ -300,15 +308,16 @@ void processlcp(tunnelidt t, sessionidt s, u8 * p, u16 l) } else if (*p == ConfigReq) { - signed int x = l - 1; - char *o = (p + 1); + signed int x = l - 4; + u8 *o = (p + 4); log(3, session[s].ip, s, t, "LCP: ConfigReq (%d bytes)...\n", l); + dumplcp(p, l); while (x > 2) { - int type = *(u8 *)(o); - int length = *(u8 *)(o + 1); + int type = o[0]; + int length = o[1]; if (length == 0 || type == 0) break; switch (type) { @@ -324,7 +333,11 @@ void processlcp(tunnelidt t, sessionidt s, u8 * p, u16 l) if (!q) { - q = makeppp(b, p, l, t, s, PPPLCP); + q = makeppp(b, sizeof(b), p, l, t, s, PPPLCP); + if (!q) { + log(2, session[s].ip, s, t, " Failed to send packet.\n"); + break; + } *q++ = ConfigNak; } memcpy(q, o, length); @@ -365,7 +378,11 @@ void processlcp(tunnelidt t, sessionidt s, u8 * p, u16 l) { // Send back a ConfigAck log(3, session[s].ip, s, t, "ConfigReq accepted, sending as Ack\n"); - q = makeppp(b, p, l, t, s, PPPLCP); + q = makeppp(b, sizeof(b), p, l, t, s, PPPLCP); + if (!q) { + log(3, session[s].ip, s, t, " failed to create packet.\n"); + return; + } *q = ConfigAck; tunnelsend(b, l + (q - b), t); } @@ -376,7 +393,7 @@ void processlcp(tunnelidt t, sessionidt s, u8 * p, u16 l) tunnelsend(b, l + (q - b), t); log(3, session[s].ip, s, t, "Sending ConfigReq, requesting PAP login\n"); - q = makeppp(b, NULL, 0, t, s, PPPLCP); + q = makeppp(b, sizeof(b), NULL, 0, t, s, PPPLCP); *q++ = ConfigReq; *(u8 *)(q++) = 3; *(u8 *)(q++) = 4; @@ -393,7 +410,11 @@ void processlcp(tunnelidt t, sessionidt s, u8 * p, u16 l) else if (*p == TerminateReq) { *p = TerminateAck; // close - q = makeppp(b, p, l, t, s, PPPLCP); + q = makeppp(b, sizeof(b), p, l, t, s, PPPLCP); + if (!q) { + log(3, session[s].ip, s, t, "Failed to create PPP packet in processlcp.\n"); + return; + } log(3, session[s].ip, s, t, "LCP: Received TerminateReq. Sending TerminateAck\n"); sessionshutdown(s, "Remote end closed connection."); tunnelsend(b, l + (q - b), t); // send it @@ -406,7 +427,11 @@ void processlcp(tunnelidt t, sessionidt s, u8 * p, u16 l) { *p = EchoReply; // reply *(u32 *) (p + 4) = htonl(session[s].magic); // our magic number - q = makeppp(b, p, l, t, s, PPPLCP); + q = makeppp(b, sizeof(b), p, l, t, s, PPPLCP); + if (!q) { + log(3, session[s].ip, s, t, " failed to send EchoReply.\n"); + return; + } log(5, session[s].ip, s, t, "LCP: Received EchoReq. Sending EchoReply\n"); tunnelsend(b, l + (q - b), t); // send it } @@ -438,11 +463,14 @@ void processipcp(tunnelidt t, sessionidt s, u8 * p, u16 l) if (*p == ConfigAck) { // happy with our IPCP u16 r = session[s].radius; - if ((!r || radius[r].state == RADIUSIPCP) && !session[s].walled_garden) + if ((!r || radius[r].state == RADIUSIPCP) && !session[s].walled_garden) { if (!r) r = radiusnew(s); if (r) radiussend(r, RADIUSSTART); // send radius start, having got IPCP at last + } + session[s].flags |= SF_IPCP_ACKED; + return ; // done } if (*p != ConfigReq) @@ -460,7 +488,7 @@ void processipcp(tunnelidt t, sessionidt s, u8 * p, u16 l) if (!session[s].ip) { log(3, 0, s, t, "Waiting on radius reply\n"); - return ; // have to wait on RADIUS eply + return ; // have to wait on RADIUS reply } // form a config reply quoting the IP in the session { @@ -480,7 +508,11 @@ void processipcp(tunnelidt t, sessionidt s, u8 * p, u16 l) { // reject u16 n = 4; i = p + l; - q = makeppp(b, p, l, t, s, PPPIPCP); + q = makeppp(b, sizeof(b), p, l, t, s, PPPIPCP); + if (!q) { + log(2, 0, s, t, "Failed to send IPCP.\n"); + return; + } *q = ConfigRej; p += 4; while (p < i && p[1]) @@ -529,54 +561,109 @@ void processipcp(tunnelidt t, sessionidt s, u8 * p, u16 l) *(u32 *) (i + 2) = htonl(session[s].ip); *p = ConfigNak; } - q = makeppp(b, p, l, t, s, PPPIPCP); + q = makeppp(b, sizeof(b), p, l, t, s, PPPIPCP); + if (!q) { + log(2, 0, s, t, " Failed to send IPCP packet.\n"); + return; + } tunnelsend(b, l + (q - b), t); // send it } } } // process IP packet received +// +// This MUST be called with at least 4 byte behind 'p'. +// (i.e. this routine writes to p[-4]). void processipin(tunnelidt t, sessionidt s, u8 * p, u16 l) { + ipt ip; + #ifdef STAT_CALLS STAT(call_processipin); #endif log_hex(5, "IP", p, l); + ip = ntohl(*(u32 *)(p + 12)); + if (l > MAXETHER) { - log(1, *(u32 *)(p + 12), s, t, "IP packet too long %d\n", l); + log(1, ip, s, t, "IP packet too long %d\n", l); STAT(tunnel_rx_errors); return ; } - session[s].cin += l; - session[s].total_cin += l; - session[s].pin++; - eth_tx += l; + // no spoof (do sessionbyip to handled statically routed subnets) + if (ip != session[s].ip && sessionbyip(htonl(ip)) != s) + { + log(5, ip, s, t, "Dropping packet with spoofed IP %s\n", inet_toa(htonl(ip))); + return; + } // Add on the tun header p -= 4; *(u32 *)p = htonl(0x00000800); l += 4; + if (session[s].tbf_in && !config->cluster_iam_master) { // Are we throttled and a slave? + master_throttle_packet(session[s].tbf_in, p, l); // Pass it to the master for handling. + return; + } + + session[s].cin += l - 4; + session[s].total_cin += l - 4; + sess_count[s].cin += l - 4; + + session[s].pin++; + eth_tx += l - 4; + + if (session[s].snoop_ip && session[s].snoop_port) + { + // Snooping this session, send it to ASIO + snoop_send_packet(p, l, session[s].snoop_ip, session[s].snoop_port); + } + STAT(tap_tx_packets); + INC_STAT(tap_tx_bytes, l); + + if (session[s].tbf_in && config->cluster_iam_master) { // Are we throttled and a master?? actually handle the throttled packets. + tbf_queue_packet(session[s].tbf_in, p, l); + return; + } + // send to ethernet - if (write(tapfd, p, l) < 0) + if (tun_write(p, l) < 0) { STAT(tap_tx_errors); log(0, 0, s, t, "Error writing %d bytes to TAP device: %s (tapfd=%d, p=%p)\n", l, strerror(errno), tapfd, p); } - if (session[s].snoop) +} + +// +// Helper routine for the TBF filters. +// Used to send queued data in from the user. +// +void send_ipin(sessionidt s, u8 *buf, int len) +{ + log_hex(5, "IP in throttled", buf, len); + if (write(tapfd, buf, len) < 0) { - // Snooping this session, send it to ASIO - snoop_send_packet(p, l); + STAT(tap_tx_errors); + log(0, 0, 0, 0, "Error writing %d bytes to TAP device: %s (tapfd=%d, p=%p)\n", + len, strerror(errno), tapfd, buf); } - STAT(tap_tx_packets); - INC_STAT(tap_tx_bytes, l); + + // Increment packet counters + session[s].cin += len - 4; + session[s].total_cin += len - 4; + sess_count[s].cin += len - 4; + + session[s].pin++; + eth_tx += len - 4; } + // Process LCP messages void processccp(tunnelidt t, sessionidt s, u8 * p, u16 l) { @@ -607,7 +694,11 @@ void processccp(tunnelidt t, sessionidt s, u8 * p, u16 l) } else *p = TerminateAck; // close - q = makeppp(b, p, l, t, s, PPPCCP); + q = makeppp(b, sizeof(b), p, l, t, s, PPPCCP); + if (!q) { + log(1,0,0,0, "Failed to send CCP packet.\n"); + return; + } tunnelsend(b, l + (q - b), t); // send it } } @@ -645,7 +736,11 @@ void sendchap(tunnelidt t, sessionidt s) STAT(tunnel_tx_errors); return ; } - q = makeppp(b, 0, 0, t, s, PPPCHAP); + q = makeppp(b, sizeof(b), 0, 0, t, s, PPPCHAP); + if (!q) { + log(1, 0, s, t, "failed to send CHAP challenge.\n"); + return; + } *q = 1; // challenhe q[1] = radius[r].id; // ID q[4] = 16; // length @@ -658,24 +753,33 @@ void sendchap(tunnelidt t, sessionidt s) // fill in a L2TP message with a PPP frame, // copies existing PPP message and changes magic number if seen // returns start of PPP frame -u8 *makeppp(u8 * b, u8 * p, int l, tunnelidt t, sessionidt s, u16 mtype) +u8 *makeppp(u8 * b, int size, u8 * p, int l, tunnelidt t, sessionidt s, u16 mtype) { + + if (size < 12) + return NULL; // Need more space than this!! + *(u16 *) (b + 0) = htons(0x0002); // L2TP with no options *(u16 *) (b + 2) = htons(tunnel[t].far); // tunnel *(u16 *) (b + 4) = htons(session[s].far); // session b += 6; - if (mtype != PPPLCP && !(session[s].flags & SESSIONACFC)) + if (mtype != PPPLCP && !(session[s].l2tp_flags & SESSIONACFC)) { *(u16 *) b = htons(0xFF03); // HDLC header b += 2; } - if (mtype < 0x100 && session[s].flags & SESSIONPFC) + if (mtype < 0x100 && session[s].l2tp_flags & SESSIONPFC) *b++ = mtype; else { *(u16 *) b = htons(mtype); b += 2; } + + if (l + 12 > size) { + log(3,0,0,0, "Would have overflowed the buffer in makeppp: size %d, len %d.\n", size, l); + return NULL; // Run out of room to hold the packet! + } if (p && l) memcpy(b, p, l); return b; @@ -706,7 +810,11 @@ void initlcp(tunnelidt t, sessionidt s) { char b[500] = {0}, *q; - q = makeppp(b, NULL, 0, t, s, PPPLCP); + q = makeppp(b, sizeof(b), NULL, 0, t, s, PPPLCP); + if (!q) { + log(1, 0, s, t, "Failed to send LCP ConfigReq.\n"); + return; + } log(4, 0, s, t, "Sending LCP ConfigReq for PAP\n"); *q = ConfigReq; *(u8 *)(q + 1) = (time_now % 255) + 1; // ID diff --git a/radius.c b/radius.c index 18bdf1e..2a2db48 100644 --- a/radius.c +++ b/radius.c @@ -1,5 +1,5 @@ // L2TPNS Radius Stuff -// $Id: radius.c,v 1.3 2004/05/24 04:27:11 fred_nerk Exp $ +// $Id: radius.c,v 1.4 2004/06/23 03:52:24 fred_nerk Exp $ #include #include @@ -21,7 +21,6 @@ extern radiust *radius; extern sessiont *session; extern tunnelt *tunnel; extern u32 sessionid; -extern struct Tstats *_statistics; extern struct configt *config; extern int *radfds; @@ -95,6 +94,9 @@ u16 radiusnew(sessionidt s) session[s].radius = r; radius[r].session = s; radius[r].state = RADIUSWAIT; + radius[r].retry = config->current_time + 1200; // Wait at least 120 seconds to re-claim this. + + log(3,0,s, session[s].tunnel, "Allocated radius %d\n", r); return r; } @@ -352,6 +354,11 @@ void processrad(u8 *buf, int len, char socket_index) hasht hash; u8 routes = 0; + int r_code, r_id ; // Radius code. + + r_code = buf[0]; // First byte in radius packet. + r_id = buf[1]; // radius reply indentifier. + #ifdef STAT_CALLS STAT(call_processrad); #endif @@ -362,10 +369,10 @@ void processrad(u8 *buf, int len, char socket_index) return ; } len = ntohs(*(u16 *) (buf + 2)); - r = socket_index | (buf[1] << RADIUS_SHIFT); + r = socket_index | (r_id << RADIUS_SHIFT); s = radius[r].session; - log(3, 0, s, session[s].tunnel, "Received %s, radius %d response for session %u\n", - radius_states[radius[r].state], r, s); + log(3, 0, s, session[s].tunnel, "Received %s, radius %d response for session %u (code %d, id %d)\n", + radius_states[radius[r].state], r, s, r_code, r_id); if (!s && radius[r].state != RADIUSSTOP) { log(1, 0, s, session[s].tunnel, " Unexpected RADIUS response\n"); @@ -386,16 +393,21 @@ void processrad(u8 *buf, int len, char socket_index) do { if (memcmp(hash, buf + 4, 16)) { - log(0, 0, s, session[s].tunnel, " Incorrect auth on RADIUS response\n"); - radius[r].state = RADIUSWAIT; - break; + log(0, 0, s, session[s].tunnel, " Incorrect auth on RADIUS response!! (wrong secret in radius config?)\n"); +// radius[r].state = RADIUSWAIT; + + return; // Do nothing. On timeout, it will try the next radius server. } if ((radius[r].state == RADIUSAUTH && *buf != 2 && *buf != 3) || ((radius[r].state == RADIUSSTART || radius[r].state == RADIUSSTOP) && *buf != 5)) { log(1, 0, s, session[s].tunnel, " Unexpected RADIUS response %d\n", *buf); - radius[r].state = RADIUSWAIT; - break; + + return; // We got something we didn't expect. Let the timeouts take + // care off finishing the radius session if that's really correct. +// old code. I think incorrect. --mo +// radius[r].state = RADIUSWAIT; +// break; // Finish the radius sesssion. } if (radius[r].state == RADIUSAUTH) { @@ -404,8 +416,10 @@ void processrad(u8 *buf, int len, char socket_index) if (radius[r].chap) { // CHAP - u8 *p = makeppp(b, 0, 0, t, s, PPPCHAP); - + u8 *p = makeppp(b, sizeof(b), 0, 0, t, s, PPPCHAP); + if (!p) { + return; // Abort! + } { struct param_post_auth packet = { &tunnel[t], &session[s], session[s].user, (*buf == 2), PPPCHAP }; run_plugins(PLUGIN_POST_AUTH, &packet); @@ -422,7 +436,9 @@ void processrad(u8 *buf, int len, char socket_index) else { // PAP - u8 *p = makeppp(b, 0, 0, t, s, PPPPAP); + u8 *p = makeppp(b, sizeof(b), 0, 0, t, s, PPPPAP); + if (!p) + return; // Abort! { struct param_post_auth packet = { &tunnel[t], &session[s], session[s].user, (*buf == 2), PPPPAP }; @@ -453,17 +469,18 @@ void processrad(u8 *buf, int len, char socket_index) // Statically assigned address log(3, 0, s, session[s].tunnel, " Radius reply contains IP address %s\n", inet_toa(*(u32 *) (p + 2))); session[s].ip = ntohl(*(u32 *) (p + 2)); + session[s].ip_pool_index = -1; } else if (*p == 135) { // DNS address - log(3, 0, s, session[s].tunnel, " Radius reply contains primary DNS address %s\n", inet_toa(ntohl(*(u32 *) (p + 2)))); + log(3, 0, s, session[s].tunnel, " Radius reply contains primary DNS address %s\n", inet_toa(*(u32 *) (p + 2))); session[s].dns1 = ntohl(*(u32 *) (p + 2)); } else if (*p == 136) { // DNS address - log(3, 0, s, session[s].tunnel, " Radius reply contains secondary DNS address %s\n", inet_toa(ntohl(*(u32 *) (p + 2)))); + log(3, 0, s, session[s].tunnel, " Radius reply contains secondary DNS address %s\n", inet_toa(*(u32 *) (p + 2))); session[s].dns2 = ntohl(*(u32 *) (p + 2)); } else if (*p == 22) @@ -503,11 +520,11 @@ void processrad(u8 *buf, int len, char socket_index) { log(1, 0, s, session[s].tunnel, " Too many routes\n"); } - else + else if (ip) { char *ips, *masks; - ips = strdup(inet_toa(ip)); - masks = strdup(inet_toa(mask)); + ips = strdup(inet_toa(htonl(ip))); + masks = strdup(inet_toa(htonl(mask))); log(3, 0, s, session[s].tunnel, " Radius reply contains route for %s/%s\n", ips, masks); free(ips); free(masks); @@ -564,19 +581,10 @@ void processrad(u8 *buf, int len, char socket_index) else if (*buf == 3) { log(2, 0, s, session[s].tunnel, " Authentication denied for %s\n", session[s].user); +//FIXME: We should tear down the session here! break; } - // Check for Assign-IP-Address - if (!session[s].ip || session[s].ip == 0xFFFFFFFE) - { - assign_ip_address(s); - if (session[s].ip) - log(3, 0, s, t, " No IP allocated by radius. Assigned %s from pool\n", - inet_toa(htonl(session[s].ip))); - else - log(0, 0, s, t, " No IP allocated by radius. The IP address pool is FULL!\n"); - } if (!session[s].dns1 && config->default_dns1) { session[s].dns1 = htonl(config->default_dns1); @@ -588,21 +596,15 @@ void processrad(u8 *buf, int len, char socket_index) log(3, 0, s, t, " Sending dns2 = %s\n", inet_toa(config->default_dns2)); } - if (session[s].ip) - { - // Valid Session, set it up - session[s].sid = 0; - sessionsetup(t, s, routes); - } - else - { - log(0, 0, s, t, " End of processrad(), but no valid session exists.\n"); - sessionkill(s, "Can't create valid session"); - } + // Valid Session, set it up + session[s].sid = 0; + sessionsetup(t, s); } else { - log(3, 0, s, t, " RADIUS response in state %s\n", radius_states[radius[r].state]); + // An ack for a stop or start record. + log(3, 0, s, t, " RADIUS accounting ack recv in state %s\n", radius_states[radius[r].state]); + break; } } while (0); diff --git a/rl.c b/rl.c deleted file mode 100644 index 1692944..0000000 --- a/rl.c +++ /dev/null @@ -1,121 +0,0 @@ -// L2TPNS Rate Limiting Stuff -// $Id: rl.c,v 1.4 2004/05/24 04:28:41 fred_nerk Exp $ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "l2tpns.h" - -extern radiust *radius; -extern sessiont *session; -extern u32 sessionid; -extern tbft *filter_buckets; -extern struct configt *config; - -#define DEVICE "tun0" - -void init_rl() -{ - char *commands[] = { - "tc qdisc add dev " DEVICE " root handle 1: htb", - "tc filter del dev " DEVICE " protocol ip pref 1 fw", - "iptables -t mangle -N throttle 2>&1 >/dev/null", - "iptables -t mangle -F throttle 2>&1 >/dev/null", - "iptables -t mangle -A l2tpns -j throttle 2>&1 >/dev/null", - NULL - }; - int i; - - log(2, 0, 0, 0, "Initializing HTB\n"); - for (i = 0; commands[i] && *commands[i]; i++) - { - log(3, 0, 0, 0, "Running \"%s\"\n", commands[i]); - system(commands[i]); - } - log(2, 0, 0, 0, "Done initializing HTB\n"); -} - -u16 rl_create_tbf() -{ - u16 t; - char cmd[2048]; - if (!config->rl_rate) return 0; - - t = ++config->next_tbf; - if (config->next_tbf >= MAXSESSION) return 0; - snprintf(filter_buckets[t].handle, 9, "1:%d0", t); - - log(2, 0, 0, 0, "Creating new htb %s\n", filter_buckets[t].handle); - snprintf(cmd, 2048, "tc class add dev " DEVICE " parent 1: classid %s htb rate %lukbit burst 15k", - filter_buckets[t].handle, config->rl_rate); - log(3, 0, 0, 0, "%s\n", cmd); - if (WEXITSTATUS(system(cmd)) != 0) - { - memset(filter_buckets[t].handle, 0, sizeof(filter_buckets[t].handle)); - log(0, 0, 0, 0, "tc returned an error creating a token bucket\n"); - return 0; - } - - snprintf(cmd, 2048, "tc filter add dev " DEVICE " protocol ip parent 1:0 prio 1 handle %d fw flowid %s", - t, filter_buckets[t].handle); - log(3, 0, 0, 0, "%s\n", cmd); - if (WEXITSTATUS(system(cmd)) != 0) - { - memset(filter_buckets[t].handle, 0, sizeof(filter_buckets[t].handle)); - log(0, 0, 0, 0, "tc returned an error creating a filter\n"); - return 0; - } - - return t; -} - -u16 rl_get_tbf() -{ - int i; - if (!config->rl_rate) return 0; - - for (i = 1; i < MAXSESSION; i++) - { - if (!*filter_buckets[i].handle) continue; - if (filter_buckets[i].in_use) continue; - - filter_buckets[i].in_use = 1; - log(2, 0, 0, 0, "Returning tbf %s\n", filter_buckets[i].handle); - return i; - } - i = rl_create_tbf(); - if (i) filter_buckets[i].in_use = 1; - return i; -} - -void rl_done_tbf(u16 t) -{ - if (!t) return; - log(2, 0, 0, 0, "Freeing up HTB %s\n", filter_buckets[t].handle); - filter_buckets[t].in_use = 0; -} - -void rl_destroy_tbf(u16 t) -{ - char cmd[2048]; - if (!config->rl_rate) return; - if (filter_buckets[t].in_use) - { - log(0, 0, 0, 0, "Trying to destroy an in-use HTB %s\n", filter_buckets[t].handle); - return; - } - snprintf(cmd, 2048, "tc qdisc del dev " DEVICE " handle %s", filter_buckets[t].handle); - if (WEXITSTATUS(system(cmd)) != 0) - log(0, 0, 0, 0, "tc returned an error deleting a token bucket\n"); - memset(filter_buckets[t].handle, 0, sizeof(filter_buckets[t].handle)); -} - diff --git a/tbf.c b/tbf.c new file mode 100644 index 0000000..c024c67 --- /dev/null +++ b/tbf.c @@ -0,0 +1,400 @@ +#include +#include +#include +#include + +#include "l2tpns.h" +#include "tbf.h" + +// Need a time interval. + +#define TBF_MAX_QUEUE 2 // Maximum of 2 queued packet per +#define TBF_MAX_SIZE 3000 // Maxiumum queued packet size is 2048. + +#define TBF_MAX_CREDIT 6000 // Maximum 6000 bytes of credit. +#define TBF_RATE 360 // 360 bytes per 1/10th of a second. + +typedef struct { + int credit; + int lasttime; + int queued; + int oldest; // Position of packet in the ring buffer. + sessionidt sid; // associated session ID. + int max_credit; // Maximum amount of credit available (burst size). + int rate; // How many bytes of credit per second we get? (sustained rate) + void (*send)(sessionidt s, u8 *, int); // Routine to actually send out the data. + int prev; // Timer chain position. + int next; // Timer chain position. + + u32 b_queued; // Total bytes sent through this TBF + u32 b_sent; // Total bytes sucessfully made it to the network. + u32 p_queued; // ditto packets. + u32 p_sent; // ditto packets. + u32 b_dropped; // Total bytes dropped. + u32 p_dropped; // Total packets dropped. + u32 p_delayed; // Total packets not sent immediately. + + int sizes[TBF_MAX_QUEUE]; + char packets[TBF_MAX_QUEUE][TBF_MAX_SIZE]; +} tbft; + + +tbft * filter_list = NULL; +int filter_list_size = 0; + +static int timer_chain = -1; // Head of timer chain. + +static void tbf_run_queue(int tbf_id); + +void init_tbf(void) +{ + filter_list = mmap(NULL, sizeof(*filter_list) * MAXTBFS, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0); + if (!filter_list) + return; + + filter_list_size = MAXTBFS; + filter_list[0].sid = -1; // Reserved. +} +// +// Put a TBF on the timer list. +// This is a doubly linked list.. +// We put ourselves on the tail of the list. +// +static void add_to_timer(int id) +{ + if (!filter_list) + return; + + if (timer_chain == -1) { + filter_list[id].next = filter_list[id].prev = id; + timer_chain = id; + return; + } + + filter_list[id].next = timer_chain; + filter_list[id].prev = filter_list[timer_chain].prev; + filter_list[filter_list[timer_chain].prev].next = id; + filter_list[timer_chain].prev = id; +} + +// +// Remove a TBF from the timer list. +// This is a doubly linked list. +static void del_from_timer(int id) +{ + if (!filter_list) + return; + + if (filter_list[id].next == id) { // Last element in chain? + if (timer_chain != id) { // WTF? + log(0,0,0,0, "Removed a singleton element from TBF, but tc didn't point to it!\n"); + } else + timer_chain = -1; + filter_list[id].next = filter_list[id].prev = 0; + return; + } + + filter_list[filter_list[id].next].prev = filter_list[id].prev; + filter_list[filter_list[id].prev].next = filter_list[id].next; + if (timer_chain == id) + timer_chain = filter_list[id].next; + + filter_list[id].next = filter_list[id].prev = 0; // Mark as off the timer chain. +} + +// +// Free a token bucket filter structure for re-use. +// + +int free_tbf(int tid) +{ + if (tid < 1) // Make sure we don't free id # 0 + return -1; + + if (!filter_list) // WTF? + return -1; + + if (filter_list[tid].next) + del_from_timer(tid); + filter_list[tid].sid = 0; + + return 0; // Done! +} + +// +// Allocate a new token bucket filter. +// +int new_tbf(int sid, int max_credit, int rate, void (*f)(sessionidt, u8 *, int)) +{ + int i; + static int p = 0; + + log(3,0,0,0, "Allocating new TBF (sess %d, rate %d, helper %p)\n", sid, rate, f); + + if (!filter_list) + return 0; // Couldn't alloc memory! + +// again: + + for (i = 0 ; i < filter_list_size ; ++i, p = (p+1)%filter_list_size ) { + if (filter_list[p].sid) + continue; + + memset((void*) &filter_list[p], 0, sizeof(filter_list[p]) ); // Clear counters and data. + filter_list[p].sid = sid; + filter_list[p].credit = max_credit; + filter_list[p].queued = 0; + filter_list[p].max_credit = max_credit; + filter_list[p].rate = rate; + filter_list[p].oldest = 0; + filter_list[p].send = f; + return p; + } + + log(0,0,0,0, "Ran out of token bucket filters! Sess %d will be un-throttled\n", sid); + return 0; + +#if 0 + // Not using. Disasterous if called via the CLI! :) + // All allocated filters are used! Increase the size of the allocated + // filters. + + i = filter_list_size; + filter_list_size = filter_list_size * 2 + 1; + + filter_list = realloc(filter_list, filter_list_size * sizeof(*filter_list) ); + + for (; i < filter_list_size; ++i) + filter_list[i].sid = 0; + + goto again; +#endif +} + +// +// Sanity check all the TBF records. This is +// typically done when we become a master.. +// +void fsck_tbfs(void) +{ + int i , sid; + + if (!filter_list) + return; + + for (i = 1; i < filter_list_size; ++i) { + if (!filter_list[i].sid) // Is it used?? + continue; + + sid = filter_list[i].sid; + if (i != session[sid].tbf_in && + i != session[sid].tbf_out) { // Ooops. + + free_tbf(i); // Mark it as free... + } + } + + for (i = 0; i < config->cluster_highest_sessionid ; ++i) { + if (session[i].tbf_in && filter_list[session[i].tbf_in].sid != i) { + filter_list[session[i].tbf_in].sid = i; // Ouch!? FIXME. What to do here? + } + if (session[i].tbf_out && filter_list[session[i].tbf_out].sid != i) { + filter_list[session[i].tbf_out].sid = i; // Ouch!? FIXME. What to do here? + } + } +} + + +// +// Run a packet through a token bucket filter. +// If we can send it right away, we do. Else we +// try and queue it to send later. Else we drop it. +// +int tbf_queue_packet(int tbf_id, char * data, int size) +{ + int i; + tbft * f; + + if (!filter_list) + return -1; + + if (tbf_id > filter_list_size || tbf_id < 1) { // Out of range ID?? + // Very bad. Just drop it. + return -1; + } + + f = &filter_list[tbf_id]; + + if (!f->sid) // Is this a real structure?? + return -1; + + tbf_run_queue(tbf_id); // Caculate credit and send any queued packets if possible.. + + f->b_queued += size; + f->p_queued ++; + + if (!f->queued && f->credit > size) { // If the queue is empty, and we have + // enough credit, just send it now. + f->credit -= size; + if (f->send) { + f->send(f->sid, data, size); + f->b_sent += size; + f->p_sent ++; + } else { + f->b_dropped += size; + f->p_dropped ++; + } + return size; + } + + // Not enough credit. Can we have room in the queue? + if (f->queued >= TBF_MAX_QUEUE) { + f->p_dropped ++; + f->b_dropped += size; + return -1; // No, just drop it. + } + + // Is it too big to fit into a queue slot? + if (size >= TBF_MAX_SIZE) { + f->p_dropped ++; + f->b_dropped += size; + return -1; // Yes, just drop it. + } + + // Ok. We have a slot, and it's big enough to + // contain the packet, so queue the packet! + i = ( f->oldest + f->queued ) % TBF_MAX_QUEUE; + memcpy(f->packets[i], data, size); + + f->sizes[i] = size; + f->queued ++; + f->p_delayed ++; + + if (!f->next) // Are we off the timer chain? + add_to_timer(tbf_id); // Put ourselves on the timer chain. + + return 0; // All done. +} + +// +// Send queued packets from the filter if possible. +// (We're normally only called if this is possible.. ) +static void tbf_run_queue(int tbf_id) +{ + tbft * f; + + if (!filter_list) + return; + + f = &filter_list[tbf_id]; + + // Calculate available credit... + f->credit += (config->current_time - f->lasttime) * f->rate / 10; // current time is 1/10th of a second. + if (f->credit > f->max_credit) + f->credit = f->max_credit; + f->lasttime = config->current_time; + + while (f->queued > 0 && f->credit >= f->sizes[f->oldest]) { // While we have enough credit.. + + if (f->send) { + f->send(f->sid, f->packets[f->oldest], f->sizes[f->oldest]); + f->b_sent += f->sizes[f->oldest]; + f->p_sent ++; + } else { + f->b_dropped += f->sizes[f->oldest]; + f->p_dropped ++; + } + + f->credit -= f->sizes[f->oldest]; + + f->oldest = (f->oldest + 1 ) % TBF_MAX_QUEUE; + f->queued--; // One less queued packet.. + } + + if (f->queued) // Still more to do. Hang around on the timer list. + return; + + if (f->next) // Are we on the timer list?? + del_from_timer(tbf_id); // Nothing more to do. Get off the timer list. +} + +// +// Periodically walk the timer list.. +// +int tbf_run_timer(void) +{ + int i = timer_chain; + int count = filter_list_size + 1; // Safety check. + int last = -1; + int tbf_id; // structure being processed. + + if (timer_chain < 0) + return 0; // Nothing to do... + + if (!filter_list) // No structures built yet. + return 0; + + last = filter_list[i].prev; // last element to process. + + do { + tbf_id = i; + i = filter_list[i].next; // Get the next in the queue. + + tbf_run_queue(tbf_id); // Run the timer queue.. + } while ( timer_chain > 0 && i && tbf_id != last && --count > 0); + + +#if 0 // Debugging. + for (i = 0; i < filter_list_size; ++i) { + if (!filter_list[i].next) + continue; + if (filter_list[i].lasttime == config->current_time) // Did we just run it? + continue; + + log(1,0,0,0, "Missed tbf %d! Not on the timer chain?(n %d, p %d, tc %d)\n", i, + filter_list[i].next, filter_list[i].prev, timer_chain); + tbf_run_queue(i); + } +#endif + + return 1; +} + +int cmd_show_tbf(struct cli_def *cli, char *command, char **argv, int argc) +{ + int i; + int count = 0; + + if (!config->cluster_iam_master) { + cli_print(cli, "Command can't be run on a slave."); + return CLI_OK; + } + if (!filter_list) + return CLI_OK; + + cli_print(cli,"%6s %5s %5s %6s %6s | %7s %7s %8s %8s %8s %8s", "TBF#", "Sid", "Rate", "Credit", "Queued", + "ByteIn","PackIn","ByteSent","PackSent", "PackDrop", "PackDelay"); + + for (i = 1; i < filter_list_size; ++i) { + if (!filter_list[i].sid) // Is it used? + continue; // No. + + cli_print(cli, "%5d%1s %5d %5d %6d %6d | %7d %7d %8d %8d %8d %8d", + i, (filter_list[i].next ? "*" : " "), + filter_list[i].sid, + filter_list[i].rate * 8, + filter_list[i].credit, + filter_list[i].queued, + + filter_list[i].b_queued, + filter_list[i].p_queued, + filter_list[i].b_sent, + filter_list[i].p_sent, + filter_list[i].p_dropped, + filter_list[i].p_delayed); + ++count; + } + cli_print(cli, "%d tbf entries used, %d total", count, filter_list_size); + return CLI_OK; +} + diff --git a/tbf.h b/tbf.h new file mode 100644 index 0000000..3d98c1b --- /dev/null +++ b/tbf.h @@ -0,0 +1,13 @@ +#ifndef __TBF_H__ +#define __TBF_H__ + +void init_tbf(void); +int tbf_run_timer(void); +int tbf_queue_packet(int tbf_id, char * data, int size); +int new_tbf(int sid, int max_credit, int rate, void (*f)(sessionidt, u8 *, int)); +int free_tbf(int tid); +void fsck_tbfs(void); + +int cmd_show_tbf(struct cli_def *cli, char *command, char **argv, int argc); + +#endif /* __TBF_H__ */ diff --git a/throttle.c b/throttle.c deleted file mode 100644 index 7244cfa..0000000 --- a/throttle.c +++ /dev/null @@ -1,77 +0,0 @@ -// L2TPNS Throttle Stuff -// $Id: throttle.c,v 1.3 2004/05/24 04:29:21 fred_nerk Exp $ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "l2tpns.h" -#include "util.h" - -extern radiust *radius; -extern sessiont *session; -extern u32 sessionid; -extern tbft *filter_buckets; -extern struct configt *config; - -// Throttle or Unthrottle a session -int throttle_session(sessionidt s, int throttle) -{ - if (!config->rl_rate) return 0; - - if (!*session[s].user) - return 0; // User not logged in - - if (throttle) - { - // Throttle them - char cmd[2048] = {0}; - if (!session[s].tbf) session[s].tbf = rl_get_tbf(); - if (!session[s].tbf) - { - log(1, 0, s, session[s].tunnel, "Error creating a filtering bucket for user %s\n", session[s].user); - return 0; - } - log(2, 0, s, session[s].tunnel, "Throttling session %d for user %s (bucket %s)\n", s, session[s].user, filter_buckets[session[s].tbf].handle); - snprintf(cmd, 2048, "iptables -t mangle -A throttle -d %s -j MARK --set-mark %d", - inet_toa(ntohl(session[s].ip)), - session[s].tbf); - log(4, 0, s, session[s].tunnel, "Running %s\n", cmd); - if (WEXITSTATUS(system(cmd)) != 0) - { - log(2, 0, s, session[s].tunnel, "iptables returned an error. Session is not throttled\n"); - return 0; - } - } - else - { - char cmd[2048] = {0}; - log(2, 0, s, session[s].tunnel, "Unthrottling session %d for user %s\n", s, session[s].user); - if (session[s].tbf) - { - int count = 10; - snprintf(cmd, 2048, "iptables -t mangle -D throttle -d %s -j MARK --set-mark %d", inet_toa(ntohl(session[s].ip)), session[s].tbf); - log(4, 0, s, session[s].tunnel, "Running %s\n", cmd); - while (--count) - { - int status = system(cmd); - if (WEXITSTATUS(status) != 0) break; - } - system(cmd); - - rl_done_tbf(session[s].tbf); - session[s].tbf = 0; - } - } - session[s].throttle = throttle; - return session[s].throttle; -} - diff --git a/util.h b/util.h index ec2c017..be797f8 100644 --- a/util.h +++ b/util.h @@ -1 +1,6 @@ +#ifndef __UTIL_H__ +#define __UTIL_H__ + char *inet_toa(unsigned long addr); + +#endif /* __UTIL_H__ */