File: | blib/lib/OpenSRF/Transport.pm |
Coverage: | 27.3% |
line | stmt | bran | cond | sub | pod | time | code |
---|---|---|---|---|---|---|---|
1 | package OpenSRF::Transport; | ||||||
2 | 9 9 9 9 9 9 | 65 34 61 64 31 76 | use strict; use warnings; | ||||
3 | 9 9 9 | 62 57 53 | use base 'OpenSRF'; | ||||
4 | 9 9 9 | 70 31 60 | use Time::HiRes qw/time/; | ||||
5 | 9 9 9 | 81 32 83 | use OpenSRF::AppSession; | ||||
6 | 9 9 9 | 66 30 59 | use OpenSRF::Utils::JSON; | ||||
7 | 9 9 9 | 55 37 57 | use OpenSRF::Utils::Logger qw(:level); | ||||
8 | 9 9 9 | 62 36 77 | use OpenSRF::DomainObject::oilsResponse qw/:status/; | ||||
9 | 9 9 9 | 62 35 57 | use OpenSRF::EX qw/:try/; | ||||
10 | 9 9 9 | 118 39 110 | use OpenSRF::Transport::SlimJabber::MessageWrapper; | ||||
11 | |||||||
12 | #------------------ | ||||||
13 | # --- These must be implemented by all Transport subclasses | ||||||
14 | # ------------------------------------------- | ||||||
15 | |||||||
16 - 21 | =head2 get_listener Returns the package name of the package the system will use to gather incoming requests =cut | ||||||
22 | |||||||
23 | 0 | 1 | sub get_listener { shift()->alert_abstract(); } | ||||
24 | |||||||
25 - 29 | =head2 get_peer_client Returns the name of the package responsible for client communication =cut | ||||||
30 | |||||||
31 | 0 | 1 | sub get_peer_client { shift()->alert_abstract(); } | ||||
32 | |||||||
33 - 37 | =head2 get_msg_envelope Returns the name of the package responsible for parsing incoming messages =cut | ||||||
38 | |||||||
39 | 0 | 1 | sub get_msg_envelope { shift()->alert_abstract(); } | ||||
40 | |||||||
41 | # ------------------------------------------- | ||||||
42 | |||||||
43 | our $message_envelope; | ||||||
44 | my $logger = "OpenSRF::Utils::Logger"; | ||||||
45 | |||||||
46 | |||||||
47 | |||||||
48 - 54 | =head2 message_envelope( [$envelope] ); Sets the message envelope class that will allow us to extract information from the messages we receive from the low level transport =cut | ||||||
55 | |||||||
56 | sub message_envelope { | ||||||
57 | 0 | 1 | my( $class, $envelope ) = @_; | ||||
58 | 0 | if( $envelope ) { | |||||
59 | 0 | $message_envelope = $envelope; | |||||
60 | 0 | $envelope->use; | |||||
61 | 0 | if( $@ ) { | |||||
62 | 0 | $logger->error( | |||||
63 | "Error loading message_envelope: $envelope -> $@", ERROR); | ||||||
64 | } | ||||||
65 | } | ||||||
66 | 0 | return $message_envelope; | |||||
67 | } | ||||||
68 | |||||||
69 - 76 | =head2 handler( $data ) Creates a new MessageWrapper, extracts the remote_id, session_id, and message body from the message. Then, creates or retrieves the AppSession object with the session_id and remote_id. Finally, creates the message document from the body of the message and calls the handler method on the message document. =cut | ||||||
77 | |||||||
78 | sub handler { | ||||||
79 | 0 | 1 | my $start_time = time(); | ||||
80 | 0 | my( $class, $service, $data ) = @_; | |||||
81 | |||||||
82 | 0 | $logger->transport( "Transport handler() received $data", INTERNAL ); | |||||
83 | |||||||
84 | 0 | my $remote_id = $data->from; | |||||
85 | 0 | my $sess_id = $data->thread; | |||||
86 | 0 | my $body = $data->body; | |||||
87 | 0 | my $type = $data->type; | |||||
88 | |||||||
89 | 0 | $logger->set_osrf_xid($data->osrf_xid); | |||||
90 | |||||||
91 | |||||||
92 | 0 | if (defined($type) and $type eq 'error') { | |||||
93 | 0 | throw OpenSRF::EX::Session ("$remote_id IS NOT CONNECTED TO THE NETWORK!!!"); | |||||
94 | |||||||
95 | } | ||||||
96 | |||||||
97 | # See if the app_session already exists. If so, make | ||||||
98 | # sure the sender hasn't changed if we're a server | ||||||
99 | 0 | my $app_session = OpenSRF::AppSession->find( $sess_id ); | |||||
100 | 0 | if( $app_session and $app_session->endpoint == $app_session->SERVER() and | |||||
101 | $app_session->remote_id ne $remote_id ) { | ||||||
102 | |||||||
103 | 0 | my $c = OpenSRF::Utils::SettingsClient->new(); | |||||
104 | 0 | if($c->config_value("apps", $app_session->service, "migratable")) { | |||||
105 | 0 | $logger->debug("service is migratable, new client is $remote_id"); | |||||
106 | } else { | ||||||
107 | |||||||
108 | 0 | $logger->warn("Backend Gone or invalid sender"); | |||||
109 | 0 | my $res = OpenSRF::DomainObject::oilsBrokenSession->new(); | |||||
110 | 0 | $res->status( "Backend Gone or invalid sender, Reconnect" ); | |||||
111 | 0 | $app_session->status( $res ); | |||||
112 | 0 | return 1; | |||||
113 | } | ||||||
114 | } | ||||||
115 | |||||||
116 | # Retrieve or build the app_session as appropriate (server_build decides which to do) | ||||||
117 | 0 | $logger->transport( "AppSession is valid or does not exist yet", INTERNAL ); | |||||
118 | 0 | $app_session = OpenSRF::AppSession->server_build( $sess_id, $remote_id, $service ); | |||||
119 | |||||||
120 | 0 | if( ! $app_session ) { | |||||
121 | 0 | throw OpenSRF::EX::Session ("Transport::handler(): No AppSession object returned from server_build()"); | |||||
122 | } | ||||||
123 | |||||||
124 | # Create a document from the JSON contained within the message | ||||||
125 | 0 | my $doc; | |||||
126 | 0 0 | eval { $doc = OpenSRF::Utils::JSON->JSON2perl($body); }; | |||||
127 | 0 | if( $@ ) { | |||||
128 | |||||||
129 | 0 | $logger->warn("Received bogus JSON: $@"); | |||||
130 | 0 | $logger->warn("Bogus JSON data: $body"); | |||||
131 | 0 | my $res = OpenSRF::DomainObject::oilsXMLParseError->new( status => "JSON Parse Error --- $body\n\n$@" ); | |||||
132 | |||||||
133 | 0 | $app_session->status($res); | |||||
134 | #$app_session->kill_me; | ||||||
135 | 0 | return 1; | |||||
136 | } | ||||||
137 | |||||||
138 | 0 | $logger->transport( "Transport::handler() creating \n$body", INTERNAL ); | |||||
139 | |||||||
140 | # We need to disconnect the session if we got a jabber error on the client side. For | ||||||
141 | # server side, we'll just tear down the session and go away. | ||||||
142 | 0 | if (defined($type) and $type eq 'error') { | |||||
143 | # If we're a server | ||||||
144 | 0 | if( $app_session->endpoint == $app_session->SERVER() ) { | |||||
145 | 0 | $app_session->kill_me; | |||||
146 | 0 | return 1; | |||||
147 | } else { | ||||||
148 | 0 | $app_session->reset; | |||||
149 | 0 | $app_session->state( $app_session->DISCONNECTED ); | |||||
150 | # below will lead to infinite looping, should return an exception | ||||||
151 | #$app_session->push_resend( $app_session->app_request( | ||||||
152 | # $doc->documentElement->firstChild->threadTrace ) ); | ||||||
153 | 0 | $logger->debug( | |||||
154 | "Got Jabber error on client connection $remote_id, nothing we can do..", ERROR ); | ||||||
155 | 0 | return 1; | |||||
156 | } | ||||||
157 | } | ||||||
158 | |||||||
159 | # cycle through and pass each oilsMessage contained in the message | ||||||
160 | # up to the message layer for processing. | ||||||
161 | 0 | for my $msg (@$doc) { | |||||
162 | |||||||
163 | 0 | next unless ( $msg && UNIVERSAL::isa($msg => 'OpenSRF::DomainObject::oilsMessage')); | |||||
164 | |||||||
165 | 0 | if( $app_session->endpoint == $app_session->SERVER() ) { | |||||
166 | |||||||
167 | try { | ||||||
168 | |||||||
169 | 0 0 | if( ! $msg->handler( $app_session ) ) { return 0; } | |||||
170 | |||||||
171 | 0 | $logger->debug("Successfully handled message", DEBUG); | |||||
172 | |||||||
173 | } catch Error with { | ||||||
174 | |||||||
175 | 0 | my $e = shift; | |||||
176 | 0 | my $res = OpenSRF::DomainObject::oilsServerError->new(); | |||||
177 | 0 | $res->status( $res->status . "\n$e"); | |||||
178 | 0 | $app_session->status($res) if $res; | |||||
179 | 0 | $app_session->kill_me; | |||||
180 | 0 | return 0; | |||||
181 | |||||||
182 | 0 | }; | |||||
183 | |||||||
184 | } else { | ||||||
185 | |||||||
186 | 0 0 | if( ! $msg->handler( $app_session ) ) { return 0; } | |||||
187 | 0 | $logger->debug("Successfully handled message", DEBUG); | |||||
188 | |||||||
189 | } | ||||||
190 | |||||||
191 | } | ||||||
192 | |||||||
193 | 0 | $logger->debug(sprintf("Message processing duration: %.3fs",(time() - $start_time)), DEBUG); | |||||
194 | |||||||
195 | 0 | return $app_session; | |||||
196 | } | ||||||
197 | |||||||
198 | 1; |