File: | blib/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm |
Coverage: | 20.2% |
line | stmt | bran | cond | sub | pod | time | code |
---|---|---|---|---|---|---|---|
1 | package OpenSRF::Transport::SlimJabber::XMPPReader; | ||||||
2 | 9 9 9 9 9 9 | 56 34 48 60 34 52 | use strict; use warnings; | ||||
3 | 9 9 9 | 122 41 139 | use XML::Parser; | ||||
4 | 9 9 9 | 77 34 98 | use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); | ||||
5 | 9 9 9 | 64 35 74 | use Time::HiRes qw/time/; | ||||
6 | 9 9 9 | 130 40 153 | use OpenSRF::Transport::SlimJabber::XMPPMessage; | ||||
7 | 9 9 9 | 74 33 59 | use OpenSRF::Utils::Logger qw/$logger/; | ||||
8 | |||||||
9 | # ----------------------------------------------------------- | ||||||
10 | # Connect, disconnect, and authentication messsage templates | ||||||
11 | # ----------------------------------------------------------- | ||||||
12 | 9 | 53 | use constant JABBER_CONNECT => | ||||
13 | 9 9 | 64 36 | "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>"; | ||||
14 | |||||||
15 | 9 | 49 | use constant JABBER_BASIC_AUTH => | ||||
16 | "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" . | ||||||
17 | 9 9 | 62 34 | "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>"; | ||||
18 | |||||||
19 | 9 9 9 | 60 37 53 | use constant JABBER_DISCONNECT => "</stream:stream>"; | ||||
20 | |||||||
21 | |||||||
22 | # ----------------------------------------------------------- | ||||||
23 | # XMPP Stream states | ||||||
24 | # ----------------------------------------------------------- | ||||||
25 | 9 9 9 | 84 40 50 | use constant DISCONNECTED => 1; | ||||
26 | 9 9 9 | 62 35 50 | use constant CONNECT_RECV => 2; | ||||
27 | 9 9 9 | 63 39 46 | use constant CONNECTED => 3; | ||||
28 | |||||||
29 | |||||||
30 | # ----------------------------------------------------------- | ||||||
31 | # XMPP Message states | ||||||
32 | # ----------------------------------------------------------- | ||||||
33 | 9 9 9 | 74 33 56 | use constant IN_NOTHING => 1; | ||||
34 | 9 9 9 | 59 35 49 | use constant IN_BODY => 2; | ||||
35 | 9 9 9 | 59 44 51 | use constant IN_THREAD => 3; | ||||
36 | 9 9 9 | 62 33 46 | use constant IN_STATUS => 4; | ||||
37 | |||||||
38 | |||||||
39 | # ----------------------------------------------------------- | ||||||
40 | # Constructor, getter/setters | ||||||
41 | # ----------------------------------------------------------- | ||||||
42 | sub new { | ||||||
43 | 0 | 0 | my $class = shift; | ||||
44 | 0 | my $socket = shift; | |||||
45 | |||||||
46 | 0 | my $self = bless({}, $class); | |||||
47 | |||||||
48 | 0 | $self->{queue} = []; | |||||
49 | 0 | $self->{stream_state} = DISCONNECTED; | |||||
50 | 0 | $self->{xml_state} = IN_NOTHING; | |||||
51 | 0 | $self->socket($socket); | |||||
52 | |||||||
53 | 0 | my $p = new XML::Parser(Handlers => { | |||||
54 | Start => \&start_element, | ||||||
55 | End => \&end_element, | ||||||
56 | Char => \&characters, | ||||||
57 | }); | ||||||
58 | |||||||
59 | 0 | $self->parser($p->parse_start); # create a push parser | |||||
60 | 0 | $self->parser->{_parent_} = $self; | |||||
61 | 0 | $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new; | |||||
62 | 0 | return $self; | |||||
63 | } | ||||||
64 | |||||||
65 | sub push_msg { | ||||||
66 | 0 | 0 | my($self, $msg) = @_; | ||||
67 | 0 0 | push(@{$self->{queue}}, $msg) if $msg; | |||||
68 | } | ||||||
69 | |||||||
70 | sub next_msg { | ||||||
71 | 0 | 0 | my $self = shift; | ||||
72 | 0 0 | return shift @{$self->{queue}}; | |||||
73 | } | ||||||
74 | |||||||
75 | sub peek_msg { | ||||||
76 | 0 | 0 | my $self = shift; | ||||
77 | 0 0 | return (@{$self->{queue}} > 0); | |||||
78 | } | ||||||
79 | |||||||
80 | sub parser { | ||||||
81 | 0 | 0 | my($self, $parser) = @_; | ||||
82 | 0 | $self->{parser} = $parser if $parser; | |||||
83 | 0 | return $self->{parser}; | |||||
84 | } | ||||||
85 | |||||||
86 | sub socket { | ||||||
87 | 0 | 0 | my($self, $socket) = @_; | ||||
88 | 0 | $self->{socket} = $socket if $socket; | |||||
89 | 0 | return $self->{socket}; | |||||
90 | } | ||||||
91 | |||||||
92 | sub stream_state { | ||||||
93 | 0 | 0 | my($self, $stream_state) = @_; | ||||
94 | 0 | $self->{stream_state} = $stream_state if $stream_state; | |||||
95 | 0 | return $self->{stream_state}; | |||||
96 | } | ||||||
97 | |||||||
98 | sub xml_state { | ||||||
99 | 0 | 0 | my($self, $xml_state) = @_; | ||||
100 | 0 | $self->{xml_state} = $xml_state if $xml_state; | |||||
101 | 0 | return $self->{xml_state}; | |||||
102 | } | ||||||
103 | |||||||
104 | sub message { | ||||||
105 | 0 | 0 | my($self, $message) = @_; | ||||
106 | 0 | $self->{message} = $message if $message; | |||||
107 | 0 | return $self->{message}; | |||||
108 | } | ||||||
109 | |||||||
110 | |||||||
111 | # ----------------------------------------------------------- | ||||||
112 | # Stream and connection handling methods | ||||||
113 | # ----------------------------------------------------------- | ||||||
114 | |||||||
115 | sub connect { | ||||||
116 | 0 | 0 | my($self, $domain, $username, $password, $resource) = @_; | ||||
117 | |||||||
118 | 0 | $self->send(sprintf(JABBER_CONNECT, $domain)); | |||||
119 | 0 | $self->wait(10); | |||||
120 | |||||||
121 | 0 | unless($self->{stream_state} == CONNECT_RECV) { | |||||
122 | 0 | $logger->error("No initial XMPP response from server"); | |||||
123 | 0 | return 0; | |||||
124 | } | ||||||
125 | |||||||
126 | 0 | $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource)); | |||||
127 | 0 | $self->wait(10); | |||||
128 | |||||||
129 | 0 | unless($self->connected) { | |||||
130 | 0 | $logger->error('XMPP connect failed'); | |||||
131 | 0 | return 0; | |||||
132 | } | ||||||
133 | |||||||
134 | 0 | return 1; | |||||
135 | } | ||||||
136 | |||||||
137 | sub disconnect { | ||||||
138 | 0 | 0 | my $self = shift; | ||||
139 | 0 | return unless $self->socket; | |||||
140 | 0 | if($self->tcp_connected) { | |||||
141 | 0 | $self->send(JABBER_DISCONNECT); | |||||
142 | 0 | shutdown($self->socket, 2); | |||||
143 | } | ||||||
144 | 0 | close($self->socket); | |||||
145 | } | ||||||
146 | |||||||
147 | # ----------------------------------------------------------- | ||||||
148 | # returns true if this stream is connected to the server | ||||||
149 | # ----------------------------------------------------------- | ||||||
150 | sub connected { | ||||||
151 | 0 | 0 | my $self = shift; | ||||
152 | 0 | return ($self->tcp_connected and $self->{stream_state} == CONNECTED); | |||||
153 | } | ||||||
154 | |||||||
155 | # ----------------------------------------------------------- | ||||||
156 | # returns true if the socket is connected | ||||||
157 | # ----------------------------------------------------------- | ||||||
158 | sub tcp_connected { | ||||||
159 | 0 | 0 | my $self = shift; | ||||
160 | 0 | return ($self->socket and $self->socket->connected); | |||||
161 | } | ||||||
162 | |||||||
163 | # ----------------------------------------------------------- | ||||||
164 | # sends pre-formated XML | ||||||
165 | # ----------------------------------------------------------- | ||||||
166 | sub send { | ||||||
167 | 0 | 0 | my($self, $xml) = @_; | ||||
168 | 0 | $self->{socket}->print($xml); | |||||
169 | } | ||||||
170 | |||||||
171 | # ----------------------------------------------------------- | ||||||
172 | # Puts a file handle into blocking mode | ||||||
173 | # ----------------------------------------------------------- | ||||||
174 | sub set_block { | ||||||
175 | 0 | 0 | my $fh = shift; | ||||
176 | 0 | my $flags = fcntl($fh, F_GETFL, 0); | |||||
177 | 0 | $flags &= ~O_NONBLOCK; | |||||
178 | 0 | fcntl($fh, F_SETFL, $flags); | |||||
179 | } | ||||||
180 | |||||||
181 | |||||||
182 | # ----------------------------------------------------------- | ||||||
183 | # Puts a file handle into non-blocking mode | ||||||
184 | # ----------------------------------------------------------- | ||||||
185 | sub set_nonblock { | ||||||
186 | 0 | 0 | my $fh = shift; | ||||
187 | 0 | my $flags = fcntl($fh, F_GETFL, 0); | |||||
188 | 0 | fcntl($fh, F_SETFL, $flags | O_NONBLOCK); | |||||
189 | } | ||||||
190 | |||||||
191 | |||||||
192 | sub wait { | ||||||
193 | 0 | 0 | my($self, $timeout) = @_; | ||||
194 | |||||||
195 | 0 | return $self->next_msg if $self->peek_msg; | |||||
196 | |||||||
197 | 0 | $timeout ||= 0; | |||||
198 | 0 | $timeout = undef if $timeout < 0; | |||||
199 | 0 | my $socket = $self->{socket}; | |||||
200 | |||||||
201 | 0 | set_block($socket); | |||||
202 | |||||||
203 | # build the select readset | ||||||
204 | 0 | my $infile = ''; | |||||
205 | 0 | vec($infile, $socket->fileno, 1) = 1; | |||||
206 | 0 | return undef unless select($infile, undef, undef, $timeout); | |||||
207 | |||||||
208 | # now slurp the data off the socket | ||||||
209 | 0 | my $buf; | |||||
210 | 0 | my $read_size = 1024; | |||||
211 | 0 | my $nonblock = 0; | |||||
212 | 0 | while(my $n = sysread($socket, $buf, $read_size)) { | |||||
213 | 0 | $self->{parser}->parse_more($buf) if $buf; | |||||
214 | 0 | if($n < $read_size or $self->peek_msg) { | |||||
215 | 0 | set_block($socket) if $nonblock; | |||||
216 | 0 | last; | |||||
217 | } | ||||||
218 | 0 | set_nonblock($socket) unless $nonblock; | |||||
219 | 0 | $nonblock = 1; | |||||
220 | } | ||||||
221 | |||||||
222 | 0 | return $self->next_msg; | |||||
223 | } | ||||||
224 | |||||||
225 | # ----------------------------------------------------------- | ||||||
226 | # Waits up to timeout seconds for a fully-formed XMPP | ||||||
227 | # message to arrive. If timeout is < 0, waits indefinitely | ||||||
228 | # ----------------------------------------------------------- | ||||||
229 | sub wait_msg { | ||||||
230 | 0 | 0 | my($self, $timeout) = @_; | ||||
231 | 0 | my $xml; | |||||
232 | |||||||
233 | 0 | $timeout = 0 unless defined $timeout; | |||||
234 | |||||||
235 | 0 | if($timeout < 0) { | |||||
236 | 0 | while(1) { | |||||
237 | 0 | return $xml if $xml = $self->wait($timeout); | |||||
238 | } | ||||||
239 | |||||||
240 | } else { | ||||||
241 | 0 | while($timeout >= 0) { | |||||
242 | 0 | my $start = time; | |||||
243 | 0 | return $xml if $xml = $self->wait($timeout); | |||||
244 | 0 | $timeout -= time - $start; | |||||
245 | } | ||||||
246 | } | ||||||
247 | |||||||
248 | 0 | return undef; | |||||
249 | } | ||||||
250 | |||||||
251 | |||||||
252 | # ----------------------------------------------------------- | ||||||
253 | # SAX Handlers | ||||||
254 | # ----------------------------------------------------------- | ||||||
255 | |||||||
256 | |||||||
257 | sub start_element { | ||||||
258 | 0 | 0 | my($parser, $name, %attrs) = @_; | ||||
259 | 0 | my $self = $parser->{_parent_}; | |||||
260 | |||||||
261 | 0 | if($name eq 'message') { | |||||
262 | |||||||
263 | 0 | my $msg = $self->{message}; | |||||
264 | 0 | $msg->{to} = $attrs{'to'}; | |||||
265 | 0 | $msg->{from} = $attrs{router_from} if $attrs{router_from}; | |||||
266 | 0 | $msg->{from} = $attrs{from} unless $msg->{from}; | |||||
267 | 0 | $msg->{osrf_xid} = $attrs{'osrf_xid'}; | |||||
268 | 0 | $msg->{type} = $attrs{type}; | |||||
269 | |||||||
270 | } elsif($name eq 'body') { | ||||||
271 | 0 | $self->{xml_state} = IN_BODY; | |||||
272 | |||||||
273 | } elsif($name eq 'thread') { | ||||||
274 | 0 | $self->{xml_state} = IN_THREAD; | |||||
275 | |||||||
276 | } elsif($name eq 'stream:stream') { | ||||||
277 | 0 | $self->{stream_state} = CONNECT_RECV; | |||||
278 | |||||||
279 | } elsif($name eq 'iq') { | ||||||
280 | 0 | if($attrs{type} and $attrs{type} eq 'result') { | |||||
281 | 0 | $self->{stream_state} = CONNECTED; | |||||
282 | } | ||||||
283 | |||||||
284 | } elsif($name eq 'status') { | ||||||
285 | 0 | $self->{xml_state } = IN_STATUS; | |||||
286 | |||||||
287 | } elsif($name eq 'stream:error') { | ||||||
288 | 0 | $self->{stream_state} = DISCONNECTED; | |||||
289 | |||||||
290 | } elsif($name eq 'error') { | ||||||
291 | 0 | $self->{message}->{err_type} = $attrs{'type'}; | |||||
292 | 0 | $self->{message}->{err_code} = $attrs{'code'}; | |||||
293 | } | ||||||
294 | } | ||||||
295 | |||||||
296 | sub characters { | ||||||
297 | 0 | 0 | my($parser, $chars) = @_; | ||||
298 | 0 | my $self = $parser->{_parent_}; | |||||
299 | 0 | my $state = $self->{xml_state}; | |||||
300 | |||||||
301 | 0 | if($state == IN_BODY) { | |||||
302 | 0 | $self->{message}->{body} .= $chars; | |||||
303 | |||||||
304 | } elsif($state == IN_THREAD) { | ||||||
305 | 0 | $self->{message}->{thread} .= $chars; | |||||
306 | |||||||
307 | } elsif($state == IN_STATUS) { | ||||||
308 | 0 | $self->{message}->{status} .= $chars; | |||||
309 | } | ||||||
310 | } | ||||||
311 | |||||||
312 | sub end_element { | ||||||
313 | 0 | 0 | my($parser, $name) = @_; | ||||
314 | 0 | my $self = $parser->{_parent_}; | |||||
315 | 0 | $self->{xml_state} = IN_NOTHING; | |||||
316 | |||||||
317 | 0 | if($name eq 'message') { | |||||
318 | 0 | $self->push_msg($self->{message}); | |||||
319 | 0 | $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new; | |||||
320 | |||||||
321 | } elsif($name eq 'stream:stream') { | ||||||
322 | 0 | $self->{stream_state} = DISCONNECTED; | |||||
323 | } | ||||||
324 | } | ||||||
325 | |||||||
326 | |||||||
327 | # read all the data on the jabber socket through the | ||||||
328 | # parser and drop the resulting message | ||||||
329 | sub flush_socket { | ||||||
330 | 0 | 0 | my $self = shift; | ||||
331 | 0 | return 0 unless $self->connected; | |||||
332 | |||||||
333 | 0 | while ($self->wait(0)) { | |||||
334 | # TODO remove this log line | ||||||
335 | 0 | $logger->info("flushing data from socket..."); | |||||
336 | } | ||||||
337 | |||||||
338 | 0 | return $self->connected; | |||||
339 | } | ||||||
340 | |||||||
341 | |||||||
342 | |||||||
343 | 1; | ||||||
344 |