#include <errno.h>
#include <libcli.h>
+#include "dhcp6.h"
#include "l2tpns.h"
#include "cluster.h"
#include "util.h"
#include "tbf.h"
+#include "pppoe.h"
#ifdef BGP
#include "bgp.h"
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
- addr.sin_port = htons(CLUSTERPORT);
+ addr.sin_port = htons(config->cluster_port);
addr.sin_addr.s_addr = INADDR_ANY;
setsockopt(cluster_sockfd, SOL_SOCKET, SO_REUSEADDR, &addr, sizeof(addr));
if (!config->cluster_address) return 0;
addr.sin_addr.s_addr = config->cluster_address;
- addr.sin_port = htons(CLUSTERPORT);
+ addr.sin_port = htons(config->cluster_port);
addr.sin_family = AF_INET;
LOG(5, 0, 0, "Cluster send data: %d bytes\n", datalen);
return -1;
addr.sin_addr.s_addr = peer;
- addr.sin_port = htons(CLUSTERPORT);
+ addr.sin_port = htons(config->cluster_port);
addr.sin_family = AF_INET;
LOG_HEX(5, "Peer send", data, size);
//
// The master just processes the payload as if it had
// received it off the tun device.
-//
-int master_forward_packet(uint8_t *data, int size, in_addr_t addr, int port)
+//(note: THIS ROUTINE WRITES TO pack[-6]).
+int master_forward_packet(uint8_t *data, int size, in_addr_t addr, uint16_t port, uint16_t indexudp)
{
- return _forward_packet(data, size, addr, port, C_FORWARD);
+ uint8_t *p = data - (3 * sizeof(uint32_t));
+ uint8_t *psave = p;
+ uint32_t indexandport = port | ((indexudp << 16) & 0xFFFF0000);
+
+ if (!config->cluster_master_address) // No election has been held yet. Just skip it.
+ return -1;
+
+ LOG(4, 0, 0, "Forwarding packet from %s to master (size %d)\n", fmtaddr(addr, 0), size);
+
+ STAT(c_forwarded);
+ add_type(&p, C_FORWARD, addr, (uint8_t *) &indexandport, sizeof(indexandport));
+
+ return peer_send_data(config->cluster_master_address, psave, size + (3 * sizeof(uint32_t)));
+}
+
+// Forward PPPOE packet to the master.
+//(note: THIS ROUTINE WRITES TO pack[-4]).
+int master_forward_pppoe_packet(uint8_t *data, int size, uint8_t codepad)
+{
+ uint8_t *p = data - (2 * sizeof(uint32_t));
+ uint8_t *psave = p;
+
+ if (!config->cluster_master_address) // No election has been held yet. Just skip it.
+ return -1;
+
+ LOG(4, 0, 0, "Forward PPPOE packet to master, code %s (size %d)\n", get_string_codepad(codepad), size);
+
+ STAT(c_forwarded);
+ add_type(&p, C_PPPOE_FORWARD, codepad, NULL, 0);
+
+ return peer_send_data(config->cluster_master_address, psave, size + (2 * sizeof(uint32_t)));
}
// Forward a DAE RADIUS packet to the master.
}
+//
+// Forward a MPPP packet to the master for handling.
+//
+// (Note that this must be called with the tun header
+// as the start of the data).
+// (i.e. this routine writes to data[-8]).
+int master_forward_mppp_packet(sessionidt s, uint8_t *data, int size)
+{
+ uint8_t *p = data - (2 * sizeof(uint32_t));
+ uint8_t *psave = p;
+
+ if (!config->cluster_master_address) // No election has been held yet. Just skip it.
+ return -1;
+
+ LOG(4, 0, 0, "Forward MPPP packet to master (size %d)\n", size);
+
+ add_type(&p, C_MPPP_FORWARD, s, NULL, 0);
+
+ return peer_send_data(config->cluster_master_address, psave, size + (2 * sizeof(uint32_t)));
+
+}
+
//
// Send a chunk of data as a heartbeat..
// We save it in the history buffer as we do so.
// to become a master!!!
config->cluster_iam_master = 1;
+ pppoe_send_garp(); // gratuitous arp of the pppoe interface
LOG(0, 0, 0, "I am declaring myself the master!\n");
// Failed to compress : Fall through.
}
case C_SESSION:
- add_type(p, C_SESSION, id, (uint8_t *) &session[id], sizeof(sessiont));
+ add_type(p, C_SESSION, id, (uint8_t *) &session[id], sizeof(sessiont));
break;
case C_CBUNDLE: { // Compressed C_BUNDLE
// Failed to compress : Fall through.
}
case C_TUNNEL:
- add_type(p, C_TUNNEL, id, (uint8_t *) &tunnel[id], sizeof(tunnelt));
+ add_type(p, C_TUNNEL, id, (uint8_t *) &tunnel[id], sizeof(tunnelt));
break;
default:
LOG(0, 0, 0, "Found an invalid type in heart queue! (%d)\n", type);
exit(1);
}
- LOG(3, 0, 0, "Sending v%d heartbeat #%d, change #%" PRIu64 " with %d changes "
+ LOG(4, 0, 0, "Sending v%d heartbeat #%d, change #%" PRIu64 " with %d changes "
"(%d x-sess, %d x-bundles, %d x-tunnels, %d highsess, %d highbund, %d hightun, size %d)\n",
HB_VERSION, h.seq, h.table_version, config->cluster_num_changes,
count, bcount, tcount, config->cluster_highest_sessionid, config->cluster_highest_bundleid,
int i;
for (i = 0 ; i < config->cluster_num_changes ; ++i)
- if ( cluster_changes[i].id == id &&
- cluster_changes[i].type == type)
- return 0; // Already marked for change.
+ {
+ if ( cluster_changes[i].id == id && cluster_changes[i].type == type)
+ {
+ // Already marked for change, remove it
+ --config->cluster_num_changes;
+ memmove(&cluster_changes[i],
+ &cluster_changes[i+1],
+ (config->cluster_num_changes - i) * sizeof(cluster_changes[i]));
+ break;
+ }
+ }
- cluster_changes[i].type = type;
- cluster_changes[i].id = id;
+ cluster_changes[config->cluster_num_changes].type = type;
+ cluster_changes[config->cluster_num_changes].id = id;
++config->cluster_num_changes;
if (config->cluster_num_changes > MAX_CHANGES)
return 1;
}
-
// A particular session has been changed!
int cluster_send_session(int sid)
{
uint32_t tx_connect_speed;
uint32_t rx_connect_speed;
clockt timeout;
- uint32_t mrru;
- uint8_t mssf;
- epdist epdis;
- bundleidt bundle;
+ uint32_t mrru;
+ uint8_t mssf;
+ epdist epdis;
+ bundleidt bundle;
in_addr_t snoop_ip;
uint16_t snoop_port;
uint8_t walled_garden;
char reserved_3[11];
};
+struct oldsessionV7 {
+ sessionidt next; // next session in linked list
+ sessionidt far; // far end session ID
+ tunnelidt tunnel; // near end tunnel ID
+ uint8_t flags; // session flags: see SESSION_*
+ struct {
+ uint8_t phase; // PPP phase
+ uint8_t lcp:4; // LCP state
+ uint8_t ipcp:4; // IPCP state
+ uint8_t ipv6cp:4; // IPV6CP state
+ uint8_t ccp:4; // CCP state
+ } ppp;
+ uint16_t mru; // maximum receive unit
+ in_addr_t ip; // IP of session set by RADIUS response (host byte order).
+ int ip_pool_index; // index to IP pool
+ uint32_t unique_id; // unique session id
+ uint32_t magic; // ppp magic number
+ uint32_t pin, pout; // packet counts
+ uint32_t cin, cout; // byte counts
+ uint32_t cin_wrap, cout_wrap; // byte counter wrap count (RADIUS accounting giagawords)
+ uint32_t cin_delta, cout_delta; // byte count changes (for dump_session())
+ uint16_t throttle_in; // upstream throttle rate (kbps)
+ uint16_t throttle_out; // downstream throttle rate
+ uint8_t filter_in; // input filter index (to ip_filters[N-1]; 0 if none)
+ uint8_t filter_out; // output filter index
+ uint16_t snoop_port; // Interception destination port
+ in_addr_t snoop_ip; // Interception destination IP
+ clockt opened; // when started
+ clockt die; // being closed, when to finally free
+ uint32_t session_timeout; // Maximum session time in seconds
+ uint32_t idle_timeout; // Maximum idle time in seconds
+ time_t last_packet; // Last packet from the user (used for idle timeouts)
+ time_t last_data; // Last data packet to/from the user (used for idle timeouts)
+ in_addr_t dns1, dns2; // DNS servers
+ routet route[MAXROUTE]; // static routes
+ uint16_t tbf_in; // filter bucket for throttling in from the user.
+ uint16_t tbf_out; // filter bucket for throttling out to the user.
+ int random_vector_length;
+ uint8_t random_vector[MAXTEL];
+ char user[MAXUSER]; // user (needed in session for radius stop messages)
+ char called[MAXTEL]; // called number
+ char calling[MAXTEL]; // calling number
+ uint32_t tx_connect_speed;
+ uint32_t rx_connect_speed;
+ clockt timeout; // Session timeout
+ uint32_t mrru; // Multilink Max-Receive-Reconstructed-Unit
+ epdist epdis; // Multilink Endpoint Discriminator
+ bundleidt bundle; // Multilink Bundle Identifier
+ uint8_t mssf; // Multilink Short Sequence Number Header Format
+ uint8_t walled_garden; // is this session gardened?
+ uint8_t classlen; // class (needed for radius accounting messages)
+ char class[MAXCLASS];
+ uint8_t ipv6prefixlen; // IPv6 route prefix length
+ struct in6_addr ipv6route; // Static IPv6 route
+ sessionidt forwardtosession; // LNS id_session to forward
+ uint8_t src_hwaddr[ETH_ALEN]; // MAC addr source (for pppoe sessions 6 bytes)
+ char reserved[4]; // Space to expand structure without changing HB_VERSION
+};
+
+struct oldsessionV8 {
+ sessionidt next; // next session in linked list
+ sessionidt far; // far end session ID
+ tunnelidt tunnel; // near end tunnel ID
+ uint8_t flags; // session flags: see SESSION_*
+ struct {
+ uint8_t phase; // PPP phase
+ uint8_t lcp:4; // LCP state
+ uint8_t ipcp:4; // IPCP state
+ uint8_t ipv6cp:4; // IPV6CP state
+ uint8_t ccp:4; // CCP state
+ } ppp;
+ uint16_t mru; // maximum receive unit
+ in_addr_t ip; // IP of session set by RADIUS response (host byte order).
+ int ip_pool_index; // index to IP pool
+ uint32_t unique_id; // unique session id
+ uint32_t magic; // ppp magic number
+ uint32_t pin, pout; // packet counts
+ uint32_t cin, cout; // byte counts
+ uint32_t cin_wrap, cout_wrap; // byte counter wrap count (RADIUS accounting giagawords)
+ uint32_t cin_delta, cout_delta; // byte count changes (for dump_session())
+ uint16_t throttle_in; // upstream throttle rate (kbps)
+ uint16_t throttle_out; // downstream throttle rate
+ uint8_t filter_in; // input filter index (to ip_filters[N-1]; 0 if none)
+ uint8_t filter_out; // output filter index
+ uint16_t snoop_port; // Interception destination port
+ in_addr_t snoop_ip; // Interception destination IP
+ clockt opened; // when started
+ clockt die; // being closed, when to finally free
+ uint32_t session_timeout; // Maximum session time in seconds
+ uint32_t idle_timeout; // Maximum idle time in seconds
+ time_t last_packet; // Last packet from the user (used for idle timeouts)
+ time_t last_data; // Last data packet to/from the user (used for idle timeouts)
+ in_addr_t dns1, dns2; // DNS servers
+ routet route[MAXROUTE]; // static routes
+ uint16_t tbf_in; // filter bucket for throttling in from the user.
+ uint16_t tbf_out; // filter bucket for throttling out to the user.
+ int random_vector_length;
+ uint8_t random_vector[MAXTEL];
+ char user[MAXUSER]; // user (needed in session for radius stop messages)
+ char called[MAXTEL]; // called number
+ char calling[MAXTEL]; // calling number
+ uint32_t tx_connect_speed;
+ uint32_t rx_connect_speed;
+ clockt timeout; // Session timeout
+ uint32_t mrru; // Multilink Max-Receive-Reconstructed-Unit
+ epdist epdis; // Multilink Endpoint Discriminator
+ bundleidt bundle; // Multilink Bundle Identifier
+ uint8_t mssf; // Multilink Short Sequence Number Header Format
+ uint8_t walled_garden; // is this session gardened?
+ uint8_t classlen; // class (needed for radius accounting messages)
+ char class[MAXCLASS];
+ uint8_t ipv6prefixlen; // IPv6 route prefix length
+ struct in6_addr ipv6route; // Static IPv6 route
+ sessionidt forwardtosession; // LNS id_session to forward
+ uint8_t src_hwaddr[ETH_ALEN]; // MAC addr source (for pppoe sessions 6 bytes)
+ uint32_t dhcpv6_prefix_iaid; // prefix iaid requested by client
+ uint32_t dhcpv6_iana_iaid; // iaid of iana requested by client
+ struct in6_addr ipv6address; // Framed Ipv6 address
+ struct dhcp6_opt_clientid dhcpv6_client_id; // Size max (headers + DUID)
+ char reserved[4]; // Space to expand structure without changing HB_VERSION
+};
+
static uint8_t *convert_session(struct oldsession *old)
{
static sessiont new;
new.snoop_ip = old->snoop_ip;
new.snoop_port = old->snoop_port;
new.walled_garden = old->walled_garden;
- new.ipv6prefixlen = old->ipv6prefixlen;
- new.ipv6route = old->ipv6route;
+ new.route6[0].ipv6prefixlen = old->ipv6prefixlen;
+ new.route6[0].ipv6route = old->ipv6route;
memcpy(new.random_vector, old->random_vector, sizeof(new.random_vector));
memcpy(new.user, old->user, sizeof(new.user));
return (uint8_t *) &new;
}
+static uint8_t *convert_sessionV7(struct oldsessionV7 *old)
+{
+ static sessiont new;
+ int i;
+
+ memset(&new, 0, sizeof(new));
+
+ new.next = old->next;
+ new.far = old->far;
+ new.tunnel = old->tunnel;
+ new.flags = old->flags;
+ new.ppp.phase = old->ppp.phase;
+ new.ppp.lcp = old->ppp.lcp;
+ new.ppp.ipcp = old->ppp.ipcp;
+ new.ppp.ipv6cp = old->ppp.ipv6cp;
+ new.ppp.ccp = old->ppp.ccp;
+ new.mru = old->mru;
+ new.ip = old->ip;
+ new.ip_pool_index = old->ip_pool_index;
+ new.unique_id = old->unique_id;
+ new.magic = old->magic;
+ new.pin = old->pin;
+ new.pout = old->pout;
+ new.cin = old->cin;
+ new.cout = old->cout;
+ new.cin_wrap = old->cin_wrap;
+ new.cout_wrap = old->cout_wrap;
+ new.cin_delta = old->cin_delta;
+ new.cout_delta = old->cout_delta;
+ new.throttle_in = old->throttle_in;
+ new.throttle_out = old->throttle_out;
+ new.filter_in = old->filter_in;
+ new.filter_out = old->filter_out;
+ new.snoop_port = old->snoop_port;
+ new.snoop_ip = old->snoop_ip;
+ new.opened = old->opened;
+ new.die = old->die;
+ new.session_timeout = old->session_timeout;
+ new.idle_timeout = old->idle_timeout;
+ new.last_packet = old->last_packet;
+ new.last_data = old->last_data;
+ new.dns1 = old->dns1;
+ new.dns2 = old->dns2;
+ for (i = 0; i < MAXROUTE; i++)
+ memcpy(&new.route[i], &old->route[i], sizeof(new.route[i]));
+ new.tbf_in = old->tbf_in;
+ new.tbf_out = old->tbf_out;
+ new.random_vector_length = old->random_vector_length;
+ memcpy(new.random_vector, old->random_vector, sizeof(new.random_vector));
+ memcpy(new.user, old->user, sizeof(new.user));
+ memcpy(new.called, old->called, sizeof(new.called));
+ memcpy(new.calling, old->calling, sizeof(new.calling));
+ new.tx_connect_speed = old->tx_connect_speed;
+ new.rx_connect_speed = old->rx_connect_speed;
+ new.timeout = old->timeout;
+ new.mrru = old->mrru;
+ new.epdis = old->epdis;
+ new.bundle = old->bundle;
+ new.mssf = old->mssf;
+ new.walled_garden = old->walled_garden;
+ new.classlen = old->classlen;
+ memcpy(new.class, old->class, sizeof(new.class));
+ new.route6[0].ipv6prefixlen = old->ipv6prefixlen;
+ new.route6[0].ipv6route = old->ipv6route;
+
+ new.forwardtosession = old->forwardtosession;
+ memcpy(new.src_hwaddr, old->src_hwaddr, sizeof(new.src_hwaddr));
+
+ return (uint8_t *) &new;
+}
+
+static uint8_t *convert_sessionV8(struct oldsessionV8 *old)
+{
+ static sessiont new;
+ int i;
+
+ memset(&new, 0, sizeof(new));
+
+ new.next = old->next;
+ new.far = old->far;
+ new.tunnel = old->tunnel;
+ new.flags = old->flags;
+ new.ppp.phase = old->ppp.phase;
+ new.ppp.lcp = old->ppp.lcp;
+ new.ppp.ipcp = old->ppp.ipcp;
+ new.ppp.ipv6cp = old->ppp.ipv6cp;
+ new.ppp.ccp = old->ppp.ccp;
+ new.mru = old->mru;
+ new.ip = old->ip;
+ new.ip_pool_index = old->ip_pool_index;
+ new.unique_id = old->unique_id;
+ new.magic = old->magic;
+ new.pin = old->pin;
+ new.pout = old->pout;
+ new.cin = old->cin;
+ new.cout = old->cout;
+ new.cin_wrap = old->cin_wrap;
+ new.cout_wrap = old->cout_wrap;
+ new.cin_delta = old->cin_delta;
+ new.cout_delta = old->cout_delta;
+ new.throttle_in = old->throttle_in;
+ new.throttle_out = old->throttle_out;
+ new.filter_in = old->filter_in;
+ new.filter_out = old->filter_out;
+ new.snoop_port = old->snoop_port;
+ new.snoop_ip = old->snoop_ip;
+ new.opened = old->opened;
+ new.die = old->die;
+ new.session_timeout = old->session_timeout;
+ new.idle_timeout = old->idle_timeout;
+ new.last_packet = old->last_packet;
+ new.last_data = old->last_data;
+ new.dns1 = old->dns1;
+ new.dns2 = old->dns2;
+ for (i = 0; i < MAXROUTE; i++)
+ memcpy(&new.route[i], &old->route[i], sizeof(new.route[i]));
+ new.tbf_in = old->tbf_in;
+ new.tbf_out = old->tbf_out;
+ new.random_vector_length = old->random_vector_length;
+ memcpy(new.random_vector, old->random_vector, sizeof(new.random_vector));
+ memcpy(new.user, old->user, sizeof(new.user));
+ memcpy(new.called, old->called, sizeof(new.called));
+ memcpy(new.calling, old->calling, sizeof(new.calling));
+ new.tx_connect_speed = old->tx_connect_speed;
+ new.rx_connect_speed = old->rx_connect_speed;
+ new.timeout = old->timeout;
+ new.mrru = old->mrru;
+ new.epdis = old->epdis;
+ new.bundle = old->bundle;
+ new.mssf = old->mssf;
+ new.walled_garden = old->walled_garden;
+ new.classlen = old->classlen;
+ memcpy(new.class, old->class, sizeof(new.class));
+ new.route6[0].ipv6prefixlen = old->ipv6prefixlen;
+ new.route6[0].ipv6route = old->ipv6route;
+
+ new.forwardtosession = old->forwardtosession;
+ memcpy(new.src_hwaddr, old->src_hwaddr, sizeof(new.src_hwaddr));
+ new.dhcpv6_prefix_iaid = old->dhcpv6_prefix_iaid;
+ new.dhcpv6_iana_iaid = old->dhcpv6_iana_iaid;
+ new.ipv6address = old->ipv6address;
+ new.dhcpv6_client_id = old->dhcpv6_client_id;
+
+ return (uint8_t *) &new;
+}
+
//
// Process a heartbeat..
//
// v6: added RADIUS class attribute, re-ordered session structure
+// v7: added tunnelt attribute at the end of struct (tunnelt size change)
static int cluster_process_heartbeat(uint8_t *data, int size, int more, uint8_t *p, in_addr_t addr)
{
heartt *h;
int i, type;
int hb_ver = more;
-#if HB_VERSION != 6
+#if HB_VERSION != 9
# error "need to update cluster_process_heartbeat()"
#endif
- // we handle versions 5 through 6
+ // we handle versions 5 through 8
if (hb_ver < 5 || hb_ver > HB_VERSION) {
LOG(0, 0, 0, "Received a heartbeat version that I don't support (%d)!\n", hb_ver);
return -1; // Ignore it??
break;
}
- if (size != sizeof(sessiont) ) { // Ouch! Very very bad!
- LOG(0, 0, 0, "DANGER: Received a CSESSION that didn't decompress correctly!\n");
- // Now what? Should exit! No-longer up to date!
- break;
+ if (size != sizeof(sessiont)) { // Ouch! Very very bad!
+ if ((hb_ver < HB_VERSION) && (size < sizeof(sessiont)))
+ {
+ if ((hb_ver == 7) && (size == sizeof(struct oldsessionV7)))
+ cluster_recv_session(more, convert_sessionV7((struct oldsessionV7 *) c));
+ else if (size == sizeof(struct oldsessionV8))
+ cluster_recv_session(more, convert_sessionV8((struct oldsessionV8 *) c));
+ else
+ LOG(0, 0, 0, "DANGER: Received a CSESSION version=%d that didn't decompress correctly!\n", hb_ver);
+ break;
+ }
+ else
+ {
+ LOG(0, 0, 0, "DANGER: Received a CSESSION that didn't decompress correctly!\n");
+ // Now what? Should exit! No-longer up to date!
+ break;
+ }
}
cluster_recv_session(more, c);
size = rle_decompress((uint8_t **) &p, s, c, sizeof(c));
s -= (p - orig_p);
- if (size != sizeof(tunnelt) ) { // Ouch! Very very bad!
+ if ( ((hb_ver >= HB_VERSION) && (size != sizeof(tunnelt))) ||
+ ((hb_ver < HB_VERSION) && (size > sizeof(tunnelt))) )
+ { // Ouch! Very very bad!
LOG(0, 0, 0, "DANGER: Received a CTUNNEL that didn't decompress correctly!\n");
// Now what? Should exit! No-longer up to date!
break;
else
{
struct sockaddr_in a;
+ uint16_t indexudp;
a.sin_addr.s_addr = more;
- a.sin_port = *(int *) p;
+ a.sin_port = (*(int *) p) & 0xFFFF;
+ indexudp = ((*(int *) p) >> 16) & 0xFFFF;
s -= sizeof(int);
p += sizeof(int);
processdae(p, s, &a, sizeof(a), &local);
}
else
- processudp(p, s, &a);
+ processudp(p, s, &a, indexudp);
return 0;
}
+ case C_PPPOE_FORWARD:
+ if (!config->cluster_iam_master)
+ {
+ LOG(0, 0, 0, "I'm not the master, but I got a C_PPPOE_FORWARD from %s?\n", fmtaddr(addr, 0));
+ return -1;
+ }
+ else
+ {
+ pppoe_process_forward(p, s, addr);
+ return 0;
+ }
+
+ case C_MPPP_FORWARD:
+ // Receive a MPPP packet from a slave.
+ if (!config->cluster_iam_master) {
+ LOG(0, 0, 0, "I'm not the master, but I got a C_MPPP_FORWARD from %s?\n", fmtaddr(addr, 0));
+ return -1;
+ }
+
+ processipout(p, s);
+ return 0;
case C_THROTTLE: { // Receive a forwarded packet from a slave.
if (!config->cluster_iam_master) {
//====================================================================================================
-int cmd_show_cluster(struct cli_def *cli, char *command, char **argv, int argc)
+int cmd_show_cluster(struct cli_def *cli, const char *command, char **argv, int argc)
{
int i;
cli_print(cli, "My address : %s", fmtaddr(my_address, 0));
cli_print(cli, "VIP address : %s", fmtaddr(config->bind_address, 0));
cli_print(cli, "Multicast address: %s", fmtaddr(config->cluster_address, 0));
+ cli_print(cli, "UDP port : %u", config->cluster_port);
cli_print(cli, "Multicast i'face : %s", config->cluster_interface);
if (!config->cluster_iam_master) {