File Coverage

File:blib/lib/OpenSRF/Application/Persist.pm
Coverage:10.1%

linestmtbrancondsubpodtimecode
1package OpenSRF::Application::Persist;
2
1
1
1
7
3
8
use base qw/OpenSRF::Application/;
3
1
1
1
7
4
9
use OpenSRF::Application;
4
5
1
1
1
6
3
8
use OpenSRF::Utils::SettingsClient;
6
1
1
1
6
4
10
use OpenSRF::EX qw/:try/;
7
1
1
1
7
4
13
use OpenSRF::Utils qw/:common/;
8
1
1
1
8
4
6
use OpenSRF::Utils::Logger;
9
1
1
1
8
4
7
use OpenSRF::Utils::JSON;
10
1
1
1
13
5
11
use DBI;
11
12
1
1
1
8
3
11
use vars qw/$dbh $log $default_expire_time/;
13
14sub initialize {
15
0
0
        $log = 'OpenSRF::Utils::Logger';
16
17
0
        $sc = OpenSRF::Utils::SettingsClient->new;
18
19
0
        my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
20
0
        unless ($dbfile) {
21
0
                throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
22        }
23
24
0
        my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25
0
        $init_dbh->{AutoCommit} = 1;
26
0
        $init_dbh->{RaiseError} = 0;
27
28
0
        $init_dbh->do( <<" SQL" );
29                CREATE TABLE storage (
30                        id INTEGER PRIMARY KEY,
31                        name_id INTEGER,
32                        value TEXT
33                );
34        SQL
35
36
0
        $init_dbh->do( <<" SQL" );
37                CREATE TABLE store_name (
38                        id INTEGER PRIMARY KEY,
39                        name TEXT UNIQUE
40                );
41        SQL
42
43
0
        $init_dbh->do( <<" SQL" );
44                CREATE TABLE store_expire (
45                        id INTEGER PRIMARY KEY,
46                        atime INTEGER,
47                        expire_interval INTEGER
48                );
49        SQL
50
51}
52
53sub child_init {
54
0
0
        my $sc = OpenSRF::Utils::SettingsClient->new;
55
56
0
        $default_expire_time = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'default_expire_time' );
57
0
        $default_expire_time ||= 300;
58
59
0
        my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
60
0
        unless ($dbfile) {
61
0
                throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
62        }
63
64
0
        $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65
0
        $dbh->{AutoCommit} = 1;
66
0
        $dbh->{RaiseError} = 0;
