File Coverage

File:blib/lib/OpenSRF/UnixServer.pm
Coverage:29.6%

linestmtbrancondsubpodtimecode
1package OpenSRF::UnixServer;
2
9
9
9
9
9
9
49
32
56
61
30
64
use strict; use warnings;
3
9
9
9
61
36
61
use base qw/OpenSRF/;
4
9
9
9
73
28
61
use OpenSRF::EX qw(:try);
5
9
9
9
65
32
57
use OpenSRF::Utils::Logger qw(:level $logger);
6
9
9
9
69
35
70
use OpenSRF::Transport::PeerHandle;
7
9
9
9
124
39
105
use OpenSRF::Application;
8
9
9
9
95
30
66
use OpenSRF::AppSession;
9
9
9
9
62
29
63
use OpenSRF::DomainObject::oilsResponse qw/:status/;
10
9
9
9
69
33
82
use OpenSRF::System;
11
9
9
9
61
33
54
use OpenSRF::Utils::SettingsClient;
12
9
9
9
55
33
63
use Time::HiRes qw(time);
13
9
9
9
64
35
58
use OpenSRF::Utils::JSON;
14
9
9
9
61
38
69
use vars qw/@ISA $app/;
15
9
9
9
63
36
74
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
16
9
9
9
61
34
54
use Carp;
17
9
9
9
68
33
55
use FreezeThaw qw/thaw/;
18
19
9
9
9
61
32
139
use IO::Socket::INET;
20
9
9
9
69
35
74
use IO::Socket::UNIX;
21
22
0
sub DESTROY { confess "Dying $$"; }
23
24 - 34
=head1 What am I

All inbound messages are passed on to the UnixServer for processing.
We take the data, close the Unix socket, and pass the data on to our abstract
'process()' method.  

Our purpose is to 'multiplex' a single TCP connection into multiple 'client' connections.
So when you pass data down the Unix socket to us, we have been preforked and waiting
to disperse new data among us.

=cut
35
36
0
0
sub app { return $app; }
37
38{
39
40        sub new {
41
0
0
                my( $class, $app1 ) = @_;
42
0
                if( ! $app1 ) {
43
0
                        throw OpenSRF::EX::InvalidArg( "UnixServer requires an app name to run" );
44                }
45
0
                $app = $app1;
46
0
                my $self = bless( {}, $class );
47# my $client = OpenSRF::Utils::SettingsClient->new();
48# if( $client->config_value("server_type") !~ /fork/i ||
49# OpenSRF::Utils::Config->current->bootstrap->settings_config ) {
50# warn "Calling hooks for non-prefork\n";
51# $self->configure_hook();
52# $self->child_init_hook();
53# }
54
0
                return $self;
55        }
56
57}
58
59 - 64
=head2 process_request()

Takes the incoming data, closes the Unix socket and hands the data untouched 
to the abstract process() method.  This method is implemented in our subclasses.

=cut
65
66sub process_request {
67
68
0
1
        my $self = shift;
69
0
0
        my $data; my $d;
70
0
0
        while( $d = <STDIN> ) { $data .= $d; }
71
72
0
        my $orig = $0;
73
0
        $0 = "$0*";
74
75
0
        if( ! $data or ! defined( $data ) or $data eq "" ) {
76
0
                close($self->{server}->{client});
77
0
                $logger->debug("Unix child received empty data from socket", ERROR);
78
0
                $0 = $orig;
79
0
                return;
80        }
81
82
83
0
        if( ! close( $self->{server}->{client} ) ) {
84
0
                $logger->debug( "Error closing Unix socket: $!", ERROR );
85        }
86
87
0
        my $app = $self->app();
88
0
        $logger->transport( "UnixServer for $app received $data", INTERNAL );
89
90        # --------------------------------------------------------------
91        # Drop all data from the socket before coninuting to process
92        # --------------------------------------------------------------
93
0
        my $ph = OpenSRF::Transport::PeerHandle->retrieve;
94
0
        if(!$ph->flush_socket()) {
95
0
                $logger->error("We received a request ".
96                        "and we are no longer connected to the jabber network. ".
97                        "We will go away and drop this request: $data");
98
0
                exit;
99        }
100
101
0
    ($data) = thaw($data);
102
0
        my $app_session = OpenSRF::Transport->handler( $self->app(), $data );
103
104
0
        if(!ref($app_session)) {
105
0
                $logger->transport( "Did not receive AppSession from transport handler, returning...", WARN );
106
0
                $0 = $orig;
107
0
                return;
108        }
109
110
0
        if($app_session->stateless and $app_session->state != $app_session->CONNECTED()){
111
0
                $logger->debug("Exiting keepalive for stateless session / orig = $orig");
112
0
                $app_session->kill_me;
113
0
                $0 = $orig;
114
0
                return;
115        }
116
117
118
0
        my $client = OpenSRF::Utils::SettingsClient->new();
119
0
        my $keepalive = $client->config_value("apps", $self->app(), "keepalive");
120
121
0
        my $req_counter = 0;
122
0
        while( $app_session and
123                        $app_session->state and
124                        $app_session->state != $app_session->DISCONNECTED() and
125                        $app_session->find( $app_session->session_id ) ) {
126
127
128
0
                my $before = time;
129
0
                $logger->debug( "UnixServer calling queue_wait $keepalive", INTERNAL );
130
0
                $app_session->queue_wait( $keepalive );
131
0
                $logger->debug( "after queue wait $keepalive", INTERNAL );
132
0
                my $after = time;
133
134
0
                if( ($after - $before) >= $keepalive ) {
135
136
0
                        my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
137                                                                        status => "Disconnected on timeout",
138                                                                        statusCode => STATUS_TIMEOUT);
139
0
                        $app_session->status($res);
140
0
                        $app_session->state( $app_session->DISCONNECTED() );
141
0
                        last;
142                }
143
144        }
145
146
0
        my $x = 0;
