package POE::Component::Generic; use POE 0.31; use POE::Wheel::Run; use POE::Filter::Line; use POE::Filter::Reference; use POE::Component::Generic::Child; use Carp qw(carp croak); use Devel::Symdump; use vars qw($AUTOLOAD); use Config; use strict; our $VERSION = '0.04'; sub AUTOLOAD { my $self = shift; my $method = $AUTOLOAD; $method =~ s/.*:://; return unless $method =~ /[^A-Z]/; warn "autoload method $method" if ($self->{debug}); my $hash = shift; unless( ref($hash) eq 'HASH' ) { croak "first param to $method must be a hash ref of options" } unless( $self->{method_map}{ $method } ) { croak qq(Can't locate object method "$method" via package ") .ref( $self ). qq("); } $hash->{wantarray} = wantarray() unless (defined($hash->{wantarray})); # use ->call() so that $generic->method() happens in order $poe_kernel->call( $self->session_id() => $method => $hash => @_ ); } ########################################################################## sub spawn { goto &new; } ################################################## sub new { my $package = shift; croak "$package needs an even number of parameters" if @_ & 1; my %params; { my %p = @_; while( my( $k, $v ) = each %p ) { $params{ lc $k } = $v; } } my $options = delete ( $params{'options'} ); unless( $params{package} ) { croak "Please specify a package"; } # map of commands to packages $params{method_map} = {}; # param storage $params{store} = {}; # request IDs $params{id} = "REQ000000"; my $self = bless(\%params, $package); POE::Component::Generic::Child::package_load( $self->{package} ); $self->package_map; $self->callback_map; $self->postback_map; $self->{session_id} = POE::Session->create( object_states => [ $self => { (map { $_ => 'request' } keys %{$self->{method_map}}) }, $self => [ qw(_start shutdown _child __wheel_close __wheel_err __wheel_out __wheel_stderr) ], ], ( ( defined ( $options ) and ref ( $options ) eq 'HASH' ) ? ( options => $options ) : () ), )->ID(); warn "session $self->{session_id} created for $params{package}" if ($self->{debug}); return $self; } ################################################## # Build a map of all methods => package sub package_map { my( $self ) = @_; my @methods = $self->package_methods( $self->{package} ); my %OK; if( $self->{methods} ) { @OK{ @{ $self->{methods} } } = (1) x @{ $self->{methods} }; } foreach my $p ( @methods ) { my ($pk,$sub) = ($p =~ m/^(.+)\:\:([^\:]+)/); next if $self->{method_map}->{ $sub }; next if $self->{methods} and not $OK{ $sub }; next unless ($sub =~ /[^A-Z_0-9]$/); next if ($sub =~ m/^_/ || $sub =~ m/(carp|croak|confess)$/); my $o = $p; if (defined &$o) { $self->{method_map}->{$sub} = $pk; } } } ################################################## # Get a list of all methods from the package sub package_methods { my( $self, $package ) = @_; my $isa = Symbol::qualify_to_ref( "ISA", $package ); my @obj = Devel::Symdump->functions( $package ); foreach my $subpack ( @{ *$isa } ) { next if $subpack eq 'Exporter'; push @obj, $self->package_methods( $subpack ); } # we can't distinguish methods from functions :-/ return @obj } ########################################################################## # POE related object methods sub _start { my ($kernel,$self) = @_[KERNEL,OBJECT]; $self->{session_id} = $_[SESSION]->ID; if ( $self->{alias} ) { $self->{name} = $self->{alias}; $kernel->alias_set( $self->{alias} ); $self->{debug} and warn "alias is $self->{alias}"; } else { $self->{name} = "poe-generic"; $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ ); } $poe_kernel->sig( CHLD => '_child' ); my %prog = (Program => \&process_requests); if ($self->{alt_fork}) { my $perl = $^X; if( $ENV{HARNESS_PERL_SWITCHES} ) { $perl .= " $ENV{HARNESS_PERL_SWITCHES}"; } %prog = (Program => "$perl -M".__PACKAGE__ ." -I".join(' -I',@INC) ." -e '".__PACKAGE__."::process_requests(1)'"); $self->{debug} and warn "Launching $prog{Program}"; } $self->{wheel} = POE::Wheel::Run->new( %prog, CloseOnCall => 0, StdinFilter => POE::Filter::Reference->new(), StdoutFilter => POE::Filter::Reference->new(), StderrFilter => POE::Filter::Line->new(), StdoutEvent => '__wheel_out', StderrEvent => '__wheel_stderr', ErrorEvent => '__wheel_err', CloseEvent => '__wheel_close', ); # Tell the other side to create an object $self->{object_options} ||= []; unless( ref $self->{object_options} ) { $self->{object_options} = [ $self->{object_options} ] } my $new = { req => 'setup', debug => $self->{debug}, args => $self->{object_options}, package => $self->{package}, name => $self->{name}, verbose => $self->{verbose}, }; $new->{size} = $self->{size} if $self->{size}; $self->{debug} and warn "Ask to create object"; $self->{wheel}->put( $new ); undef; } ###################################################### # Send request to child process sub request { my ( $self,$state ) = @_[OBJECT,STATE]; my $sender = $_[SENDER]->ID; warn "$$: processing request $state\n" if ($self->{debug}); # Get the arguments my $args; if (ref($_[ARG0]) eq 'HASH') { $args = { %{ $_[ARG0] } }; } else { warn "first parameter must be a ref hash, trying to adjust. " ."(fix this to get rid of this message)"; $args = { @_[ARG0 .. $#_ ] }; } if ($self->{wheel}) { # If we have an {event}, it means the user wants *something* back if( $args->{event} and not defined $args->{wantarray} ) { $args->{wantarray} = 0; } my $params = { state => $state, event => $args->{event}, wantarray => $args->{wantarray}, session => ($args->{session}||$sender), args => [ @_[ ARG1 .. $#_ ] ], }; my $id = $params->{id} = $self->{id}++; # param storage if ( keys %$args ) { # id to match in param storage $self->{store}->{$id} = $args; } # if we have an event to report to...make sure it stays around if ($args->{event}) { $poe_kernel->refcount_increment( $sender => $self->{name} ); } if( $self->{callback_map}{ $state } ) { $self->callback_marshall( $params ); } if( $self->{postback_map}{ $state } ) { $self->postback_marshall( $params, $sender ); } $self->{debug} and warn "request put"; $self->{wheel}->put($params); } elsif( $self->{debug} ) { warn "No wheel"; } undef; } ################################################## # Prepare the callback definitions sub callback_map { my( $self ) = @_; return unless $self->{callbacks}; my $c = $self->{callbacks}; $c = [$c] unless ref $c; my %callbacks; @callbacks{ @$c } = ( {state=>$c} ) x @$c; $self->{callback_map} = \%callbacks; return; } ################################################## # Marshall any callback definitions sub callback_marshall { my( $self, $params ) = @_; my $cmap = $self->{callback_map}{ $params->{state} }; return unless $cmap; my $args = $params->{args}; my @callbacks; for( my $pos=0; $pos <= $#$args; $pos++ ) { next unless 'CODE' eq ref $args->[$pos]; my $CBid = "---CALLBACK-$params->{id}-$pos---"; $self->{callback_defs}{ $params->{id} }{ $pos } = { coderef => $args->[$pos] }; push @callbacks, { CBid=>$CBid, pos=>$pos }; $args->[$pos] = $CBid; } return unless @callbacks; $params->{callbacks} = \@callbacks; return; } ################################################## # Prepare the postback definitions sub postback_map { my( $self ) = @_; return unless $self->{postbacks}; my $c = $self->{postbacks}; $c = {$c => {pos=>0}} unless ref $c; $c = { map { $_ => 0 } @$c } if 'ARRAY' eq ref $c; my %postbacks; while( my( $state, $pdef ) = each %$c ) { $postbacks{ $state } = { state=>$state, pos=>[] }; unless( ref $pdef ) { $postbacks{ $state }{pos} = [$pdef||0]; } elsif( 'ARRAY' eq ref $pdef ) { $postbacks{ $state }{pos} = [ map { $_||0 } @$pdef ]; } else { carp "postback position must be an arrayref or scalar"; } } $self->{postback_map} = \%postbacks; return; } ################################################## # Marshall any postback definitions sub postback_marshall { my( $self, $params, $sender ) = @_; my $pmap = $self->{postback_map}{ $params->{state} }; return unless $pmap; my $args = $params->{args}; my @postbacks; foreach my $pos ( @{ $pmap->{pos} } ) { my $PBid = "---POSTBACK-$pmap->{state}-$pos---"; $self->{postback_defs}{ $PBid } = $self->__postback_def( $args->[$pos], $sender, $PBid ); push @postbacks, { PBid=>$PBid, pos=>$pos }; $args->[$pos] = $PBid; } return unless @postbacks; $params->{postbacks} = \@postbacks; return; } ################################################## sub __postback_def { my( $self, $arg, $sender, $id ) = @_; unless( ref $arg ) { # simply an event name return { event=>$arg, session=>$sender }; } elsif( 'HASH' eq ref $arg ) { # { event=>'...' } $arg->{session} ||= $sender; return $arg; } die "$arg isn't not a valid postback"; } ###################################################### # Child process sent us a response sub __wheel_out { my ($self,$input) = @_[ OBJECT,ARG0 ]; $self->{debug} and warn "__wheel_out"; my $res = $input->{result} || []; if( $input->{response} ) { if( $input->{response} eq 'new' ) { $self->{child_PID} = $input->{PID}; $self->{debug} and warn "Child PID=$input->{PID}"; } elsif( $input->{response} eq 'callback' ) { my $id = $input->{id}; my $pos = $input->{pos}; my $CB = $self->{callback_defs}{ $id }{ $pos }; unless( $CB ) { warn "Callback to undefined $id\[$input->{pos}]"; return; } eval { $CB->{coderef}->( @$res ) }; warn "Error in callback: $@" if $@; } elsif( $input->{response} eq 'postback' ) { my $PBid = $input->{PBid}; my $PB = $self->{postback_defs}{ $PBid }; unless( $PB ) { warn "Postback to undefined $PBid"; return; } $poe_kernel->post( $PB->{session} => $PB->{event}, @$res ); } else { warn "Unknown child response $input->{response}"; } return; } if (defined $input->{id}) { my $id = delete $input->{id}; # splice in stored data, because we might not trust other side @{ $input }{ keys %{$self->{store}->{$id}} } = values %{$self->{store}->{$id}}; delete $self->{store}->{$id}; delete $self->{callback_defs}->{$id}; } my $session = delete $input->{session}; my $event = delete $input->{event}; if ($event) { $self->{debug} and warn "Reply to $session/$event"; $poe_kernel->post( $session => $event, $input => @$res ); $poe_kernel->refcount_decrement( $session => $self->{name} ); } undef; } sub __wheel_stderr { my ($kernel,$self,$input) = @_[KERNEL,OBJECT,ARG0]; warn "ERR:$self->{name}: $input\n" if $self->{debug} or $self->{verbose}; } sub __wheel_err { my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT, ARG0..ARG3]; warn "Wheel:$self->{name}: Wheel $wheel_id generated $operation error $errnum: $errstr\n" if $self->{debug} or ( $self->{verbose} and $errnum != 0 ); } sub __wheel_close { my $self = $_[OBJECT]; warn "Wheel closed\n" if ($self->{debug}); # warn "$self->{package} Wheel closed, ieeeeeeee!\n"; } sub _child { my( $self, $name, $PID, $ret ) = @_[ OBJECT, ARG0..ARG2 ]; unless( $PID == $self->{child_PID} ) { $self->{debug} and warn "Got CHLD for $PID"; return; } $self->{debug} and warn "Child $PID exited with $ret"; $poe_kernel->sig_handled; return; } ############################################################################ # Dual event and object methods sub shutdown { unless (UNIVERSAL::isa($_[KERNEL],'POE::Kernel')) { if ($poe_kernel) { $poe_kernel->call(shift->session_id() => 'shutdown' => @_); } return; } my ($kernel,$self) = @_[KERNEL,OBJECT]; # remove alias or decrease ref count if ($self->{alias}) { $kernel->alias_remove($_) for $kernel->alias_list(); } else { $kernel->refcount_decrement($self->session_id() => __PACKAGE__); } if ($self->{wheel}) { $self->{wheel}->shutdown_stdin; } undef; } # Object methods sub session_id { shift->{session_id}; } sub yield { my $self = shift; $poe_kernel->post($self->session_id() => @_); } sub call { my $self = shift; $poe_kernel->call($self->session_id() => @_); } sub DESTROY { if (UNIVERSAL::isa($_[0],__PACKAGE__)) { $_[0]->shutdown(); } } ########################################################################## # Main Wheel::Run process sub sub process_requests { my $alt_fork = shift || 0; binmode(STDIN); binmode(STDOUT); STDOUT->autoflush(1); my $runner = POE::Component::Generic::Child->new( name=>__PACKAGE__, size=>4096, debug=>0, proc=>$0, alt_fork=>$alt_fork ); $runner->loop; } 1; __END__ =head1 NAME POE::Component::Generic - A POE component that provides non-blocking access to a blocking object. =head1 SYNOPSIS use POE::Component::Generic; my $poco = POE::Component::Generic->new( package => 'Net::Telnet', # optional; You can use $poco->session_id() instead alias => 'telnet', # optional; 1 to turn on debugging debug => 1, # optional; Options passed to the internal session options => { trace => 1 }, # optional; Options passed to Net::Telnet->new() object_options => [ ], ); # Start your POE session, then... $kernel->post('telnet' => open => { event => 'result' },"rainmaker.wunderground.com"); sub result { my ($kernel, $ref, $result) = @_[KERNEL, ARG0, ARG1]; if( $ref->{error} ) { die join(' ', @{ $ref->{error} ) . "\n"; } print "connected: $result\n"; } =head1 DESCRIPTION POE::Component::Generic is a L component that provides a non-blocking wrapper around any object. It works by creating a L and creating the object in the child process. Method requests are then serialised and sent to the child to be handled. Return values are posted back to your session. This means that requests and return values must survive serialisation. Method calls are wrapped in C so that errors may be propagated back to your session. See L. Output to STDERR in the child is shown only if C is set. Output to STDOUT in the child will MESS THINGS UP. So don't do that. =head1 METHODS =head2 new Takes a number of arguments, all but C are optional. =over 4 =item alias Session alias to register with the kernel. Default is none. =item alt_fork Set to C if you want to run another perl instance. That is, the child process will C a new instance of C using C<$^X> to do the work. C<@INC> is preserved. If present, C<$ENV{HARNESS_PERL_SWITCHES}> is used. Using C might help save memory; while the child process will only contain C and your object, it will not be able to share as many memory pages with other processes. Care must be taken that the all necessary modules are loaded in the new perl instance. Make sure that C will C all modules that it might interact with. Default is false. =item callbacks List of methods that have callbacks in their parameter list. When one of the methods in the list is called, any coderefs in the parameters are converted into a message to the child process to propagate the call back to the parent. A callback is a coderef that the object will only use during that method call. After the method returns, the callback will be invalidated. If you need to pass a coderef that must last longer then one method, use L. IMPORTANT: The callback is called from inside L. This means that the current session is NOT your session. If you need to be inside your session, use C. Defaults to empty. =item debug Set to C to see component debug information, such as anything output to STDERR by your code. Default to C. =item methods An array ref containing methods that you want the component to expose. If not specified, all the methods of C and its super-classes are exposed. Note that methods that being with C<_> or don't end with a lower case letter (C) are excluded, as well as methods that end in C, C and C. =item options A hashref of L options that are passed to the component's session creator. =item object_options An optional array ref of options that will be passed to the object constructor. =item package Package name that is used to create the object. Object creation happens in the child process. The package is loaded, if it isn't already, then a constructor is called with C as the parameters. The constructor is a package method named C, C or C, searching in that order. =item postbacks List of methods that have a coderef in there parameters. These coderefs will remain valid after the method returns. C must be a hashref, keys are method names, values are argument offsets list that will be converted into postbacks. These offsets maybe be a number, or an array of numeric offsets. Remember that argument offsets are numbered from 0. C may also be an array of method names. In this case, the argument offset is assumed to be 0. Examples: [ qw( new_cert new_connect ) ] { new_cert=>0, new_connect=>0 } # equivalent to previous { double_set=>[0,3] } When calling a method that has a postback, you specify an event name in the current session, or a hashref containing C and C keys. If C is missing, the current session is used. Yes, this means you may create postbacks that go to other sessions. Examples: "some_back" { event=>"some_back" } { event=>"some_back", session=>"my-session" } If you want a postback a postback to call a coderef, use L. =item verbose Component tells you more about what is happening in the child process. The child's PID is reported to STDERR. All text sent to STDERR in the child process is report. Any abnormal error conditions or exits are also reported. The proceeding is reported via warn. =back =head2 shutdown Shut the component down, doing all the magic so that POE needs to do. The child process will exit, causing C to be called on your object. The child process will of course wait if the object is in a blocking method. Note that this is also a POE event, which means you can not call a method named 'shutdown' on your object. Shuting down if there are response pending (see L below) is undefined. =head2 session_id Takes no arguments, returns the L ID of the component. Useful if you don't want to use aliases. =head1 METHOD CALLS There are 4 ways of calling methods on the object. All methods need a data hashref that will be handed back to the return event. This data hash is discussed in the L section. =head2 post Post events to the object. First argument is the event to post, second is the data hashref, following arguments are sent as arguments to the resultant post. $poe_kernel->post( $alias => 'request', open => { event => 'result' }, "localhost" ); =head2 yield This method provides an alternative object based means of posting events to the component. First argument is the event to post, second is the data hashref, following arguments are sent as arguments to the resultant post. $poco->yield( open => { event => 'result' }, "localhost" ); =head2 call This method provides an alternative object based means of calling events to the component. First argument is the event to call, second is the data hashref, following arguments are following arguments are sent as arguments to the resultant call. $poco->call( open => { event => 'result' }, "localhost" ); =head2 simple method All methods of the object can be called, but the first param must be the data hashref as noted below in the L section below. For example: $poco->open( { event => 'opened' },"localhost" ); =head1 INPUT Each event requires a data hashref as an argument. The data hashref may have the following keys. =over 4 =item event Event in your session that you want the results of the method to go to. C is needed for all requests that you want a response to. No response is sent if C is missing. =item wantarray Should the method be called in array context (1), scalar context (0) or void context (undef)? This will be set to a reasonable value for you if you don't specify it. =back It is possible to pass arbitary data in the request hashref that could be used in the resultant event handler. Simply define additional key/value pairs of your own. It is recommended that one prefixes keys with '__' to avoid future clashes. =head1 OUTPUT For each requested operation an event handler is required. C of this event handler contains the data hashref. C are the returned values. =over 4 =item state The method that was called. =item result This is data returned from the function you called. Usually a scalar, but can be an array ref when using 'option_callback'. =item error In the event of an error occurring this will be defined. It is an scalar which contains the error. =back =head1 STATUS For your comfort and conveinence, the child process update C<$O> to tell you what it is doing. On many systems, C<$O> is available via C. =head1 AUTHOR Philip Gwyn Egwyn-at-cpan.orgE Based on work by David Davis Exantus@cpan.orgE =head1 SEE ALSO L =head1 RATING Please rate this module. L =head1 BUGS Haven't implemented postbacks yet. Probably. Report them here: L =head1 CREDITS BinGOs for L that helped xantus get started. =head1 COPYRIGHT AND LICENSE Copyright 2006 by Philip Gwyn; Copyright 2005 by David Davis and Teknikill Software. This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut