File: | blib/lib/OpenSRF/MultiSession.pm |
Coverage: | 4.2% |
line | stmt | bran | cond | sub | pod | time | code |
---|---|---|---|---|---|---|---|
1 | package OpenSRF::MultiSession; | ||||||
2 | 1 1 1 | 9 4 10 | use OpenSRF::AppSession; | ||||
3 | 1 1 1 | 9 3 7 | use OpenSRF::Utils::Logger; | ||||
4 | 1 1 1 | 7 3 37 | use Time::HiRes qw/time usleep/; | ||||
5 | |||||||
6 | my $log = 'OpenSRF::Utils::Logger'; | ||||||
7 | |||||||
8 | sub new { | ||||||
9 | 0 | 0 | my $class = shift; | ||||
10 | 0 | $class = ref($class) || $class; | |||||
11 | |||||||
12 | 0 | my $self = bless {@_} => $class; | |||||
13 | |||||||
14 | 0 | $self->{api_level} = 1 if (!defined($self->{api_level})); | |||||
15 | 0 | $self->{session_hash_function} = \&_dummy_session_hash_function | |||||
16 | if (!defined($self->{session_hash_function})); | ||||||
17 | |||||||
18 | 0 | if ($self->{cap}) { | |||||
19 | 0 | $self->session_cap($self->{cap}) if (!$self->session_cap); | |||||
20 | 0 | $self->request_cap($self->{cap}) if (!$self->request_cap); | |||||
21 | } | ||||||
22 | |||||||
23 | 0 | if (!$self->session_cap) { | |||||
24 | # XXX make adaptive the default once the logic is in place | ||||||
25 | #$self->adaptive(1); | ||||||
26 | |||||||
27 | 0 | $self->session_cap(10); | |||||
28 | } | ||||||
29 | 0 | if (!$self->request_cap) { | |||||
30 | # XXX make adaptive the default once the logic is in place | ||||||
31 | #$self->adaptive(1); | ||||||
32 | |||||||
33 | 0 | $self->request_cap(10); | |||||
34 | } | ||||||
35 | |||||||
36 | 0 | $self->{sessions} = []; | |||||
37 | 0 | $self->{running} = []; | |||||
38 | 0 | $self->{completed} = []; | |||||
39 | 0 | $self->{failed} = []; | |||||
40 | |||||||
41 | 0 | for ( 1 .. $self->session_cap) { | |||||
42 | 0 0 | push @{ $self->{sessions} }, | |||||
43 | OpenSRF::AppSession->create( | ||||||
44 | $self->{app}, | ||||||
45 | $self->{api_level}, | ||||||
46 | 1 | ||||||
47 | ); | ||||||
48 | #print "Creating connection ".$self->{sessions}->[-1]->session_id." ...\n"; | ||||||
49 | 0 | $log->debug("Creating connection ".$self->{sessions}->[-1]->session_id." ..."); | |||||
50 | } | ||||||
51 | |||||||
52 | 0 | return $self; | |||||
53 | } | ||||||
54 | |||||||
55 | sub _dummy_session_hash_function { | ||||||
56 | 0 | my $self = shift; | |||||
57 | 0 | $self->{_dummy_hash_counter} = 1 if (!exists($self->{_dummy_hash_counter})); | |||||
58 | 0 | return $self->{_dummy_hash_counter}++; | |||||
59 | } | ||||||
60 | |||||||
61 | sub connect { | ||||||
62 | 0 | 0 | my $self = shift; | ||||
63 | 0 0 | for my $ses (@{$self->{sessions}}) { | |||||
64 | 0 | $ses->connect unless ($ses->connected); | |||||
65 | } | ||||||
66 | } | ||||||
67 | |||||||
68 | sub finish { | ||||||
69 | 0 | 0 | my $self = shift; | ||||
70 | 0 0 0 | $_->finish for (@{$self->{sessions}}); | |||||
71 | } | ||||||
72 | |||||||
73 | sub disconnect { | ||||||
74 | 0 | 0 | my $self = shift; | ||||
75 | 0 0 0 | $_->disconnect for (@{$self->{sessions}}); | |||||
76 | } | ||||||
77 | |||||||
78 | sub session_hash_function { | ||||||
79 | 0 | 0 | my $self = shift; | ||||
80 | 0 | my $session_hash_function = shift; | |||||
81 | 0 | return unless (ref $self); | |||||
82 | |||||||
83 | 0 | $self->{session_hash_function} = $session_hash_function if (defined $session_hash_function); | |||||
84 | 0 | return $self->{session_hash_function}; | |||||
85 | } | ||||||
86 | |||||||
87 | sub failure_handler { | ||||||
88 | 0 | 0 | my $self = shift; | ||||
89 | 0 | my $failure_handler = shift; | |||||
90 | 0 | return unless (ref $self); | |||||
91 | |||||||
92 | 0 | $self->{failure_handler} = $failure_handler if (defined $failure_handler); | |||||
93 | 0 | return $self->{failure_handler}; | |||||
94 | } | ||||||
95 | |||||||
96 | sub success_handler { | ||||||
97 | 0 | 0 | my $self = shift; | ||||
98 | 0 | my $success_handler = shift; | |||||
99 | 0 | return unless (ref $self); | |||||
100 | |||||||
101 | 0 | $self->{success_handler} = $success_handler if (defined $success_handler); | |||||
102 | 0 | return $self->{success_handler}; | |||||
103 | } | ||||||
104 | |||||||
105 | sub session_cap { | ||||||
106 | 0 | 0 | my $self = shift; | ||||
107 | 0 | my $cap = shift; | |||||
108 | 0 | return unless (ref $self); | |||||
109 | |||||||
110 | 0 | $self->{session_cap} = $cap if (defined $cap); | |||||
111 | 0 | return $self->{session_cap}; | |||||
112 | } | ||||||
113 | |||||||
114 | sub request_cap { | ||||||
115 | 0 | 0 | my $self = shift; | ||||
116 | 0 | my $cap = shift; | |||||
117 | 0 | return unless (ref $self); | |||||
118 | |||||||
119 | 0 | $self->{request_cap} = $cap if (defined $cap); | |||||
120 | 0 | return $self->{request_cap}; | |||||
121 | } | ||||||
122 | |||||||
123 | sub adaptive { | ||||||
124 | 0 | 0 | my $self = shift; | ||||
125 | 0 | my $adapt = shift; | |||||
126 | 0 | return unless (ref $self); | |||||
127 | |||||||
128 | 0 | $self->{adaptive} = $adapt if (defined $adapt); | |||||
129 | 0 | return $self->{adaptive}; | |||||
130 | } | ||||||
131 | |||||||
132 | sub completed { | ||||||
133 | 0 | 0 | my $self = shift; | ||||
134 | 0 | my $count = shift; | |||||
135 | 0 | return unless (ref $self); | |||||
136 | |||||||
137 | |||||||
138 | 0 | if (wantarray) { | |||||
139 | 0 0 | $count ||= scalar @{$self->{completed}}; | |||||
140 | } | ||||||
141 | |||||||
142 | 0 | if (defined $count) { | |||||
143 | 0 0 | return () unless (@{$self->{completed}}); | |||||
144 | 0 0 | return splice @{$self->{completed}}, 0, $count; | |||||
145 | } | ||||||
146 | |||||||
147 | 0 0 | return scalar @{$self->{completed}}; | |||||
148 | } | ||||||
149 | |||||||
150 | sub failed { | ||||||
151 | 0 | 0 | my $self = shift; | ||||
152 | 0 | my $count = shift; | |||||
153 | 0 | return unless (ref $self); | |||||
154 | |||||||
155 | |||||||
156 | 0 | if (wantarray) { | |||||
157 | 0 0 | $count ||= scalar @{$self->{failed}}; | |||||
158 | } | ||||||
159 | |||||||
160 | 0 | if (defined $count) { | |||||
161 | 0 0 | return () unless (@{$self->{failed}}); | |||||
162 | 0 0 | return splice @{$self->{failed}}, 0, $count; | |||||
163 | } | ||||||
164 | |||||||
165 | 0 0 | return scalar @{$self->{failed}}; | |||||
166 | } | ||||||
167 | |||||||
168 | sub running { | ||||||
169 | 0 | 0 | my $self = shift; | ||||
170 | 0 | return unless (ref $self); | |||||
171 | 0 0 | return scalar(@{ $self->{running} }); | |||||
172 | } | ||||||
173 | |||||||
174 | |||||||
175 | sub request { | ||||||
176 | 0 | 0 | my $self = shift; | ||||
177 | 0 | my $hash_param; | |||||
178 | |||||||
179 | 0 | my $method = shift; | |||||
180 | 0 | if (ref $method) { | |||||
181 | 0 | $hash_param = $method; | |||||
182 | 0 | $method = shift; | |||||
183 | } | ||||||
184 | |||||||
185 | 0 | my @params = @_; | |||||
186 | |||||||
187 | 0 | $self->session_reap; | |||||
188 | 0 | if ($self->running < $self->request_cap ) { | |||||
189 | 0 | my $index = $self->session_hash_function->($self, (defined $hash_param ? $hash_param : ()), $method, @params); | |||||
190 | 0 | my $ses = $self->{sessions}->[$index % $self->session_cap]; | |||||
191 | |||||||
192 | #print "Running $method using session ".$ses->session_id."\n"; | ||||||
193 | |||||||
194 | 0 | my $req = $ses->request( $method, @params ); | |||||
195 | |||||||
196 | 0 0 | push @{ $self->{running} }, | |||||
197 | { req => $req, | ||||||
198 | meth => $method, | ||||||
199 | hash => $hash_param, | ||||||
200 | params => [@params] | ||||||
201 | }; | ||||||
202 | |||||||
203 | 0 | $log->debug("Making request [$method] ".$self->running."..."); | |||||
204 | |||||||
205 | 0 | return $req; | |||||
206 | } elsif (!$self->adaptive) { | ||||||
207 | #print "Oops. Too many running: ".$self->running."\n"; | ||||||
208 | 0 | $self->session_wait; | |||||
209 | 0 | return $self->request((defined $hash_param ? $hash_param : ()), $method => @params); | |||||
210 | } else { | ||||||
211 | # XXX do addaptive stuff ... | ||||||
212 | } | ||||||
213 | } | ||||||
214 | |||||||
215 | sub session_wait { | ||||||
216 | 0 | 0 | my $self = shift; | ||||
217 | 0 | my $all = shift; | |||||
218 | |||||||
219 | 0 | my $count; | |||||
220 | 0 | if ($all) { | |||||
221 | 0 | $count = $self->running; | |||||
222 | 0 | while ($self->running) { | |||||
223 | 0 | $self->session_reap; | |||||
224 | } | ||||||
225 | 0 | return $count; | |||||
226 | } else { | ||||||
227 | 0 | while(($count = $self->session_reap) == 0 && $self->running) { | |||||
228 | 0 | usleep 100; | |||||
229 | } | ||||||
230 | 0 | return $count; | |||||
231 | } | ||||||
232 | } | ||||||
233 | |||||||
234 | sub session_reap { | ||||||
235 | 0 | 0 | my $self = shift; | ||||
236 | |||||||
237 | 0 | my @done; | |||||
238 | 0 | my @running; | |||||
239 | 0 0 | while ( my $req = shift @{ $self->{running} } ) { | |||||
240 | 0 | if ($req->{req}->complete) { | |||||
241 | #print "Currently running: ".$self->running."\n"; | ||||||
242 | |||||||
243 | 0 | $req->{response} = [ $req->{req}->recv ]; | |||||
244 | 0 | $req->{duration} = $req->{req}->duration; | |||||
245 | |||||||
246 | #print "Completed ".$req->{meth}." in session ".$req->{req}->session->session_id." [$req->{duration}]\n"; | ||||||
247 | |||||||
248 | 0 | if ($req->{req}->failed) { | |||||
249 | #print "ARG!!!! Failed command $req->{meth} in session ".$req->{req}->session->session_id."\n"; | ||||||
250 | 0 | $req->{error} = $req->{req}->failed; | |||||
251 | 0 0 | push @{ $self->{failed} }, $req; | |||||
252 | } else { | ||||||
253 | 0 0 | push @{ $self->{completed} }, $req; | |||||
254 | } | ||||||
255 | |||||||
256 | 0 | push @done, $req; | |||||
257 | |||||||
258 | } else { | ||||||
259 | #$log->debug("Still running ".$req->{meth}." in session ".$req->{req}->session->session_id); | ||||||
260 | 0 | push @running, $req; | |||||
261 | } | ||||||
262 | } | ||||||
263 | 0 0 | push @{ $self->{running} }, @running; | |||||
264 | |||||||
265 | 0 | for my $req ( @done ) { | |||||
266 | 0 | my $handler = $req->{error} ? $self->failure_handler : $self->success_handler; | |||||
267 | 0 | $handler->($self, $req) if ($handler); | |||||
268 | |||||||
269 | 0 | $req->{req}->finish; | |||||
270 | 0 0 | delete $$req{$_} for (keys %$req); | |||||
271 | |||||||
272 | } | ||||||
273 | |||||||
274 | 0 | my $complete = scalar @done; | |||||
275 | 0 | my $incomplete = scalar @running; | |||||
276 | |||||||
277 | #$log->debug("Still running $incomplete, completed $complete"); | ||||||
278 | |||||||
279 | 0 | return $complete; | |||||
280 | } | ||||||
281 | |||||||
282 | 1; | ||||||
283 |