perl 多线程,实时监控线程数,支持max thread

#!/usr/bin/perl -w
#Description:rerun eod job group by system
#Auther:Suzm
#Date  :2015-06-23

use DBI;
use Thread;
use strict;

push( @INC, $ENV{TPMS_EOD_PERL_LIB} );
require public_pg;

my %dbc_info;
my $maxnum = 3;
my %threads;
my $tx_date;
my $path_log = $ENV{TPMS_EOD_LOGPATH};

my $system = get_config('TPMS_SYSTEM_ID');
unless ( defined( $system ) ) {
	$system = "error";
}

#my $system = undef;

my $log_name =
  uc( CITIC::getscript_name($0) . "_" .$system);
my $log_file     = CITIC::create_logfile( $log_name, $path_log );
my $TPMS_EOD_SID = $ENV{TPMS_EOD_SID};
my $my_sysenv    = uc( $ENV{MY_SYSENV} );

select $log_file;
$| = 1;

#get eod job info ,return hash
sub getEodJob {
	my ( $sysid, $dbh ) = @_;
	my %jobinfo;

	#my $str = "select * from tpms_eod.T_TPMS_EODTABLE where system='${sysid}'";
	my $str = <<EOF;
select a.* from tpms_eod.t_tpms_eodtable a
inner join tpms_eod.Tpms_Rollback_Bizdate b
on a.TX_DATE != b.CURR_BIZ_DATE
or (a.TX_DATE = b.CURR_BIZ_DATE and a.FLAG=1)
where a.SYSTEM='${sysid}'
EOF
	my $sth;
	eval {
		$sth = $dbh->prepare($str);
		$sth->execute();
	};
	if ($@) {
		CITIC::showtime();
		print "An error occurred ($@)\n";
		CITIC::showtime();
		print $dbh->errstr . "\n";
		$dbh->disconnect();
		return %jobinfo;
	}
	while ( my @row = $sth->fetchrow_array() ) {
		$jobinfo{ $row[1] }{"SYSTEM"}     = $row[0];
		$jobinfo{ $row[1] }{"JOB_SCRIPT"} = $row[2];
		$jobinfo{ $row[1] }{"ARGV"}       = $row[3];
		$jobinfo{ $row[1] }{"START_TIME"} = $row[4];
		$jobinfo{ $row[1] }{"END_TIME"}   = $row[5];
		$jobinfo{ $row[1] }{"TX_DATE"}    = $row[6];
		$jobinfo{ $row[1] }{"FLAG"}       = $row[7];

	}
	return %jobinfo;
}

#获取配置参数
sub get_config{
	my($name) = @_;
	my $str;
	open(CONFIG,"$ENV{TPMS_EOD_ETC}/etc.profile");
	while(<CONFIG>){
		my($key,$value)=split('=',$_);
		if(uc($key) eq uc($name)){
			$str = CITIC::strimstr($value);
			chomp $str;
		}else{
			next;
		}
	}
	close(CONFIG);
	return uc($str);
}

#cmd
sub cmd {
	my $ret;
	my ( $script, $argv, $tablename, $dbh ) = @_;
	my $cmd = " perl ${script} $argv";
	my $rc = open( INFO, "$cmd 2>&1|" );
	unless ($rc) {
		CITIC::showtime();
		print "Can't invoke the Command!\n";
		return 1;
	}
	while (<INFO>) {
		print $_;

		#print STDOUT $_;
		if (eof) {
			unless (/\b(complete|Succeeds)/) {
				CITIC::showtime();
				print "*" x 6 . "$tablename作业执行失败" . "*" x 6 . "\n";
				$ret = 1;
			}
			else {
				CITIC::showtime();
				print "*" x 6 . "$tablename作业执行成功" . "*" x 6 . "\n";
				$ret = 0;
			}
		}
	}
	close(INFO);
	return $ret;
}

#update job info
sub upJobInfo {
	my ( $dbh, $tablename, $flag, $final ) = @_;
	my $upstr;
	my $timestamp = showtime();

	if ( $flag == 1 ) {
		$upstr =
"START_TIME=to_timestamp('$timestamp','YYYY-MM-DD hh24:mi:ss:ff'),TX_DATE=to_date('$tx_date','YYYY-MM-DD')";
	}
	else {
		$upstr =
"END_TIME=to_timestamp('$timestamp','YYYY-MM-DD hh24:mi:ss:ff'),FLAG=$final";
	}

	my $str =
"update tpms_eod.T_TPMS_EODTABLE set $upstr where TABLE_NAME='$tablename' and SYSTEM='$system'";
	my $sth;
	eval {
		$sth = $dbh->prepare($str);
		$sth->execute();

	};
	if ($@) {
		CITIC::showtime();
		print "An error occurred ($@)\n";
		CITIC::showtime();
		print $dbh->errstr . "\n";
		$dbh->disconnect();
		return 1;
	}
	return 0;
}

