1
0

Qmgr.pm 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898
  1. ################################################################################
  2. # $Id: Qmgr.pm 2880 2007-04-12 18:52:02Z b4rt $
  3. # $Date: 2007-04-12 13:52:02 -0500 (Thu, 12 Apr 2007) $
  4. # $Revision: 2880 $
  5. ################################################################################
  6. # #
  7. # LICENSE #
  8. # #
  9. # This program is free software; you can redistribute it and/or #
  10. # modify it under the terms of the GNU General Public License (GPL) #
  11. # as published by the Free Software Foundation; either version 2 #
  12. # of the License, or (at your option) any later version. #
  13. # #
  14. # This program is distributed in the hope that it will be useful, #
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of #
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
  17. # GNU General Public License for more details. #
  18. # #
  19. # To read the license please visit http://www.gnu.org/copyleft/gpl.html #
  20. # #
  21. # #
  22. ################################################################################
  23. package Qmgr;
  24. use strict;
  25. use warnings;
  26. ################################################################################
  27. ################################################################################
  28. # fields #
  29. ################################################################################
  30. # version in a var
  31. my $VERSION = do {
  32. my @r = (q$Revision: 2880 $ =~ /\d+/g); sprintf "%d"."%02d" x $#r, @r };
  33. # state
  34. my $state = Fluxd::MOD_STATE_NULL;
  35. # message, error etc. keep it in one string for simplicity atm.
  36. my $message = "";
  37. # loglevel
  38. my $loglevel = 0;
  39. # run-interval
  40. my $interval;
  41. # time of last run
  42. my $time_last_run = 0;
  43. # data-dir
  44. my $dataDir;
  45. my $path_dataDir = "qmgr/";
  46. # queue-file
  47. my $fileQueue;
  48. my $path_fileQueue = "qmgr.queue";
  49. # transfers-dir
  50. my $transfersDir;
  51. # time-vars
  52. my ($time, $localtime);
  53. # globals
  54. my %globals;
  55. # jobs
  56. my %jobs;
  57. # queue
  58. my @queue;
  59. my $queueIdx = 0;
  60. # start-tries-hash
  61. my %startTries;
  62. # some defaults
  63. my $default_limitStartTries = 3;
  64. # sf-instance-field to reuse object
  65. my $sf = StatFile->new();
  66. ################################################################################
  67. # constructor + destructor #
  68. ################################################################################
  69. #------------------------------------------------------------------------------#
  70. # Sub: new (Constructor Method) #
  71. # Arguments: Null #
  72. # Returns: Object #
  73. #------------------------------------------------------------------------------#
  74. sub new {
  75. my $class = shift;
  76. my $self = bless ({}, ref ($class) || $class);
  77. return $self;
  78. }
  79. #------------------------------------------------------------------------------#
  80. # Sub: destroy #
  81. # Arguments: Null #
  82. # Returns: Null #
  83. #------------------------------------------------------------------------------#
  84. sub destroy {
  85. # set state
  86. $state = Fluxd::MOD_STATE_NULL;
  87. # log
  88. Fluxd::printMessage("Qmgr", "shutdown\n");
  89. # save queue
  90. queueSave();
  91. # strings
  92. undef $dataDir;
  93. undef $fileQueue;
  94. undef $transfersDir;
  95. # undef
  96. undef %globals;
  97. undef %jobs;
  98. undef @queue;
  99. undef %startTries;
  100. }
  101. ################################################################################
  102. # public methods #
  103. ################################################################################
  104. #------------------------------------------------------------------------------#
  105. # Sub: initialize. this is separated from constructor to call it independent #
  106. # from object-creation. #
  107. # Arguments: null #
  108. # Returns: 0|1 #
  109. #------------------------------------------------------------------------------#
  110. sub initialize {
  111. shift; # class
  112. # loglevel
  113. $loglevel = Fluxd::getLoglevel();
  114. if (!(defined $loglevel)) {
  115. # message
  116. $message = "loglevel not defined";
  117. # set state
  118. $state = Fluxd::MOD_STATE_ERROR;
  119. # return
  120. return 0;
  121. }
  122. # data-dir
  123. my $ddir = Fluxd::getPathDataDir();
  124. if (!(defined $ddir)) {
  125. # message
  126. $message = "data-dir not defined";
  127. # set state
  128. $state = Fluxd::MOD_STATE_ERROR;
  129. # return
  130. return 0;
  131. }
  132. $dataDir = $ddir . $path_dataDir;
  133. # check if our main-dir exists. try to create if it doesnt
  134. if (! -d $dataDir) {
  135. Fluxd::printMessage("Qmgr", "creating data-dir : ".$dataDir."\n");
  136. mkdir($dataDir, 0700);
  137. if (! -d $dataDir) {
  138. # message
  139. $message = "data-dir does not exist and cannot be created";
  140. # set state
  141. $state = Fluxd::MOD_STATE_ERROR;
  142. # return
  143. return 0;
  144. }
  145. }
  146. # queue-file
  147. $fileQueue = $dataDir . $path_fileQueue;
  148. # transfers-dir
  149. $transfersDir = Fluxd::getPathTransferDir();
  150. if (!(defined $transfersDir)) {
  151. # message
  152. $message = "transfers-dir not defined";
  153. # set state
  154. $state = Fluxd::MOD_STATE_ERROR;
  155. # return
  156. return 0;
  157. }
  158. if (! -d $transfersDir) {
  159. # message
  160. $message = "transfers-dir does not exist";
  161. # set state
  162. $state = Fluxd::MOD_STATE_ERROR;
  163. # return
  164. return 0;
  165. }
  166. # interval
  167. $interval = FluxDB->getFluxConfig("fluxd_Qmgr_interval");
  168. if (!(defined $interval)) {
  169. # message
  170. $message = "interval not defined";
  171. # set state
  172. $state = Fluxd::MOD_STATE_ERROR;
  173. # return
  174. return 0;
  175. }
  176. # global-limit
  177. my $limitGlobal = FluxDB->getFluxConfig("fluxd_Qmgr_maxTotalTransfers");
  178. if (!(defined $limitGlobal)) {
  179. # message
  180. $message = "global-limit not defined";
  181. # set state
  182. $state = Fluxd::MOD_STATE_ERROR;
  183. # return
  184. return 0;
  185. }
  186. # user-limit
  187. my $limitUser = FluxDB->getFluxConfig("fluxd_Qmgr_maxUserTransfers");
  188. if (!(defined $limitUser)) {
  189. # message
  190. $message = "user-limit not defined";
  191. # set state
  192. $state = Fluxd::MOD_STATE_ERROR;
  193. # return
  194. return 0;
  195. }
  196. Fluxd::printMessage("Qmgr", "initializing (loglevel: ".$loglevel." ; data-dir: ".$dataDir." ; interval: ".$interval." ; global-limit: ".$limitGlobal." ; user-limit: ".$limitUser.")\n");
  197. # Create some time vars
  198. $time = time();
  199. $localtime = localtime();
  200. # initialize our globals hash
  201. $globals{'main'} = 0;
  202. $globals{'started'} = 0;
  203. $globals{'limitGlobal'} = $limitGlobal;
  204. $globals{'limitUser'} = $limitUser;
  205. $globals{'limitStartTries'} = $default_limitStartTries;
  206. #initialize the queue
  207. if (-f $fileQueue) {
  208. # actually load the queue
  209. queueLoad();
  210. } else {
  211. if ($loglevel > 0) {
  212. Fluxd::printMessage("Qmgr", "creating empty queue\n");
  213. }
  214. @queue = qw();
  215. }
  216. # start-tries hash
  217. %startTries = ();
  218. # update running transfers
  219. runningUpdate();
  220. # reset last run time
  221. $time_last_run = time();
  222. # set state
  223. $state = Fluxd::MOD_STATE_OK;
  224. # return
  225. return 1;
  226. }
  227. #------------------------------------------------------------------------------#
  228. # Sub: getVersion #
  229. # Arguments: null #
  230. # Returns: VERSION #
  231. #------------------------------------------------------------------------------#
  232. sub getVersion {
  233. return $VERSION;
  234. }
  235. #------------------------------------------------------------------------------#
  236. # Sub: getState #
  237. # Arguments: null #
  238. # Returns: state #
  239. #------------------------------------------------------------------------------#
  240. sub getState {
  241. return $state;
  242. }
  243. #------------------------------------------------------------------------------#
  244. # Sub: getMessage #
  245. # Arguments: null #
  246. # Returns: message #
  247. #------------------------------------------------------------------------------#
  248. sub getMessage {
  249. return $message;
  250. }
  251. #------------------------------------------------------------------------------#
  252. # Sub: set #
  253. # Arguments: Variable [value] #
  254. # Returns: #
  255. #------------------------------------------------------------------------------#
  256. sub set {
  257. my $key = shift;
  258. $globals{$key} = shift;
  259. }
  260. #------------------------------------------------------------------------------#
  261. # Sub: main #
  262. # Arguments: Null #
  263. # Returns: #
  264. #------------------------------------------------------------------------------#
  265. sub main {
  266. if ((time() - $time_last_run) >= $interval) {
  267. # log
  268. if ($loglevel > 1) {
  269. Fluxd::printMessage("Qmgr", "process queue...\n");
  270. }
  271. # process queue
  272. queueProcess();
  273. # set last run time
  274. $time_last_run = time();
  275. }
  276. }
  277. #------------------------------------------------------------------------------#
  278. # Sub: command #
  279. # Arguments: command-string #
  280. # Returns: result-string #
  281. #------------------------------------------------------------------------------#
  282. sub command {
  283. shift; # class
  284. $_= shift;
  285. SWITCH: {
  286. /^count-jobs/ && do {
  287. return jobsCount();
  288. };
  289. /^count-queue/ && do {
  290. return queueCount();
  291. };
  292. /^list-queue/ && do {
  293. return queueList();
  294. };
  295. /^enqueue;(.*);(.*)/ && do {
  296. if ($loglevel > 1) {
  297. Fluxd::printMessage("Qmgr", "enqueue-request : ".$1." (".$2.")\n");
  298. }
  299. return queueAdd($1, $2);
  300. };
  301. /^dequeue;(.*);(.*)/ && do {
  302. if ($loglevel > 1) {
  303. Fluxd::printMessage("Qmgr", "dequeue-request : ".$1." (".$2.")\n");
  304. }
  305. return queueRemove($1, $2);
  306. };
  307. /^set;(.*);(.*)/ && do {
  308. if ($loglevel > 1) {
  309. Fluxd::printMessage("Qmgr", "set : \"".$1."\"->\"".$2."\")\n");
  310. }
  311. return set($1, $2);
  312. };
  313. }
  314. return "Unknown command";
  315. }
  316. #------------------------------------------------------------------------------#
  317. # Sub: queueProcess #
  318. # Arguments: Null #
  319. # Returns: Null #
  320. #------------------------------------------------------------------------------#
  321. sub queueProcess {
  322. # update running transfers
  323. runningUpdate();
  324. if (queueCount() > 0) {
  325. # queue-loop
  326. $queueIdx = 0;
  327. QUEUE: while ($queueIdx < queueCount()) {
  328. # next job
  329. my $nextTransfer = $queue[$queueIdx];
  330. my $nextUser = $jobs{"queued"}{$nextTransfer};
  331. # check if this queue-entry exists in running-jobs.
  332. # dont try to start what is already running.
  333. # this may be after a restart or transfer was started outside.
  334. if (exists $jobs{"running"}{$nextTransfer}) { # already running
  335. # remove job from queue
  336. if (queueRemove($nextTransfer, $nextUser) == 0) { $queueIdx++; }
  337. # check if more entries
  338. if (queueCountEntriesLeft() == 0) { last QUEUE; }
  339. } else { # transfer not already running
  340. my @jobAry = (keys %{$jobs{"running"}});
  341. my $jobcountr = scalar(@jobAry);
  342. # lets see if max limit applies
  343. if ($jobcountr < $globals{'limitGlobal'}) { # max limit does not apply
  344. # lets see if per user limit applies
  345. $jobcountr = 0;
  346. foreach my $anJob (@jobAry) {
  347. if ($jobs{"running"}{$anJob} eq $nextUser) {
  348. $jobcountr++;
  349. }
  350. }
  351. if ($jobcountr < $globals{'limitUser'}) { # user limit does not apply
  352. # startup the thing
  353. if ($loglevel > 0) {
  354. Fluxd::printMessage("Qmgr", "starting transfer : ".$nextTransfer." (".$nextUser.")\n");
  355. }
  356. # set start-counter-var
  357. if (exists $startTries{$nextTransfer}) {
  358. $startTries{$nextTransfer} += 1;
  359. } else {
  360. $startTries{$nextTransfer} = 1;
  361. }
  362. if (transferStart($nextTransfer) == 1) { # start transfer succeeded
  363. # reset start-counter-var
  364. delete($startTries{$nextTransfer});
  365. # remove job from queue
  366. if (queueRemove($nextTransfer, $nextUser) == 0) { $queueIdx++; }
  367. # add job to jobs running
  368. if ($loglevel > 0) {
  369. Fluxd::printMessage("Qmgr", "adding job to jobs running : ".$nextTransfer." (".$nextUser.")\n");
  370. }
  371. $jobs{"running"}{$nextTransfer} = $nextUser;
  372. # check if more entries
  373. if (queueCountEntriesLeft() == 0) { last QUEUE; }
  374. } else { # start transfer failed
  375. Fluxd::printError("Qmgr", "start transfer failed : ".$nextTransfer." (".$nextUser.")\n");
  376. # already tried max-times to start this thing ?
  377. if ($startTries{$nextTransfer} > $globals{'limitStartTries'}) {
  378. # reset start-counter-var
  379. delete($startTries{$nextTransfer});
  380. Fluxd::printError("Qmgr", $globals{'limitStartTries'}." errors when starting, cancel job : ".$nextTransfer." (".$nextUser.")\n");
  381. # remove job from queue
  382. if (queueRemove($nextTransfer, $nextUser) == 0) { $queueIdx++; }
  383. # check if more entries
  384. if (queueCountEntriesLeft() == 0) { last QUEUE; }
  385. } else {
  386. Fluxd::printError("Qmgr", $startTries{$nextTransfer}." errors when starting, skip job : ".$nextTransfer." (".$nextUser.")\n");
  387. # next entry
  388. $queueIdx++;
  389. # check if more entries
  390. if (queueCountEntriesLeft() == 0) { last QUEUE; }
  391. }
  392. } # end start transfer failed
  393. } else { # user-limit for this user applies
  394. # check next queue-entry if one exists
  395. if ($queueIdx < (queueCount() - 1)) { # there is a next entry
  396. if ($loglevel > 0) {
  397. Fluxd::printMessage("Qmgr", "user limit reached, skipping job : ".$nextTransfer." (".$nextUser.") (next queue-entry)\n");
  398. }
  399. $queueIdx++;
  400. } else { # no more in queue
  401. if ($loglevel > 0) {
  402. Fluxd::printMessage("Qmgr", "user limit reached, skipping job : ".$nextTransfer." (".$nextUser.") (last queue-entry)\n");
  403. }
  404. last QUEUE;
  405. }
  406. }
  407. } else { # max limit does apply
  408. if ($loglevel > 0) {
  409. Fluxd::printMessage("Qmgr", "max limit reached, skipping job : ".$nextTransfer." (".$nextUser.")\n");
  410. }
  411. last QUEUE;
  412. }
  413. } # end already runnin
  414. } # queue-while-loop
  415. } else {
  416. if ($loglevel > 1) {
  417. Fluxd::printMessage("Qmgr", "queue empty...\n");
  418. }
  419. }
  420. # increment main-count
  421. $globals{"main"} += 1;
  422. }
  423. #------------------------------------------------------------------------------#
  424. # Sub: queueCountEntriesLeft #
  425. # Arguments: Null #
  426. # Returns: 0|num of entries #
  427. #------------------------------------------------------------------------------#
  428. sub queueCountEntriesLeft {
  429. # check if more entries
  430. my $jobcount = queueCount();
  431. if (($jobcount > 0) && ($queueIdx < ($jobcount))) {
  432. # more jobs in queue
  433. if ($loglevel > 1) {
  434. Fluxd::printMessage("Qmgr", "next queue-entry\n");
  435. }
  436. return ($jobcount - $queueIdx);
  437. } else {
  438. # nothing more in queue
  439. if ($loglevel > 1) {
  440. Fluxd::printMessage("Qmgr", "last queue-entry\n");
  441. }
  442. return 0;
  443. }
  444. }
  445. #------------------------------------------------------------------------------#
  446. # Sub: runningUpdate #
  447. # Arguments: Null #
  448. # Returns: Null #
  449. #------------------------------------------------------------------------------#
  450. sub runningUpdate {
  451. # get running transfers
  452. opendir(DIR, $transfersDir);
  453. my @pids = map { $_->[1] } # extract pathnames
  454. map { [ $_, "$_" ] } # no full paths
  455. grep { /.*\.pid$/ } # only .pid-files
  456. readdir(DIR);
  457. closedir(DIR);
  458. # flush running-jobs-hash
  459. $jobs{"running"} = ();
  460. # refill hash
  461. if (scalar(@pids) > 0) {
  462. foreach my $pidFile (@pids) {
  463. my $transfer = (substr ($pidFile, 0, (length($pidFile)) - 4));
  464. $sf->initialize($transfersDir.$transfer.".stat");
  465. my $running = $sf->get("running");
  466. my $user = $sf->get("transferowner");
  467. # user
  468. if ((!(defined $user)) || ($user eq "")) {
  469. if ($loglevel > 1) {
  470. Fluxd::printMessage("Qmgr", "cannot get owner of running transfer, using n/a : ".$transfer."\n");
  471. }
  472. $user = "n/a";
  473. }
  474. # running
  475. if ((!(defined $running)) || ($running ne "1")) {
  476. if ($loglevel > 1) {
  477. Fluxd::printMessage("Qmgr", "transfer not in running state, checking via ps-list if transfer is up : ".$transfer."\n");
  478. }
  479. if (FluxCommon::transferIsRunning($transfer) == 1) {
  480. if ($loglevel > 1) {
  481. Fluxd::printMessage("Qmgr", "transfer is running, adding to running transfers : ".$transfer." (".$user.")\n");
  482. }
  483. } else {
  484. if ($loglevel > 1) {
  485. Fluxd::printMessage("Qmgr", "transfer not running, skipping add to running transfers : ".$transfer." (".$user.")\n");
  486. }
  487. # skip
  488. next;
  489. }
  490. }
  491. # add it
  492. if (! exists $jobs{"running"}{$transfer}) {
  493. if ($loglevel > 1) {
  494. Fluxd::printMessage("Qmgr", "adding to running transfers : ".$transfer." (".$user.")\n");
  495. }
  496. $jobs{"running"}{$transfer} = $user;
  497. } else {
  498. if ($loglevel > 1) {
  499. Fluxd::printMessage("Qmgr", "transfer already exists in running transfers, skipping : ".$transfer." (".$user.")\n");
  500. }
  501. }
  502. }
  503. }
  504. }
  505. #------------------------------------------------------------------------------#
  506. # Sub: queueLoad #
  507. # Arguments: Null #
  508. # Returns: Null #
  509. #------------------------------------------------------------------------------#
  510. sub queueLoad {
  511. if ($loglevel > 0) {
  512. Fluxd::printMessage("Qmgr", "loading queue-file : ".$fileQueue."\n");
  513. }
  514. # read from file into queue-array
  515. my $lineSep = $/;
  516. $/ = "\n";
  517. my @tempo = qw();
  518. open(QUEUEFILE,"< $fileQueue");
  519. while (<QUEUEFILE>) {
  520. chomp;
  521. push(@tempo, $_);
  522. }
  523. close QUEUEFILE;
  524. $/ = $lineSep;
  525. # fill queue
  526. @queue = qw();
  527. foreach my $transfer (@tempo) {
  528. $sf->initialize($transfersDir.$transfer.".stat");
  529. my $running = $sf->get("running");
  530. my $user = $sf->get("transferowner");
  531. if ((!(defined $running)) || ($running ne "3")) {
  532. if ($loglevel > 1) {
  533. Fluxd::printMessage("Qmgr", "transfer not in queued state, skipping : ".$fileQueue."\n");
  534. }
  535. } elsif ((!(defined $user)) || ($user eq "")) {
  536. if ($loglevel > 1) {
  537. Fluxd::printMessage("Qmgr", "cannot get owner, skipping : ".$fileQueue."\n");
  538. }
  539. } else {
  540. if (! exists $jobs{"queued"}{$transfer}) {
  541. if ($loglevel > 1) {
  542. Fluxd::printMessage("Qmgr", "adding job to queue : ".$transfer." (".$user.")\n");
  543. }
  544. $jobs{"queued"}{$transfer} = $user;
  545. push(@queue, $transfer);
  546. }
  547. }
  548. }
  549. # done loading
  550. return 1;
  551. }
  552. #------------------------------------------------------------------------------#
  553. # Sub: queueSave #
  554. # Arguments: Null #
  555. # Returns: Null #
  556. #------------------------------------------------------------------------------#
  557. sub queueSave {
  558. my $jobcount = queueCount();
  559. if ($jobcount > 0) {
  560. if ($loglevel > 0) {
  561. Fluxd::printMessage("Qmgr", $jobcount." job(s) queued, writing queue-file...\n");
  562. }
  563. # open queue-file
  564. open(QUEUEFILE,">$fileQueue");
  565. # queued transfers
  566. foreach my $queueEntry (@queue) {
  567. if ($loglevel > 1) {
  568. Fluxd::printMessage("Qmgr", "saving job : ".$queueEntry."\n");
  569. }
  570. print QUEUEFILE $queueEntry."\n";
  571. }
  572. # close queue-file
  573. close(QUEUEFILE);
  574. } else {
  575. if (-f $fileQueue) {
  576. if ($loglevel > 0) {
  577. Fluxd::printMessage("Qmgr", "no jobs queued, deleting queue-file...\n");
  578. }
  579. return unlink($fileQueue);
  580. }
  581. }
  582. }
  583. #------------------------------------------------------------------------------#
  584. # Sub: jobsDump #
  585. # Parameters: - #
  586. # Return: - #
  587. #------------------------------------------------------------------------------#
  588. sub jobsDump {
  589. if ($loglevel > 0) {
  590. Fluxd::printMessage("Qmgr", "dumping jobs to queue-file : ".$fileQueue."\n");
  591. }
  592. # open queue-file
  593. open(QUEUEFILE,">$fileQueue");
  594. # running transfers
  595. foreach my $jobName (keys %{$jobs{"running"}}) {
  596. if ($loglevel > 1) {
  597. Fluxd::printMessage("Qmgr", "dumping running job : ".$jobName."\n");
  598. }
  599. print QUEUEFILE $jobName."\n";
  600. }
  601. # queued transfers
  602. foreach my $queueEntry (@queue) {
  603. if ($loglevel > 1) {
  604. Fluxd::printMessage("Qmgr", "dumping queued job : ".$queueEntry."\n");
  605. }
  606. print QUEUEFILE $queueEntry."\n";
  607. }
  608. # close queue-file
  609. close(QUEUEFILE);
  610. }
  611. #------------------------------------------------------------------------------#
  612. # Sub: queueList #
  613. # Arguments: Null #
  614. # Returns: List of queued transfers #
  615. #------------------------------------------------------------------------------#
  616. sub queueList {
  617. my $return = "";
  618. foreach my $queueEntry (@queue) {
  619. $return .= $queueEntry."\n";
  620. }
  621. return $return;
  622. }
  623. #------------------------------------------------------------------------------#
  624. # Sub: queueAdd #
  625. # Arguments: transfer, user #
  626. # Returns: 0|1 #
  627. #------------------------------------------------------------------------------#
  628. sub queueAdd {
  629. # Verify that the arguments look good
  630. my $temp = shift;
  631. if (!(defined $temp)) {
  632. Fluxd::printError("Qmgr", "invalid argument for transfer on add\n");
  633. return 0;
  634. }
  635. my $transfer = $temp;
  636. $temp = shift;
  637. if (!(defined $temp)) {
  638. Fluxd::printError("Qmgr", "invalid argument for username on add\n");
  639. return 0;
  640. }
  641. my $username = $temp;
  642. # add it
  643. if ((! exists $jobs{"queued"}{$transfer}) && (! exists $jobs{"running"}{$transfer})) {
  644. if ($loglevel > 0) {
  645. Fluxd::printMessage("Qmgr", "adding job to jobs queued : ".$transfer." (".$username.")\n");
  646. }
  647. $jobs{"queued"}{$transfer} = $username;
  648. # add
  649. push(@queue,$transfer);
  650. # save queue
  651. queueSave();
  652. # return
  653. return 1;
  654. } else {
  655. if ($loglevel > 0) {
  656. Fluxd::printMessage("Qmgr", "job already present in jobs : ".$transfer." (".$username.")\n");
  657. }
  658. # return
  659. return 0;
  660. }
  661. }
  662. #------------------------------------------------------------------------------#
  663. # Sub: queueRemove #
  664. # Arguments: transfer, user #
  665. # Returns: 0|1 #
  666. #------------------------------------------------------------------------------#
  667. sub queueRemove {
  668. # Verify that the arguments look good
  669. my $temp = shift;
  670. if (!(defined $temp)) {
  671. Fluxd::printError("Qmgr", "invalid argument for transfer on remove\n");
  672. return 0;
  673. }
  674. my $transfer = $temp;
  675. $temp = shift;
  676. if (!(defined $temp)) {
  677. Fluxd::printError("Qmgr", "invalid argument for username on remove\n");
  678. return 0;
  679. }
  680. my $username = $temp;
  681. # log
  682. if ($loglevel > 0) {
  683. Fluxd::printMessage("Qmgr", "remove job from jobs queued : ".$transfer." (".$username.")\n");
  684. }
  685. # remove from job-hash
  686. my $retValJobs = 0;
  687. if (exists $jobs{"queued"}{$transfer}) {
  688. delete($jobs{"queued"}{$transfer});
  689. $retValJobs = 1;
  690. }
  691. # remove from queue-stack
  692. my $retValQueue = 0;
  693. my $Idx = 0;
  694. LOOP: foreach my $queueEntry (@queue) {
  695. if ($queueEntry eq $transfer) {
  696. $retValQueue = 1;
  697. last LOOP;
  698. }
  699. $Idx++;
  700. }
  701. if ($retValQueue > 0) {
  702. if ($Idx > 0) { # not first entry, stack-action
  703. my @stack;
  704. for (my $i = 0; $i < $Idx; $i++) {
  705. push(@stack, (shift @queue));
  706. }
  707. shift @queue;
  708. for (my $i = 0; $i < $Idx; $i++) {
  709. push(@queue, (shift @stack));
  710. }
  711. $Idx--;
  712. } else { # first entry, just shift
  713. shift @queue;
  714. }
  715. }
  716. # return / save
  717. if (($retValJobs > 0) && ($retValQueue > 0)) {
  718. # save queue
  719. queueSave();
  720. return 1;
  721. } else {
  722. return 0;
  723. }
  724. }
  725. #------------------------------------------------------------------------------#
  726. # Sub: jobsCount #
  727. # Parameters: - #
  728. # Return: number of Jobs #
  729. #------------------------------------------------------------------------------#
  730. sub jobsCount {
  731. my $jobcount = 0;
  732. $jobcount += queueCount();
  733. $jobcount += runningCount();
  734. return $jobcount;
  735. }
  736. #------------------------------------------------------------------------------#
  737. # Sub: queueCount #
  738. # Parameters: - #
  739. # Return: number of queued jobs #
  740. #------------------------------------------------------------------------------#
  741. sub queueCount {
  742. return scalar(@queue);
  743. }
  744. #------------------------------------------------------------------------------#
  745. # Sub: runningCount #
  746. # Parameters: - #
  747. # Return: number of queued jobs #
  748. #------------------------------------------------------------------------------#
  749. sub runningCount {
  750. return scalar((keys %{$jobs{"running"}}));
  751. }
  752. #------------------------------------------------------------------------------#
  753. # Sub: transferStart #
  754. # Parameters: transfer-name #
  755. # Return: int with return of start-call (0|1) #
  756. #------------------------------------------------------------------------------#
  757. sub transferStart {
  758. my $transfer = shift;
  759. if (!(defined $transfer)) {
  760. return 0;
  761. }
  762. # fluxcli-call
  763. my $result = Fluxd::fluxcli("start", $transfer);
  764. if ($result == 1) {
  765. $globals{"started"} += 1;
  766. return 1;
  767. }
  768. return 0;
  769. }
  770. #------------------------------------------------------------------------------#
  771. # Sub: stack #
  772. # Arguments: integer, array ref #
  773. # Returns: Null #
  774. #------------------------------------------------------------------------------#
  775. sub stack {
  776. my $index = shift;
  777. my $array = shift;
  778. if ($index) {
  779. my @stack;
  780. for (my $i = 0; $i < $index; $i++) {
  781. push(@stack, (shift @$array));
  782. }
  783. shift @$array;
  784. for (my $i = 0; $i < $index; $i++) {
  785. push(@$array, (shift @stack));
  786. }
  787. $index--;
  788. } else {
  789. shift @$array;
  790. }
  791. }
  792. #------------------------------------------------------------------------------#
  793. # Sub: status #
  794. # Arguments: Null #
  795. # Returns: status string #
  796. #------------------------------------------------------------------------------#
  797. sub status {
  798. my $return = "";
  799. $return .= "\n-= Qmgr Revision ".$VERSION." =-\n";
  800. $return .= "interval : ".$interval." s \n";
  801. # get count-vars
  802. my $countQueue = queueCount();
  803. my $countRunning = runningCount();
  804. my $countJobs = $countQueue + $countRunning;
  805. # some vars
  806. $return .= "max transfers global : ".$globals{'limitGlobal'}."\n";
  807. $return .= "max transfers per user : ".$globals{'limitUser'}."\n";
  808. $return .= "max start-tries : ".$globals{'limitStartTries'}."\n";
  809. # jobs total
  810. $return .= "jobs total : ".$countJobs."\n";
  811. # jobs queued
  812. $return .= "jobs queued : ".$countQueue."\n";
  813. foreach my $jobName (sort keys %{$jobs{"queued"}}) {
  814. my $jobUser = $jobs{"queued"}{$jobName};
  815. $return .= " * ".$jobName." (".$jobUser.")\n";
  816. }
  817. # jobs running
  818. $return .= "jobs running : ".$countRunning."\n";
  819. foreach my $jobName (sort keys %{$jobs{"running"}}) {
  820. my $jobUser = $jobs{"running"}{$jobName};
  821. $return .= " * ".$jobName." (".$jobUser.")\n";
  822. }
  823. # misc stats
  824. $return .= "running since : ".$localtime." (";
  825. $return .= FluxCommon::niceTimeString($time).") ";
  826. $return .= "(".$globals{'main'}." cycles) \n";
  827. $return .= "started transfers : ".$globals{'started'}."\n";
  828. # return
  829. return $return;
  830. }
  831. ################################################################################
  832. # make perl happy #
  833. ################################################################################
  834. 1;