File Coverage

File:blib/lib/OpenSRF/MultiSession.pm
Coverage:4.2%

linestmtbrancondsubpodtimecode
1package OpenSRF::MultiSession;
2
1
1
1
9
4
10
use OpenSRF::AppSession;
3
1
1
1
9
3
7
use OpenSRF::Utils::Logger;
4
1
1
1
7
3
37
use Time::HiRes qw/time usleep/;
5
6my $log = 'OpenSRF::Utils::Logger';
7
8sub new {
9
0
0
        my $class = shift;
10
0
        $class = ref($class) || $class;
11
12
0
        my $self = bless {@_} => $class;
13
14
0
        $self->{api_level} = 1 if (!defined($self->{api_level}));
15
0
        $self->{session_hash_function} = \&_dummy_session_hash_function
16                if (!defined($self->{session_hash_function}));
17
18
0
        if ($self->{cap}) {
19
0
                $self->session_cap($self->{cap}) if (!$self->session_cap);
20
0
                $self->request_cap($self->{cap}) if (!$self->request_cap);
21        }
22
23
0
        if (!$self->session_cap) {
24                # XXX make adaptive the default once the logic is in place
25                #$self->adaptive(1);
26
27
0
                $self->session_cap(10);
28        }
29
0
        if (!$self->request_cap) {
30                # XXX make adaptive the default once the logic is in place
31                #$self->adaptive(1);
32
33
0
                $self->request_cap(10);
34        }
35
36
0
        $self->{sessions} = [];
37
0
        $self->{running} = [];
38
0
        $self->{completed} = [];
39
0
        $self->{failed} = [];
40
41
0
        for ( 1 .. $self->session_cap) {
42
0
0
                push @{ $self->{sessions} },
43                        OpenSRF::AppSession->create(
44                                $self->{app},
45                                $self->{api_level},
46                                1
47                        );
48                #print "Creating connection ".$self->{sessions}->[-1]->session_id." ...\n";
49
0
                $log->debug("Creating connection ".$self->{sessions}->[-1]->session_id." ...");
50        }
51
52
0
        return $self;
53}
54
55sub _dummy_session_hash_function {
56
0
        my $self = shift;
57
0
        $self->{_dummy_hash_counter} = 1 if (!exists($self->{_dummy_hash_counter}));
58
0
        return $self->{_dummy_hash_counter}++;
59}
60
61sub connect {
62
0
0
        my $self = shift;
63
0
0
        for my $ses (@{$self->{sessions}}) {
64
0
                $ses->connect unless ($ses->connected);
65        }
66}
67
68sub finish {
69
0
0
        my $self = shift;
70
0
0
0
        $_->finish for (@{$self->{sessions}});
71}
72
73sub disconnect {
74
0
0
        my $self = shift;
75
0
0
0
        $_->disconnect for (@{$self->{sessions}});
76}
77
78sub session_hash_function {
79
0
0
        my $self = shift;
80
0
        my $session_hash_function = shift;
81
0
        return unless (ref $self);
82
83
0
        $self->{session_hash_function} = $session_hash_function if (defined $session_hash_function);
84
0
        return $self->{session_hash_function};
85}
86
87sub failure_handler {
88
0
0
        my $self = shift;
89
0
        my $failure_handler = shift;
90
0
        return unless (ref $self);
91
92
0
        $self->{failure_handler} = $failure_handler if (defined $failure_handler);
93
0
        return $self->{failure_handler};
94}
95
96sub success_handler {
97
0
0
        my $self = shift;
98
0
        my $success_handler = shift;
99
0
        return unless (ref $self);
100
101
0
        $self->{success_handler} = $success_handler if (defined $success_handler);
102
0
        return $self->{success_handler};
103}
104
105sub session_cap {
106
0
0
        my $self = shift;
107
0
        my $cap = shift;
108
0
        return unless (ref $self);
109
110
0
        $self->{session_cap} = $cap if (defined $cap);
111
0
        return $self->{session_cap};
112}
113
114sub request_cap {
115
0
0
        my $self = shift;
116
0
        my $cap = shift;
117
0
        return unless (ref $self);
118
119
0
        $self->{request_cap} = $cap if (defined $cap);
120
0
        return $self->{request_cap};
121}
122
123sub adaptive {
124
0
0
        my $self = shift;
125
0
        my $adapt = shift;
126
0
        return unless (ref $self);
127
128
0
        $self->{adaptive} = $adapt if (defined $adapt);
129
0
        return $self->{adaptive};
130}
131
132sub completed {
133
0
0
        my $self = shift;
134
0
        my $count = shift;
135
0
        return unless (ref $self);
136
137
138
0
        if (wantarray) {
139
0
0
                $count ||= scalar @{$self->{completed}};
140        }
141
142
0
        if (defined $count) {
143
0
0
                return () unless (@{$self->{completed}});
144
0
0
                return splice @{$self->{completed}}, 0, $count;
145        }
146
147
0
0
        return scalar @{$self->{completed}};
148}
149
150sub failed {
151
0
0
        my $self = shift;
152
0
        my $count = shift;
153
0
        return unless (ref $self);
154
155
156
0
        if (wantarray) {
157
0
0
                $count ||= scalar @{$self->{failed}};
158        }
159
160
0
        if (defined $count) {
161
0
0
                return () unless (@{$self->{failed}});
162
0
0
                return splice @{$self->{failed}}, 0, $count;
163        }
164
165
0
0
        return scalar @{$self->{failed}};
166}
167
168sub running {
169
0
0
        my $self = shift;
170
0
        return unless (ref $self);
171
0
0
        return scalar(@{ $self->{running} });
172}
173
174
175sub request {
176
0
0
        my $self = shift;
177
0
        my $hash_param;
178
179
0
        my $method = shift;
180
0
        if (ref $method) {
181
0
                $hash_param = $method;
182
0
                $method = shift;
183        }
184
185
0
        my @params = @_;
186
187
0
        $self->session_reap;
188
0
        if ($self->running < $self->request_cap ) {
189
0
                my $index = $self->session_hash_function->($self, (defined $hash_param ? $hash_param : ()), $method, @params);
190
0
                my $ses = $self->{sessions}->[$index % $self->session_cap];
191
192                #print "Running $method using session ".$ses->session_id."\n";
193
194
0
                my $req = $ses->request( $method, @params );
195
196
0
0
                push @{ $self->{running} },
197                        { req => $req,
198                          meth => $method,
199                          hash => $hash_param,
200                          params => [@params]
201                        };
202
203
0
                $log->debug("Making request [$method] ".$self->running."...");
204
205
0
                return $req;
206        } elsif (!$self->adaptive) {
207                #print "Oops. Too many running: ".$self->running."\n";
208
0
                $self->session_wait;
209
0
                return $self->request((defined $hash_param ? $hash_param : ()), $method => @params);
210        } else {
211                # XXX do addaptive stuff ...
212        }
213}
214
215sub session_wait {
216
0
0
        my $self = shift;
217
0
        my $all = shift;
218
219
0
        my $count;
220
0
        if ($all) {
221
0
                $count = $self->running;
222
0
                while ($self->running) {
223
0
                        $self->session_reap;
224                }
225
0
                return $count;
226        } else {
227
0
                while(($count = $self->session_reap) == 0 && $self->running) {
228
0
                        usleep 100;
229                }
230
0
                return $count;
231        }
232}
233
234sub session_reap {
235
0
0
        my $self = shift;
236
237
0
        my @done;
238
0
        my @running;
239
0
0
        while ( my $req = shift @{ $self->{running} } ) {
240
0
                if ($req->{req}->complete) {
241                        #print "Currently running: ".$self->running."\n";
242
243
0
                        $req->{response} = [ $req->{req}->recv ];
244
0
                        $req->{duration} = $req->{req}->duration;
245
246                        #print "Completed ".$req->{meth}." in session ".$req->{req}->session->session_id." [$req->{duration}]\n";
247
248
0
                        if ($req->{req}->failed) {
249                                #print "ARG!!!! Failed command $req->{meth} in session ".$req->{req}->session->session_id."\n";
250
0
                                $req->{error} = $req->{req}->failed;
251
0
0
                                push @{ $self->{failed} }, $req;
252                        } else {
253
0
0
                                push @{ $self->{completed} }, $req;
254                        }
255
256
0
                        push @done, $req;
257
258                } else {
259                        #$log->debug("Still running ".$req->{meth}." in session ".$req->{req}->session->session_id);
260
0
                        push @running, $req;
261                }
262        }
263
0
0
        push @{ $self->{running} }, @running;
264
265
0
        for my $req ( @done ) {
266
0
                my $handler = $req->{error} ? $self->failure_handler : $self->success_handler;
267
0
                $handler->($self, $req) if ($handler);
268
269
0
                $req->{req}->finish;
270
0
0
                delete $$req{$_} for (keys %$req);
271
272        }
273
274
0
        my $complete = scalar @done;
275
0
        my $incomplete = scalar @running;
276
277        #$log->debug("Still running $incomplete, completed $complete");
278
279
0
        return $complete;
280}
281
2821;
283