--- globus_2_4_3_adv2003_fix892_fix956_more/gram/jobmanager/source/scripts/JobManager.pm Mon Apr 14 21:06:55 2003 +++ globus_2_4_3_adv2003_fix892_fix956_plus/gram/jobmanager/source/scripts/JobManager.pm Sun Jan 4 20:24:37 2004 @@ -3,9 +3,9 @@ # # CVS Information: # $Source: /home/globdev/CVS/globus-packages/gram/jobmanager/source/scripts/JobManager.pm,v $ -# $Date: 2003/04/14 19:06:55 $ -# $Revision: 1.7.6.1 $ -# $Author: gose $ +# $Date: 2003/11/13 00:51:30 $ +# $Revision: 1.11 $ +# $Author: smartin $ # use Globus::GRAM::Error; use Globus::GRAM::JobState; @@ -13,6 +13,7 @@ use Globus::Core::Paths; use POSIX; +use Errno; use File::Path; use File::Copy; @@ -31,6 +32,7 @@ $manager = new Globus::GRAM::JobManager($job_description); $manager->log("Starting new operation"); + $manager->nfssync($fileobj,$createflag); $manager->respond($hashref); $hashref = $manager->submit(); $hashref = $manager->poll(); @@ -45,6 +47,9 @@ $hashref = $manager->remote_io_file_create(); $hashref = $manager->proxy_relocate(); $hashref = $manager->proxy_update(); + $scalar = $manager->pipe_out_cmd(@arglist); + ($stderr, $rc) = $manager->pipe_err_cmd(@arglist); + $status = $manager->fork_and_exec_cmd(@arglist); $manager->append_path($hash, $variable, $path); =head1 DESCRIPTION @@ -117,6 +122,33 @@ return; } +=item $manager->nfssync($object,$create) + +Send an NFS update by touching the file (or directory) in question. If the +$create is true, a file will be created. If it is false, the $object will +not be created. + +=cut + +sub nfssync +{ + my $self = shift; + my $object = shift; + my $create_p = shift; + + my $now = time(); + unless ( utime( $now, $now, $object ) ) + { + # object did not exist + if ( $create_p ) + { + local(*TEMP); + close(TEMP) if ( open( TEMP, ">$object" ) ); + } + } + $self->log( "Sent NFS sync for $object" ); +} + =item $manager->respond($message) Send a response to the job manager program. The response may either be @@ -299,7 +331,8 @@ return Globus::GRAM::Error::INVALID_SCRATCH; } - while(!$created) + my $Loops = 0; + while( (!$created) && ($Loops++ < 100) ) { # Files with names comprised of Ascii values 48-122 should be # relatively easy to remove from the shell if things go bad. @@ -320,6 +353,7 @@ $created = mkdir($dirname, 0700); if($created) { + $self->nfssync( $dirname, 0 ); $self->log("I think it was made.... verifying"); if (-l $dirname || ! -d $dirname || ! -o $dirname) { @@ -327,7 +361,22 @@ $created = 0; } } + elsif( $!{EEXIST} ) + { + $self->log("Already exist; trying again"); + } + else + { + last; + } } + + # We give up + if (!$created) + { + return Globus::GRAM::Error::INVALID_SCRATCH; + } + $self->log("Using $dirname as the scratch directory for this job."); return {SCRATCH_DIR => $dirname}; @@ -416,7 +465,8 @@ chomp($url = $description->$_()); if($url =~ m|^[a-zA-Z]+://|) { - $filename = pipe_out_cmd($cache_pgm, '-query', '-t', $tag, $url); + my @arg = ($cache_pgm, '-query', '-t', $tag, $url); + $filename = $self->pipe_out_cmd(@arg); if($filename ne '') { $description->add($_, $filename); @@ -449,21 +499,68 @@ my $description = $self->{JobDescription}; my $tag = $description->cache_tag() or $tag = $ENV{'GLOBUS_GRAM_JOB_CONTACT'}; - my ($remote, $local, $local_resolved, $cached, @arg); + my ($remote, $local, $local_resolved, $cached, $stderr, $rc, @arg); $self->log("stage_in(enter)"); if($description->executable() =~ m|^[a-zA-Z]+://|) { @arg = ($cache_pgm, '-add', '-t', $tag, $description->executable()); - return Globus::GRAM::Error::STAGING_EXECUTABLE - if (fork_and_exec_cmd(@arg) != 0); + + ($stderr, $rc) = $self->pipe_err_cmd(@arg); + + if ($rc != 0) { + $self->log("executable staging failed with $stderr"); + + $self->respond( { + 'GT3_FAILURE_TYPE' => 'executable', + 'GT3_FAILURE_MESSAGE' => $stderr, + 'GT3_FAILURE_SOURCE' => $description->executable() + }); + + return Globus::GRAM::Error::STAGING_EXECUTABLE; + } + $local = $self->pipe_out_cmd($cache_pgm, '-q', '-t', $tag, + $description->executable()); + if ($local eq '') { + $self->respond( { + 'GT3_FAILURE_TYPE' => 'executable', + 'GT3_FAILURE_MESSAGE' => $stderr, + 'GT3_FAILURE_SOURCE' => $description->executable() + }); + + return Globus::GRAM::Error::STAGING_EXECUTABLE; + } + $self->nfssync($local, 0); } if($description->stdin() =~ m|^[a-zA-Z]+://|) { @arg = ($cache_pgm, '-add', '-t', $tag, $description->stdin()); + ($stderr, $rc) = $self->pipe_err_cmd(@arg); + + if ($rc != 0) { + $self->log("stdin staging failed with $stderr"); + + $self->respond( { + 'GT3_FAILURE_TYPE' => 'stdin', + 'GT3_FAILURE_MESSAGE' => $stderr, + 'GT3_FAILURE_SOURCE' => $description->stdin() + }); + return Globus::GRAM::Error::STAGING_STDIN - if (fork_and_exec_cmd(@arg) != 0); + } + $local = $self->pipe_out_cmd($cache_pgm, '-q', '-t', $tag, + $description->stdin()); + if ($local eq '') { + $self->respond( { + 'GT3_FAILURE_TYPE' => 'stdin', + 'GT3_FAILURE_MESSAGE' => $stderr, + 'GT3_FAILURE_SOURCE' => $description->stdin() + }); + + return Globus::GRAM::Error::STAGING_STDIN; + } + $self->nfssync($local, 0); } foreach ($description->file_stage_in()) { @@ -481,8 +578,20 @@ } @arg = ($url_copy_pgm, $remote, 'file://' . $local_resolved); + + ($stderr, $rc) = $self->pipe_err_cmd(@arg); + if($rc != 0) { + $self->log("filestagein staging failed with $stderr"); + + $self->respond( { + 'GT3_FAILURE_TYPE' => 'filestagein', + 'GT3_FAILURE_MESSAGE' => $stderr, + 'GT3_FAILURE_SOURCE' => $remote, + 'GT3_FAILURE_DESTINATION' => $local + }); return Globus::GRAM::Error::STAGE_IN_FAILED - if (fork_and_exec_cmd(@arg) != 0); + } + $self->nfssync($local_resolved); $self->respond({'STAGED_IN' => "$remote $local"}); } foreach($description->file_stage_in_shared()) @@ -502,11 +611,21 @@ @arg = ($cache_pgm, '-add', '-t', $tag, $remote); + ($stderr, $rc) = $self->pipe_err_cmd(@arg); + if($rc != 0) { + $self->log("filestagein staging failed with $stderr"); + + $self->respond( { + 'GT3_FAILURE_TYPE' => 'filestagein', + 'GT3_FAILURE_MESSAGE' => $stderr, + 'GT3_FAILURE_SOURCE' => $remote, + 'GT3_FAILURE_DESTINATION' => $local + }); return Globus::GRAM::Error::STAGE_IN_FAILED - if (fork_and_exec_cmd(@arg) != 0); + } @arg = ($cache_pgm, '-query', '-t', $tag, $remote); - $cached = pipe_out_cmd(@arg); + $cached = $self->pipe_out_cmd(@arg); return Globus::GRAM::Error::STAGE_IN_FAILED if($cached eq ''); @@ -514,6 +633,10 @@ symlink($cached, $local_resolved); $self->respond({'STAGED_IN_SHARED' => "$remote $local"}); + + $self->log( "local=$local" ); + $self->log( "local_resolved=$local_resolved" ); + $self->nfssync( $local_resolved, 0 ); } $self->log("stage_in(exit)"); return {}; @@ -541,6 +664,12 @@ my @arg; $self->log("stage_out(enter)"); + + $self->nfssync( $description->stdout(), 0 ) + if defined $description->stdout(); + $self->nfssync( $description->stderr(), 0 ) + if defined $description->stderr(); + foreach ($description->file_stage_out()) { next unless defined $_; @@ -552,7 +681,7 @@ if($local_path =~ m|^x-gass-cache://|) { @arg = ($cache_pgm, '-query', '-t', $tag, $local_path); - $local_path = pipe_out_cmd(@arg); + $local_path = $self->pipe_out_cmd(@arg); return Globus::GRAM::Error::STAGE_OUT_FAILED if($local_path eq ''); @@ -566,9 +695,22 @@ $local_path = $description->directory() . '/' . $local; } + $self->nfssync($local_path, 0); @arg = ($url_copy_pgm, 'file://' . $local_path, $remote); + + ($stderr, $rc) = $self->pipe_err_cmd(@arg); + if($rc != 0) { + $self->log("filestageout staging failed with $stderr"); + + $self->respond( { + 'GT3_FAILURE_TYPE' => 'filestageout', + 'GT3_FAILURE_MESSAGE' => $stderr, + 'GT3_FAILURE_SOURCE' => $local, + 'GT3_FAILURE_DESTINATION' => $remote + }); return Globus::GRAM::Error::STAGE_OUT_FAILED - if (fork_and_exec_cmd(@arg) != 0); + } + $self->respond({'STAGED_OUT' => "$local $remote"}); } $self->log("stage_out(exit)"); @@ -595,7 +737,11 @@ return {}; } - fork_and_exec_cmd($cache_pgm, '-cleanup-tag', '-t', $tag); + ($stderr, $rc) = $self->pipe_err_cmd($cache_pgm, '-cleanup-tag', '-t', $tag); + + if ($rc != 0) { + $self->log("cache cleanup failed with $stderr"); + } $self->log("cache_cleanup(exit)"); return {}; @@ -616,12 +762,14 @@ or $tag = $ENV{'GLOBUS_GRAM_JOB_CONTACT'}; my $filename = "${tag}dev/remote_io_url"; my $result; + my $stderr; + my $rc; $self->log("remote_io_file_create(enter)"); local(*FH); - $result = pipe_out_cmd($cache_pgm, '-query', '-t', $tag, $filename); + $result = $self->pipe_out_cmd($cache_pgm, '-query', '-t', $tag, $filename); if($result eq '') { @@ -632,11 +780,18 @@ print FH $description->remote_io_url(), "\n"; close FH; - fork_and_exec_cmd($cache_pgm, '-add', '-t', $tag, '-n', $filename, - "file:$tmpname"); + ($stderr, $rc) = $self->pipe_err_cmd($cache_pgm, '-add', '-t', $tag, '-n', + $filename, "file:$tmpname"); + if ($rc != 0) { + $self->log("remote I/O file create failed with $stderr"); + $self->respond( { + 'GT3_FAILURE_TYPE' => 'remoteiofile', + 'GT3_FAILURE_MESSAGE' => $stderr, + }); + } unlink($tmpname); - $result = pipe_out_cmd($cache_pgm, '-query', '-t', $tag, $filename); + $result = $self->pipe_out_cmd($cache_pgm, '-query', '-t', $tag, $filename); } else { @@ -674,7 +829,7 @@ return { X509_USER_PROXY => $ENV{'X509_USER_PROXY'} } if exists($ENV{'X509_USER_PROXY'}); - $proxy_filename = pipe_out_cmd($info_pgm, '-path'); + $proxy_filename = $self->pipe_out_cmd($info_pgm, '-path'); return Globus::GRAM::Error::OPENING_USER_PROXY if($proxy_filename eq ''); @@ -717,9 +872,25 @@ } } +=item $manager->pipe_out_cmd(@arg) + +Create a new process to run the first argument application with the +remaining arguments (which may be empty). No shell metacharacter will +be evaluated, avoiding a shell invocation. Stderr is redirected to +/dev/null and stdout is being captured by the parent process, which +is also the result returned. Use this function as more efficient +backticks, if you can do not need shell metacharacter evaluation. + +A child error code with an exit code of 127 indicates that the application +could not be run. The scalar result returned by this function is usually +undef'ed in this case. + +=cut + sub pipe_out_cmd { - my $result; + my $self = shift; + my @result; local(*READ); my $pid = open( READ, "-|" ); @@ -728,7 +899,7 @@ if ( $pid ) { # parent - chomp($result = scalar ); + chomp(@result = ); close(READ); } else { # child @@ -740,11 +911,68 @@ exit(127); } } - $result; + wantarray ? @result : $result[0]; +} + +=item ($stder, $rc) = $manager->pipe_err_cmd(@arg) + +Create a new process to run the first argument application with the +remaining arguments (which may be empty). No shell metacharacter will +be evaluated, avoiding a shell invocation. + +This method returns a list of two items, the standard error of the program, and +the exit code of the program. If the error code is 127, then the application +could not be run. Standard output is discarded. + +=cut + +sub pipe_err_cmd +{ + my $self = shift; + my $result; + local(*READ); + + my $pid = open( READ, "-|" ); + + return ("Error " . $! . " forking sub-process", -1) unless defined($pid); + + if ( $pid ) + { + # parent + chomp($result = scalar ); + close(READ); + } else { + # child + open( STDERR, '>&STDOUT'); + open( STDOUT, '>>/dev/null' ); + select(STDERR); $|=1; + select(STDOUT); $|=1; + if (! exec { $_[0] } @_ ) + { + exit(127); + } + } + ($result, $?); } +=item $manager->fork_and_exec_cmd(@arg) + +Fork off a child to run the first argument in the list. Remaining arguments +will be passed, but shell interpolation is avoided. Signals SIGINT and +SIGQUIT are ignored in the child process. Stdout is appended to /dev/null, +and stderr is dup2 from stdout. The parent waits for the child to finish, +and returns the value for the CHILD_ERROR variable as result. Use this +function as more efficient system() call, if you can do not need shell +metacharacter evaluation. + +Note that the inability to execute the program will result in a status code +of 127. + +=cut + sub fork_and_exec_cmd { + my $self = shift; my $pid = fork(); return undef unless defined $pid;