Kannel: Open Source WAP and SMS gateway  svn-r5335
dlr_cass.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2018 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
57 /*
58  * dlr_cass.c
59  *
60  * Implementation of handling delivery reports (DLRs)
61  * for Cassandra NoSQL database
62  *
63  * Stipe Tolj <stolj at kannel.org, 01.10.2015
64 */
65 
66 #include "gwlib/gwlib.h"
67 #include "gwlib/dbpool.h"
68 #include "dlr_p.h"
69 
70 
71 #ifdef HAVE_CASS
72 
73 /*
74  * Our connection pool to mysql.
75  */
76 static DBPool *pool = NULL;
77 
78 /*
79  * Database fields, which we are use.
80  */
81 static struct dlr_db_fields *fields = NULL;
82 
83 
84 static void dlr_cass_shutdown()
85 {
86  dbpool_destroy(pool);
87  dlr_db_fields_destroy(fields);
88 }
89 
90 static void dlr_cass_add(struct dlr_entry *entry)
91 {
92  Octstr *sql, *os_mask;
93  DBPoolConn *pconn;
94  List *binds = gwlist_create();
95  int res;
96 
97  debug("dlr.cass", 0, "adding DLR entry into database");
98 
99  pconn = dbpool_conn_consume(pool);
100  /* just for sure */
101  if (pconn == NULL) {
102  dlr_entry_destroy(entry);
103  return;
104  }
105 
106  sql = octstr_format("INSERT INTO %S (%S, %S, %S, %S, %S, %S, %S, %S, %S) VALUES "
107  "(?, ?, ?, ?, ?, ?, ?, ?, 0) ",
108  fields->table, fields->field_smsc, fields->field_ts,
109  fields->field_src, fields->field_dst, fields->field_serv,
110  fields->field_url, fields->field_mask, fields->field_boxc,
111  fields->field_status);
112  os_mask = octstr_format("%d", entry->mask);
113  gwlist_append(binds, entry->smsc);
114  gwlist_append(binds, entry->timestamp);
115  gwlist_append(binds, entry->source);
116  gwlist_append(binds, entry->destination);
117  gwlist_append(binds, entry->service);
118  gwlist_append(binds, entry->url);
119  gwlist_append(binds, os_mask);
120  gwlist_append(binds, entry->boxc_id);
121 
122  /* add TTL value */
123  if (fields->ttl) {
124  octstr_format_append(sql, "USING TTL %ld;", fields->ttl);
125  } else {
126  /* final semicolon */
127  octstr_append_char(sql, ';');
128  }
129 
130 
131 #if defined(DLR_TRACE)
132  debug("dlr.cass", 0, "cql: %s", octstr_get_cstr(sql));
133 #endif
134  if ((res = dbpool_conn_update(pconn, sql, binds)) == -1)
135  error(0, "DLR: Cassandra: Error while adding dlr entry for DST<%s>", octstr_get_cstr(entry->destination));
136 
137  dbpool_conn_produce(pconn);
138  octstr_destroy(sql);
139  gwlist_destroy(binds, NULL);
140  octstr_destroy(os_mask);
141  dlr_entry_destroy(entry);
142 }
143 
144 static struct dlr_entry* dlr_cass_get(const Octstr *smsc, const Octstr *ts, const Octstr *dst)
145 {
146  Octstr *sql;
147  DBPoolConn *pconn;
148  List *result = NULL, *row;
149  struct dlr_entry *res = NULL;
150  List *binds = gwlist_create();
151 
152  pconn = dbpool_conn_consume(pool);
153  if (pconn == NULL) /* sanity check */
154  return NULL;
155 
156  sql = octstr_format("SELECT %S, %S, %S, %S, %S, %S FROM %S WHERE %S=? AND %S=? LIMIT 1;",
157  fields->field_mask, fields->field_serv,
158  fields->field_url, fields->field_src,
159  fields->field_dst, fields->field_boxc,
160  fields->table, fields->field_smsc,
161  fields->field_ts);
162 
163  gwlist_append(binds, (Octstr*) smsc);
164  gwlist_append(binds, (Octstr*) ts);
165 
166 #if defined(DLR_TRACE)
167  debug("dlr.cass", 0, "cql: %s", octstr_get_cstr(sql));
168 #endif
169 
170  if (dbpool_conn_select(pconn, sql, binds, &result) != 0) {
171  octstr_destroy(sql);
172  gwlist_destroy(binds, NULL);
173  dbpool_conn_produce(pconn);
174  return NULL;
175  }
176  octstr_destroy(sql);
177  gwlist_destroy(binds, NULL);
178  dbpool_conn_produce(pconn);
179 
180 #define LO2CSTR(r, i) octstr_get_cstr(gwlist_get(r, i))
181 
182  if (gwlist_len(result) > 0) {
183  row = gwlist_extract_first(result);
184  res = dlr_entry_create();
185  gw_assert(res != NULL);
186  res->mask = atoi(LO2CSTR(row,0));
187  res->service = octstr_create(LO2CSTR(row, 1));
188  res->url = octstr_create(LO2CSTR(row,2));
189  res->source = octstr_create(LO2CSTR(row, 3));
190  res->destination = octstr_create(LO2CSTR(row, 4));
191  res->boxc_id = octstr_create(LO2CSTR(row, 5));
193  res->smsc = octstr_duplicate(smsc);
194  }
195  gwlist_destroy(result, NULL);
196 
197 #undef LO2CSTR
198 
199  return res;
200 }
201 
202 static void dlr_cass_remove(const Octstr *smsc, const Octstr *ts, const Octstr *dst)
203 {
204  Octstr *sql;
205  DBPoolConn *pconn;
206  List *binds = gwlist_create();
207  int res;
208 
209  debug("dlr.cass", 0, "removing DLR from database");
210 
211  pconn = dbpool_conn_consume(pool);
212  /* just for sure */
213  if (pconn == NULL)
214  return;
215 
216  sql = octstr_format("DELETE FROM %S WHERE %S=? AND %S=?;",
217  fields->table, fields->field_smsc,
218  fields->field_ts);
219 
220  gwlist_append(binds, (Octstr*) smsc);
221  gwlist_append(binds, (Octstr*) ts);
222 
223 #if defined(DLR_TRACE)
224  debug("dlr.cass", 0, "cql: %s", octstr_get_cstr(sql));
225 #endif
226 
227  if ((res = dbpool_conn_update(pconn, sql, binds)) == -1)
228  error(0, "DLR: Cassandra: Error while removing dlr entry for DST<%s>", octstr_get_cstr(dst));
229 
230  dbpool_conn_produce(pconn);
231  gwlist_destroy(binds, NULL);
232  octstr_destroy(sql);
233 }
234 
235 static void dlr_cass_update(const Octstr *smsc, const Octstr *ts, const Octstr *dst, int status)
236 {
237  Octstr *sql, *os_status;
238  DBPoolConn *pconn;
239  List *binds = gwlist_create();
240  int res;
241 
242  debug("dlr.cass", 0, "updating DLR status in database");
243 
244  pconn = dbpool_conn_consume(pool);
245  /* just for sure */
246  if (pconn == NULL)
247  return;
248 
249  sql = octstr_format("UPDATE %S SET %S=? WHERE %S=? AND %S=? LIMIT 1;",
250  fields->table, fields->field_status,
251  fields->field_smsc, fields->field_ts);
252 
253  os_status = octstr_format("%d", status);
254  gwlist_append(binds, (Octstr*) os_status);
255  gwlist_append(binds, (Octstr*) smsc);
256  gwlist_append(binds, (Octstr*) ts);
257 
258 #if defined(DLR_TRACE)
259  debug("dlr.cass", 0, "cql: %s", octstr_get_cstr(sql));
260 #endif
261  if ((res = dbpool_conn_update(pconn, sql, binds)) == -1)
262  error(0, "DLR: Cassandra: Error while updating dlr entry for DST<%s>", octstr_get_cstr(dst));
263 
264  dbpool_conn_produce(pconn);
265  gwlist_destroy(binds, NULL);
266  octstr_destroy(os_status);
267  octstr_destroy(sql);
268 }
269 
270 static long dlr_cass_messages(void)
271 {
272  List *result, *row;
273  Octstr *sql;
274  DBPoolConn *conn;
275  long msgs = -1;
276 
277  conn = dbpool_conn_consume(pool);
278  if (conn == NULL)
279  return -1;
280 
281  sql = octstr_format("SELECT count(*) FROM %s", octstr_get_cstr(fields->table));
282 #if defined(DLR_TRACE)
283  debug("dlr.cass", 0, "cql: %s", octstr_get_cstr(sql));
284 #endif
285 
286  if (dbpool_conn_select(conn, sql, NULL, &result) != 0) {
287  octstr_destroy(sql);
288  dbpool_conn_produce(conn);
289  return -1;
290  }
291  dbpool_conn_produce(conn);
292  octstr_destroy(sql);
293 
294  if (gwlist_len(result) > 0) {
295  row = gwlist_extract_first(result);
296  msgs = strtol(octstr_get_cstr(gwlist_get(row,0)), NULL, 10);
298  }
299  gwlist_destroy(result, NULL);
300 
301  return msgs;
302 }
303 
304 static void dlr_cass_flush(void)
305 {
306  Octstr *sql;
307  DBPoolConn *pconn;
308  int rows;
309 
310  pconn = dbpool_conn_consume(pool);
311  /* just for sure */
312  if (pconn == NULL)
313  return;
314 
315  sql = octstr_format("DELETE FROM %S", fields->table);
316 #if defined(DLR_TRACE)
317  debug("dlr.mysql", 0, "cql: %s", octstr_get_cstr(sql));
318 #endif
319  rows = dbpool_conn_update(pconn, sql, NULL);
320  if (rows == -1)
321  error(0, "DLR: Cassandra: Error while flushing dlr entries from database");
322  dbpool_conn_produce(pconn);
323  octstr_destroy(sql);
324 }
325 
326 static struct dlr_storage handles = {
327  .type = "cassandra",
328  .dlr_add = dlr_cass_add,
329  .dlr_get = dlr_cass_get,
330  .dlr_update = dlr_cass_update,
331  .dlr_remove = dlr_cass_remove,
332  .dlr_shutdown = dlr_cass_shutdown,
333  .dlr_messages = dlr_cass_messages,
334  .dlr_flush = dlr_cass_flush
335 };
336 
338 {
339  CfgGroup *grp;
340  List *grplist;
341  Octstr *host, *user, *pass, *db, *id;
342  long port = 0;
343  Octstr *p = NULL;
344  long pool_size;
345  DBConf *db_conf = NULL;
346 
347  /*
348  * check for all mandatory directives that specify the field names
349  * of the used Cassandra table
350  */
351  if (!(grp = cfg_get_single_group(cfg, octstr_imm("dlr-db"))))
352  panic(0, "DLR: Cassandra: group 'dlr-db' is not specified!");
353 
354  if (!(id = cfg_get(grp, octstr_imm("id"))))
355  panic(0, "DLR: Cassandra: directive 'id' is not specified!");
356 
357  fields = dlr_db_fields_create(grp);
358  gw_assert(fields != NULL);
359 
360  /*
361  * now grap the required information from the 'cassandra-connection' group
362  * with the id we just obtained
363  *
364  * we have to loop through all available Cassandra connection definitions
365  * and search for the one we are looking for
366  */
367 
368  grplist = cfg_get_multi_group(cfg, octstr_imm("cassandra-connection"));
369  while (grplist && (grp = gwlist_extract_first(grplist)) != NULL) {
370  p = cfg_get(grp, octstr_imm("id"));
371  if (p != NULL && octstr_compare(p, id) == 0) {
372  goto found;
373  }
374  if (p != NULL) octstr_destroy(p);
375  }
376  panic(0, "DLR: Cassandra: connection settings for id '%s' are not specified!",
377  octstr_get_cstr(id));
378 
379 found:
380  octstr_destroy(p);
381  gwlist_destroy(grplist, NULL);
382 
383  if (cfg_get_integer(&pool_size, grp, octstr_imm("max-connections")) == -1 || pool_size == 0)
384  pool_size = 1;
385 
386  if (!(host = cfg_get(grp, octstr_imm("host"))))
387  panic(0, "DLR: Cassandra: directive 'host' is not specified!");
388  /* optional(s) */
389  user = cfg_get(grp, octstr_imm("username"));
390  pass = cfg_get(grp, octstr_imm("password"));
391  db = cfg_get(grp, octstr_imm("database"));
392  cfg_get_integer(&port, grp, octstr_imm("port"));
393 
394  /*
395  * ok, ready to connect to Cassandra
396  */
397  db_conf = gw_malloc(sizeof(DBConf));
398  gw_assert(db_conf != NULL);
399 
400  db_conf->cass = gw_malloc(sizeof(CassConf));
401  gw_assert(db_conf->cass != NULL);
402 
403  db_conf->cass->host = host;
404  db_conf->cass->port = port;
405  db_conf->cass->username = user;
406  db_conf->cass->password = pass;
407  db_conf->cass->database = db;
408 
409  pool = dbpool_create(DBPOOL_CASS, db_conf, pool_size);
410  gw_assert(pool != NULL);
411 
412  /*
413  * XXX should a failing connect throw panic?!
414  */
415  if (dbpool_conn_count(pool) == 0)
416  panic(0,"DLR: Cassandra: database pool has no connections!");
417 
418  octstr_destroy(id);
419 
420  return &handles;
421 }
422 #else
423 /*
424  * Return NULL , so we point dlr-core that we were
425  * not compiled in.
426  */
428 {
429  return NULL;
430 }
431 #endif /* HAVE_CASS */
void error(int err, const char *fmt,...)
Definition: log.c:648
const char * type
Definition: dlr_p.h:112
Octstr * url
Definition: dlr_p.h:84
Definition: http.c:2014
long dbpool_conn_count(DBPool *p)
DBPool * dbpool_create(enum db_type db_type, DBConf *conf, unsigned int connections)
gw_assert(wtls_machine->packet_to_send !=NULL)
Octstr * database
Definition: dbpool.h:160
Octstr * field_boxc
Definition: dlr_p.h:160
void gwlist_append(List *list, void *item)
Definition: list.c:179
Octstr * service
Definition: dlr_p.h:83
void dlr_db_fields_destroy(struct dlr_db_fields *fields)
Definition: dlr.c:204
struct dlr_entry * dlr_entry_create(void)
Definition: dlr.c:103
long gwlist_len(List *list)
Definition: list.c:166
Octstr * boxc_id
Definition: dlr_p.h:85
void * gwlist_get(List *list, long pos)
Definition: list.c:292
void octstr_append_char(Octstr *ostr, int ch)
Definition: octstr.c:1517
static Octstr * host
Definition: fakesmsc.c:122
#define cfg_get(grp, varname)
Definition: cfg.h:86
Octstr * field_src
Definition: dlr_p.h:154
Octstr * field_url
Definition: dlr_p.h:157
static Cfg * cfg
Definition: opensmppbox.c:95
#define octstr_get_cstr(ostr)
Definition: octstr.h:233
static struct pid_list * found
Octstr * field_status
Definition: dlr_p.h:159
void dbpool_conn_produce(DBPoolConn *conn)
static struct dlr_storage * handles
Definition: dlr.c:97
static int port
Definition: fakesmsc.c:121
Octstr * table
Definition: dlr_p.h:150
Octstr * octstr_imm(const char *cstr)
Definition: octstr.c:283
Definition: cfg.c:164
void * gwlist_extract_first(List *list)
Definition: list.c:305
CassConf * cass
Definition: dbpool.h:173
Octstr * source
Definition: dlr_p.h:81
#define octstr_duplicate(ostr)
Definition: octstr.h:187
List * cfg_get_multi_group(Cfg *cfg, Octstr *name)
Definition: cfg.c:645
Octstr * timestamp
Definition: dlr_p.h:80
Octstr * field_serv
Definition: dlr_p.h:156
Octstr * octstr_format(const char *fmt,...)
Definition: octstr.c:2464
void octstr_destroy(Octstr *ostr)
Definition: octstr.c:324
#define octstr_create(cstr)
Definition: octstr.h:125
void octstr_destroy_item(void *os)
Definition: octstr.c:336
long port
Definition: dbpool.h:157
Definition: dbpool.h:164
Octstr * field_smsc
Definition: dlr_p.h:152
Octstr * destination
Definition: dlr_p.h:82
void dbpool_destroy(DBPool *p)
int dbpool_conn_update(DBPoolConn *conn, const Octstr *sql, List *binds)
Octstr * host
Definition: dbpool.h:156
Definition: octstr.c:118
struct dlr_storage * dlr_init_cass(Cfg *cfg)
Definition: dlr_cass.c:427
void dlr_entry_destroy(struct dlr_entry *dlr)
Definition: dlr.c:142
void debug(const char *place, int err, const char *fmt,...)
Definition: log.c:726
int cfg_get_integer(long *n, CfgGroup *grp, Octstr *varname)
Definition: cfg.c:742
#define panic
Definition: log.h:87
Definition: cfg.c:73
void octstr_format_append(Octstr *os, const char *fmt,...)
Definition: octstr.c:2507
int dbpool_conn_select(DBPoolConn *conn, const Octstr *sql, List *binds, List **result)
#define gwlist_create()
Definition: list.h:136
Definition: dlr_p.h:78
DBPoolConn * dbpool_conn_consume(DBPool *p)
Octstr * smsc
Definition: dlr_p.h:79
Octstr * password
Definition: dbpool.h:159
int mask
Definition: dlr_p.h:86
CfgGroup * cfg_get_single_group(Cfg *cfg, Octstr *name)
Definition: cfg.c:639
struct dlr_db_fields * dlr_db_fields_create(CfgGroup *grp)
Definition: dlr.c:169
Octstr * username
Definition: dbpool.h:158
Octstr * field_ts
Definition: dlr_p.h:153
Definition: list.c:102
Octstr * field_dst
Definition: dlr_p.h:155
Octstr * field_mask
Definition: dlr_p.h:158
long ttl
Definition: dlr_p.h:151
int octstr_compare(const Octstr *ostr1, const Octstr *ostr2)
Definition: octstr.c:871
void gwlist_destroy(List *list, gwlist_item_destructor_t *destructor)
Definition: list.c:145
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.