147
0
        while( $app_session && $app_session->queue_wait(0) ) {
148
0
                $logger->debug( "Looping on zombies " . $x++ , DEBUG);
149        }
150
151
0
        $logger->debug( "Timed out, disconnected, or authentication failed" );
152
0
        $app_session->kill_me if ($app_session);
153
154
0
        $0 = $orig;
155}
156
157
158sub serve {
159
0
0
        my( $self ) = @_;
160
161
0
        my $app = $self->app();
162
0
        $logger->set_service($app);
163
164
0
        $0 = "OpenSRF master [$app]";
165
166
0
        my $client = OpenSRF::Utils::SettingsClient->new();
167
0
    my @base = ('apps', $app, 'unix_config' );
168
169
0
        my $min_servers = $client->config_value(@base, 'min_children');
170
0
        my $max_servers = $client->config_value(@base, "max_children" );
171
0
        my $min_spare = $client->config_value(@base, "min_spare_children" );
172
0
        my $max_spare = $client->config_value(@base, "max_spare_children" );
173
0
        my $max_requests = $client->config_value(@base, "max_requests" );
174    # fwiw, these file paths are (obviously) not portable
175
0
        my $log_file = join("/", $client->config_value("dirs", "log"), $client->config_value(@base, "unix_log" ));
176
0
        my $port = join("/", $client->config_value("dirs", "sock"), $client->config_value(@base, "unix_sock" ));
177
0
        my $pid_file = join("/", $client->config_value("dirs", "pid"), $client->config_value(@base, "unix_pid" ));
178
179
0
    $min_spare ||= $min_servers;
180
0
    $max_spare ||= $max_servers;
181
0
    $max_requests ||= 1000;
182
183
0
    $logger->info("UnixServer: min=$min_servers, max=$max_servers, min_spare=$min_spare ".
184        "max_spare=$max_spare, max_req=$max_requests, log_file=$log_file, port=$port, pid_file=$pid_file");
185
186
0
    $self->run(
187        min_servers => $min_servers,
188        max_servers => $max_servers,
189        min_spare_servers => $min_spare,
190        max_spare_servers => $max_spare,
191        max_requests => $max_requests,
192        log_file => $log_file,
193        port => $port,
194        proto => 'unix',
195        pid_file => $pid_file,
196    );
197
198}
199
200
201sub configure_hook {
202
0
0
        my $self = shift;
203
0
        my $app = $self->app;
204
205        # boot a client
206
0
        OpenSRF::System->bootstrap_client( client_name => "system_client" );
207
208
0
        $logger->debug( "Setting application implementation for $app", DEBUG );
209
0
        my $client = OpenSRF::Utils::SettingsClient->new();
210
0
        my $imp = $client->config_value("apps", $app, "implementation");
211
0
        OpenSRF::Application::server_class($app);
212
0
        OpenSRF::Application->application_implementation( $imp );
213
0
        OpenSRF::Utils::JSON->register_class_hint( name => $imp, hint => $app, type => "hash" );
214
0
        OpenSRF::Application->application_implementation->initialize()
215                if (OpenSRF::Application->application_implementation->can('initialize'));
216
217
0
        if( $client->config_value("server_type") !~ /fork/i ) {
218
0
                $self->child_init_hook();
219        }
220
221
0
        my $con = OpenSRF::Transport::PeerHandle->retrieve;
222
0
        if($con) {
223
0
                $con->disconnect;
224        }
225
226
0
        return OpenSRF::Application->application_implementation;
227}
228
229sub child_init_hook {
230
231
0
0
        $0 =~ s/master/drone/g;
232
233
0
        if ($ENV{OPENSRF_PROFILE}) {
234
0
                my $file = $0;
235
0
                $file =~ s/\W/_/go;
236
0
                eval "use Devel::Profiler output_file => '/tmp/profiler_$file.out', buffer_size => 0;";
237
0
                if ($@) {
238
0
                        $logger->debug("Could not load Devel::Profiler: $@",ERROR);
239                } else {
240
0
                        $0 .= ' [PROFILING]';
241
0
                        $logger->debug("Running under Devel::Profiler", INFO);
242                }
243        }
244
245
0
        my $self = shift;
246
247# $logger->transport(
248# "Creating PeerHandle from UnixServer child_init_hook", INTERNAL );
249
0
        OpenSRF::Transport::PeerHandle->construct( $self->app() );
250
0
        $logger->transport( "PeerHandle Created from UnixServer child_init_hook", INTERNAL );
251
252
0
        OpenSRF::Application->application_implementation->child_init
253                if (OpenSRF::Application->application_implementation->can('child_init'));
254
255
0
        return OpenSRF::Transport::PeerHandle->retrieve;
256}
257
258sub child_finish_hook {
259
0
0
    $logger->debug("attempting to call child exit handler...");
260
0
        OpenSRF::Application->application_implementation->child_exit
261                if (OpenSRF::Application->application_implementation->can('child_exit'));
262}
263
264
2651;
266