Squiz Matrix  4.12.2
 All Data Structures Namespaces Functions Variables Pages
bulk_mailer.inc
1 <?php
17 require_once dirname(__FILE__).'/../../core/include/init.inc';
18 require_once SQ_FUDGE_PATH.'/general/file_system.inc';
19 require_once 'Mail/Queue.php';
20 
21 
33 {
34 
35  var $error_log_file_name = 'bulkmail_errors';
36 
37 
42  function __construct()
43  {
44 
45  }//end constructor
46 
47 
54  function start()
55  {
56  declare(ticks=1);
57  //We will intensively use fork from now on
58  //therefore set oci_commit_on_success mode on to avoid end-of-communication issue for oracle + fork
59  if (MatrixDAL::getDbType() === 'oci') {
61  }
62  // setup signal handlers
63  pcntl_signal(SIGTERM, Array(&$this,'_sigHandler'));
64  pcntl_signal(SIGHUP, Array(&$this,'_sigHandler'));
65 
66  $am = $GLOBALS['SQ_SYSTEM']->am;
67  $bm = $am->getSystemAsset('bulkmail_manager');
68 
69  // setup the log manager to log bulkmail errors
70  $old_log_errors = ini_set('log_errors', '1');
71  $old_error_log = ini_set('error_log', SQ_LOG_PATH.'/'.$this->error_log_file_name.SQ_CONF_LOG_EXTENSION);
72  set_error_handler(Array(&$this, '_errorHandler'));
73 
74  // login as root user
75  $root_user = $am->getSystemAsset('root_user');
76  if (!$GLOBALS['SQ_SYSTEM']->setCurrentUser($root_user)) {
77  trigger_localised_error('BML0003', E_USER_WARNING, $root_user->name, $root_user->id);
78  exit(1);
79  }
80 
81  // the allowed range for mail per minute (send rate)
82  $GLOBALS['min_mpm_threshold'] = 1;
83  $GLOBALS['max_mpm_threshold'] = 600;
84  // if the time allocated for each mail is greater than 30 minutes
85  // the script will quit, and a wake up time will be stored in the progress file
86  $GLOBALS['sleep_threshold'] = 1800;
87  // when the last_updated field in the progress file did not get updated for 1 hour
88  // job is considered as timeout-ed and we will take over this job
89  $GLOBALS['timeout_threshold'] = 3600;
90 
91  // get a scheduled job from the database table and process it
92  $results = $bm->getQueuedJobs();
93 
94  // pick up a timeout-ed job (higher priority than a new job)
95  foreach ($results as $job_id => $info) {
96  if ($info['status'] == BML_JOB_STATE_RUNNING) {
97  $progress_info = $bm->getJobProgress($job_id);
98  // check timeout and excluded time, make sure we take over this job if timeout_threshold is reached
99  if ($this->checkThreshold('time_out', Array(), $progress_info)) {
100  if ($this->checkThreshold('excluded_time', $info['details']['threshold'])) {
101  $this->chunkJob($job_id);
102  $this->processJob($job_id);
103  }
104  }
105  }
106  }
107 
108  // pick up a new job or a resume a paused job (normal priority)
109  foreach ($results as $job_id => $info) {
110  if ($info['status'] == BML_JOB_STATE_NOT_RUNNING) {
111  // excluded time threshold rule
112  if ($this->checkThreshold('excluded_time', $info['details']['threshold'])) {
113  $bm->updateJob($job_id, BML_JOB_STATE_RUNNING);
114  $this->chunkJob($job_id);
115  $this->processJob($job_id);
116  }
117  }
118  }
119  if (MatrixDAL::getDbType() === 'oci') {
121  }
122  }//end start()
123 
124 
131  function fork()
132  {
133  $child_pid = pcntl_fork();
134  switch ($child_pid) {
135  case -1:
136  trigger_localised_error('BML0004', E_USER_WARNING);
137  return NULL;
138  break;
139  case 0:
140  return $child_pid;
141  break;
142  default :
143  $status = NULL;
144  pcntl_waitpid(-1, $status);
145  return $child_pid;
146  break;
147  }
148 
149  }//end fork()
150 
151 
160  function chunkJob($job_id)
161  {
162  $pid = $this->fork();
163  if (!$pid) {
164  // reconnect because child disconnects DB when it exits
165  $this->_reconnectDB();
166  $bm = $GLOBALS['SQ_SYSTEM']->am->getSystemAsset('bulkmail_manager');
167  $info = $bm->getQueuedJobs($job_id);
168  $job_path = $bm->getJobDataPath($job_id);
169 
170  $recipient_path = $job_path.'/cache/recipient';
171  $progress_path = $job_path.'/progress_info';
172  $recip_per_chunk = $this->calculateChunkSize($info[$job_id]['details']['threshold'], $progress_path);
173  $progress_info = Array();
174  if (file_exists($progress_path)) {
175  include $progress_path;
176  } else {
177  trigger_localised_error('BML0009', E_USER_WARNING, $progress_path);
178  }
179 
180  // start caching if not previously cached
181  if (!is_dir($recipient_path)) {
182  $result = $this->getAllRecipients($info[$job_id]['details']['content_id'], $info[$job_id]['details']['recipients']);
183  // update total_count and list of problematic recipients in the progress file
184  $progress_info['total_count'] = count($result['recipient']);
185  $progress_info['problematic'] = $result['problematic'];
186  if (!array_to_file($progress_info, 'progress_info', $progress_path)) {
187  trigger_localised_error('BML0008', E_USER_WARNING, $progress_path);
188  }
189 
190  // cache recipients into smaller chunks
191  create_directory($recipient_path);
192  $chunks = array_chunk($result['recipient'], $recip_per_chunk, TRUE);
193  foreach ($chunks as $index => $chunk) {
194  if (!array_to_file($chunk, 'recipient_chunk', $recipient_path.'/recipient_'.$index)) {
195  trigger_localised_error('BML0008', E_USER_WARNING, $recipient_path.'/recipient_'.$index);
196  }
197  }
198  }
199  exit;
200  }//end if
201 
202  }//end chunkJob()
203 
204 
213  function processJob($job_id)
214  {
215  $am = $GLOBALS['SQ_SYSTEM']->am;
216  $mm = $GLOBALS['SQ_SYSTEM']->getMetadataManager();
217  $this->_reconnectDB();
218  $bm = $am->getSystemAsset('bulkmail_manager');
219  $info = $bm->getQueuedJobs($job_id);
220  $job_path = $bm->getJobDataPath($job_id);
221  $progress_path = $job_path.'/progress_info';
222  $recipient_path = $job_path.'/cache/recipient';
223 
224  // load required information
225  $progress_info = $info[$job_id]['progress'];
226  $details = $info[$job_id]['details'];
227  $header_details = $details['header_details'];
228  $header_details['from'] = $details['from'];
229 
230  require_once SQ_FUDGE_PATH.'/general/text.inc';
231  $keywords = retrieve_keywords_replacements($details['subject']);
232  $replacements = Array();
233  $content_asset = $am->getAsset($details['content_id']);
234 
235  $details['search_page'] = NULL;
236  $details['user_subscriptions'] = NULL;
237 
238  if ((get_class($content_asset) == 'Search_Page') && !$details['user_subscriptions_status']) {
239  trigger_localised_error('BML0020', E_USER_WARNING);
240  return FALSE;
241  } else if ((get_class($content_asset) != 'Search_Page') && $details['user_subscriptions_status']) {
242  trigger_localised_error('BML0021', E_USER_WARNING);
243  return FALSE;
244  } else if ((get_class($content_asset) == 'Search_Page') && $details['user_subscriptions_status'] && empty($details['subscriptions'])) {
245  trigger_localised_error('BML0023', E_USER_WARNING);
246  return FALSE;
247  } else if ((get_class($content_asset) == 'Search_Page') && $details['user_subscriptions_status']) {
248  // the settings seems to be correct
249  $details['search_page'] = $content_asset;
250  $details['user_subscriptions'] = $details['subscriptions'];
251  }
252 
253  // If the content asset has a different character set to the default,
254  // then use that instead, otherwise use the default set in System Config
255  if (!empty($content_asset->charset)) {
256  $content_charset = $content_asset->charset;
257  } else {
258  $content_charset = SQ_CONF_DEFAULT_CHARACTER_SET;
259  }
260 
261  $mime_param = Array(
262  'head_charset' => $content_charset,
263  'text_charset' => $content_charset,
264  'html_charset' => $content_charset,
265  );
266 
267  foreach ($keywords as $word) {
268  if (0 === strpos($word, 'recipient_')) {
269  // keep recipient keyword until later
270  $replacements[$word] = "%$word%";
271  } else {
272  $replacements[$word] = $content_asset->getKeywordReplacement($word);
273  }
274  }
275  $am->forgetAsset($content_asset);
276 
277  replace_keywords($details['subject'], $replacements);
278  $header_details['subject'] = $details['subject'];
279 
280  // create the /queue and /sent directories for mail queue
281  $queue_path = $details['queue_details']['dir'];
282  $sent_path = $queue_path.'/sent';
283  if (!is_dir($sent_path)) create_directory($sent_path);
284 
285  $GLOBALS['chunk_start'] = get_microtime();
286  // read chunks from cache and process them
287 
288  while (file_exists($recipient_path.'/recipient_'.$progress_info['current_chunk'])) {
289 
290  // check if job state is paused/cancelled at the beginning of each chunk
291  if (!isset($info[$job_id])) {
292  // job removed from queue table
293  exit;
294  } else {
295  $state = $info[$job_id]['status'];
296  if ($state == BML_JOB_STATE_PAUSED) {
297  // job paused
298  exit;
299  } else if ($state == BML_JOB_STATE_NOT_RUNNING) {
300  // job cancelled
301  exit;
302  }
303  }
304 
305  // threshold checking
306  if (!$this->checkThreshold('excluded_time', $details['threshold'])) {
307  // current time is in excluded time range
308  $bm->updateJob($job_id, BML_JOB_STATE_NOT_RUNNING);
309  exit;
310  }
311 
312  // load the current recipient chunk
313  include($recipient_path.'/recipient_'.$progress_info['current_chunk']);
314 
315  $pid = $this->fork();
316 
317  if (!$pid) {
318  $mime = new Mail_mime("\n");
319  $mail_queue = new Mail_Queue($details['queue_details'], $details['server_details']);
320 
321  if (strpos($job_id, ':') === FALSE) {
322  $job = $am->getAsset($job_id);
323  $current_post_office = $job->getPostOffice();
324  } else {
325  $job = $current_post_office = NULL;
326  }
327 
328  if ($details['content_text_only']) {
329  unset($details['content_design']);
330  unset($details['content_layout']);
331  }
332 
333  foreach ($recipient_chunk as $r_info) {
334  $this->_reconnectDB();
335  $user_id = $r_info['assetid'];
336  $email = $r_info['custom_val'];
337  $user = $am->getAsset($user_id, '', TRUE);
338 
339  // do not add the email to queue if there is no recipient's email address. This will
340  // make sure that the sendMailsInQueue() method of Mail_Queue class does not throw an
341  // exception when the first email in the chunk of 100 emails is empty which may leaves
342  // 99 valid email addresses not being sent to. Further to that bug fx #5847 Bulkmail
343  // job does not correctly check for duplicate mails in its' queue if the email has
344  // leading/trailing white spaces or line breaks then that should be termed as invalid
345  // and don't allow it to be queued up or else PEAR package will go in infinite loop
346  if ((is_string($email) && (trim($email) == '')) || ($email != trim($email) || !valid_email($email))
347  || (is_array($email) && (trim($email['email']) == '')) || (array_get_index($email, 'email', FALSE) && ($email['email'] != trim($email['email']) || !valid_email($email['email'])))) {
348  $this->updateProgress($progress_path, $user_id, FALSE, FALSE);
349  continue;
350  }
351 
352  // make sure user is not already in mail queue/sent directory
353  $id = md5($user_id);
354 
355  if (!file_exists($queue_path.'/'.$id) && !file_exists($sent_path.'/'.$id)) {
356  // fork since we cannot call printFrontend multiple times
357  $pid = $this->fork();
358  if (!$pid) {
359  $problematic = NULL;
360  $filtered = NULL;
361 
362  if (!is_null($details['search_page']) && !is_null($details['user_subscriptions'])) {
363  $search_asset_list = Array();
364  $user_metadata = NULL;
365  $matches = Array();
366  if (!is_null($user)) {
367  foreach ($details['user_subscriptions'] as $sub_name => $sub_info) {
368  $matches[$sub_name] = Array();
369  foreach ($sub_info as $index => $info) {
370  $match = FALSE;
371  $category_type = array_keys($info);
372  $user_metadata = $mm->getMetadataValueByAssetid($user->id, $info[$category_type[0]]['assetid']);
373  $metadata_field= $GLOBALS['SQ_SYSTEM']->am->getAsset($info[$category_type[0]]['assetid']);
374  $rule_text = $info[$category_type[0]]['rule_text'];
375  $old_frontend_asset = $GLOBALS['SQ_SYSTEM']->frontend_asset;
376 
377  $GLOBALS['SQ_SYSTEM']->frontend_asset = $job;
378  $GLOBALS['SQ_SYSTEM']->replaceKeyword($rule_text);
379  $GLOBALS['SQ_SYSTEM']->frontend_asset = $old_frontend_asset;
380 
381  if ($info[$category_type[0]]['rule_cond'] == 'contains') {
382  if ($metadata_field instanceof Metadata_Field_Select) {
383  $rule_text_array = explode(';', $rule_text);
384 
385  foreach ($rule_text_array as $individial_text) {
386  $match = (stripos($user_metadata, trim($individial_text)) !== FALSE);
387  if ($match) break;
388  }
389  } else {
390  $match = (stripos($user_metadata, $rule_text) !== FALSE);
391  }
392  } else {
393  // needs a exact match
394  $match = (strtolower($user_metadata) == strtolower($rule_text));
395  }
396 
397  if ($match && !in_array($user_metadata, $matches[$sub_name])) {
398  if ($metadata_field instanceof Metadata_Field_Select && strpos($user_metadata, '\\;') !== FALSE) {
399  $matches[$sub_name] = explode('\;', $user_metadata);
400  } else {
401  $matches[$sub_name][] = $user_metadata;
402  }
403  }
404  }//end foreach
405  }//end foreach
406  }//end if
407 
408  // check to see if the user is a bulkmail user
409  // we can't set current user as a bulkmail user
410  if ($user instanceof Bulkmail_User) {
411  $user = NULL;
412  $user = $am->getAsset($details['content_gen_as']);
413  }
414 
415  $GLOBALS['SQ_SYSTEM']->setCurrentUser($user);
416  $old_get_vars = $_GET;
417  $search_result = '';
418  $_GET = $good_query = Array();
419  foreach ($matches as $get_var => $strings_to_srch) {
420  foreach ($strings_to_srch as $srch_string) {
421  $_REQUEST['mode'] = 'results';
422  $_GET[$get_var] = $srch_string;
423  $search_vars = $content_asset->processSearchVars();
424 
425  $this->_reconnectDB(TRUE);
426 
427  $search_res = $content_asset->processSearch($search_vars);
428 
429  unset($_GET[$get_var]);
430  if (!empty($search_res)) {
431  $good_query[$get_var][] = $srch_string;
432  }
433 
434  $this->_reconnectDB();
435  }//end foreach
436  $_GET = $good_query;
437  }//end foreach
438  $this->_reconnectDB(TRUE);
439 
440  if (!empty($_GET)) $search_result .= $this->generateContent($details, $job_path, $user_id, $email);
441 
442  $this->_reconnectDB();
443 
444  $_GET = $old_get_vars;
445  $GLOBALS['SQ_SYSTEM']->restoreCurrentUser();
446 
447  $details['content_id'] = NULL;
448  $details['search_content'] = $search_result;
449  }//end if
450 
451  // if no search results were found then update the progress because we have already processed this user
452  if (empty($details)) $this->updateProgress($progress_path, $user_id, $problematic, $filtered);
453 
454  if (!isset($details['search_content'])) {
455  $content = $this->generateContent($details, $job_path, $user_id, $email);
456  } else {
457  $content = $details['search_content'];
458  }
459  $text_only_content = html_entity_decode(strip_tags($content));
460 
461  // If there is a filter set, only continue if the
462  // filter matches (or is ineffective due to empty
463  // text field).
464  if (array_get_index($details, 'content_body_filter')) {
465  $match = array_get_index($details, 'content_body_filter_match', TRUE);
466  $filter_text = array_get_index($details, 'content_body_filter_text', '');
467 
468  if (!empty($filter_text)) {
469  $target_content = ($details['content_text_only'] ? $text_only_content : $content);
470  $ok = (strpos($target_content, $filter_text) !== FALSE);
471  if ($ok != $match) $filtered = TRUE;
472  }
473  }
474 
475  if (is_array($email)) {
476  $header_details['subject'] = $this->replaceContentKeywords($header_details['subject'], $email);
477  } else {
478  $user = $am->getAsset($user_id);
479  $header_details['subject'] = $this->replaceContentKeywords($header_details['subject'], $user);
480  }//end if
481 
482  // only queue the mail if we can generate content
483  $search_content_empty = FALSE;
484  if (empty($content) && isset($details['search_content'])) {
485  $search_content_empty = TRUE;
486  } else if (empty($content)) {
487  $problematic = TRUE;
488  }
489 
490  if (!$problematic && !$filtered && !$search_content_empty) {
491  // Check that the email is not the email address array
492  if (is_array($email)) $email = $email['email'];
493 
494  // Always set text version, and set html version
495  // if turned on.
496  $mime->setTxtBody($text_only_content);
497  if (!$details['content_text_only']) {
498  $mime->setHTMLBody($content);
499  }
500 
501  $body = @$mime->get($mime_param);
502  // pass in custom_id as extra header detail, for queue file naming
503  $header_details['custom_id'] = $user_id;
504  $header_details['To'] = $email; // FR: 2515
505  // Check for Test Mode
506  if (!is_null($current_post_office) && $current_post_office->attr('bulkmail_mode') == 'test') {
507  // Log the emails
508  $message_type = 'bulkmail.job.testmode';
509  $msg_reps = Array(
510  'from' => $header_details['from'],
511  'to' => $header_details['To'],
512  'subject' => $header_details['subject'],
513  );
514  $ms = $GLOBALS['SQ_SYSTEM']->getMessagingService();
515  $message = $ms->newMessage(Array(), $message_type, $msg_reps);
516  $message->parameters['assetid'] = $job_id;
517  $message->send();
518 
519  $am->forgetAsset($message);
520  $am->forgetAsset($ms);
521  } else {
522  $headers = @$mime->headers($header_details);
523  $mail_id = $mail_queue->put('', $email, $headers, $body);
524  }
525  }
526  $this->updateProgress($progress_path, $user_id, $problematic, $filtered);
527  $this->enforceSendRate($details['threshold'], $progress_path);
528 
529  exit;
530  }//end if
531  }//end if
532 
533  }//end foreach
534 
535  if (!is_null($job)) $am->forgetAsset($job);
536  if (!is_null($current_post_office)) $am->forgetAsset($current_post_office);
537  try {
538  $status = $mail_queue->sendMailsInQueue();
539  } catch (Exception $e) {
540  trigger_localised_error('BML0001', E_USER_WARNING, $e->getMessage());
541  $this->nextChunk($progress_path);
542  exit;
543  }
544 
545  if ($status instanceof PEAR_Error) {
546  trigger_localised_error('BML0001', E_USER_WARNING, $status->getMessage());
547  $this->nextChunk($progress_path);
548  exit;
549  }
550  // proceed to next chunk
551  $this->nextChunk($progress_path);
552  exit;
553 
554  } else {
555  // parent process, get updated info for next loop
556  $this->_reconnectDB();
557  $progress_info = $bm->getJobProgress($job_id);
558  $info = $bm->getQueuedJobs($job_id);
559  }
560 
561  }//end while
562 
563  // job completed
564  $this->_reconnectDB();
565  $progress_info = $bm->getJobProgress($job_id);
566  if ($progress_info['current_count'] >= $progress_info['total_count']) {
567  if (strpos($job_id, ':') === FALSE) {
568  $job = $am->getAsset($job_id);
569  $current_post_office = $job->getPostOffice();
570  } else {
571  $job = $current_post_office = NULL;
572  }
573 
574  // remove job entry from the db queue table
575  $bm->deleteJob($job_id);
576 
577  // if this is not an ad-hoc job, log an internal message
578  $job_id_parts = explode(':', $job_id);
579  if (!isset($job_id_parts[1])) {
580 
581  if (!is_null($current_post_office) && $current_post_office->attr('bulkmail_mode') == 'test') {
582  $message_type = 'bulkmail.job.testmode.completed';
583  $msg_reps = Array(
584  'num_recip' => number_format($progress_info['total_count']),
585  'num_problem' => number_format(count($progress_info['problematic'])),
586  'num_filtered' => isset($progress_info['filtered']) ? number_format(count($progress_info['filtered'])) : 0,
587  );
588  } else {
589  $subtype = '';
590  if (count($progress_info['problematic']) > 0) {
591  trigger_localised_error('BML0010', E_USER_WARNING, implode(',', $progress_info['problematic']));
592  $subtype = '.error';
593  }
594 
595  if (isset($progress_info['filtered']) && (count($progress_info['filtered']) > 0)) {
596  trigger_localised_error('BML0019', E_USER_WARNING, implode(',', $progress_info['filtered']));
597  if (empty($subtype)) {
598  $subtype = '.filtered';
599  } else {
600  $subtype = '+filtered';
601  }
602  }
603 
604  $message_type = 'bulkmail.job.completed'.$subtype;
605 
606  // format recipient count with grouped thousands
607  $msg_reps = Array(
608  'num_recip' => number_format($progress_info['total_count']),
609  'num_problem' => number_format(count($progress_info['problematic'])),
610  'num_filtered' => isset($progress_info['filtered']) ? number_format(count($progress_info['filtered'])) : 0,
611  );
612  }//end if
613 
614  // Send the message
615  $ms = $GLOBALS['SQ_SYSTEM']->getMessagingService();
616  $message = $ms->newMessage(Array(), $message_type, $msg_reps);
617  $message->parameters['assetid'] = $job_id;
618  $message->send();
619  $am->forgetAsset($message);
620  $am->forgetAsset($ms);
621  }
622  if (!is_null($job)) $am->forgetAsset($job);
623  if (!is_null($current_post_office)) $am->forgetAsset($current_post_office);
624 
625  }
626  return TRUE;
627 
628  }//end processJob()
629 
630 
631 //-- PROGRESS --//
632 
633 
640  function _reconnectDB($dbsearch=FALSE)
641  {
642  if (!$dbsearch) {
643  $GLOBALS['SQ_SYSTEM']->restoreDatabaseConnection(TRUE);
644  //force a new oci connection to avoid end-of-communication issue of oracle + fork
645  $GLOBALS['SQ_SYSTEM']->changeDatabaseConnection('db2', TRUE);
646  } else {
647  $GLOBALS['SQ_SYSTEM']->restoreDatabaseConnection(TRUE);
648  //force a new oci connection to avoid end-of-communication issue of oracle + fork
649  $GLOBALS['SQ_SYSTEM']->changeDatabaseConnection('dbsearch', TRUE);
650  }
651 
652  }//end _reconnectDB()
653 
654 
669  function updateProgress($progress_path, $user_id, $problematic, $filtered)
670  {
671  $progress_info = Array();
672  if (file_exists($progress_path)) {
673  include $progress_path;
674  } else {
675  trigger_localised_error('BML0009', E_USER_WARNING, $progress_path);
676  return FALSE;
677  }
678 
679  if ($problematic) {
680  $progress_info['total_count']--;
681  $progress_info['problematic'][] = $user_id;
682  } else if ($filtered) {
683  $progress_info['total_count']--;
684  $progress_info['filtered'][] = $user_id;
685  } else {
686  $progress_info['current_count']++;
687  }
688  $progress_info['current_recip_id'] = $user_id;
689  $progress_info['last_updated'] = time();
690 
691  if (!array_to_file($progress_info, 'progress_info', $progress_path)) {
692  trigger_localised_error('BML0008', E_USER_WARNING, $progress_path);
693  return FALSE;
694  } else {
695  return TRUE;
696  }
697 
698  }//end updateProgress()
699 
700 
709  function nextChunk($progress_path)
710  {
711  $progress_info = Array();
712  if (file_exists($progress_path)) {
713  include $progress_path;
714  } else {
715  trigger_localised_error('BML0009', E_USER_WARNING, $progress_path);
716  return FALSE;
717  }
718 
719  $progress_info['current_chunk']++;
720  // record the time taken to process one recipient chunk
721  if (!$progress_info['time_per_chunk']) {
722  $progress_info['time_per_chunk'] = get_microtime() - $GLOBALS['chunk_start'];
723  }
724 
725  if (!array_to_file($progress_info, 'progress_info', $progress_path)) {
726  trigger_localised_error('BML0008', E_USER_WARNING, $progress_path);
727  return FALSE;
728  } else {
729  return TRUE;
730  }
731 
732  }//end nextChunk()
733 
734 
735 //-- RECIPIENT --//
736 
737 
751  function getAllRecipients($asset_to_send_id, $recipients)
752  {
753  $am = $GLOBALS['SQ_SYSTEM']->am;
754 
755  if (!is_array($recipients)) {
756  $recipients = Array($recipients);
757  }
758 
759  $asset_to_send = $am->getAsset($asset_to_send_id);
760  $recipients_expanded = Array();
761  $problematic_list = Array();
762 
763  foreach ($recipients as $recipient_key => $recipient_id) {
764  // Test for email address
765  if (isset($recipient_id['email']) && preg_match('/(.*)\@(.*)/is', $recipient_id['email'])) {
766  $recipients_expanded[$recipient_key] = $recipients[$recipient_key];
767  } else {
768  // Otherwise it should be an asset id
769  $recipient = $am->getAsset($recipient_id);
770 
771  // If the asset is not a user, we should check for children
772  if (!($recipient instanceOf User) && !($recipient instanceOf Bulkmail_User)) {
773  // It is no longer just user group
774  // If it is a search folder, do something, otherwise just try to get children
775  if ($recipient->type() == 'search_folder') {
776  $map_links = $recipient->getAssetMapLinks();
777  foreach ($map_links as $map_link) {
778  // We only add this asset if it is a user or bulkmail user
779  $all_user_types = Array();
780  $user_types = $am->getTypeDescendants('user', TRUE);
781  $bulkmail_user_types = $am->getTypeDescendants('bulkmail_user', TRUE);
782  $all_user_types = array_merge($user_types, $bulkmail_user_types);
783  if (in_array($map_link['type_code'], $all_user_types)) {
784  $id_parts = explode(':', $map_link['assetid']);
785  // We are so sure that it has shadow id, but just incase
786  if (isset($id_parts[1])) {
787  $link_assetid = $id_parts[1];
788  $recipients_expanded[$link_assetid] = $map_link['type_code'];
789  }//end if
790  }//end if
791  }//end foreach
792  } else {
793  $recipients_expanded += $am->getChildren($recipient->id, 'user', FALSE, FALSE);
794  $recipients_expanded += $am->getChildren($recipient->id, 'bulkmail_user', FALSE, FALSE);
795  }//end else
796  } else {
797  // Else it is a user, add single recipient
798  $recipients_expanded[$recipient->id] = $recipient->type();
799  }
800 
801  $am->forgetAsset($recipient);
802 
803  }//end if
804  }//end foreach
805 
806  $recipients = Array();
807  $recipients_query = Array();
808  $permission_denied = $am->getPermission($asset_to_send_id, SQ_PERMISSION_READ, FALSE, FALSE);
809  $public_user = $am->getSystemAsset('public_user');
810  $public_user_id = $public_user->id;
811  for ($recipients_expanded; NULL != ($recipient_id = key($recipients_expanded)); next($recipients_expanded)) {
812  //if it has right permission, but it is not an email address
813  if (!in_array($recipient_id, $permission_denied) && (!is_array($recipients_expanded[$recipient_id]) || !array_key_exists('email',$recipients_expanded[$recipient_id]))) {
814 
815  $recipient_id_parts = explode(':', $recipient_id);
816  if (isset($recipient_id_parts[1])) {
817  $recipient = $am->getAsset($recipient_id);
818  // calls to getChildren() on bridge assets do not always return User assets
819  if ($recipient instanceof User) {
820  $recipients[] = Array(
821  'assetid' => $recipient->id,
822  'custom_val' => $recipient->attr('email'),
823  );
824  }
825  $am->forgetAsset($recipient);
826  } else {
827  // Only do assets here
828  $recipients_query[] = $recipient_id;
829  }//end if
830  } else if (!in_array($public_user_id, $permission_denied)) {
831  // Make sure the public user has access, then process the email addresses
832  if (isset($recipients_expanded[$recipient_id]['email']) && preg_match('/(.*)\@(.*)/is', $recipients_expanded[$recipient_id]['email'])) {
833  // Do the email addresses here
834  $recipients[] = Array(
835  'assetid' => $recipient_id,
836  'custom_val' => Array(
837  'first_name' => $recipients_expanded[$recipient_id]['first_name'],
838  'last_name' => $recipients_expanded[$recipient_id]['last_name'],
839  'email' => $recipients_expanded[$recipient_id]['email'],
840  ),
841  );
842  }
843  } else {
844  // read access denied for this recipient
845  $problematic_list[] = $recipient_id;
846  }//end if
847  }//end for
848 
849  $query_results = Array();
850  if (!empty($recipients_query)) {
851  // Eliminate any duplicate recipients before chunking to obtain optimal bite-sized portions
852  $recipients_query = array_unique($recipients_query);
853 
854  // Chunkifise our query to get all the email addresses of REAL assets
855  // User assets without an email will be filtered out here
856  $chunk_size = 1000;
857  $recipient_chunks = array_chunk($recipients_query, $chunk_size, TRUE);
858 
859  foreach ($recipient_chunks as $recipient_chunk) {
860  try {
861  $bind_vars['recipient_list'] = $recipient_chunk;
862  $bind_vars['contextid'] = $GLOBALS['SQ_SYSTEM']->getContextId();
863  $query_result = MatrixDAL::executeAssoc('bulkmail_package', 'getAllRecipients', $bind_vars);
864  $query_results = array_merge($query_results, $query_result);
865  } catch (Exception $e) {
866  throw new Exception('Unable to get recipient list due to database error: '.$e->getMessage());
867  }
868  }
869  }
870 
871  $recipients = array_merge($recipients, $query_results);
872  $result['recipient'] = $recipients;
873  $result['problematic'] = $problematic_list;
874 
875  return $result;
876 
877  }//end getAllRecipients()
878 
879 
880 //-- CONTENT --//
881 
882 
897  public function generateContent($job_details, $job_path, $recipient_id=NULL, $email=NULL)
898  {
899 
900  $am = $GLOBALS['SQ_SYSTEM']->am;
901  if (isset($job_details['content_id']) && $job_details['content_id']) {
902  $content_asset = $am->getAsset($job_details['content_id']);
903  $GLOBALS['SQ_SYSTEM']->frontend_asset = &$content_asset;
904 
905  $am->forgetAsset($content_asset);
906  }
907 
908  if (!empty($recipient_id)) {
909  if (!is_null($email) && is_array($email)) {
910  $recipient = Array($recipient_id => $email);
911  } else {
912  $recipient = $am->getAsset($recipient_id);
913  }
914  } else {
915  $recipient = $GLOBALS['SQ_SYSTEM']->user;
916  }//end if
917 
918  // Create an instance of the job to load the content
919  if (isset($job_details['job_id']) && !empty($job_details['job_id'])) {
920  $job = $am->getAsset($job_details['job_id']);
921  } else {
922  $job = NULL;
923  }//end if
924 
925  if (($job_details['content_gen'] == 'one_user') || ($recipient instanceof Bulkmail_User)) {
926 
927  // Same content for everyone, apart from recipient keywords.
928  // First try the cache, unless we're previewing
929  if ((file_exists($job_path.'/cache/content_file_'.$job_details['content_id'].'.html')) && !empty($recipient_id)) {
930 
931  $content = file_to_string($job_path.'/cache/content_file_'.$job_details['content_id'].'.html');
932 
933  } else {
934 
935  // Log in as generator
936  $generator_user = $am->getAsset($job_details['content_gen_as']);
937  if (!$GLOBALS['SQ_SYSTEM']->setCurrentUser($generator_user)) {
938  trigger_localised_error('BML0017', E_USER_WARNING); // localise
939  return '';
940  }
941 
942  // Do the actual generation
943  if (is_null($job)) {
944  $content = Bulk_Mailer::generateRawContent($job_details);
945  } else {
946  $content = $job->generateRawContent($job_details, $recipient);
947  }//end if
948 
949  // Log out generator
950  $GLOBALS['SQ_SYSTEM']->restoreCurrentUser();
951 
952  // Save to cache, unless previewing or not sending to actual users
953  if (!empty($recipient_id) || !($recipient instanceof Bulkmail_User)) {
954  string_to_file($content, $job_path.'/cache/content_file_'.$job_details['content_id'].'.html');
955  }
956 
957 
958  }//end else file doesn't exist or recipient empty
959 
960  } else {
961 
962  // We are generating content on a per-recipient basis
963  // If the recipient is a non-user, we ask for the Raw Content based on the recipient passed
964  if (is_array($recipient)) {
965  // Do the actual generation
966  if (is_null($job)) {
967  $content = Bulk_Mailer::generateRawContent($job_details);
968  } else {
969  $content = $job->generateRawContent($job_details, $recipient);
970  }//end if
971 
972  return Bulk_Mailer::replaceContentKeywords($content, $recipient);
973  }//end if
974 
975  // Make the recipient the current user, unless we are previewing (empty recipient id)
976  // in which case the current user is already the recipient
977  if (!empty($recipient_id)) {
978  if (!$GLOBALS['SQ_SYSTEM']->setCurrentUser($recipient)) {
979  trigger_localised_error('BML0017', E_USER_WARNING);
980  return '';
981  }
982  }
983 
984  // Do the actual generation
985  if (is_null($job)) {
986  $content = Bulk_Mailer::generateRawContent($job_details);
987  } else {
988  $content = $job->generateRawContent($job_details, $recipient);
989  }//end if
990 
991  // If we logged in the recipient, log them out again
992  if (!empty($recipient_id)) {
993  $GLOBALS['SQ_SYSTEM']->restoreCurrentUser();
994  }
995 
996  }//end else
997 
998  return Bulk_Mailer::replaceContentKeywords($content, $recipient);
999 
1000  }//end generateContent()
1001 
1002 
1012  public static function generateRawContent($details)
1013  {
1014  // moved functionality to asset_manager.inc for reusing the codes in triggers
1015  return $GLOBALS['SQ_SYSTEM']->am->generateRawContent($details);
1016 
1017  }//end generateRawContent()
1018 
1019 
1029  public static function replaceContentKeywords(&$content, &$user)
1030  {
1031  // to insert user keywords in the 'Asset to Send' content,
1032  // add 'recipient_' prefix in front of normal keywords
1033  require_once SQ_FUDGE_PATH.'/general/text.inc';
1034 
1035  $keywords = retrieve_keywords_replacements($content);
1036  $keyword_replacements = Array();
1037  foreach ($keywords as $word) {
1038  if (0 === strpos($word, 'recipient_')) {
1039  $user_keyword = substr($word, 10);
1040  if (is_array($user)) {
1041  // Process based on the first name, last name and email address supplied
1042  foreach ($user as $user_login => $user_details) {
1043  if ($user_keyword == 'login') {
1044  $keyword_replacements[$word] = $user_login;
1045  } else if (isset($user_details[$user_keyword])) {
1046  $keyword_replacements[$word] = $user_details[$user_keyword];
1047  } else {
1048  $keyword_replacements[$word] = "%$word%";
1049  }//end if
1050  }//end foreach
1051  } else {
1052  $user_replacement = $user->getKeywordReplacement($user_keyword);
1053  if ($user_replacement == "%$user_keyword%") {
1054  // Unknown user keyword, do not replace
1055  $keyword_replacements[$word] = "%$word%";
1056  } else {
1057  $keyword_replacements[$word] = $user_replacement;
1058  }
1059  }//end if
1060  } else {
1061  $keyword_replacements[$word] = '';
1062  }
1063  }
1064  require_once SQ_INCLUDE_PATH.'/general.inc';
1065  replace_global_keywords($content);
1066  replace_keywords($content, $keyword_replacements);
1067  return $content;
1068 
1069  }//end replaceContentKeywords()
1070 
1071 
1072 //-- THRESHOLD --//
1073 
1074 
1084  function calculateChunkSize($thresholds, $progress_path)
1085  {
1086  $mpm = 0;
1087  foreach ($thresholds as $threshold) {
1088  if ($threshold['type'] == translate('bulkmail_threshold_rule_send_rate')) {
1089  $value = $threshold['params']['value'];
1090  $unit = $threshold['params']['unit'];
1091  $mpm = ceil(($value/$unit) * 60);
1092  // mail per minute setting is not within allowed range
1093  if ($mpm < $GLOBALS['min_mpm_threshold'] || $mpm > $GLOBALS['max_mpm_threshold']) {
1094  $mpm = 0;
1095  }
1096  }
1097  }
1098  // use default chunk_size if 'send rate' rule is not defined or not achievable
1099  if (!$mpm) {
1100  $bm = $GLOBALS['SQ_SYSTEM']->am->getSystemAsset('bulkmail_manager');
1101  $mpm = $bm->attr('chunk_size');
1102  }
1103 
1104  // store mpm value in progress file
1105  $progress_info = Array();
1106  if (file_exists($progress_path)) {
1107  include $progress_path;
1108  } else {
1109  trigger_localised_error('BML0009', E_USER_WARNING, $progress_path);
1110  }
1111  $progress_info['mpm'] = $mpm;
1112  if (!array_to_file($progress_info, 'progress_info', $progress_path)) {
1113  trigger_localised_error('BML0008', E_USER_WARNING, $progress_path);
1114  }
1115 
1116  return $mpm;
1117 
1118  }//end calculateChunkSize()
1119 
1120 
1130  function enforceSendRate($thresholds, $progress_path)
1131  {
1132  $progress_info = Array();
1133  if (file_exists($progress_path)) {
1134  include $progress_path;
1135  } else {
1136  trigger_localised_error('BML0009', E_USER_WARNING, $progress_path);
1137  return FALSE;
1138  }
1139  if (!isset($progress_info['mpm'])) {
1140  // send rate rule is not defined
1141  return TRUE;
1142  }
1143 
1144  // finished processing one recipient chunk, work out the time to sleep between each recipient
1145  if (($progress_info['time_per_chunk'] != 0) && (!isset($progress_info['sleep_time']))) {
1146  $time_diff = 60 - $progress_info['time_per_chunk'];
1147  $progress_info['sleep_time'] = -1;
1148  if ($time_diff > 0) {
1149  // store the sleep time between each recipient (in microseconds)
1150  $chunk_size = $this->calculateChunkSize($thresholds, $progress_path);
1151  $progress_info['sleep_time'] = ($time_diff/$chunk_size) * 1000000;
1152  // sleep for the first chunk
1153  usleep($progress_info['sleep_time'] * $chunk_size);
1154  }
1155 
1156  // write the sleep time between recipient to the progress file
1157  if (!array_to_file($progress_info, 'progress_info', $progress_path)) {
1158  trigger_localised_error('BML0008', E_USER_WARNING, $progress_path);
1159  }
1160  }
1161 
1162  // sleep between each recipient
1163  if (isset($progress_info['sleep_time']) && ($progress_info['sleep_time'] != -1)) {
1164  usleep($progress_info['sleep_time']);
1165  }
1166 
1167  return TRUE;
1168 
1169  }//end enforceSendRate()
1170 
1171 
1182  function checkThreshold($type, $thresholds, $extra='')
1183  {
1184  switch ($type) {
1185  case 'excluded_time' :
1186  return $this->isExcludedTime($thresholds);
1187  break;
1188  case 'wakeup_time' :
1189  return $this->checkWakeUpTime($thresholds, $extra);
1190  break;
1191  case 'time_out' :
1192  return $this->checkTimeOut($thresholds, $extra);
1193  break;
1194  default:
1195  return FALSE;
1196  break;
1197  }
1198 
1199  }//end checkThreshold()
1200 
1201 
1210  function isExcludedTime($thresholds)
1211  {
1212 
1213  $days_array = Array(
1214  0 => 'Sun',
1215  1 => 'Mon',
1216  2 => 'Tue',
1217  3 => 'Wed',
1218  4 => 'Thu',
1219  5 => 'Fri',
1220  6 => 'Sat',
1221  );
1222 
1223  foreach ($thresholds as $threshold) {
1224  if ($threshold['type'] == translate('bulkmail_threshold_rule_excluded_time')) {
1225  $excluded_days = Array();
1226  foreach ($threshold['params']['days'] as $excluded_day) {
1227  $excluded_days[] = $days_array[$excluded_day];
1228  }
1229  $now_day = date('D', time()); // A textual representation of a day, three letters, eg. 'Mon'
1230  $now_time = date('G', time()); // 24-hour format of an hour without leading zeros, eg. 8, 15
1231  if (in_array($now_day, $excluded_days)) {
1232  // range: [from, to)
1233  if ($now_time >= $threshold['params']['from'] && (($threshold['params']['to'] < $threshold['params']['from']) || ($now_time < $threshold['params']['to']))) {
1234  return FALSE;
1235  }
1236  } else if ($threshold['params']['from'] > $threshold['params']['to']) {
1237  // exclude from previous day X hour to today Y hour
1238  $yesterday_day = date('D', time()-86400);
1239  if (in_array($yesterday_day, $excluded_days)) {
1240  if ($now_time < $threshold['params']['to']) {
1241  return FALSE;
1242  }
1243  }
1244  }
1245  }
1246  }
1247  return TRUE;
1248 
1249  }//end isExcludedTime()
1250 
1251 
1261  function checkTimeOut($thresholds, $progress_info)
1262  {
1263  if (isset($progress_info['last_updated']) && ($progress_info['last_updated'] != 0)) {
1264  $time_diff = time() - $progress_info['last_updated'];
1265  // timeout threshold reached, take over
1266  if ($time_diff > $GLOBALS['timeout_threshold']) {
1267  return TRUE;
1268  }
1269  }
1270  return FALSE;
1271 
1272  }//end checkTimeOut()
1273 
1274 
1275 //-- HANDLER --//
1276 
1277 
1286  function _sigHandler($signo)
1287  {
1288  switch ($signo) {
1289  case SIGTERM:
1290  exit;
1291  break;
1292  case SIGHUP:
1293  case SIGUSR1:
1294  default:
1295  }
1296 
1297  }//end _sigHandler()
1298 
1299 
1312  function _errorHandler($err_no, $err_msg, $err_file, $err_line)
1313  {
1314  $terminate = ((E_USER_ERROR | E_ERROR) & $err_no);
1315 
1316  // if the function didn't have an '@' prepended OR if we are about to terminate
1317  // catch the error
1318  if ((error_reporting() & $err_no) || $terminate) {
1319 
1320  $bt = debug_backtrace();
1321  if (count($bt) > 1) {
1322  $real_bt_index = 0;
1323  // note the use of '_errorhandler' instead of '_errorHandler' in the last condition
1324  while (($real_bt_index < count($bt)-1) && ((FALSE !== strpos(array_get_index($bt[$real_bt_index], 'class'), 'locale_manager')) || (FALSE !== strpos(array_get_index($bt[$real_bt_index], 'file'), 'locale_manager')) || ($bt[$real_bt_index]['function'] == '_errorhandler'))) {
1325  $real_bt_index++;
1326  }
1327  if (isset($bt[$real_bt_index]['file'])) {
1328  $err_file = hide_system_root($bt[$real_bt_index]['file']);
1329  }
1330  if (isset($bt[$real_bt_index]['line'])) {
1331  $err_line = $bt[$real_bt_index]['line'];
1332  }
1333  $err_msg = hide_system_root($err_msg);
1334  }
1335 
1336  $text_msg = strip_tags(preg_replace(Array('/<br\\/?>/i', '/<p[^>]*>/i'), Array("\n", "\n\n"), $err_msg));
1337 
1338  // send a report to the system error log
1339  if (ini_get('log_errors')) {
1340  log_error($text_msg, $err_no, $err_file, $err_line, $this->error_log_file_name);
1341  }
1342 
1343  }//end error_reporting
1344 
1345  if ($terminate) exit(1);
1346 
1347  }//end _errorHandler()
1348 
1349 
1350 }//end class
1351 
1352 
1353 ?>