Fix cluster group update
[l2tpns.git] / tbf.c
1 // L2TPNS: token bucket filters
2
3 #include <string.h>
4 #include <sys/socket.h>
5 #include <linux/rtnetlink.h>
6
7 #include "l2tpns.h"
8 #include "util.h"
9 #include "tbf.h"
10
11 tbft *filter_list = NULL;
12 static int filter_list_size = 0;
13
14 static int timer_chain = -1; // Head of timer chain.
15
16 static void tbf_run_queue(int tbf_id);
17
18 void init_tbf(int num_tbfs)
19 {
20 if (!(filter_list = shared_malloc(sizeof(*filter_list) * num_tbfs)))
21 return;
22
23 filter_list_size = num_tbfs;
24 filter_list[0].sid = -1; // Reserved.
25 }
26 //
27 // Put a TBF on the timer list.
28 // This is a doubly linked list..
29 // We put ourselves on the tail of the list.
30 //
31 static void add_to_timer(int id)
32 {
33 if (!filter_list)
34 return;
35
36 if (timer_chain == -1) {
37 filter_list[id].next = filter_list[id].prev = id;
38 timer_chain = id;
39 return;
40 }
41
42 filter_list[id].next = timer_chain;
43 filter_list[id].prev = filter_list[timer_chain].prev;
44 filter_list[filter_list[timer_chain].prev].next = id;
45 filter_list[timer_chain].prev = id;
46 }
47
48 //
49 // Remove a TBF from the timer list.
50 // This is a doubly linked list.
51 static void del_from_timer(int id)
52 {
53 if (!filter_list)
54 return;
55
56 if (filter_list[id].next == id) { // Last element in chain?
57 if (timer_chain != id) { // WTF?
58 LOG(0, 0, 0, "Removed a singleton element from TBF, but tc didn't point to it!\n");
59 } else
60 timer_chain = -1;
61 filter_list[id].next = filter_list[id].prev = 0;
62 return;
63 }
64
65 filter_list[filter_list[id].next].prev = filter_list[id].prev;
66 filter_list[filter_list[id].prev].next = filter_list[id].next;
67 if (timer_chain == id)
68 timer_chain = filter_list[id].next;
69
70 filter_list[id].next = filter_list[id].prev = 0; // Mark as off the timer chain.
71 }
72
73 //
74 // Free a token bucket filter structure for re-use.
75 //
76
77 int free_tbf(int tid)
78 {
79 if (tid < 1) // Make sure we don't free id # 0
80 return -1;
81
82 if (!filter_list) // WTF?
83 return -1;
84
85 if (filter_list[tid].next)
86 del_from_timer(tid);
87 filter_list[tid].sid = 0;
88
89 return 0; // Done!
90 }
91
92 //
93 // Allocate a new token bucket filter.
94 //
95 int new_tbf(int sid, int max_credit, int rate, void (*f)(sessionidt, uint8_t *, int))
96 {
97 int i;
98 static int p = 0;
99
100 LOG(4, 0, 0, "Allocating new TBF (sess %d, rate %d, helper %p)\n", sid, rate, f);
101
102 if (!filter_list)
103 return 0; // Couldn't alloc memory!
104
105 for (i = 0 ; i < filter_list_size ; ++i, p = (p+1)%filter_list_size ) {
106 if (filter_list[p].sid)
107 continue;
108
109 memset((void*) &filter_list[p], 0, sizeof(filter_list[p]) ); // Clear counters and data.
110 filter_list[p].sid = sid;
111 filter_list[p].credit = max_credit;
112 filter_list[p].queued = 0;
113 filter_list[p].max_credit = max_credit;
114 filter_list[p].rate = rate;
115 filter_list[p].oldest = 0;
116 filter_list[p].send = f;
117 return p;
118 }
119
120 LOG(0, 0, 0, "Ran out of token bucket filters! Sess %d will be un-throttled\n", sid);
121 return 0;
122 }
123
124 //
125 // Sanity check all the TBF records. This is
126 // typically done when we become a master..
127 //
128 void fsck_tbfs(void)
129 {
130 int i , sid;
131
132 if (!filter_list)
133 return;
134
135 for (i = 1; i < filter_list_size; ++i) {
136 if (!filter_list[i].sid) // Is it used??
137 continue;
138
139 sid = filter_list[i].sid;
140 if (i != session[sid].tbf_in &&
141 i != session[sid].tbf_out) { // Ooops.
142
143 free_tbf(i); // Mark it as free...
144 }
145 }
146
147 for (i = 0; i < config->cluster_highest_sessionid ; ++i) {
148 if (session[i].tbf_in && filter_list[session[i].tbf_in].sid != i) {
149 filter_list[session[i].tbf_in].sid = i; // Ouch!? FIXME. What to do here?
150 }
151 if (session[i].tbf_out && filter_list[session[i].tbf_out].sid != i) {
152 filter_list[session[i].tbf_out].sid = i; // Ouch!? FIXME. What to do here?
153 }
154 }
155 }
156
157
158 //
159 // Run a packet through a token bucket filter.
160 // If we can send it right away, we do. Else we
161 // try and queue it to send later. Else we drop it.
162 //
163 int tbf_queue_packet(int tbf_id, uint8_t *data, int size)
164 {
165 int i;
166 tbft *f;
167
168 if (!filter_list)
169 return -1;
170
171 if (tbf_id > filter_list_size || tbf_id < 1) { // Out of range ID??
172 // Very bad. Just drop it.
173 return -1;
174 }
175
176 f = &filter_list[tbf_id];
177
178 if (!f->sid) // Is this a real structure??
179 return -1;
180
181 tbf_run_queue(tbf_id); // Caculate credit and send any queued packets if possible..
182
183 f->b_queued += size;
184 f->p_queued ++;
185
186 if (!f->queued && f->credit > size) { // If the queue is empty, and we have
187 // enough credit, just send it now.
188 f->credit -= size;
189 if (f->send) {
190 f->send(f->sid, data, size);
191 f->b_sent += size;
192 f->p_sent ++;
193 } else {
194 f->b_dropped += size;
195 f->p_dropped ++;
196 }
197 return size;
198 }
199
200 // Not enough credit. Can we have room in the queue?
201 if (f->queued >= TBF_MAX_QUEUE) {
202 f->p_dropped ++;
203 f->b_dropped += size;
204 return -1; // No, just drop it.
205 }
206
207 // Is it too big to fit into a queue slot?
208 if (size >= TBF_MAX_SIZE) {
209 f->p_dropped ++;
210 f->b_dropped += size;
211 return -1; // Yes, just drop it.
212 }
213
214 // Ok. We have a slot, and it's big enough to
215 // contain the packet, so queue the packet!
216 i = ( f->oldest + f->queued ) % TBF_MAX_QUEUE;
217 memcpy(f->packets[i], data, size);
218
219 f->sizes[i] = size;
220 f->queued ++;
221 f->p_delayed ++;
222
223 if (!f->next) // Are we off the timer chain?
224 add_to_timer(tbf_id); // Put ourselves on the timer chain.
225
226 return 0; // All done.
227 }
228
229 //
230 // Send queued packets from the filter if possible.
231 // (We're normally only called if this is possible.. )
232 static void tbf_run_queue(int tbf_id)
233 {
234 tbft * f;
235
236 if (!filter_list)
237 return;
238
239 f = &filter_list[tbf_id];
240
241 // Calculate available credit...
242 f->credit += (TIME - f->lasttime) * f->rate / 10; // current time is 1/10th of a second.
243 if (f->credit > f->max_credit)
244 f->credit = f->max_credit;
245 f->lasttime = TIME;
246
247 while (f->queued > 0 && f->credit >= f->sizes[f->oldest]) { // While we have enough credit..
248
249 if (f->send) {
250 f->send(f->sid, f->packets[f->oldest], f->sizes[f->oldest]);
251 f->b_sent += f->sizes[f->oldest];
252 f->p_sent ++;
253 } else {
254 f->b_dropped += f->sizes[f->oldest];
255 f->p_dropped ++;
256 }
257
258 f->credit -= f->sizes[f->oldest];
259
260 f->oldest = (f->oldest + 1 ) % TBF_MAX_QUEUE;
261 f->queued--; // One less queued packet..
262 }
263
264 if (f->queued) // Still more to do. Hang around on the timer list.
265 return;
266
267 if (f->next) // Are we on the timer list??
268 del_from_timer(tbf_id); // Nothing more to do. Get off the timer list.
269 }
270
271 //
272 // Periodically walk the timer list..
273 //
274 int tbf_run_timer(void)
275 {
276 int i = timer_chain;
277 int count = filter_list_size + 1; // Safety check.
278 int last = -1;
279 int tbf_id; // structure being processed.
280
281 if (timer_chain < 0)
282 return 0; // Nothing to do...
283
284 if (!filter_list) // No structures built yet.
285 return 0;
286
287 last = filter_list[i].prev; // last element to process.
288
289 do {
290 tbf_id = i;
291 i = filter_list[i].next; // Get the next in the queue.
292
293 tbf_run_queue(tbf_id); // Run the timer queue..
294 } while ( timer_chain > 0 && i && tbf_id != last && --count > 0);
295
296
297 #if 0 // Debugging.
298 for (i = 0; i < filter_list_size; ++i) {
299 if (!filter_list[i].next)
300 continue;
301 if (filter_list[i].lasttime == TIME) // Did we just run it?
302 continue;
303
304 LOG(1, 0, 0, "Missed tbf %d! Not on the timer chain?(n %d, p %d, tc %d)\n", i,
305 filter_list[i].next, filter_list[i].prev, timer_chain);
306 tbf_run_queue(i);
307 }
308 #endif
309
310 return 1;
311 }
312
313 int cmd_show_tbf(struct cli_def *cli, char *command, char **argv, int argc)
314 {
315 int i;
316 int count = 0;
317
318 if (CLI_HELP_REQUESTED)
319 return CLI_HELP_NO_ARGS;
320
321 if (!config->cluster_iam_master) {
322 cli_error(cli, "Can't do this on a slave. Do it on %s",
323 fmtaddr(config->cluster_master_address, 0));
324
325 return CLI_OK;
326 }
327
328 if (!filter_list)
329 return CLI_OK;
330
331 cli_print(cli,"%6s %5s %5s %6s %6s | %7s %7s %8s %8s %8s %8s", "TBF#", "Sid", "Rate", "Credit", "Queued",
332 "ByteIn","PackIn","ByteSent","PackSent", "PackDrop", "PackDelay");
333
334 for (i = 1; i < filter_list_size; ++i) {
335 if (!filter_list[i].sid) // Is it used?
336 continue; // No.
337
338 cli_print(cli, "%5d%1s %5d %5d %6d %6d | %7d %7d %8d %8d %8d %8d",
339 i, (filter_list[i].next ? "*" : " "),
340 filter_list[i].sid,
341 filter_list[i].rate * 8,
342 filter_list[i].credit,
343 filter_list[i].queued,
344
345 filter_list[i].b_queued,
346 filter_list[i].p_queued,
347 filter_list[i].b_sent,
348 filter_list[i].p_sent,
349 filter_list[i].p_dropped,
350 filter_list[i].p_delayed);
351 ++count;
352 }
353 cli_print(cli, "%d tbf entries used, %d total", count, filter_list_size);
354 return CLI_OK;
355 }