File: | blib/lib/OpenSRF/AppSession.pm |
Coverage: | 7.7% |
line | stmt | bran | cond | sub | pod | time | code |
---|---|---|---|---|---|---|---|
1 | package OpenSRF::AppSession; | ||||||
2 | 9 9 9 | 85 35 168 | use OpenSRF::DomainObject::oilsMessage; | ||||
3 | 9 9 9 | 152 45 101 | use OpenSRF::DomainObject::oilsMethod; | ||||
4 | 9 9 9 | 73 35 67 | use OpenSRF::DomainObject::oilsResponse qw/:status/; | ||||
5 | 9 9 9 | 114 38 120 | use OpenSRF::Transport::PeerHandle; | ||||
6 | 9 9 9 | 72 30 65 | use OpenSRF::Utils::JSON; | ||||
7 | 9 9 9 | 63 32 57 | use OpenSRF::Utils::Logger qw(:level); | ||||
8 | 9 9 9 | 107 32 85 | use OpenSRF::Utils::SettingsClient; | ||||
9 | 9 9 9 | 66 33 59 | use OpenSRF::Utils::Config; | ||||
10 | 9 9 9 | 61 31 53 | use OpenSRF::EX; | ||||
11 | 9 9 9 | 64 31 71 | use OpenSRF; | ||||
12 | 9 9 9 | 60 35 57 | use Exporter; | ||||
13 | 9 9 9 | 61 32 79 | use base qw/Exporter OpenSRF/; | ||||
14 | 9 9 9 | 61 31 71 | use Time::HiRes qw( time usleep ); | ||||
15 | 9 9 9 | 65 32 79 | use warnings; | ||||
16 | 9 9 9 | 1182 47 68 | use strict; | ||||
17 | |||||||
18 | our @EXPORT_OK = qw/CONNECTING INIT_CONNECTED CONNECTED DISCONNECTED CLIENT SERVER/; | ||||||
19 | our %EXPORT_TAGS = ( state => [ qw/CONNECTING INIT_CONNECTED CONNECTED DISCONNECTED/ ], | ||||||
20 | endpoint => [ qw/CLIENT SERVER/ ], | ||||||
21 | ); | ||||||
22 | |||||||
23 | my $logger = "OpenSRF::Utils::Logger"; | ||||||
24 | my $_last_locale = 'en-US'; | ||||||
25 | |||||||
26 | our %_CACHE; | ||||||
27 | our @_RESEND_QUEUE; | ||||||
28 | |||||||
29 | 0 | 0 | sub CONNECTING { return 3 }; | ||||
30 | 0 | 0 | sub INIT_CONNECTED { return 4 }; | ||||
31 | 0 | 0 | sub CONNECTED { return 1 }; | ||||
32 | 0 | 0 | sub DISCONNECTED { return 2 }; | ||||
33 | |||||||
34 | 0 | 0 | sub CLIENT { return 2 }; | ||||
35 | 0 | 0 | sub SERVER { return 1 }; | ||||
36 | |||||||
37 | sub find { | ||||||
38 | 0 | 0 | return undef unless (defined $_[1]); | ||||
39 | 0 | return $_CACHE{$_[1]} if (exists($_CACHE{$_[1]})); | |||||
40 | } | ||||||
41 | |||||||
42 | sub transport_connected { | ||||||
43 | 0 | 0 | my $self = shift; | ||||
44 | 0 | if( ! exists $self->{peer_handle} || ! $self->{peer_handle} ) { | |||||
45 | 0 | return 0; | |||||
46 | } | ||||||
47 | 0 | return $self->{peer_handle}->tcp_connected(); | |||||
48 | } | ||||||
49 | |||||||
50 | sub connected { | ||||||
51 | 0 | 0 | my $self = shift; | ||||
52 | 0 | return $self->state == CONNECTED; | |||||
53 | } | ||||||
54 | # ---------------------------------------------------------------------------- | ||||||
55 | # Clears the transport buffers | ||||||
56 | # call this if you are not through with the sesssion, but you want | ||||||
57 | # to have a clean slate. You shouldn't have to call this if | ||||||
58 | # you are correctly 'recv'ing all of the data from a request. | ||||||
59 | # however, if you don't want all of the data, this will | ||||||
60 | # slough off any excess | ||||||
61 | # * * Note: This will delete data for all sessions using this transport | ||||||
62 | # handle. For example, all client sessions use the same handle. | ||||||
63 | # ---------------------------------------------------------------------------- | ||||||
64 | sub buffer_reset { | ||||||
65 | |||||||
66 | 0 | 0 | my $self = shift; | ||||
67 | 0 | if( ! exists $self->{peer_handle} || ! $self->{peer_handle} ) { | |||||
68 | 0 | return 0; | |||||
69 | } | ||||||
70 | 0 | $self->{peer_handle}->buffer_reset(); | |||||
71 | } | ||||||
72 | |||||||
73 | |||||||
74 | # when any incoming data is received, this method is called. | ||||||
75 | sub server_build { | ||||||
76 | 0 | 0 | my $class = shift; | ||||
77 | 0 | $class = ref($class) || $class; | |||||
78 | |||||||
79 | 0 | my $sess_id = shift; | |||||
80 | 0 | my $remote_id = shift; | |||||
81 | 0 | my $service = shift; | |||||
82 | |||||||
83 | 0 | warn "Missing args to server_build():\n" . | |||||
84 | "sess_id: $sess_id, remote_id: $remote_id, service: $service\n" | ||||||
85 | unless ($sess_id and $remote_id and $service); | ||||||
86 | |||||||
87 | 0 | return undef unless ($sess_id and $remote_id and $service); | |||||
88 | |||||||
89 | 0 | if ( my $thingy = $class->find($sess_id) ) { | |||||
90 | 0 | $thingy->remote_id( $remote_id ); | |||||
91 | 0 | return $thingy; | |||||
92 | } | ||||||
93 | |||||||
94 | 0 | if( $service eq "client" ) { | |||||
95 | #throw OpenSRF::EX::PANIC ("Attempting to build a client session as a server" . | ||||||
96 | # " Session ID [$sess_id], remote_id [$remote_id]"); | ||||||
97 | |||||||
98 | 0 | warn "Attempting to build a client session as ". | |||||
99 | "a server Session ID [$sess_id], remote_id [$remote_id]"; | ||||||
100 | |||||||
101 | 0 | $logger->debug("Attempting to build a client session as ". | |||||
102 | "a server Session ID [$sess_id], remote_id [$remote_id]", ERROR ); | ||||||
103 | |||||||
104 | 0 | return undef; | |||||
105 | } | ||||||
106 | |||||||
107 | 0 | my $config_client = OpenSRF::Utils::SettingsClient->new(); | |||||
108 | 0 | my $stateless = $config_client->config_value("apps", $service, "stateless"); | |||||
109 | |||||||
110 | #my $max_requests = $conf->$service->max_requests; | ||||||
111 | 0 | my $max_requests = $config_client->config_value("apps",$service,"max_requests"); | |||||
112 | 0 | $logger->debug( "Max Requests for $service is $max_requests", INTERNAL ) if (defined $max_requests); | |||||
113 | |||||||
114 | 0 | $logger->transport( "AppSession creating new session: $sess_id", INTERNAL ); | |||||
115 | |||||||
116 | 0 | my $self = bless { recv_queue => [], | |||||
117 | request_queue => [], | ||||||
118 | requests => 0, | ||||||
119 | session_data => {}, | ||||||
120 | callbacks => {}, | ||||||
121 | endpoint => SERVER, | ||||||
122 | state => CONNECTING, | ||||||
123 | session_id => $sess_id, | ||||||
124 | remote_id => $remote_id, | ||||||
125 | peer_handle => OpenSRF::Transport::PeerHandle->retrieve($service), | ||||||
126 | max_requests => $max_requests, | ||||||
127 | session_threadTrace => 0, | ||||||
128 | service => $service, | ||||||
129 | stateless => $stateless, | ||||||
130 | } => $class; | ||||||
131 | |||||||
132 | 0 | return $_CACHE{$sess_id} = $self; | |||||
133 | } | ||||||
134 | |||||||
135 | sub session_data { | ||||||
136 | 0 | 0 | my $self = shift; | ||||
137 | 0 | my ($name, $datum) = @_; | |||||
138 | |||||||
139 | 0 | $self->{session_data}->{$name} = $datum if (defined $datum); | |||||
140 | 0 | return $self->{session_data}->{$name}; | |||||
141 | } | ||||||
142 | |||||||
143 | 0 | 0 | sub service { return shift()->{service}; } | ||||
144 | |||||||
145 | sub continue_request { | ||||||
146 | 0 | 0 | my $self = shift; | ||||
147 | 0 | $self->{'requests'}++; | |||||
148 | 0 | return 1 if (!$self->{'max_requests'}); | |||||
149 | 0 | return $self->{'requests'} <= $self->{'max_requests'} ? 1 : 0; | |||||
150 | } | ||||||
151 | |||||||
152 | sub last_sent_payload { | ||||||
153 | 0 | 0 | my( $self, $payload ) = @_; | ||||
154 | 0 | if( $payload ) { | |||||
155 | 0 | return $self->{'last_sent_payload'} = $payload; | |||||
156 | } | ||||||
157 | 0 | return $self->{'last_sent_payload'}; | |||||
158 | } | ||||||
159 | |||||||
160 | sub session_locale { | ||||||
161 | 0 | 0 | my( $self, $type ) = @_; | ||||
162 | 0 | if( $type ) { | |||||
163 | 0 | $_last_locale = $type if ($self->endpoint == SERVER); | |||||
164 | 0 | return $self->{'session_locale'} = $type; | |||||
165 | } | ||||||
166 | 0 | return $self->{'session_locale'}; | |||||
167 | } | ||||||
168 | |||||||
169 | sub last_sent_type { | ||||||
170 | 0 | 0 | my( $self, $type ) = @_; | ||||
171 | 0 | if( $type ) { | |||||
172 | 0 | return $self->{'last_sent_type'} = $type; | |||||
173 | } | ||||||
174 | 0 | return $self->{'last_sent_type'}; | |||||
175 | } | ||||||
176 | |||||||
177 | sub get_app_targets { | ||||||
178 | 0 | 0 | my $app = shift; | ||||
179 | |||||||
180 | 0 | my $conf = OpenSRF::Utils::Config->current; | |||||
181 | 0 | my $router_name = $conf->bootstrap->router_name || 'router'; | |||||
182 | 0 | my $domain = $conf->bootstrap->domain; | |||||
183 | 0 | $logger->error("use of <domains/> is deprecated") if $conf->bootstrap->domains; | |||||
184 | |||||||
185 | 0 | unless($router_name and $domain) { | |||||
186 | 0 | throw OpenSRF::EX::Config | |||||
187 | ("Missing router config information 'router_name' and 'domain'"); | ||||||
188 | } | ||||||
189 | |||||||
190 | 0 | return ("$router_name\@$domain/$app"); | |||||
191 | } | ||||||
192 | |||||||
193 | sub stateless { | ||||||
194 | 0 | 0 | my $self = shift; | ||||
195 | 0 | my $state = shift; | |||||
196 | 0 | $self->{stateless} = $state if (defined $state); | |||||
197 | 0 | return $self->{stateless}; | |||||
198 | } | ||||||
199 | |||||||
200 | # When we're a client and we want to connect to a remote service | ||||||
201 | sub create { | ||||||
202 | 0 | 0 | my $class = shift; | ||||
203 | 0 | $class = ref($class) || $class; | |||||
204 | |||||||
205 | 0 | my $app = shift; | |||||
206 | 0 | my $api_level = shift; | |||||
207 | 0 | my $quiet = shift; | |||||
208 | 0 | my $locale = shift || $_last_locale; | |||||
209 | |||||||
210 | 0 | $api_level = 1 if (!defined($api_level)); | |||||
211 | |||||||
212 | 0 | $logger->debug( "AppSession creating new client session for $app", DEBUG ); | |||||
213 | |||||||
214 | 0 | my $stateless = 0; | |||||
215 | 0 | my $c = OpenSRF::Utils::SettingsClient->new(); | |||||
216 | # we can get an infinite loop if we're grabbing the settings and we | ||||||
217 | # need the settings to grab the settings... | ||||||
218 | 0 | if($app ne "opensrf.settings" || $c->has_config()) { | |||||
219 | 0 | $stateless = $c->config_value("apps", $app, "stateless"); | |||||
220 | } | ||||||
221 | |||||||
222 | 0 | my $sess_id = time . rand( $$ ); | |||||
223 | 0 | while ( $class->find($sess_id) ) { | |||||
224 | 0 | $sess_id = time . rand( $$ ); | |||||
225 | } | ||||||
226 | |||||||
227 | |||||||
228 | 0 | my ($r_id) = get_app_targets($app); | |||||
229 | |||||||
230 | 0 | my $peer_handle = OpenSRF::Transport::PeerHandle->retrieve("client"); | |||||
231 | 0 | if( ! $peer_handle ) { | |||||
232 | 0 | $peer_handle = OpenSRF::Transport::PeerHandle->retrieve("system_client"); | |||||
233 | } | ||||||
234 | |||||||
235 | 0 | my $self = bless { app_name => $app, | |||||
236 | request_queue => [], | ||||||
237 | endpoint => CLIENT, | ||||||
238 | state => DISCONNECTED,#since we're init'ing | ||||||
239 | session_id => $sess_id, | ||||||
240 | remote_id => $r_id, | ||||||
241 | raise_error => $quiet ? 0 : 1, | ||||||
242 | session_locale => $locale, | ||||||
243 | api_level => $api_level, | ||||||
244 | orig_remote_id => $r_id, | ||||||
245 | peer_handle => $peer_handle, | ||||||
246 | session_threadTrace => 0, | ||||||
247 | stateless => $stateless, | ||||||
248 | } => $class; | ||||||
249 | |||||||
250 | 0 | $logger->debug( "Created new client session $app : $sess_id" ); | |||||
251 | |||||||
252 | 0 | return $_CACHE{$sess_id} = $self; | |||||
253 | } | ||||||
254 | |||||||
255 | sub raise_remote_errors { | ||||||
256 | 0 | 0 | my $self = shift; | ||||
257 | 0 | my $err = shift; | |||||
258 | 0 | $self->{raise_error} = $err if (defined $err); | |||||
259 | 0 | return $self->{raise_error}; | |||||
260 | } | ||||||
261 | |||||||
262 | sub api_level { | ||||||
263 | 0 | 0 | return shift()->{api_level}; | ||||
264 | } | ||||||
265 | |||||||
266 | sub app { | ||||||
267 | 0 | 0 | return shift()->{app_name}; | ||||
268 | } | ||||||
269 | |||||||
270 | sub reset { | ||||||
271 | 0 | 0 | my $self = shift; | ||||
272 | 0 | $self->remote_id($$self{orig_remote_id}); | |||||
273 | } | ||||||
274 | |||||||
275 | # 'connect' can be used as a constructor if called as a class method, | ||||||
276 | # or used to connect a session that has disconnectd if called against | ||||||
277 | # an existing session that seems to be disconnected, or was just built | ||||||
278 | # using 'create' above. | ||||||
279 | |||||||
280 | # connect( $app, username => $user, secret => $passwd ); | ||||||
281 | # OR | ||||||
282 | # connect( $app, sysname => $user, secret => $shared_secret ); | ||||||
283 | |||||||
284 | # --- Returns undef if the connect attempt times out. | ||||||
285 | # --- Returns the OpenSRF::EX object if one is returned by the server | ||||||
286 | # --- Returns self if connected | ||||||
287 | sub connect { | ||||||
288 | 0 | 0 | my $self = shift; | ||||
289 | 0 | my $class = ref($self) || $self; | |||||
290 | |||||||
291 | |||||||
292 | 0 | if ( ref( $self ) and $self->state && $self->state == CONNECTED ) { | |||||
293 | 0 | $logger->transport("AppSession already connected", DEBUG ); | |||||
294 | } else { | ||||||
295 | 0 | $logger->transport("AppSession not connected, connecting..", DEBUG ); | |||||
296 | } | ||||||
297 | 0 | return $self if ( ref( $self ) and $self->state && $self->state == CONNECTED ); | |||||
298 | |||||||
299 | |||||||
300 | 0 | my $app = shift; | |||||
301 | 0 | my $api_level = shift; | |||||
302 | 0 | $api_level = 1 unless (defined $api_level); | |||||
303 | |||||||
304 | 0 | $self = $class->create($app, @_) if (!ref($self)); | |||||
305 | |||||||
306 | 0 | return undef unless ($self); | |||||
307 | |||||||
308 | 0 | $self->{api_level} = $api_level; | |||||
309 | |||||||
310 | 0 | $self->reset; | |||||
311 | 0 | $self->state(CONNECTING); | |||||
312 | 0 | $self->send('CONNECT', ""); | |||||
313 | |||||||
314 | |||||||
315 | # if we want to connect to settings, we may not have | ||||||
316 | # any data for the settings client to work with... | ||||||
317 | # just using a default for now XXX | ||||||
318 | |||||||
319 | 0 | my $time_remaining = 5; | |||||
320 | |||||||
321 | |||||||
322 | # my $client = OpenSRF::Utils::SettingsClient->new(); | ||||||
323 | # my $trans = $client->config_value("client_connection","transport_host"); | ||||||
324 | # | ||||||
325 | # if(!ref($trans)) { | ||||||
326 | # $time_remaining = $trans->{connect_timeout}; | ||||||
327 | # } else { | ||||||
328 | # # XXX for now, just use the first | ||||||
329 | # $time_remaining = $trans->[0]->{connect_timeout}; | ||||||
330 | # } | ||||||
331 | |||||||
332 | 0 | while ( $self->state != CONNECTED and $time_remaining > 0 ) { | |||||
333 | 0 | my $starttime = time; | |||||
334 | 0 | $self->queue_wait($time_remaining); | |||||
335 | 0 | my $endtime = time; | |||||
336 | 0 | $time_remaining -= ($endtime - $starttime); | |||||
337 | } | ||||||
338 | |||||||
339 | 0 | return undef unless($self->state == CONNECTED); | |||||
340 | |||||||
341 | 0 | $self->stateless(0); | |||||
342 | |||||||
343 | 0 | return $self; | |||||
344 | } | ||||||
345 | |||||||
346 | sub finish { | ||||||
347 | 0 | 0 | my $self = shift; | ||||
348 | 0 | if( ! $self->session_id ) { | |||||
349 | 0 | return 0; | |||||
350 | } | ||||||
351 | } | ||||||
352 | |||||||
353 | sub unregister_callback { | ||||||
354 | 0 | 0 | my $self = shift; | ||||
355 | 0 | my $type = shift; | |||||
356 | 0 | my $cb = shift; | |||||
357 | 0 | if (exists $self->{callbacks}{$type}) { | |||||
358 | 0 | delete $self->{callbacks}{$type}{$cb}; | |||||
359 | 0 | return $cb; | |||||
360 | } | ||||||
361 | 0 | return undef; | |||||
362 | } | ||||||
363 | |||||||
364 | sub register_callback { | ||||||
365 | 0 | 0 | my $self = shift; | ||||
366 | 0 | my $type = shift; | |||||
367 | 0 | my $cb = shift; | |||||
368 | 0 | my $cb_key = "$cb"; | |||||
369 | 0 | $self->{callbacks}{$type}{$cb_key} = $cb; | |||||
370 | 0 | return $cb_key; | |||||
371 | } | ||||||
372 | |||||||
373 | sub kill_me { | ||||||
374 | 0 | 0 | my $self = shift; | ||||
375 | 0 0 | if( ! $self->session_id ) { return 0; } | |||||
376 | |||||||
377 | # run each 'death' callback; | ||||||
378 | 0 | if (exists $self->{callbacks}{death}) { | |||||
379 | 0 0 | for my $sub (values %{$self->{callbacks}{death}}) { | |||||
380 | 0 | $sub->($self); | |||||
381 | } | ||||||
382 | } | ||||||
383 | |||||||
384 | 0 | $self->disconnect; | |||||
385 | 0 | $logger->transport( "AppSession killing self: " . $self->session_id(), DEBUG ); | |||||
386 | 0 | delete $_CACHE{$self->session_id}; | |||||
387 | 0 0 | delete($$self{$_}) for (keys %$self); | |||||
388 | } | ||||||
389 | |||||||
390 | sub disconnect { | ||||||
391 | 0 | 0 | my $self = shift; | ||||
392 | |||||||
393 | # run each 'disconnect' callback; | ||||||
394 | 0 | if (exists $self->{callbacks}{disconnect}) { | |||||
395 | 0 0 | for my $sub (values %{$self->{callbacks}{disconnect}}) { | |||||
396 | 0 | $sub->($self); | |||||
397 | } | ||||||
398 | } | ||||||
399 | |||||||
400 | 0 | if ( !$self->stateless and $self->state != DISCONNECTED ) { | |||||
401 | 0 | $self->send('DISCONNECT', "") if ($self->endpoint == CLIENT); | |||||
402 | 0 | $self->state( DISCONNECTED ); | |||||
403 | } | ||||||
404 | |||||||
405 | 0 | $self->reset; | |||||
406 | } | ||||||
407 | |||||||
408 | sub request { | ||||||
409 | 0 | 0 | my $self = shift; | ||||
410 | 0 | my $meth = shift; | |||||
411 | 0 | return unless $self; | |||||
412 | |||||||
413 | # tell the logger to create a new xid - the logger will decide if it's really necessary | ||||||
414 | 0 | $logger->mk_osrf_xid; | |||||
415 | |||||||
416 | 0 | my $method; | |||||
417 | 0 | if (!ref $meth) { | |||||
418 | 0 | $method = new OpenSRF::DomainObject::oilsMethod ( method => $meth ); | |||||
419 | } else { | ||||||
420 | 0 | $method = $meth; | |||||
421 | } | ||||||
422 | |||||||
423 | 0 | $method->params( @_ ); | |||||
424 | |||||||
425 | 0 | $self->send('REQUEST',$method); | |||||
426 | } | ||||||
427 | |||||||
428 | sub full_request { | ||||||
429 | 0 | 0 | my $self = shift; | ||||
430 | 0 | my $meth = shift; | |||||
431 | |||||||
432 | 0 | my $method; | |||||
433 | 0 | if (!ref $meth) { | |||||
434 | 0 | $method = new OpenSRF::DomainObject::oilsMethod ( method => $meth ); | |||||
435 | } else { | ||||||
436 | 0 | $method = $meth; | |||||
437 | } | ||||||
438 | |||||||
439 | 0 | $method->params( @_ ); | |||||
440 | |||||||
441 | 0 | $self->send(CONNECT => '', REQUEST => $method, DISCONNECT => ''); | |||||
442 | } | ||||||
443 | |||||||
444 | sub send { | ||||||
445 | 0 | 0 | my $self = shift; | ||||
446 | 0 | my @payload_list = @_; # this is a Domain Object | |||||
447 | |||||||
448 | 0 | return unless ($self and $self->{peer_handle}); | |||||
449 | |||||||
450 | 0 | $logger->debug( "In send", INTERNAL ); | |||||
451 | |||||||
452 | 0 | my $tT; | |||||
453 | |||||||
454 | 0 0 | if( @payload_list % 2 ) { $tT = pop @payload_list; } | |||||
455 | |||||||
456 | 0 | if( ! @payload_list ) { | |||||
457 | 0 | $logger->debug( "payload_list param is incomplete in AppSession::send()", ERROR ); | |||||
458 | 0 | return undef; | |||||
459 | } | ||||||
460 | |||||||
461 | 0 | my @doc = (); | |||||
462 | |||||||
463 | 0 | my $disconnect = 0; | |||||
464 | 0 | my $connecting = 0; | |||||
465 | |||||||
466 | 0 | while( @payload_list ) { | |||||
467 | |||||||
468 | 0 | my ($msg_type, $payload) = ( shift(@payload_list), shift(@payload_list) ); | |||||
469 | |||||||
470 | 0 | if ($msg_type eq 'DISCONNECT' ) { | |||||
471 | 0 | $disconnect++; | |||||
472 | 0 | if( $self->state == DISCONNECTED && !$connecting) { | |||||
473 | 0 | next; | |||||
474 | } | ||||||
475 | } | ||||||
476 | |||||||
477 | 0 | if( $msg_type eq "CONNECT" ) { | |||||
478 | 0 | $connecting++; | |||||
479 | } | ||||||
480 | |||||||
481 | 0 | my $msg = OpenSRF::DomainObject::oilsMessage->new(); | |||||
482 | 0 | $msg->type($msg_type); | |||||
483 | |||||||
484 | 9 9 9 | 106 36 64 | no warnings; | ||||
485 | 0 | $msg->threadTrace( $tT || int($self->session_threadTrace) || int($self->last_threadTrace) ); | |||||
486 | 9 9 9 | 62 42 65 | use warnings; | ||||
487 | |||||||
488 | 0 | if ($msg->type eq 'REQUEST') { | |||||
489 | 0 | if ( !defined($tT) || $self->last_threadTrace != $tT ) { | |||||
490 | 0 | $msg->update_threadTrace; | |||||
491 | 0 | $self->session_threadTrace( $msg->threadTrace ); | |||||
492 | 0 | $tT = $self->session_threadTrace; | |||||
493 | 0 | OpenSRF::AppRequest->new($self, $payload); | |||||
494 | } | ||||||
495 | } | ||||||
496 | |||||||
497 | 0 | $msg->api_level($self->api_level); | |||||
498 | 0 | $msg->payload($payload) if $payload; | |||||
499 | |||||||
500 | 0 | my $locale = $self->session_locale; | |||||
501 | 0 | $msg->sender_locale($locale) if ($locale); | |||||
502 | |||||||
503 | 0 | push @doc, $msg; | |||||
504 | |||||||
505 | |||||||
506 | 0 | $logger->debug( "AppSession sending ".$msg->type." to ".$self->remote_id. | |||||
507 | " with threadTrace [".$msg->threadTrace."]"); | ||||||
508 | |||||||
509 | } | ||||||
510 | |||||||
511 | 0 | if ($self->endpoint == CLIENT and ! $disconnect) { | |||||
512 | 0 | $self->queue_wait(0); | |||||
513 | |||||||
514 | |||||||
515 | 0 | if($self->stateless && $self->state != CONNECTED) { | |||||
516 | 0 | $self->reset; | |||||
517 | 0 | $logger->debug("AppSession is stateless in send", INTERNAL ); | |||||
518 | } | ||||||
519 | |||||||
520 | 0 | if( !$self->stateless and $self->state != CONNECTED ) { | |||||
521 | |||||||
522 | 0 | $logger->debug( "Sending connect before request 1", INTERNAL ); | |||||
523 | |||||||
524 | 0 | unless (($self->state == CONNECTING && $connecting )) { | |||||
525 | 0 | $logger->debug( "Sending connect before request 2", INTERNAL ); | |||||
526 | 0 | my $v = $self->connect(); | |||||
527 | 0 | if( ! $v ) { | |||||
528 | 0 | $logger->debug( "Unable to connect to remote service in AppSession::send()", ERROR ); | |||||
529 | 0 | return undef; | |||||
530 | } | ||||||
531 | 0 | if( ref($v) and $v->can("class") and $v->class->isa( "OpenSRF::EX" ) ) { | |||||
532 | 0 | return $v; | |||||
533 | } | ||||||
534 | } | ||||||
535 | } | ||||||
536 | |||||||
537 | } | ||||||
538 | 0 | my $json = OpenSRF::Utils::JSON->perl2JSON(\@doc); | |||||
539 | 0 | $logger->internal("AppSession sending doc: $json"); | |||||
540 | |||||||
541 | 0 | $self->{peer_handle}->send( | |||||
542 | to => $self->remote_id, | ||||||
543 | thread => $self->session_id, | ||||||
544 | body => $json ); | ||||||
545 | |||||||
546 | 0 | if( $disconnect) { | |||||
547 | 0 | $self->state( DISCONNECTED ); | |||||
548 | } | ||||||
549 | |||||||
550 | 0 | my $req = $self->app_request( $tT ); | |||||
551 | 0 | $req->{_start} = time; | |||||
552 | 0 | return $req | |||||
553 | } | ||||||
554 | |||||||
555 | sub app_request { | ||||||
556 | 0 | 0 | my $self = shift; | ||||
557 | 0 | my $tT = shift; | |||||
558 | |||||||
559 | 0 | return undef unless (defined $tT); | |||||
560 | 0 0 0 | my ($req) = grep { $_->threadTrace == $tT } @{ $self->{request_queue} }; | |||||
561 | |||||||
562 | 0 | return $req; | |||||
563 | } | ||||||
564 | |||||||
565 | sub remove_app_request { | ||||||
566 | 0 | 0 | my $self = shift; | ||||
567 | 0 | my $req = shift; | |||||
568 | |||||||
569 | 0 0 0 | my @list = grep { $_->threadTrace != $req->threadTrace } @{ $self->{request_queue} }; | |||||
570 | |||||||
571 | 0 | $self->{request_queue} = \@list; | |||||
572 | } | ||||||
573 | |||||||
574 | sub endpoint { | ||||||
575 | 0 | 0 | return $_[0]->{endpoint}; | ||||
576 | } | ||||||
577 | |||||||
578 | |||||||
579 | sub session_id { | ||||||
580 | 0 | 0 | my $self = shift; | ||||
581 | 0 | return $self->{session_id}; | |||||
582 | } | ||||||
583 | |||||||
584 | sub push_queue { | ||||||
585 | 0 | 0 | my $self = shift; | ||||
586 | 0 | my $resp = shift; | |||||
587 | 0 | my $req = $self->app_request($resp->[1]); | |||||
588 | 0 | return $req->push_queue( $resp->[0] ) if ($req); | |||||
589 | 0 0 | push @{ $self->{recv_queue} }, $resp->[0]; | |||||
590 | } | ||||||
591 | |||||||
592 | sub last_threadTrace { | ||||||
593 | 0 | 0 | my $self = shift; | ||||
594 | 0 | my $new_last_threadTrace = shift; | |||||
595 | |||||||
596 | 0 | my $old_last_threadTrace = $self->{last_threadTrace}; | |||||
597 | 0 | if (defined $new_last_threadTrace) { | |||||
598 | 0 | $self->{last_threadTrace} = $new_last_threadTrace; | |||||
599 | 0 | return $new_last_threadTrace unless ($old_last_threadTrace); | |||||
600 | } | ||||||
601 | |||||||
602 | 0 | return $old_last_threadTrace; | |||||
603 | } | ||||||
604 | |||||||
605 | sub session_threadTrace { | ||||||
606 | 0 | 0 | my $self = shift; | ||||
607 | 0 | my $new_last_threadTrace = shift; | |||||
608 | |||||||
609 | 0 | my $old_last_threadTrace = $self->{session_threadTrace}; | |||||
610 | 0 | if (defined $new_last_threadTrace) { | |||||
611 | 0 | $self->{session_threadTrace} = $new_last_threadTrace; | |||||
612 | 0 | return $new_last_threadTrace unless ($old_last_threadTrace); | |||||
613 | } | ||||||
614 | |||||||
615 | 0 | return $old_last_threadTrace; | |||||
616 | } | ||||||
617 | |||||||
618 | sub last_message_type { | ||||||
619 | 0 | 0 | my $self = shift; | ||||
620 | 0 | my $new_last_message_type = shift; | |||||
621 | |||||||
622 | 0 | my $old_last_message_type = $self->{last_message_type}; | |||||
623 | 0 | if (defined $new_last_message_type) { | |||||
624 | 0 | $self->{last_message_type} = $new_last_message_type; | |||||
625 | 0 | return $new_last_message_type unless ($old_last_message_type); | |||||
626 | } | ||||||
627 | |||||||
628 | 0 | return $old_last_message_type; | |||||
629 | } | ||||||
630 | |||||||
631 | sub last_message_api_level { | ||||||
632 | 0 | 0 | my $self = shift; | ||||
633 | 0 | my $new_last_message_api_level = shift; | |||||
634 | |||||||
635 | 0 | my $old_last_message_api_level = $self->{last_message_api_level}; | |||||
636 | 0 | if (defined $new_last_message_api_level) { | |||||
637 | 0 | $self->{last_message_api_level} = $new_last_message_api_level; | |||||
638 | 0 | return $new_last_message_api_level unless ($old_last_message_api_level); | |||||
639 | } | ||||||
640 | |||||||
641 | 0 | return $old_last_message_api_level; | |||||
642 | } | ||||||
643 | |||||||
644 | sub remote_id { | ||||||
645 | 0 | 0 | my $self = shift; | ||||
646 | 0 | my $new_remote_id = shift; | |||||
647 | |||||||
648 | 0 | my $old_remote_id = $self->{remote_id}; | |||||
649 | 0 | if (defined $new_remote_id) { | |||||
650 | 0 | $self->{remote_id} = $new_remote_id; | |||||
651 | 0 | return $new_remote_id unless ($old_remote_id); | |||||
652 | } | ||||||
653 | |||||||
654 | 0 | return $old_remote_id; | |||||
655 | } | ||||||
656 | |||||||
657 | sub client_auth { | ||||||
658 | 0 | 0 | return undef; | ||||
659 | 0 | my $self = shift; | |||||
660 | 0 | my $new_ua = shift; | |||||
661 | |||||||
662 | 0 | my $old_ua = $self->{client_auth}; | |||||
663 | 0 | if (defined $new_ua) { | |||||
664 | 0 | $self->{client_auth} = $new_ua; | |||||
665 | 0 | return $new_ua unless ($old_ua); | |||||
666 | } | ||||||
667 | |||||||
668 | 0 | return $old_ua->cloneNode(1); | |||||
669 | } | ||||||
670 | |||||||
671 | sub state { | ||||||
672 | 0 | 0 | my $self = shift; | ||||
673 | 0 | my $new_state = shift; | |||||
674 | |||||||
675 | 0 | my $old_state = $self->{state}; | |||||
676 | 0 | if (defined $new_state) { | |||||
677 | 0 | $self->{state} = $new_state; | |||||
678 | 0 | return $new_state unless ($old_state); | |||||
679 | } | ||||||
680 | |||||||
681 | 0 | return $old_state; | |||||
682 | } | ||||||
683 | |||||||
684 | sub DESTROY { | ||||||
685 | 0 | my $self = shift; | |||||
686 | 0 0 | delete $$self{$_} for keys %$self; | |||||
687 | 0 | return undef; | |||||
688 | } | ||||||
689 | |||||||
690 | sub recv { | ||||||
691 | 0 | 0 | my $self = shift; | ||||
692 | 0 | my @proto_args = @_; | |||||
693 | 0 | my %args; | |||||
694 | |||||||
695 | 0 | if ( @proto_args ) { | |||||
696 | 0 | if ( !(@proto_args % 2) ) { | |||||
697 | 0 | %args = @proto_args; | |||||
698 | } elsif (@proto_args == 1) { | ||||||
699 | 0 | %args = ( timeout => @proto_args ); | |||||
700 | } | ||||||
701 | } | ||||||
702 | |||||||
703 | #$logger->debug( ref($self). " recv_queue before wait: " . $self->_print_queue(), INTERNAL ); | ||||||
704 | |||||||
705 | 0 | if( exists( $args{timeout} ) ) { | |||||
706 | 0 | $args{timeout} = int($args{timeout}); | |||||
707 | 0 | $self->{recv_timeout} = $args{timeout}; | |||||
708 | } | ||||||
709 | |||||||
710 | #$args{timeout} = 0 if ($self->complete); | ||||||
711 | |||||||
712 | 0 | if(defined($args{timeout})) { | |||||
713 | 0 | $logger->debug( ref($self) ."->recv with timeout " . $args{timeout}, INTERNAL ); | |||||
714 | } | ||||||
715 | |||||||
716 | 0 0 | my $avail = @{ $self->{recv_queue} }; | |||||
717 | 0 | $self->{remaining_recv_timeout} = $self->{recv_timeout}; | |||||
718 | |||||||
719 | 0 | if (!$args{count}) { | |||||
720 | 0 | if (wantarray) { | |||||
721 | 0 | $args{count} = $avail; | |||||
722 | } else { | ||||||
723 | 0 | $args{count} = 1; | |||||
724 | } | ||||||
725 | } | ||||||
726 | |||||||
727 | 0 | while ( $self->{remaining_recv_timeout} > 0 and $avail < $args{count} ) { | |||||
728 | 0 | last if $self->complete; | |||||
729 | 0 | my $starttime = time; | |||||
730 | 0 | $self->queue_wait($self->{remaining_recv_timeout}); | |||||
731 | 0 | my $endtime = time; | |||||
732 | 0 | if ($self->{timeout_reset}) { | |||||
733 | 0 | $self->{timeout_reset} = 0; | |||||
734 | } else { | ||||||
735 | 0 | $self->{remaining_recv_timeout} -= ($endtime - $starttime) | |||||
736 | } | ||||||
737 | 0 0 | $avail = @{ $self->{recv_queue} }; | |||||
738 | } | ||||||
739 | |||||||
740 | 0 | $self->timed_out(1) if ( $self->{remaining_recv_timeout} <= 0 ); | |||||
741 | |||||||
742 | 0 | my @list; | |||||
743 | 0 0 | while ( my $msg = shift @{ $self->{recv_queue} } ) { | |||||
744 | 0 | push @list, $msg; | |||||
745 | 0 | last if (scalar(@list) >= $args{count}); | |||||
746 | } | ||||||
747 | |||||||
748 | 0 | $logger->debug( "Number of matched responses: " . @list, DEBUG ); | |||||
749 | 0 | $self->queue_wait(0); # check for statuses | |||||
750 | |||||||
751 | 0 | return $list[0] if (!wantarray); | |||||
752 | 0 | return @list; | |||||
753 | } | ||||||
754 | |||||||
755 | sub timed_out { | ||||||
756 | 0 | 0 | my $self = shift; | ||||
757 | 0 | my $out = shift; | |||||
758 | 0 | $self->{timed_out} = $out if (defined $out); | |||||
759 | 0 | return $self->{timed_out}; | |||||
760 | } | ||||||
761 | |||||||
762 | sub push_resend { | ||||||
763 | 0 | 0 | my $self = shift; | ||||
764 | 0 | push @OpenSRF::AppSession::_RESEND_QUEUE, @_; | |||||
765 | } | ||||||
766 | |||||||
767 | sub flush_resend { | ||||||
768 | 0 | 0 | my $self = shift; | ||||
769 | 0 | $logger->debug( "Resending..." . @_RESEND_QUEUE, INTERNAL ); | |||||
770 | 0 | while ( my $req = shift @OpenSRF::AppSession::_RESEND_QUEUE ) { | |||||
771 | 0 | $req->resend unless $req->complete; | |||||
772 | } | ||||||
773 | } | ||||||
774 | |||||||
775 | |||||||
776 | sub queue_wait { | ||||||
777 | 0 | 0 | my $self = shift; | ||||
778 | 0 0 | if( ! $self->{peer_handle} ) { return 0; } | |||||
779 | 0 | my $timeout = shift || 0; | |||||
780 | 0 | $logger->debug( "Calling queue_wait($timeout)" , INTERNAL ); | |||||
781 | 0 | my $o = $self->{peer_handle}->process($timeout); | |||||
782 | 0 | $self->flush_resend; | |||||
783 | 0 | return $o; | |||||
784 | } | ||||||
785 | |||||||
786 | sub _print_queue { | ||||||
787 | 0 | my( $self ) = @_; | |||||
788 | 0 | my $string = ""; | |||||
789 | 0 0 | foreach my $msg ( @{$self->{recv_queue}} ) { | |||||
790 | 0 | $string = $string . $msg->toString(1) . "\n"; | |||||
791 | } | ||||||
792 | 0 | return $string; | |||||
793 | } | ||||||
794 | |||||||
795 | sub status { | ||||||
796 | 0 | 0 | my $self = shift; | ||||
797 | 0 | return unless $self; | |||||
798 | 0 | $self->send( 'STATUS', @_ ); | |||||
799 | } | ||||||
800 | |||||||
801 | sub reset_request_timeout { | ||||||
802 | 0 | 0 | my $self = shift; | ||||
803 | 0 | my $tt = shift; | |||||
804 | 0 | my $req = $self->app_request($tt); | |||||
805 | 0 | $req->{remaining_recv_timeout} = $req->{recv_timeout}; | |||||
806 | 0 | $req->{timout_reset} = 1; | |||||
807 | } | ||||||
808 | |||||||
809 | #------------------------------------------------------------------------------- | ||||||
810 | |||||||
811 | package OpenSRF::AppRequest; | ||||||
812 | 9 9 9 | 79 31 67 | use base qw/OpenSRF::AppSession/; | ||||
813 | 9 9 9 | 64 41 85 | use OpenSRF::Utils::Logger qw/:level/; | ||||
814 | 9 9 9 | 65 31 72 | use OpenSRF::DomainObject::oilsResponse qw/:status/; | ||||
815 | 9 9 9 | 60 34 63 | use Time::HiRes qw/time usleep/; | ||||
816 | |||||||
817 | sub new { | ||||||
818 | 0 | my $class = shift; | |||||
819 | 0 | $class = ref($class) || $class; | |||||
820 | |||||||
821 | 0 | my $session = shift; | |||||
822 | 0 | my $threadTrace = $session->session_threadTrace || $session->last_threadTrace; | |||||
823 | 0 | my $payload = shift; | |||||
824 | |||||||
825 | 0 | my $self = { session => $session, | |||||
826 | threadTrace => $threadTrace, | ||||||
827 | payload => $payload, | ||||||
828 | complete => 0, | ||||||
829 | timeout_reset => 0, | ||||||
830 | recv_timeout => 30, | ||||||
831 | remaining_recv_timeout => 30, | ||||||
832 | recv_queue => [], | ||||||
833 | }; | ||||||
834 | |||||||
835 | 0 | bless $self => $class; | |||||
836 | |||||||
837 | 0 0 | push @{ $self->session->{request_queue} }, $self; | |||||
838 | |||||||
839 | 0 | return $self; | |||||
840 | } | ||||||
841 | |||||||
842 | sub recv_timeout { | ||||||
843 | 0 | my $self = shift; | |||||
844 | 0 | my $timeout = shift; | |||||
845 | 0 | if (defined $timeout) { | |||||
846 | 0 | $self->{recv_timeout} = $timeout; | |||||
847 | 0 | $self->{remaining_recv_timeout} = $timeout; | |||||
848 | } | ||||||
849 | 0 | return $self->{recv_timeout}; | |||||
850 | } | ||||||
851 | |||||||
852 | sub queue_size { | ||||||
853 | 0 0 | my $size = @{$_[0]->{recv_queue}}; | |||||
854 | 0 | return $size; | |||||
855 | } | ||||||
856 | |||||||
857 | sub send { | ||||||
858 | 0 | 0 | my $self = shift; | ||||
859 | 0 | return unless ($self and $self->session and !$self->complete); | |||||
860 | 0 | $self->session->send(@_); | |||||
861 | } | ||||||
862 | |||||||
863 | sub finish { | ||||||
864 | 0 | 0 | my $self = shift; | ||||
865 | 0 | return unless $self->session; | |||||
866 | 0 | $self->session->remove_app_request($self); | |||||
867 | 0 0 | delete($$self{$_}) for (keys %$self); | |||||
868 | } | ||||||
869 | |||||||
870 | sub session { | ||||||
871 | 0 | return shift()->{session}; | |||||
872 | } | ||||||
873 | |||||||
874 | sub complete { | ||||||
875 | 0 | my $self = shift; | |||||
876 | 0 | my $complete = shift; | |||||
877 | 0 | return $self->{complete} if ($self->{complete}); | |||||
878 | 0 | if (defined $complete) { | |||||
879 | 0 | $self->{complete} = $complete; | |||||
880 | 0 | $self->{_duration} = time - $self->{_start} if ($self->{complete}); | |||||
881 | } else { | ||||||
882 | 0 | $self->session->queue_wait(0); | |||||
883 | } | ||||||
884 | 0 | return $self->{complete}; | |||||
885 | } | ||||||
886 | |||||||
887 | sub duration { | ||||||
888 | 0 | my $self = shift; | |||||
889 | 0 | $self->wait_complete; | |||||
890 | 0 | return $self->{_duration}; | |||||
891 | } | ||||||
892 | |||||||
893 | sub wait_complete { | ||||||
894 | 0 | my $self = shift; | |||||
895 | 0 | my $timeout = shift || 10; | |||||
896 | 0 | my $time_remaining = $timeout; | |||||
897 | |||||||
898 | 0 | while ( ! $self->complete and $time_remaining > 0 ) { | |||||
899 | 0 | my $starttime = time; | |||||
900 | 0 | $self->queue_wait($time_remaining); | |||||
901 | 0 | my $endtime = time; | |||||
902 | 0 | $time_remaining -= ($endtime - $starttime); | |||||
903 | } | ||||||
904 | |||||||
905 | 0 | return $self->complete; | |||||
906 | } | ||||||
907 | |||||||
908 | sub threadTrace { | ||||||
909 | 0 | return shift()->{threadTrace}; | |||||
910 | } | ||||||
911 | |||||||
912 | sub push_queue { | ||||||
913 | 0 | 0 | my $self = shift; | ||||
914 | 0 | my $resp = shift; | |||||
915 | 0 0 | if( !$resp ) { return 0; } | |||||
916 | 0 | if( UNIVERSAL::isa($resp, "Error")) { | |||||
917 | 0 | $self->{failed} = $resp; | |||||
918 | 0 | $self->complete(1); | |||||
919 | #return; eventually... | ||||||
920 | } | ||||||
921 | 0 0 | push @{ $self->{recv_queue} }, $resp; | |||||
922 | } | ||||||
923 | |||||||
924 | sub failed { | ||||||
925 | 0 | my $self = shift; | |||||
926 | 0 | return $self->{failed}; | |||||
927 | } | ||||||
928 | |||||||
929 | sub queue_wait { | ||||||
930 | 0 | 0 | my $self = shift; | ||||
931 | 0 | return $self->session->queue_wait(@_) | |||||
932 | } | ||||||
933 | |||||||
934 | 0 | sub payload { return shift()->{payload}; } | |||||
935 | |||||||
936 | sub resend { | ||||||
937 | 0 | my $self = shift; | |||||
938 | 0 | return unless ($self and $self->session and !$self->complete); | |||||
939 | 0 | OpenSRF::Utils::Logger->debug( "I'm resending the request for threadTrace ". $self->threadTrace, DEBUG); | |||||
940 | 0 | return $self->session->send('REQUEST', $self->payload, $self->threadTrace ); | |||||
941 | } | ||||||
942 | |||||||
943 | sub status { | ||||||
944 | 0 | 0 | my $self = shift; | ||||
945 | 0 | my $msg = shift; | |||||
946 | 0 | return unless ($self and $self->session and !$self->complete); | |||||
947 | 0 | $self->session->send( 'STATUS',$msg, $self->threadTrace ); | |||||
948 | } | ||||||
949 | |||||||
950 | # TODO stream_push only works when server sessions can accept RESULT | ||||||
951 | # messages, which is no longer supported. Create a new OpenSRF message | ||||||
952 | # type to support client-to-server streams. | ||||||
953 | #sub stream_push { | ||||||
954 | # my $self = shift; | ||||||
955 | # my $msg = shift; | ||||||
956 | # $self->respond( $msg ); | ||||||
957 | #} | ||||||
958 | |||||||
959 | sub respond { | ||||||
960 | 0 | my $self = shift; | |||||
961 | 0 | my $msg = shift; | |||||
962 | 0 | return unless ($self and $self->session and !$self->complete); | |||||
963 | |||||||
964 | 0 | my $response; | |||||
965 | 0 | if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) { | |||||
966 | 0 | $response = $msg; | |||||
967 | } else { | ||||||
968 | 0 | $response = new OpenSRF::DomainObject::oilsResult; | |||||
969 | 0 | $response->content($msg); | |||||
970 | } | ||||||
971 | |||||||
972 | 0 | $self->session->send('RESULT', $response, $self->threadTrace); | |||||
973 | } | ||||||
974 | |||||||
975 | sub respond_complete { | ||||||
976 | 0 | my $self = shift; | |||||
977 | 0 | my $msg = shift; | |||||
978 | 0 | return unless ($self and $self->session and !$self->complete); | |||||
979 | |||||||
980 | 0 | my $response; | |||||
981 | 0 | if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) { | |||||
982 | 0 | $response = $msg; | |||||
983 | } else { | ||||||
984 | 0 | $response = new OpenSRF::DomainObject::oilsResult; | |||||
985 | 0 | $response->content($msg); | |||||
986 | } | ||||||
987 | |||||||
988 | 0 | my $stat = OpenSRF::DomainObject::oilsConnectStatus->new( | |||||
989 | statusCode => STATUS_COMPLETE(), | ||||||
990 | status => 'Request Complete' ); | ||||||
991 | |||||||
992 | |||||||
993 | 0 | $self->session->send( 'RESULT' => $response, 'STATUS' => $stat, $self->threadTrace); | |||||
994 | 0 | $self->complete(1); | |||||
995 | } | ||||||
996 | |||||||
997 | sub register_death_callback { | ||||||
998 | 0 | my $self = shift; | |||||
999 | 0 | my $cb = shift; | |||||
1000 | 0 | $self->session->register_callback( death => $cb ); | |||||
1001 | } | ||||||
1002 | |||||||
1003 | |||||||
1004 | # utility method. checks to see of the request failed. | ||||||
1005 | # if so, throws an OpenSRF::EX::ERROR. if everything is | ||||||
1006 | # ok, it returns the content of the request | ||||||
1007 | sub gather { | ||||||
1008 | 0 | my $self = shift; | |||||
1009 | 0 | my $finish = shift; | |||||
1010 | 0 | $self->wait_complete; | |||||
1011 | 0 | my $resp = $self->recv( timeout => 60 ); | |||||
1012 | 0 | if( $self->failed() ) { | |||||
1013 | 0 | throw OpenSRF::EX::ERROR | |||||
1014 | ($self->failed()->stringify()); | ||||||
1015 | } | ||||||
1016 | 0 0 | if(!$resp) { return undef; } | |||||
1017 | 0 | my $content = $resp->content; | |||||
1018 | 0 0 | if($finish) { $self->finish();} | |||||
1019 | 0 | return $content; | |||||
1020 | } | ||||||
1021 | |||||||
1022 | |||||||
1023 | package OpenSRF::AppSubrequest; | ||||||
1024 | |||||||
1025 | sub respond { | ||||||
1026 | 0 | my $self = shift; | |||||
1027 | 0 | my $resp = shift; | |||||
1028 | 0 0 | push @{$$self{resp}}, $resp if (defined $resp); | |||||
1029 | } | ||||||
1030 | 0 | sub respond_complete { respond(@_); } | |||||
1031 | |||||||
1032 | sub new { | ||||||
1033 | 0 | my $class = shift; | |||||
1034 | 0 | $class = ref($class) || $class; | |||||
1035 | 0 | return bless({resp => [], @_}, $class); | |||||
1036 | } | ||||||
1037 | |||||||
1038 | 0 0 | sub responses { @{$_[0]->{resp}} } | |||||
1039 | |||||||
1040 | sub session { | ||||||
1041 | 0 | my $x = shift; | |||||
1042 | 0 | my $s = shift; | |||||
1043 | 0 | $x->{session} = $s if ($s); | |||||
1044 | 0 | return $x->{session}; | |||||
1045 | } | ||||||
1046 | |||||||
1047 | 0 | 0 | sub status {} | ||||
1048 | |||||||
1049 | |||||||
1050 | 1; | ||||||
1051 |