67
68}
69
70sub create_store {
71
0
0
        my $self = shift;
72
0
        my $client = shift;
73
74
0
        my $name = shift || '';
75
76        try {
77
78
0
                my $continue = 0;
79                try {
80
0
                        _get_name_id($name);
81
82                } catch Error with {
83
0
                        $continue++;
84
0
                };
85
86
0
                throw OpenSRF::EX::WARN ("Duplicate key: object name [$name] already exists! " . $dbh->errstr)
87                        unless ($continue);
88
89
0
                my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
90
0
                $sth->execute($name);
91
0
                $sth->finish;
92
93
0
                unless ($name) {
94
0
                        my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
95
0
                        $name = 'AUTOGENERATED!!'.$last_id;
96
0
                        $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
97                }
98
99
0
                _flush_by_name($name);
100
0
                return $name;
101        } catch Error with {
102
0
                return undef;
103
0
        };
104}
105__PACKAGE__->register_method(
106        api_name => 'opensrf.persist.slot.create',
107        method => 'create_store',
108        argc => 1,
109);
110
111
112sub create_expirable_store {
113
0
0
        my $self = shift;
114
0
        my $client = shift;
115
0
0
        my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
116
0
        my $time = shift || $default_expire_time;
117
118        try {
119
0
                ($name) = $self->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
120
0
                return undef unless $name;
121
122
0
                $self->method_lookup('opensrf.persist.slot.set_expire')->run($name, $time);
123
0
                return $name;
124        } catch Error with {
125
0
                return undef;
126
0
        };
127
128}
129__PACKAGE__->register_method(
130        api_name => 'opensrf.persist.slot.create_expirable',
131        method => 'create_expirable_store',
132        argc => 2,
133);
134
135sub _update_expire_atime {
136
0
        my $id = shift;
137
0
        $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
138}
139
140sub set_expire_interval {
141
0
0
        my $self = shift;
142
0
        my $client = shift;
143
0
        my $slot = shift;
144
0
        my $new_interval = shift;
145
146        try {
147
0
                my $etime = interval_to_seconds($new_interval);
148
0
                my $sid = _get_name_id($slot);
149
150
0
                $dbh->do('DELETE FROM store_expire where id = ?', {}, $sid);
151
0
                return 0 if ($etime == 0);
152
153
0
                $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$sid,time(),$etime);
154
0
                return $etime;
155        }
156
0
}
157__PACKAGE__->register_method(
158        api_name => 'opensrf.persist.slot.set_expire',
159        method => 'set_expire_interval',
160        argc => 2,
161);
162
163sub find_slot {
164
0
0
        my $self = shift;
165
0
        my $client = shift;
166
0
        my $slot = shift;
167
168
0
        my $sid = _get_name_id($slot);
169
0
        return $slot if ($sid);
170
0
        return undef;
171}
172__PACKAGE__->register_method(
173        api_name => 'opensrf.persist.slot.find',
174        method => 'find_slot',
175        argc => 2,
176);
177
178sub get_expire_interval {
179
0
0
        my $self = shift;
180
0
        my $client = shift;
181
0
        my $slot = shift;
182
183
0
        my $sid = _get_name_id($slot);
184
0
        my ($int) = $dbh->selectrow_array('SELECT expire_interval FROM store_expire WHERE id = ?;',{},$sid);
185
0
        return undef unless ($int);
186
187
0
        my ($future) = $dbh->selectrow_array('SELECT atime + expire_interval FROM store_expire WHERE id = ?;',{},$sid);
188
0
        return $future - time();
189}
190__PACKAGE__->register_method(
191        api_name => 'opensrf.persist.slot.get_expire',
192        method => 'get_expire_interval',
193        argc => 2,
194);
195
196
197sub _sweep_expired_slots {
198
0
        return if (shift());
199
200
0
        my $expired_slots = $dbh->selectcol_arrayref(<<" SQL", {}, time() );
201                SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
202        SQL
203
204
0
        return unless ($expired_slots);
205
206
0
0
        $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
207
0
0
        $dbh->do('DELETE FROM store_expire WHERE id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
208
0
        for my $id (@$expired_slots) {
209
0
                _flush_by_name(_get_id_name($id), 1);
210        }
211}
212
213sub add_item {
214
0
0
        my $self = shift;
215
0
        my $client = shift;
216
217
0
        my $name = shift or do {
218
0
                throw OpenSRF::EX::WARN ("No name specified!");
219        };
220
221
0
        my $value = shift || '';
222
223        try {
224
0
                my $name_id = _get_name_id($name);
225
226
0
                if ($self->api_name =~ /object/) {
227
0
                        $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
228                }
229
230
0
                $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, OpenSRF::Utils::JSON->perl2JSON($value));
231
232
0
                _flush_by_name($name);
233
234
0
                return $name;
235        } catch Error with {
236
0
                return undef;
237
0
        };
238}
239__PACKAGE__->register_method(
240        api_name => 'opensrf.persist.object.set',
241        method => 'add_item',
242        argc => 2,
243);
244__PACKAGE__->register_method(
245        api_name => 'opensrf.persist.queue.push',
246        method => 'add_item',
247        argc => 2,
248);
249__PACKAGE__->register_method(
250        api_name => 'opensrf.persist.stack.push',
251        method => 'add_item',
252        argc => 2,
253);
254
255sub _get_id_name {
256
0
        my $name = shift or do {
257
0
                throw OpenSRF::EX::WARN ("No slot id specified!");
258        };
259
260
261
0
        my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
262
263
0
        if (!ref($name_id) || !defined($name_id->[0])) {
264
0
                throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
265        }
266
267
0
        return $name_id->[0];
268}
269
270sub _get_name_id {
271
0
        my $name = shift or do {
272
0
                throw OpenSRF::EX::WARN ("No slot name specified!");
273        };
274
275
276
0
        my $name_id = $dbh->selectrow_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
277
278
0
        if (!ref($name_id) || !defined($name_id->[0])) {
279
0
                throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
280        }
281
282
0
        return $name_id->[0];
283}
284
285sub destroy_store {
286
0
0
        my $self = shift;
287
0
        my $client = shift;
288
289
0
        my $name = shift;
290
291
0
        my $problem = 0;
292        try {
293
0
                my $name_id = _get_name_id($name);
294
295
0
                $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
296
0
                $dbh->do("DELETE FROM store_name WHERE id = ?;", {}, $name_id);
297
0
                $dbh->do("DELETE FROM store_expire WHERE id = ?;", {}, $name_id);
298
299
0
                _sweep_expired_slots();
300
0
                return $name;
301        } catch Error with {
302
0
                return undef;
303
0
        };
304
305}
306__PACKAGE__->register_method(
307        api_name => 'opensrf.persist.slot.destroy',
308        method => 'destroy_store',
309        argc => 1,
310);
311
312sub _flush_by_name {
313
0
        my $name = shift;
314
0
        my $no_sweep = shift;
315
316
0
        my $name_id = _get_name_id($name);
317
318
0
        unless ($no_sweep) {
319
0
   _update_expire_atime($name);
320
0
                _sweep_expired_slots();
321        }
322
323
0
        if ($name =~ /^AUTOGENERATED!!/) {
324
0
                my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
325
0
                if (!ref($count) || $$count[0] == 0) {
326
0
                        $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
327                }
328        }
329}
330
331sub pop_queue {
332
0
0
        my $self = shift;
333
0
        my $client = shift;
334
335
0
        my $name = shift or do {
336
0
                throw OpenSRF::EX::WARN ("No queue name specified!");
337        };
338
339        try {
340
0
                my $name_id = _get_name_id($name);
341
342
0
                my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
343
0
                $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
344
345
0
                _flush_by_name($name);
346
347
0
                return OpenSRF::Utils::JSON->JSON2perl( $value->[1] );
348        } catch Error with {
349                #my $e = shift;
350                #return $e;
351
0
                return undef;
352
0
        };
353}
354__PACKAGE__->register_method(
355        api_name => 'opensrf.persist.queue.peek',
356        method => 'pop_queue',
357        argc => 1,
358);
359__PACKAGE__->register_method(
360        api_name => 'opensrf.persist.queue.pop',
361        method => 'pop_queue',
362        argc => 1,
363);
364
365
366sub peek_slot {
367
0
0
        my $self = shift;
368
0
        my $client = shift;
369
370
0
        my $name = shift or do {
371
0
                throw OpenSRF::EX::WARN ("No slot name specified!");
372        };
373
0
        my $name_id = _get_name_id($name);
374
375
0
        my $order = 'ASC';
376
0
        $order = 'DESC' if ($self->api_name =~ /stack/o);
377
378
0
        my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
379
380
0
0
        $client->respond( OpenSRF::Utils::JSON->JSON2perl( $_->[0] ) ) for (@$values);
381
382
0
        _flush_by_name($name);
383
0
        return undef;
384}
385__PACKAGE__->register_method(
386        api_name => 'opensrf.persist.queue.peek.all',
387        method => 'peek_slot',
388        argc => 1,
389        stream => 1,
390);
391__PACKAGE__->register_method(
392        api_name => 'opensrf.persist.stack.peek.all',
393        method => 'peek_slot',
394        argc => 1,
395        stream => 1,
396);
397
398
399sub store_size {
400
0
0
        my $self = shift;
401
0
        my $client = shift;
402
403
0
        my $name = shift or do {
404
0
                throw OpenSRF::EX::WARN ("No queue name specified!");
405        };
406
0
        my $name_id = _get_name_id($name);
407
408
0
        my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
409
410
0
        return OpenSRF::Utils::JSON->JSON2perl( $value->[0] );
411}
412__PACKAGE__->register_method(
413        api_name => 'opensrf.persist.queue.size',
414        method => 'shift_stack',
415        argc => 1,
416);
417__PACKAGE__->register_method(
418        api_name => 'opensrf.persist.stack.size',
419        method => 'shift_stack',
420        argc => 1,
421);
422__PACKAGE__->register_method(
423        api_name => 'opensrf.persist.object.size',
424        method => 'shift_stack',
425        argc => 1,
426);
427
428sub store_depth {
429
0
0
        my $self = shift;
430
0
        my $client = shift;
431
432
0
        my $name = shift or do {
433
0
                throw OpenSRF::EX::WARN ("No queue name specified!");
434        };
435
0
        my $name_id = _get_name_id($name);
436
437
0
        my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
438
439
0
        return OpenSRF::Utils::JSON->JSON2perl( $value->[0] );
440}
441__PACKAGE__->register_method(
442        api_name => 'opensrf.persist.queue.length',
443        method => 'shift_stack',
444        argc => 1,
445);
446__PACKAGE__->register_method(
447        api_name => 'opensrf.persist.stack.depth',
448        method => 'shift_stack',
449        argc => 1,
450);
451
452sub shift_stack {
453
0
0
        my $self = shift;
454
0
        my $client = shift;
455
456
0
        my $name = shift or do {
457
0
                throw OpenSRF::EX::WARN ("No slot name specified!");
458        };
459
460        try {
461
0
                my $name_id = _get_name_id($name);
462
463
0
                my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
464
0
                $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
465
466
0
                _flush_by_name($name);
467
468
0
                return OpenSRF::Utils::JSON->JSON2perl( $value->[1] );
469        } catch Error with {
470
0
                my $e = shift;
471
0
                return undef;
472
0
        };
473}
474__PACKAGE__->register_method(
475        api_name => 'opensrf.persist.stack.peek',
476        method => 'shift_stack',
477        argc => 1,
478);
479__PACKAGE__->register_method(
480        api_name => 'opensrf.persist.stack.pop',
481        method => 'shift_stack',
482        argc => 1,
483);
484
485sub get_object {
486
0
0
        my $self = shift;
487
0
        my $client = shift;
488
489
0
        my $name = shift or do {
490
0
                throw OpenSRF::EX::WARN ("No object name specified!");
491        };
492
493        try {
494
0
                my $name_id = _get_name_id($name);
495
496
0
                my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
497
0
                $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
498
499
0
                _flush_by_name($name);
500
501
0
                return OpenSRF::Utils::JSON->JSON2perl( $value->[1] );
502        } catch Error with {
503
0
                return undef;
504
0
        };
505}
506__PACKAGE__->register_method(
507        api_name => 'opensrf.persist.object.peek',
508        method => 'shift_stack',
509        argc => 1,
510);
511__PACKAGE__->register_method(
512        api_name => 'opensrf.persist.object.get',
513        method => 'shift_stack',
514        argc => 1,
515);
516
5171;