File Coverage

File:blib/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm
Coverage:20.2%

linestmtbrancondsubpodtimecode
1package OpenSRF::Transport::SlimJabber::XMPPReader;
2
9
9
9
9
9
9
56
34
48
60
34
52
use strict; use warnings;
3
9
9
9
122
41
139
use XML::Parser;
4
9
9
9
77
34
98
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
5
9
9
9
64
35
74
use Time::HiRes qw/time/;
6
9
9
9
130
40
153
use OpenSRF::Transport::SlimJabber::XMPPMessage;
7
9
9
9
74
33
59
use OpenSRF::Utils::Logger qw/$logger/;
8
9# -----------------------------------------------------------
10# Connect, disconnect, and authentication messsage templates
11# -----------------------------------------------------------
12
9
53
use constant JABBER_CONNECT =>
13
9
9
64
36
    "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
14
15
9
49
use constant JABBER_BASIC_AUTH =>
16    "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
17
9
9
62
34
    "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
18
19
9
9
9
60
37
53
use constant JABBER_DISCONNECT => "</stream:stream>";
20
21
22# -----------------------------------------------------------
23# XMPP Stream states
24# -----------------------------------------------------------
25
9
9
9
84
40
50
use constant DISCONNECTED => 1;
26
9
9
9
62
35
50
use constant CONNECT_RECV => 2;
27
9
9
9
63
39
46
use constant CONNECTED => 3;
28
29
30# -----------------------------------------------------------
31# XMPP Message states
32# -----------------------------------------------------------
33
9
9
9
74
33
56
use constant IN_NOTHING => 1;
34
9
9
9
59
35
49
use constant IN_BODY => 2;
35
9
9
9
59
44
51
use constant IN_THREAD => 3;
36
9
9
9
62
33
46
use constant IN_STATUS => 4;
37
38
39# -----------------------------------------------------------
40# Constructor, getter/setters
41# -----------------------------------------------------------
42sub new {
43
0
0
    my $class = shift;
44
0
    my $socket = shift;
45
46
0
    my $self = bless({}, $class);
47
48
0
    $self->{queue} = [];
49
0
    $self->{stream_state} = DISCONNECTED;
50
0
    $self->{xml_state} = IN_NOTHING;
51
0
    $self->socket($socket);
52
53
0
    my $p = new XML::Parser(Handlers => {
54        Start => \&start_element,
55        End => \&end_element,
56        Char => \&characters,
57    });
58
59
0
    $self->parser($p->parse_start); # create a push parser
60
0
    $self->parser->{_parent_} = $self;
61
0
    $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
62
0
    return $self;
63}
64
65sub push_msg {
66
0
0
    my($self, $msg) = @_;
67
0
0
    push(@{$self->{queue}}, $msg) if $msg;
68}
69
70sub next_msg {
71
0
0
    my $self = shift;
72
0
0
    return shift @{$self->{queue}};
73}
74
75sub peek_msg {
76
0
0
    my $self = shift;
77
0
0
    return (@{$self->{queue}} > 0);
78}
79
80sub parser {
81
0
0
    my($self, $parser) = @_;
82
0
    $self->{parser} = $parser if $parser;
83
0
    return $self->{parser};
84}
85
86sub socket {
87
0
0
    my($self, $socket) = @_;
88
0
    $self->{socket} = $socket if $socket;
89
0
    return $self->{socket};
90}
91
92sub stream_state {
93
0
0
    my($self, $stream_state) = @_;
94
0
    $self->{stream_state} = $stream_state if $stream_state;
95
0
    return $self->{stream_state};
96}
97
98sub xml_state {
99
0
0
    my($self, $xml_state) = @_;
100
0
    $self->{xml_state} = $xml_state if $xml_state;
101
0
    return $self->{xml_state};
102}
103
104sub message {
105
0
0
    my($self, $message) = @_;
106
0
    $self->{message} = $message if $message;
107
0
    return $self->{message};
108}
109
110
111# -----------------------------------------------------------
112# Stream and connection handling methods
113# -----------------------------------------------------------
114
115sub connect {
116
0
0
    my($self, $domain, $username, $password, $resource) = @_;
117
118
0
    $self->send(sprintf(JABBER_CONNECT, $domain));
119
0
    $self->wait(10);
120
121
0
    unless($self->{stream_state} == CONNECT_RECV) {
122
0
        $logger->error("No initial XMPP response from server");
123
0
        return 0;
124    }
125
126
0
    $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
127
0
    $self->wait(10);
128
129
0
    unless($self->connected) {
130
0
        $logger->error('XMPP connect failed');
131
0
        return 0;
132    }
133
134
0
    return 1;
135}
136
137sub disconnect {
138
0
0
    my $self = shift;
139
0
    return unless $self->socket;
140
0
    if($self->tcp_connected) {
141
0
        $self->send(JABBER_DISCONNECT);
142
0
        shutdown($self->socket, 2);
143    }
144
0
    close($self->socket);
145}
146
147# -----------------------------------------------------------
148# returns true if this stream is connected to the server
149# -----------------------------------------------------------
150sub connected {
151
0
0
    my $self = shift;
152
0
    return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
153}
154
155# -----------------------------------------------------------
156# returns true if the socket is connected
157# -----------------------------------------------------------
158sub tcp_connected {
159
0
0
    my $self = shift;
160
0
    return ($self->socket and $self->socket->connected);
161}
162
163# -----------------------------------------------------------
164# sends pre-formated XML
165# -----------------------------------------------------------
166sub send {
167
0
0
    my($self, $xml) = @_;
168
0
    $self->{socket}->print($xml);
169}
170
171# -----------------------------------------------------------
172# Puts a file handle into blocking mode
173# -----------------------------------------------------------
174sub set_block {
175
0
0
    my $fh = shift;
176
0
    my $flags = fcntl($fh, F_GETFL, 0);
177
0
    $flags &= ~O_NONBLOCK;
178
0
    fcntl($fh, F_SETFL, $flags);
179}
180
181
182# -----------------------------------------------------------
183# Puts a file handle into non-blocking mode
184# -----------------------------------------------------------
185sub set_nonblock {
186
0
0
    my $fh = shift;
187
0
    my $flags = fcntl($fh, F_GETFL, 0);
188
0
    fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
189}
190
191
192sub wait {
193
0
0
    my($self, $timeout) = @_;
194
195
0
    return $self->next_msg if $self->peek_msg;
196
197
0
    $timeout ||= 0;
198
0
    $timeout = undef if $timeout < 0;
199
0
    my $socket = $self->{socket};
200
201
0
    set_block($socket);
202
203    # build the select readset
204
0
    my $infile = '';
205
0
    vec($infile, $socket->fileno, 1) = 1;
206
0
    return undef unless select($infile, undef, undef, $timeout);
207
208    # now slurp the data off the socket
209
0
    my $buf;
210
0
    my $read_size = 1024;
211
0
    my $nonblock = 0;
212
0
    while(my $n = sysread($socket, $buf, $read_size)) {
213
0
        $self->{parser}->parse_more($buf) if $buf;
214
0
        if($n < $read_size or $self->peek_msg) {
215
0
            set_block($socket) if $nonblock;
216
0
            last;
217        }
218
0
        set_nonblock($socket) unless $nonblock;
219
0
        $nonblock = 1;
220    }
221
222
0
    return $self->next_msg;
223}
224
225# -----------------------------------------------------------
226# Waits up to timeout seconds for a fully-formed XMPP
227# message to arrive. If timeout is < 0, waits indefinitely
228# -----------------------------------------------------------
229sub wait_msg {
230
0
0
    my($self, $timeout) = @_;
231
0
    my $xml;
232
233
0
    $timeout = 0 unless defined $timeout;
234
235
0
    if($timeout < 0) {
236
0
        while(1) {
237
0
            return $xml if $xml = $self->wait($timeout);
238        }
239
240    } else {
241
0
        while($timeout >= 0) {
242
0
            my $start = time;
243
0
            return $xml if $xml = $self->wait($timeout);
244
0
            $timeout -= time - $start;
245        }
246    }
247
248
0
    return undef;
249}
250
251
252# -----------------------------------------------------------
253# SAX Handlers
254# -----------------------------------------------------------
255
256
257sub start_element {
258
0
0
    my($parser, $name, %attrs) = @_;
259
0
    my $self = $parser->{_parent_};
260
261
0
    if($name eq 'message') {
262
263
0
        my $msg = $self->{message};
264
0
        $msg->{to} = $attrs{'to'};
265
0
        $msg->{from} = $attrs{router_from} if $attrs{router_from};
266
0
        $msg->{from} = $attrs{from} unless $msg->{from};
267
0
        $msg->{osrf_xid} = $attrs{'osrf_xid'};
268
0
        $msg->{type} = $attrs{type};
269
270    } elsif($name eq 'body') {
271
0
        $self->{xml_state} = IN_BODY;
272
273    } elsif($name eq 'thread') {
274
0
        $self->{xml_state} = IN_THREAD;
275
276    } elsif($name eq 'stream:stream') {
277
0
        $self->{stream_state} = CONNECT_RECV;
278
279    } elsif($name eq 'iq') {
280
0
        if($attrs{type} and $attrs{type} eq 'result') {
281
0
            $self->{stream_state} = CONNECTED;
282        }
283
284    } elsif($name eq 'status') {
285
0
        $self->{xml_state } = IN_STATUS;
286
287    } elsif($name eq 'stream:error') {
288
0
        $self->{stream_state} = DISCONNECTED;
289
290    } elsif($name eq 'error') {
291
0
        $self->{message}->{err_type} = $attrs{'type'};
292
0
        $self->{message}->{err_code} = $attrs{'code'};
293    }
294}
295
296sub characters {
297
0
0
    my($parser, $chars) = @_;
298
0
    my $self = $parser->{_parent_};
299
0
    my $state = $self->{xml_state};
300
301
0
    if($state == IN_BODY) {
302
0
        $self->{message}->{body} .= $chars;
303
304    } elsif($state == IN_THREAD) {
305
0
        $self->{message}->{thread} .= $chars;
306
307    } elsif($state == IN_STATUS) {
308
0
        $self->{message}->{status} .= $chars;
309    }
310}
311
312sub end_element {
313
0
0
    my($parser, $name) = @_;
314
0
    my $self = $parser->{_parent_};
315
0
    $self->{xml_state} = IN_NOTHING;
316
317
0
    if($name eq 'message') {
318
0
        $self->push_msg($self->{message});
319
0
        $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
320
321    } elsif($name eq 'stream:stream') {
322
0
        $self->{stream_state} = DISCONNECTED;
323    }
324}
325
326
327# read all the data on the jabber socket through the
328# parser and drop the resulting message
329sub flush_socket {
330
0
0
        my $self = shift;
331
0
    return 0 unless $self->connected;
332
333
0
    while ($self->wait(0)) {
334        # TODO remove this log line
335
0
        $logger->info("flushing data from socket...");
336    }
337
338
0
    return $self->connected;
339}
340
341
342
3431;
344