File: | blib/lib/OpenSRF/Transport/SlimJabber/Inbound.pm |
Coverage: | 23.7% |
line | stmt | bran | cond | sub | pod | time | code |
---|---|---|---|---|---|---|---|
1 | package OpenSRF::Transport::SlimJabber::Inbound; | ||||||
2 | 9 9 9 9 9 9 | 53 35 60 67 33 56 | use strict;use warnings; | ||||
3 | 9 9 9 | 61 35 62 | use base qw/OpenSRF::Transport::SlimJabber::Client/; | ||||
4 | 9 9 9 | 69 672 84 | use OpenSRF::EX qw(:try); | ||||
5 | 9 9 9 | 67 33 61 | use OpenSRF::Utils::Logger qw(:level); | ||||
6 | 9 9 9 | 72 36 108 | use OpenSRF::Utils::SettingsClient; | ||||
7 | 9 9 9 | 60 31 65 | use OpenSRF::Utils::Config; | ||||
8 | 9 9 9 | 67 31 61 | use Time::HiRes qw/usleep/; | ||||
9 | 9 9 9 | 65 34 68 | use FreezeThaw qw/freeze/; | ||||
10 | |||||||
11 | my $logger = "OpenSRF::Utils::Logger"; | ||||||
12 | |||||||
13 - 22 | =head1 Description This is the jabber connection where all incoming client requests will be accepted. This connection takes the data, passes it off to the system then returns to take more data. Connection params are all taken from the config file and the values retreived are based on the $app name passed into new(). This service should be loaded at system startup. =cut | ||||||
23 | |||||||
24 | { | ||||||
25 | my $unix_sock; | ||||||
26 | 0 | 0 | sub unix_sock { return $unix_sock; } | ||||
27 | my $instance; | ||||||
28 | |||||||
29 | sub new { | ||||||
30 | 0 | 1 | my( $class, $app ) = @_; | ||||
31 | 0 | $class = ref( $class ) || $class; | |||||
32 | 0 | if( ! $instance ) { | |||||
33 | |||||||
34 | 0 | my $conf = OpenSRF::Utils::Config->current; | |||||
35 | 0 | my $domain = $conf->bootstrap->domain; | |||||
36 | 0 | $logger->error("use of <domains/> is deprecated") if $conf->bootstrap->domains; | |||||
37 | |||||||
38 | 0 | my $username = $conf->bootstrap->username; | |||||
39 | 0 | my $password = $conf->bootstrap->passwd; | |||||
40 | 0 | my $port = $conf->bootstrap->port; | |||||
41 | 0 | my $host = $domain; | |||||
42 | 0 | my $resource = $app . '_listener_at_' . $conf->env->hostname; | |||||
43 | |||||||
44 | 0 | my $no_router = 0; # make this a config entry if we want to use it | |||||
45 | 0 | if($no_router) { | |||||
46 | # no router, only one listener running.. | ||||||
47 | 0 | $username = "router"; | |||||
48 | 0 | $resource = $app; | |||||
49 | } | ||||||
50 | |||||||
51 | 0 | OpenSRF::Utils::Logger->transport("Inbound as $username, $password, $resource, $host, $port\n", INTERNAL ); | |||||
52 | |||||||
53 | 0 | my $self = __PACKAGE__->SUPER::new( | |||||
54 | username => $username, | ||||||
55 | resource => $resource, | ||||||
56 | password => $password, | ||||||
57 | host => $host, | ||||||
58 | port => $port, | ||||||
59 | ); | ||||||
60 | |||||||
61 | 0 | $self->{app} = $app; | |||||
62 | |||||||
63 | 0 | my $client = OpenSRF::Utils::SettingsClient->new(); | |||||
64 | 0 | my $f = $client->config_value("dirs", "sock"); | |||||
65 | 0 | $unix_sock = join( "/", $f, | |||||
66 | $client->config_value("apps", $app, "unix_config", "unix_sock" )); | ||||||
67 | 0 | bless( $self, $class ); | |||||
68 | 0 | $instance = $self; | |||||
69 | } | ||||||
70 | 0 | return $instance; | |||||
71 | } | ||||||
72 | |||||||
73 | } | ||||||
74 | |||||||
75 | sub DESTROY { | ||||||
76 | 0 | my $self = shift; | |||||
77 | 0 0 | for my $router (@{$self->{routers}}) { | |||||
78 | 0 | if($self->tcp_connected()) { | |||||
79 | 0 | $logger->info("disconnecting from router $router"); | |||||
80 | 0 | $self->send( to => $router, body => "registering", | |||||
81 | router_command => "unregister" , router_class => $self->{app} ); | ||||||
82 | } | ||||||
83 | } | ||||||
84 | } | ||||||
85 | |||||||
86 | sub listen { | ||||||
87 | 0 | 0 | my $self = shift; | ||||
88 | |||||||
89 | 0 | $self->{routers} = []; | |||||
90 | |||||||
91 | try { | ||||||
92 | |||||||
93 | 0 | my $conf = OpenSRF::Utils::Config->current; | |||||
94 | 0 | my $router_name = $conf->bootstrap->router_name; | |||||
95 | 0 | my $routers = $conf->bootstrap->routers; | |||||
96 | 0 | $logger->info("loading router info $routers"); | |||||
97 | |||||||
98 | 0 | for my $router (@$routers) { | |||||
99 | 0 | if(ref $router) { | |||||
100 | 0 0 | if( !$router->{services} || | |||||
101 | !$router->{services}->{service} || | ||||||
102 | ( | ||||||
103 | ref($router->{services}->{service}) eq 'ARRAY' and | ||||||
104 | 0 | grep { $_ eq $self->{app} } @{$router->{services}->{service}} ) || | |||||
105 | $router->{services}->{service} eq $self->{app}) { | ||||||
106 | |||||||
107 | 0 | my $name = $router->{name}; | |||||
108 | 0 | my $domain = $router->{domain}; | |||||
109 | 0 | my $target = "$name\@$domain/router"; | |||||
110 | 0 0 | push(@{$self->{routers}}, $target); | |||||
111 | 0 | $logger->info( $self->{app} . " connecting to router $target"); | |||||
112 | 0 | $self->send( to => $target, body => "registering", router_command => "register" , router_class => $self->{app} ); | |||||
113 | } | ||||||
114 | } else { | ||||||
115 | 0 | my $target = "$router_name\@$router/router"; | |||||
116 | 0 0 | push(@{$self->{routers}}, $target); | |||||
117 | 0 | $logger->info( $self->{app} . " connecting to router $target"); | |||||
118 | 0 | $self->send( to => $target, body => "registering", router_command => "register" , router_class => $self->{app} ); | |||||
119 | } | ||||||
120 | } | ||||||
121 | |||||||
122 | } catch Error with { | ||||||
123 | 0 | my $err = shift; | |||||
124 | 0 | $logger->error($self->{app} . ": No routers defined: $err"); | |||||
125 | # no routers defined | ||||||
126 | 0 | }; | |||||
127 | |||||||
128 | |||||||
129 | |||||||
130 | |||||||
131 | 0 | $logger->transport( $self->{app} . " going into listen loop", INFO ); | |||||
132 | |||||||
133 | 0 | while(1) { | |||||
134 | |||||||
135 | 0 | my $sock = $self->unix_sock(); | |||||
136 | 0 | my $o; | |||||
137 | |||||||
138 | 0 | $logger->debug("Inbound listener calling process()"); | |||||
139 | |||||||
140 | try { | ||||||
141 | 0 | $o = $self->process(-1); | |||||
142 | |||||||
143 | 0 | if(!$o){ | |||||
144 | 0 | $logger->error( | |||||
145 | "Inbound received no data from the Jabber socket in process()"); | ||||||
146 | 0 | usleep(100000); # otherwise we loop and pound syslog logger with errors | |||||
147 | } | ||||||
148 | |||||||
149 | } catch OpenSRF::EX::JabberDisconnected with { | ||||||
150 | |||||||
151 | 0 | $logger->error("Inbound process lost its ". | |||||
152 | "jabber connection. Attempting to reconnect..."); | ||||||
153 | 0 | $self->initialize; | |||||
154 | 0 | $o = undef; | |||||
155 | 0 | }; | |||||
156 | |||||||
157 | |||||||
158 | 0 | if($o) { | |||||
159 | 0 | my $socket = IO::Socket::UNIX->new( Peer => $sock ); | |||||
160 | 0 | throw OpenSRF::EX::Socket( | |||||
161 | "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " ) | ||||||
162 | unless ($socket->connected); | ||||||
163 | 0 | print $socket freeze($o); | |||||
164 | 0 | $socket->close; | |||||
165 | } | ||||||
166 | } | ||||||
167 | |||||||
168 | 0 | throw OpenSRF::EX::Socket( "How did we get here?!?!" ); | |||||
169 | } | ||||||
170 | |||||||
171 | 1; | ||||||
172 |