Kannel: Open Source WAP and SMS gateway  svn-r5335
gw-prioqueue.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2018 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
57 /*
58  * gw-prioqueue.c - generic priority queue with guaranteed order.
59  *
60  * Algorithm ala Robert Sedgewick.
61  *
62  * Alexander Malysh <amalysh at kannel.org>, 2004, 2008
63  */
64 
65 #include "gw-config.h"
66 #include <pthread.h>
67 #include "thread.h"
68 #include "gwmem.h"
69 #include "gwassert.h"
70 #include "gwthread.h"
71 #include "gw-prioqueue.h"
72 
73 
74 struct element {
75  void *item;
76  long long seq;
77 };
78 
79 struct gw_prioqueue {
81  struct element **tab;
82  size_t size;
83  long len;
84  long producers;
85  long long seq;
86  pthread_cond_t nonempty;
87  int (*cmp)(const void*, const void *);
88 };
89 
90 
91 static void inline queue_lock(gw_prioqueue_t *queue)
92 {
93  mutex_lock(queue->mutex);
94 }
95 
96 
97 static void inline queue_unlock(gw_prioqueue_t *queue)
98 {
99  mutex_unlock(queue->mutex);
100 }
101 
102 
103 static void make_bigger(gw_prioqueue_t *queue, long items)
104 {
105  size_t size = queue->size;
106  size_t new_size = sizeof(*queue->tab) * (queue->len + items);
107 
108  if (size >= new_size)
109  return;
110 
111  queue->tab = gw_realloc(queue->tab, new_size);
112  queue->size = new_size;
113 }
114 
115 
116 static int compare(struct element *a, struct element *b, int(*cmp)(const void*, const void *))
117 {
118  int rc;
119 
120  rc = cmp(a->item, b->item);
121  if (rc == 0) {
122  /* check sequence to guarantee order */
123  if (a->seq < b->seq)
124  rc = 1;
125  else if (a->seq > b->seq)
126  rc = -1;
127  }
128 
129  return rc;
130 }
131 
132 
138 static void upheap(gw_prioqueue_t *queue, register long index)
139 {
140  struct element *v = queue->tab[index];
141  while (queue->tab[index / 2]->item != NULL && compare(queue->tab[index / 2], v, queue->cmp) < 0) {
142  queue->tab[index] = queue->tab[index / 2];
143  index /= 2;
144  }
145  queue->tab[index] = v;
146 }
147 
148 
154 static void downheap(gw_prioqueue_t *queue, register long index)
155 {
156  struct element *v = queue->tab[index];
157  register long j;
158 
159  while (index <= queue->len / 2) {
160  j = 2 * index;
161  /* take the biggest child item */
162  if (j < queue->len && compare(queue->tab[j], queue->tab[j + 1], queue->cmp) < 0)
163  j++;
164  /* break if our item bigger */
165  if (compare(v, queue->tab[j], queue->cmp) >= 0)
166  break;
167  queue->tab[index] = queue->tab[j];
168  index = j;
169  }
170  queue->tab[index] = v;
171 }
172 
173 
174 gw_prioqueue_t *gw_prioqueue_create(int(*cmp)(const void*, const void *))
175 {
176  gw_prioqueue_t *ret;
177 
178  gw_assert(cmp != NULL);
179 
180  ret = gw_malloc(sizeof(*ret));
181  ret->producers = 0;
182  pthread_cond_init(&ret->nonempty, NULL);
183  ret->mutex = mutex_create();
184  ret->tab = NULL;
185  ret->size = 0;
186  ret->len = 0;
187  ret->seq = 0;
188  ret->cmp = cmp;
189 
190  /* put NULL item at pos 0 that is our stop marker */
191  make_bigger(ret, 1);
192  ret->tab[0] = gw_malloc(sizeof(**ret->tab));
193  ret->tab[0]->item = NULL;
194  ret->tab[0]->seq = ret->seq++;
195  ret->len++;
196 
197  return ret;
198 }
199 
200 
202 {
203  long i;
204 
205  if (queue == NULL)
206  return;
207 
208  for (i = 0; i < queue->len; i++) {
209  if (item_destroy != NULL && queue->tab[i]->item != NULL)
210  item_destroy(queue->tab[i]->item);
211  gw_free(queue->tab[i]);
212  }
213  mutex_destroy(queue->mutex);
214  pthread_cond_destroy(&queue->nonempty);
215  gw_free(queue->tab);
216  gw_free(queue);
217 }
218 
219 
221 {
222  long len;
223 
224  if (queue == NULL)
225  return 0;
226 
227  queue_lock(queue);
228  len = queue->len - 1;
230 
231  return len;
232 }
233 
234 
236 {
237  gw_assert(queue != NULL);
238  gw_assert(item != NULL);
239 
240  queue_lock(queue);
241  make_bigger(queue, 1);
242  queue->tab[queue->len] = gw_malloc(sizeof(**queue->tab));
243  queue->tab[queue->len]->item = item;
244  queue->tab[queue->len]->seq = queue->seq++;
245  upheap(queue, queue->len);
246  queue->len++;
247  pthread_cond_signal(&queue->nonempty);
249 }
250 
251 
252 void gw_prioqueue_foreach(gw_prioqueue_t *queue, void(*fn)(const void *, long))
253 {
254  register long i;
255 
256  gw_assert(queue != NULL && fn != NULL);
257 
258  queue_lock(queue);
259  for (i = 1; i < queue->len; i++)
260  fn(queue->tab[i]->item, i - 1);
262 }
263 
264 
266 {
267  void *ret;
268 
269  gw_assert(queue != NULL);
270 
271  queue_lock(queue);
272  if (queue->len <= 1) {
274  return NULL;
275  }
276  ret = queue->tab[1]->item;
277  gw_free(queue->tab[1]);
278  queue->tab[1] = queue->tab[--queue->len];
279  downheap(queue, 1);
281 
282  return ret;
283 }
284 
285 
287 {
288  void *ret;
289 
290  gw_assert(queue != NULL);
291 
292  queue_lock(queue);
293  if (queue->len > 1)
294  ret = queue->tab[1]->item;
295  else
296  ret = NULL;
298 
299  return ret;
300 }
301 
302 
304 {
305  void *ret;
306 
307  gw_assert(queue != NULL);
308 
309  queue_lock(queue);
310  while (queue->len == 1 && queue->producers > 0) {
311  queue->mutex->owner = -1;
312  pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, &queue->mutex->mutex);
313  pthread_cond_wait(&queue->nonempty, &queue->mutex->mutex);
314  pthread_cleanup_pop(0);
315  queue->mutex->owner = gwthread_self();
316  }
317  if (queue->len > 1) {
318  ret = queue->tab[1]->item;
319  gw_free(queue->tab[1]);
320  queue->tab[1] = queue->tab[--queue->len];
321  downheap(queue, 1);
322  } else {
323  ret = NULL;
324  }
326 
327  return ret;
328 }
329 
330 
332 {
333  gw_assert(queue != NULL);
334 
335  queue_lock(queue);
336  queue->producers++;
338 }
339 
340 
342 {
343  gw_assert(queue != NULL);
344 
345  queue_lock(queue);
346  gw_assert(queue->producers > 0);
347  queue->producers--;
348  pthread_cond_broadcast(&queue->nonempty);
350 }
351 
352 
354 {
355  long ret;
356 
357  gw_assert(queue != NULL);
358 
359  queue_lock(queue);
360  ret = queue->producers;
362 
363  return ret;
364 }
365 
int size
Definition: wsasm.c:84
long gwthread_self(void)
struct element ** tab
Definition: gw-prioqueue.c:81
gw_assert(wtls_machine->packet_to_send !=NULL)
int(* cmp)(const void *, const void *)
Definition: gw-prioqueue.c:87
#define mutex_unlock(m)
Definition: thread.h:136
static void make_bigger(gw_prioqueue_t *queue, long items)
Definition: gw-prioqueue.c:103
static void downheap(gw_prioqueue_t *queue, register long index)
Definition: gw-prioqueue.c:154
#define mutex_create()
Definition: thread.h:96
void gw_prioqueue_foreach(gw_prioqueue_t *queue, void(*fn)(const void *, long))
Definition: gw-prioqueue.c:252
void * gw_prioqueue_consume(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:303
void * gw_prioqueue_get(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:286
void gw_prioqueue_insert(gw_prioqueue_t *queue, void *item)
Definition: gw-prioqueue.c:235
void * item
Definition: gw-prioqueue.c:75
static void queue_lock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:91
long len
Definition: list.c:107
pthread_cond_t nonempty
Definition: gw-prioqueue.c:86
Mutex * mutex
Definition: gw-prioqueue.c:80
void * gw_prioqueue_remove(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:265
long gw_prioqueue_len(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:220
gw_prioqueue_t * gw_prioqueue_create(int(*cmp)(const void *, const void *))
Definition: gw-prioqueue.c:174
void mutex_destroy(Mutex *mutex)
Definition: thread.c:97
long long seq
Definition: gw-prioqueue.c:85
void gw_prioqueue_destroy(gw_prioqueue_t *queue, void(*item_destroy)(void *))
Definition: gw-prioqueue.c:201
pthread_cond_t nonempty
Definition: list.c:110
long long seq
Definition: gw-prioqueue.c:76
void gw_prioqueue_remove_producer(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:341
static int compare(struct element *a, struct element *b, int(*cmp)(const void *, const void *))
Definition: gw-prioqueue.c:116
static void upheap(gw_prioqueue_t *queue, register long index)
Definition: gw-prioqueue.c:138
Definition: thread.h:76
static List * queue
Definition: wap-appl.c:123
void ** tab
Definition: list.c:104
static void item_destroy(void *item)
Definition: dict.c:91
#define mutex_lock(m)
Definition: thread.h:130
static void queue_unlock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:97
long gw_prioqueue_producer_count(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:353
void gw_prioqueue_add_producer(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:331
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.