#get tx_date
sub getTxdate {
	my ($dbh) = @_;
	my $txdate;
	my $str =
	  "select to_char(CURR_BIZ_DATE,'YYYY-MM-DD') from tpms_eod.Tpms_Rollback_Bizdate";
	my $sth;
	eval {
		$sth = $dbh->prepare($str);
		$sth->execute();
	};
	if ($@) {
		CITIC::showtime();
		print "An error occurred ($@)\n";
		CITIC::showtime();
		print $dbh->errstr . "\n";
		$dbh->disconnect();
		return 1;
	}
	my $table = $sth->fetchall_arrayref();
	$txdate = $table->[0][0];

	#$dbh->disconnect();
	return $txdate;

}

#get timestamp
sub showtime {
	my ( $sec, $min, $hour, $day, $mon, $year ) = localtime( time() );
	my $current = "";
	$sec  = sprintf( "%02d", $sec );
	$min  = sprintf( "%02d", $min );
	$hour = sprintf( "%02d", $hour );
	$day  = sprintf( "%02d", $day );
	$mon  = sprintf( "%02d", $mon + 1 );
	$year += 1900;
	$current = " $year-$mon-$day" . " $hour" . ":$min" . ":$sec";
	return ${current};
}

#execute eod job
sub executeJob {
	my ( $dbh, %hash ) = @_;
	my $i;
	my $num = scalar( keys %hash );
	for my $jobname ( keys %hash ) {
		CITIC::showtime();
		print "*" x 6 . "正在执行$jobname作业" . "*" x 6 . "\n";
		upJobInfo( $dbh, $jobname, 1 );
		$i++;
		$num--;
		eval {
			$threads{$jobname} = Thread->new(
				\&cmd,
				$hash{$jobname}{'JOB_SCRIPT'},
				$hash{$jobname}{'ARGV'}, $jobname
			);

			#->join();
		};
		if ($@) {
			CITIC::showtime();
			print "An error occurred ($@)\n";
			return 1;
		}
		if ( $i > $maxnum - 1 ) {
			print "进程达到最大数" . Thread->list() . "!!!\n";
			while (1) {

				for my $th ( Thread->list() ) {
					my $tid;
					if ( $th->done ) {
						eval { $tid = $th->join() };
						foreach my $tb ( keys %threads ) {
							if ( $threads{$tb}->tid() == $th->tid() ) {
								if ($tid) {
									upJobInfo( $dbh, $tb, 2, 1 );
								}
								else {
									upJobInfo( $dbh, $tb, 2, 0 );
								}
							}
						}
						$i--;
					}
					else {
						next;
					}
				}
				if ( Thread->list() < $maxnum ) {
					print "进程小于3,开始新的进程\n";
					last;
				}
				else {
					print "sleep 10s,waiting for the job finish\n";
					sleep 10;
				}
			}
		}
		if ( $num == 0 ) {
			print "#" x 20
			  . "所有作业调度完成,等待进程结束!!!"
			  . "#" x 20 . "\n";
			for my $th1 ( Thread->list() ) {
				my $tid1;
				eval { $tid1 = $th1->join() };
				foreach my $tb1 ( keys %threads ) {
					if ( $threads{$tb1}->tid() == $th1->tid() ) {
						if ($tid1) {
							upJobInfo( $dbh, $tb1, 2, 1 );
						}
						else {
							upJobInfo( $dbh, $tb1, 2, 0 );
						}
					}
				}
			}
		}
	}
}

#程序入口
sub main {

	#my ($system)=@_;
	my $ret = 0;
	my $dbh;
	%dbc_info = CITIC::get_dbc_info($TPMS_EOD_SID);

	unless (%dbc_info) {
		CITIC::showtime();
		print "Failed to get database information!\n";
	}
	else {
		$dbh = CITIC::connect_db(
			$dbc_info{"ip"},   $dbc_info{"port"}, $dbc_info{"sid"},
			$dbc_info{"user"}, $dbc_info{"pwd"}
		);
	}

	unless ($dbh) {
		$ret = 1;
	}
	else {
		my %jobinfo = getEodJob( $system, $dbh );
		if (%jobinfo) {
			if ( getTxdate($dbh) eq 1 ) {
				$ret = 1;
			}
			else {
				$tx_date = getTxdate($dbh);
				executeJob( $dbh, %jobinfo );
			}

		}
		else {
			CITIC::showtime();
			print "没有要执行的JOB,请确认后再执行!\n";
			$ret = 1;
		}

		#	$ret = excute_sql( $dbh, @sql_queue );

	}

	return $ret;
}

open( STDERR, ">&STDOUT" );
#if ( $#ARGV < 0 ) {
#	print "Please input parameters,for example:\n1.system id :CTS\n";
#	exit(1);
#}

#$system = uc( $ARGV[0] );
my $ret = main();
if ( $ret == 0 ) {
	print STDOUT "complete\n";
}
else {
	print STDOUT "fail\n";

#	CITIC::send_mail(
#		"[$my_sysenv] $log_name JOB FAILED",
#"Hi all,\n      Job $log_name failed. Please see attached log file for details.\nthanks!"
#	);
}
exit($ret);

END {
	CITIC::showtime();
	print "return code is $ret\n";
	CITIC::close_logfile($log_file);
}

编程技巧