686 lines
25 KiB
PHP
686 lines
25 KiB
PHP
<?php // phpcs:ignore SlevomatCodingStandard.TypeHints.DeclareStrictTypes.DeclareStrictTypesMissing
|
|
|
|
namespace MailPoet\Cron\Workers\SendingQueue;
|
|
|
|
if (!defined('ABSPATH')) exit;
|
|
|
|
|
|
use MailPoet\Cron\CronHelper;
|
|
use MailPoet\Cron\Workers\Bounce;
|
|
use MailPoet\Cron\Workers\SendingQueue\Tasks\Links;
|
|
use MailPoet\Cron\Workers\SendingQueue\Tasks\Mailer as MailerTask;
|
|
use MailPoet\Cron\Workers\SendingQueue\Tasks\Newsletter as NewsletterTask;
|
|
use MailPoet\Cron\Workers\StatsNotifications\Scheduler as StatsNotificationsScheduler;
|
|
use MailPoet\Entities\NewsletterEntity;
|
|
use MailPoet\Entities\ScheduledTaskEntity;
|
|
use MailPoet\Entities\SubscriberEntity;
|
|
use MailPoet\InvalidStateException;
|
|
use MailPoet\Logging\LoggerFactory;
|
|
use MailPoet\Mailer\MailerLog;
|
|
use MailPoet\Mailer\MetaInfo;
|
|
use MailPoet\Newsletter\Sending\ScheduledTasksRepository;
|
|
use MailPoet\Newsletter\Sending\ScheduledTaskSubscribersRepository;
|
|
use MailPoet\Newsletter\Sending\SendingQueuesRepository;
|
|
use MailPoet\Segments\SegmentsRepository;
|
|
use MailPoet\Segments\SubscribersFinder;
|
|
use MailPoet\Services\AuthorizedEmailsController;
|
|
use MailPoet\Statistics\StatisticsNewslettersRepository;
|
|
use MailPoet\Subscribers\SubscribersRepository;
|
|
use MailPoet\Tasks\Subscribers\BatchIterator;
|
|
use MailPoet\WP\Functions as WPFunctions;
|
|
use MailPoetVendor\Carbon\Carbon;
|
|
use MailPoetVendor\Doctrine\ORM\EntityManager;
|
|
use Throwable;
|
|
|
|
class SendingQueue {
|
|
/** @var MailerTask */
|
|
public $mailerTask;
|
|
|
|
/** @var NewsletterTask */
|
|
public $newsletterTask;
|
|
|
|
const TASK_TYPE = 'sending';
|
|
const TASK_BATCH_SIZE = 5;
|
|
const EMAIL_WITH_INVALID_SEGMENT_OPTION = 'mailpoet_email_with_invalid_segment';
|
|
|
|
/** @var StatsNotificationsScheduler */
|
|
public $statsNotificationsScheduler;
|
|
|
|
/** @var SendingErrorHandler */
|
|
private $errorHandler;
|
|
|
|
/** @var SendingThrottlingHandler */
|
|
private $throttlingHandler;
|
|
|
|
/** @var MetaInfo */
|
|
private $mailerMetaInfo;
|
|
|
|
/** @var LoggerFactory */
|
|
private $loggerFactory;
|
|
|
|
/** @var CronHelper */
|
|
private $cronHelper;
|
|
|
|
/** @var SubscribersFinder */
|
|
private $subscribersFinder;
|
|
|
|
/** @var SegmentsRepository */
|
|
private $segmentsRepository;
|
|
|
|
/** @var WPFunctions */
|
|
private $wp;
|
|
|
|
/** @var Links */
|
|
private $links;
|
|
|
|
/** @var ScheduledTasksRepository */
|
|
private $scheduledTasksRepository;
|
|
|
|
/** @var ScheduledTaskSubscribersRepository */
|
|
private $scheduledTaskSubscribersRepository;
|
|
|
|
/** @var SubscribersRepository */
|
|
private $subscribersRepository;
|
|
|
|
/*** @var SendingQueuesRepository */
|
|
private $sendingQueuesRepository;
|
|
|
|
/** @var EntityManager */
|
|
private $entityManager;
|
|
|
|
/** @var StatisticsNewslettersRepository */
|
|
private $statisticsNewslettersRepository;
|
|
|
|
/** @var AuthorizedEmailsController */
|
|
private $authorizedEmailsController;
|
|
|
|
public function __construct(
|
|
SendingErrorHandler $errorHandler,
|
|
SendingThrottlingHandler $throttlingHandler,
|
|
StatsNotificationsScheduler $statsNotificationsScheduler,
|
|
LoggerFactory $loggerFactory,
|
|
CronHelper $cronHelper,
|
|
SubscribersFinder $subscriberFinder,
|
|
SegmentsRepository $segmentsRepository,
|
|
WPFunctions $wp,
|
|
Links $links,
|
|
ScheduledTasksRepository $scheduledTasksRepository,
|
|
ScheduledTaskSubscribersRepository $scheduledTaskSubscribersRepository,
|
|
MailerTask $mailerTask,
|
|
SubscribersRepository $subscribersRepository,
|
|
SendingQueuesRepository $sendingQueuesRepository,
|
|
EntityManager $entityManager,
|
|
StatisticsNewslettersRepository $statisticsNewslettersRepository,
|
|
AuthorizedEmailsController $authorizedEmailsController,
|
|
$newsletterTask = false
|
|
) {
|
|
$this->errorHandler = $errorHandler;
|
|
$this->throttlingHandler = $throttlingHandler;
|
|
$this->statsNotificationsScheduler = $statsNotificationsScheduler;
|
|
$this->subscribersFinder = $subscriberFinder;
|
|
$this->mailerTask = $mailerTask;
|
|
$this->newsletterTask = ($newsletterTask) ? $newsletterTask : new NewsletterTask();
|
|
$this->segmentsRepository = $segmentsRepository;
|
|
$this->mailerMetaInfo = new MetaInfo;
|
|
$this->wp = $wp;
|
|
$this->loggerFactory = $loggerFactory;
|
|
$this->cronHelper = $cronHelper;
|
|
$this->links = $links;
|
|
$this->scheduledTasksRepository = $scheduledTasksRepository;
|
|
$this->scheduledTaskSubscribersRepository = $scheduledTaskSubscribersRepository;
|
|
$this->subscribersRepository = $subscribersRepository;
|
|
$this->sendingQueuesRepository = $sendingQueuesRepository;
|
|
$this->entityManager = $entityManager;
|
|
$this->statisticsNewslettersRepository = $statisticsNewslettersRepository;
|
|
$this->authorizedEmailsController = $authorizedEmailsController;
|
|
}
|
|
|
|
public function process($timer = false) {
|
|
$timer = $timer ?: microtime(true);
|
|
$this->enforceSendingAndExecutionLimits($timer);
|
|
foreach ($this->scheduledTasksRepository->findRunningSendingTasks(self::TASK_BATCH_SIZE) as $task) {
|
|
$queue = $task->getSendingQueue();
|
|
if (!$queue) {
|
|
continue;
|
|
}
|
|
|
|
if ($task->getInProgress()) {
|
|
if ($this->isTimeout($task)) {
|
|
$this->stopProgress($task);
|
|
} else {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
$this->startProgress($task);
|
|
|
|
try {
|
|
$this->scheduledTasksRepository->touchAllByIds([$task->getId()]);
|
|
$this->processSending($task, (int)$timer);
|
|
} catch (\Exception $e) {
|
|
$this->stopProgress($task);
|
|
throw $e;
|
|
}
|
|
|
|
$this->stopProgress($task);
|
|
}
|
|
}
|
|
|
|
private function processSending(ScheduledTaskEntity $task, int $timer): void {
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
|
|
'sending queue processing',
|
|
['task_id' => $task->getId()]
|
|
);
|
|
|
|
$this->deleteTaskIfNewsletterDoesNotExist($task);
|
|
|
|
$queue = $task->getSendingQueue();
|
|
$newsletter = $this->newsletterTask->getNewsletterFromQueue($task);
|
|
if (!$queue || !$newsletter) {
|
|
$task->setStatus(ScheduledTaskEntity::STATUS_PAUSED);
|
|
$this->scheduledTasksRepository->flush();
|
|
return;
|
|
}
|
|
|
|
// pre-process newsletter (render, replace shortcodes/links, etc.)
|
|
$newsletter = $this->newsletterTask->preProcessNewsletter($newsletter, $task);
|
|
|
|
// During pre-processing we may find that the newsletter can't be sent and we delete it including all associated entities
|
|
// E.g. post notification history newsletter when there are no posts to send
|
|
if (!$newsletter) {
|
|
$task->setStatus(ScheduledTaskEntity::STATUS_PAUSED);
|
|
$this->scheduledTasksRepository->flush();
|
|
return;
|
|
}
|
|
|
|
// configure mailer
|
|
$this->mailerTask->configureMailer($newsletter);
|
|
// get newsletter segments
|
|
$newsletterSegmentsIds = $newsletter->getSegmentIds();
|
|
$segmentIdsToCheck = $newsletterSegmentsIds;
|
|
$filterSegmentId = $newsletter->getFilterSegmentId();
|
|
|
|
if (is_int($filterSegmentId)) {
|
|
$segmentIdsToCheck[] = $filterSegmentId;
|
|
}
|
|
|
|
// Pause task in case some of related segments was deleted or trashed
|
|
if ($newsletterSegmentsIds && !$this->checkDeletedSegments($segmentIdsToCheck)) {
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
|
|
'pause task in sending queue due deleted or trashed segment',
|
|
['task_id' => $task->getId()]
|
|
);
|
|
$task->setStatus(ScheduledTaskEntity::STATUS_PAUSED);
|
|
$this->scheduledTasksRepository->flush();
|
|
$this->wp->setTransient(self::EMAIL_WITH_INVALID_SEGMENT_OPTION, $newsletter->getSubject());
|
|
return;
|
|
}
|
|
|
|
// Pause task if sender domain requirements are not met
|
|
if (!$this->authorizedEmailsController->isSenderAddressValid($newsletter, 'sending')) {
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
|
|
'pause task in sending queue due to sender domain requirements',
|
|
['task_id' => $task->getId()]
|
|
);
|
|
$task->setStatus(ScheduledTaskEntity::STATUS_PAUSED);
|
|
$this->scheduledTasksRepository->flush();
|
|
return;
|
|
}
|
|
|
|
// get subscribers
|
|
$subscriberBatches = new BatchIterator($task->getId(), $this->getBatchSize());
|
|
|
|
// Set invalid state for sending task for non-campaign (no-bulk) newsletters with no subscribers (e.g. welcome emails, automatic emails).
|
|
// This cover cases when a welcome or automatic email was scheduled but before processing it the subscriber was deleted.
|
|
// The non-campaign emails are sent only to a single recipient, and we count stats based on sending tasks statues, so we can't mark them as completed.
|
|
// At the same time we want to keep a record abut processing them
|
|
if ($subscriberBatches->count() === 0 && !in_array($newsletter->getType(), NewsletterEntity::CAMPAIGN_TYPES, true)) {
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
|
|
'no subscribers to process',
|
|
['task_id' => $task->getId()]
|
|
);
|
|
$this->scheduledTasksRepository->invalidateTask($task);
|
|
return;
|
|
}
|
|
/** @var int[] $subscribersToProcessIds - it's required for PHPStan */
|
|
foreach ($subscriberBatches as $subscribersToProcessIds) {
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
|
|
'subscriber batch processing',
|
|
['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId(), 'subscriber_batch_count' => count($subscribersToProcessIds)]
|
|
);
|
|
if (!empty($newsletterSegmentsIds[0])) {
|
|
// Check that subscribers are in segments
|
|
try {
|
|
$foundSubscribersIds = $this->subscribersFinder->findSubscribersInSegments($subscribersToProcessIds, $newsletterSegmentsIds, $filterSegmentId);
|
|
} catch (InvalidStateException $exception) {
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
|
|
'paused task in sending queue due to problem finding subscribers: ' . $exception->getMessage(),
|
|
['task_id' => $task->getId()]
|
|
);
|
|
$task->setStatus(ScheduledTaskEntity::STATUS_PAUSED);
|
|
$this->scheduledTasksRepository->flush();
|
|
return;
|
|
}
|
|
$foundSubscribers = empty($foundSubscribersIds) ? [] : $this->subscribersRepository->findBy(['id' => $foundSubscribersIds, 'deletedAt' => null]);
|
|
} else {
|
|
// No segments = Welcome emails or some Automatic emails.
|
|
// Welcome emails or some Automatic emails use segments only for scheduling and store them as a newsletter option
|
|
$queryBuilder = $this->entityManager->createQueryBuilder();
|
|
|
|
$queryBuilder->select('s')
|
|
->from(SubscriberEntity::class, 's')
|
|
->where('s.id IN (:subscriberIds)')
|
|
->setParameter('subscriberIds', $subscribersToProcessIds)
|
|
->andWhere('s.deletedAt IS NULL');
|
|
|
|
if ($newsletter->isTransactional()) {
|
|
$queryBuilder->andWhere('s.status != :bouncedStatus')
|
|
->setParameter('bouncedStatus', SubscriberEntity::STATUS_BOUNCED);
|
|
} else {
|
|
$queryBuilder->andWhere('s.status = :subscribedStatus')
|
|
->setParameter('subscribedStatus', SubscriberEntity::STATUS_SUBSCRIBED);
|
|
}
|
|
|
|
$foundSubscribers = $queryBuilder->getQuery()->getResult();
|
|
$foundSubscribersIds = array_map(function(SubscriberEntity $subscriber) {
|
|
return $subscriber->getId();
|
|
}, $foundSubscribers);
|
|
}
|
|
|
|
// if some subscribers weren't found, remove them from the processing list
|
|
if (count($foundSubscribersIds) !== count($subscribersToProcessIds)) {
|
|
$subscribersToRemove = array_diff(
|
|
$subscribersToProcessIds,
|
|
$foundSubscribersIds
|
|
);
|
|
|
|
$this->scheduledTaskSubscribersRepository->deleteByScheduledTaskAndSubscriberIds($task, $subscribersToRemove);
|
|
$this->sendingQueuesRepository->updateCounts($queue);
|
|
|
|
// if there aren't any subscribers to process in the batch (e.g. all unsubscribed or were deleted), continue with the next batch
|
|
if (count($foundSubscribersIds) === 0) {
|
|
continue;
|
|
}
|
|
}
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
|
|
'before queue chunk processing',
|
|
['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId(), 'found_subscribers_count' => count($foundSubscribers)]
|
|
);
|
|
|
|
// reschedule bounce task to run sooner, if needed
|
|
$this->reScheduleBounceTask();
|
|
|
|
// Check task has not been paused before continue processing
|
|
// This is needed because the task can be paused in the middle of the batch processing,
|
|
// for example on API error ERROR_MESSAGE_BULK_EMAIL_FORBIDDEN
|
|
if ($task->getStatus() === ScheduledTaskEntity::STATUS_PAUSED) {
|
|
return;
|
|
}
|
|
|
|
if ($newsletter->getStatus() !== NewsletterEntity::STATUS_CORRUPT) {
|
|
$this->processQueue(
|
|
$task,
|
|
$newsletter,
|
|
$foundSubscribers,
|
|
$timer
|
|
);
|
|
if (!$newsletter->isTransactional()) {
|
|
$this->entityManager->wrapInTransaction(function() use ($foundSubscribersIds) {
|
|
$now = Carbon::now()->millisecond(0);
|
|
$this->subscribersRepository->bulkUpdateLastSendingAt($foundSubscribersIds, $now);
|
|
// We're nullifying this value so these subscribers' engagement score will be recalculated the next time the cron runs
|
|
$this->subscribersRepository->bulkUpdateEngagementScoreUpdatedAt($foundSubscribersIds, null);
|
|
});
|
|
}
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
|
|
'after queue chunk processing',
|
|
['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()]
|
|
);
|
|
// In case we finished end sending properly before enforcing sending and execution limits
|
|
// The limit enforcing throws and exception and the sending end wouldn't be processed properly (stats notification, newsletter marked as sent etc.)
|
|
if ($task->getStatus() === ScheduledTaskEntity::STATUS_COMPLETED) {
|
|
$this->endSending($task, $newsletter);
|
|
return;
|
|
}
|
|
$this->enforceSendingAndExecutionLimits($timer);
|
|
} else {
|
|
$this->sendingQueuesRepository->pause($queue);
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error(
|
|
'Can\'t send corrupt newsletter',
|
|
['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()]
|
|
);
|
|
return;
|
|
}
|
|
}
|
|
// At this point all batches were processed or there are no batches to process
|
|
// Also none of the checks above paused or invalidated the task
|
|
$this->endSending($task, $newsletter);
|
|
}
|
|
|
|
public function getBatchSize(): int {
|
|
return $this->throttlingHandler->getBatchSize();
|
|
}
|
|
|
|
/**
|
|
* @param SubscriberEntity[] $subscribers
|
|
*/
|
|
public function processQueue(ScheduledTaskEntity $task, NewsletterEntity $newsletter, array $subscribers, $timer) {
|
|
// determine if processing is done in bulk or individually
|
|
$processingMethod = $this->mailerTask->getProcessingMethod();
|
|
$preparedNewsletters = [];
|
|
$preparedSubscribers = [];
|
|
$preparedSubscribersIds = [];
|
|
$unsubscribeUrls = [];
|
|
$statistics = [];
|
|
$metas = [];
|
|
$oneClickUnsubscribeUrls = [];
|
|
$sendingQueueEntity = $task->getSendingQueue();
|
|
if (!$sendingQueueEntity) {
|
|
return;
|
|
}
|
|
|
|
$sendingQueueMeta = $sendingQueueEntity->getMeta() ?? [];
|
|
$campaignId = $sendingQueueMeta['campaignId'] ?? null;
|
|
|
|
foreach ($subscribers as $subscriber) {
|
|
// render shortcodes and replace subscriber data in tracked links
|
|
$preparedNewsletters[] =
|
|
$this->newsletterTask->prepareNewsletterForSending(
|
|
$newsletter,
|
|
$subscriber,
|
|
$sendingQueueEntity
|
|
);
|
|
// format subscriber name/address according to mailer settings
|
|
$preparedSubscribers[] = $this->mailerTask->prepareSubscriberForSending(
|
|
$subscriber
|
|
);
|
|
$preparedSubscribersIds[] = $subscriber->getId();
|
|
// create personalized instant unsubsribe link
|
|
$unsubscribeUrls[] = $this->links->getUnsubscribeUrl($sendingQueueEntity->getId(), $subscriber);
|
|
$oneClickUnsubscribeUrls[] = $this->links->getOneClickUnsubscribeUrl($sendingQueueEntity->getId(), $subscriber);
|
|
|
|
$metasForSubscriber = $this->mailerMetaInfo->getNewsletterMetaInfo($newsletter, $subscriber);
|
|
if ($campaignId) {
|
|
$metasForSubscriber['campaign_id'] = $campaignId;
|
|
}
|
|
$metas[] = $metasForSubscriber;
|
|
|
|
// keep track of values for statistics purposes
|
|
$statistics[] = [
|
|
'newsletter_id' => $newsletter->getId(),
|
|
'subscriber_id' => $subscriber->getId(),
|
|
'queue_id' => $sendingQueueEntity->getId(),
|
|
];
|
|
if ($processingMethod === 'individual') {
|
|
$this->sendNewsletter(
|
|
$task,
|
|
$preparedSubscribersIds[0],
|
|
$preparedNewsletters[0],
|
|
$preparedSubscribers[0],
|
|
$statistics[0],
|
|
$timer,
|
|
[
|
|
'unsubscribe_url' => $unsubscribeUrls[0],
|
|
'meta' => $metas[0],
|
|
'one_click_unsubscribe' => $oneClickUnsubscribeUrls[0],
|
|
]
|
|
);
|
|
$preparedNewsletters = [];
|
|
$preparedSubscribers = [];
|
|
$preparedSubscribersIds = [];
|
|
$unsubscribeUrls = [];
|
|
$oneClickUnsubscribeUrls = [];
|
|
$statistics = [];
|
|
$metas = [];
|
|
}
|
|
}
|
|
if ($processingMethod === 'bulk') {
|
|
$this->sendNewsletters(
|
|
$task,
|
|
$preparedSubscribersIds,
|
|
$preparedNewsletters,
|
|
$preparedSubscribers,
|
|
$statistics,
|
|
$timer,
|
|
[
|
|
'unsubscribe_url' => $unsubscribeUrls,
|
|
'meta' => $metas,
|
|
'one_click_unsubscribe' => $oneClickUnsubscribeUrls,
|
|
]
|
|
);
|
|
}
|
|
}
|
|
|
|
public function sendNewsletter(
|
|
ScheduledTaskEntity $task, $preparedSubscriberId, $preparedNewsletter,
|
|
$preparedSubscriber, $statistics, $timer, $extraParams = []
|
|
) {
|
|
// send newsletter
|
|
$sendResult = $this->mailerTask->send(
|
|
$preparedNewsletter,
|
|
$preparedSubscriber,
|
|
$extraParams
|
|
);
|
|
$this->processSendResult(
|
|
$task,
|
|
$sendResult,
|
|
[$preparedSubscriber],
|
|
[$preparedSubscriberId],
|
|
[$statistics],
|
|
$timer
|
|
);
|
|
}
|
|
|
|
public function sendNewsletters(
|
|
ScheduledTaskEntity $task, $preparedSubscribersIds, $preparedNewsletters,
|
|
$preparedSubscribers, $statistics, $timer, $extraParams = []
|
|
) {
|
|
// send newsletters
|
|
$sendResult = $this->mailerTask->sendBulk(
|
|
$preparedNewsletters,
|
|
$preparedSubscribers,
|
|
$extraParams
|
|
);
|
|
$this->processSendResult(
|
|
$task,
|
|
$sendResult,
|
|
$preparedSubscribers,
|
|
$preparedSubscribersIds,
|
|
$statistics,
|
|
$timer
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Checks whether some of segments was deleted or trashed
|
|
* @param int[] $segmentIds
|
|
*/
|
|
private function checkDeletedSegments(array $segmentIds): bool {
|
|
if (count($segmentIds) === 0) {
|
|
return true;
|
|
}
|
|
$segmentIds = array_unique($segmentIds);
|
|
$segments = $this->segmentsRepository->findBy(['id' => $segmentIds]);
|
|
// Some segment was deleted from DB
|
|
if (count($segmentIds) > count($segments)) {
|
|
return false;
|
|
}
|
|
foreach ($segments as $segment) {
|
|
if ($segment->getDeletedAt() !== null) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
private function processSendResult(
|
|
ScheduledTaskEntity $task,
|
|
$sendResult,
|
|
array $preparedSubscribers,
|
|
array $preparedSubscribersIds,
|
|
array $statistics,
|
|
$timer
|
|
) {
|
|
// log error message and schedule retry/pause sending
|
|
if ($sendResult['response'] === false) {
|
|
$error = $sendResult['error'];
|
|
$this->errorHandler->processError($error, $task, $preparedSubscribersIds, $preparedSubscribers);
|
|
} else {
|
|
$queue = $task->getSendingQueue();
|
|
if (!$queue) {
|
|
return;
|
|
}
|
|
try {
|
|
$this->scheduledTaskSubscribersRepository->updateProcessedSubscribers($task, $preparedSubscribersIds);
|
|
$this->sendingQueuesRepository->updateCounts($queue);
|
|
} catch (Throwable $e) {
|
|
MailerLog::processError(
|
|
'processed_list_update',
|
|
sprintf('QUEUE-%d-PROCESSED-LIST-UPDATE', $queue->getId()),
|
|
null,
|
|
true
|
|
);
|
|
}
|
|
}
|
|
|
|
// log statistics
|
|
$this->statisticsNewslettersRepository->createMultiple($statistics);
|
|
|
|
// update the sent count
|
|
$this->mailerTask->updateSentCount();
|
|
|
|
// enforce execution limits if queue is still being processed
|
|
if ($task->getStatus() !== ScheduledTaskEntity::STATUS_COMPLETED) {
|
|
$this->enforceSendingAndExecutionLimits($timer);
|
|
}
|
|
|
|
// trigger automation email sent hook for automation emails
|
|
if (
|
|
$task->getStatus() === ScheduledTaskEntity::STATUS_COMPLETED
|
|
&& isset($task->getMeta()['automation'])
|
|
) {
|
|
try {
|
|
$this->wp->doAction('mailpoet_automation_email_sent', $task->getMeta()['automation']);
|
|
} catch (Throwable $e) {
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error(
|
|
'Error while executing "mailpoet_automation_email_sent action" hook',
|
|
['task_id' => $task->getId(), 'error' => $e->getMessage()]
|
|
);
|
|
}
|
|
}
|
|
|
|
$this->throttlingHandler->processSuccess();
|
|
}
|
|
|
|
public function enforceSendingAndExecutionLimits($timer) {
|
|
// abort if execution limit is reached
|
|
$this->cronHelper->enforceExecutionLimit($timer);
|
|
// abort if sending limit has been reached
|
|
MailerLog::enforceExecutionRequirements();
|
|
}
|
|
|
|
private function reScheduleBounceTask() {
|
|
$bounceTasks = $this->scheduledTasksRepository->findFutureScheduledByType(Bounce::TASK_TYPE);
|
|
if (count($bounceTasks)) {
|
|
$bounceTask = reset($bounceTasks);
|
|
if (Carbon::now()->millisecond(0)->addHours(42)->lessThan($bounceTask->getScheduledAt())) {
|
|
$randomOffset = rand(-6 * 60 * 60, 6 * 60 * 60);
|
|
$bounceTask->setScheduledAt(Carbon::now()->millisecond(0)->addSeconds((36 * 60 * 60) + $randomOffset));
|
|
$this->scheduledTasksRepository->persist($bounceTask);
|
|
$this->scheduledTasksRepository->flush();
|
|
}
|
|
}
|
|
}
|
|
|
|
private function startProgress(ScheduledTaskEntity $task): void {
|
|
$task->setInProgress(true);
|
|
$this->scheduledTasksRepository->flush();
|
|
}
|
|
|
|
private function stopProgress(ScheduledTaskEntity $task): void {
|
|
// if task is not managed by entity manager, it's already deleted and detached
|
|
// it can be deleted in self::processSending method
|
|
if (!$this->entityManager->contains($task)) {
|
|
return;
|
|
}
|
|
$task->setInProgress(false);
|
|
$this->scheduledTasksRepository->flush();
|
|
}
|
|
|
|
private function isTimeout(ScheduledTaskEntity $task): bool {
|
|
$currentTime = Carbon::now()->millisecond(0);
|
|
$updatedAt = new Carbon($task->getUpdatedAt());
|
|
if ($updatedAt->diffInSeconds($currentTime, false) > $this->getExecutionLimit()) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
private function getExecutionLimit(): int {
|
|
return $this->cronHelper->getDaemonExecutionLimit() * 3;
|
|
}
|
|
|
|
private function deleteTaskIfNewsletterDoesNotExist(ScheduledTaskEntity $task) {
|
|
$queue = $task->getSendingQueue();
|
|
$newsletter = $queue ? $queue->getNewsletter() : null;
|
|
if ($newsletter !== null) {
|
|
return;
|
|
}
|
|
$this->deleteTask($task);
|
|
}
|
|
|
|
private function deleteTask(ScheduledTaskEntity $task) {
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
|
|
'delete task in sending queue',
|
|
['task_id' => $task->getId()]
|
|
);
|
|
|
|
$queue = $task->getSendingQueue();
|
|
if ($queue) {
|
|
$this->sendingQueuesRepository->remove($queue);
|
|
}
|
|
$this->scheduledTaskSubscribersRepository->deleteByScheduledTask($task);
|
|
$this->scheduledTasksRepository->remove($task);
|
|
$this->scheduledTasksRepository->flush();
|
|
}
|
|
|
|
private function endSending(ScheduledTaskEntity $task, NewsletterEntity $newsletter): void {
|
|
// We should handle all transitions into these states in the processSending method and end processing there or we throw an exception
|
|
// This might theoretically happen when multiple cron workers are running in parallel which we don't support and try to prevent
|
|
$unexpectedStates = [
|
|
ScheduledTaskEntity::STATUS_PAUSED,
|
|
ScheduledTaskEntity::STATUS_INVALID,
|
|
ScheduledTaskEntity::STATUS_SCHEDULED,
|
|
];
|
|
if (in_array($task->getStatus(), $unexpectedStates)) {
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->error(
|
|
'Sending task reached end of processing in sending queue worker in an unexpected state.',
|
|
['task_id' => $task->getId(), 'status' => $task->getStatus()]
|
|
);
|
|
return;
|
|
}
|
|
// The task is running but there is no one to send to.
|
|
// This may happen when we send to all but the execution is interrupted (e.g. by PHP time limit) and we don't update the task status
|
|
// or if we trigger sending to a newsletter without any subscriber (e.g. scheduled for long time but all were deleted)
|
|
// Lets set status to completed and update the queue counts
|
|
if ($task->getStatus() === null && $this->scheduledTaskSubscribersRepository->countUnprocessed($task) === 0) {
|
|
$task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED);
|
|
$queue = $task->getSendingQueue();
|
|
if ($queue) {
|
|
$this->sendingQueuesRepository->updateCounts($queue);
|
|
}
|
|
$this->scheduledTasksRepository->flush();
|
|
}
|
|
// Task is completed let's do all the stuff for the completed task
|
|
if ($task->getStatus() === ScheduledTaskEntity::STATUS_COMPLETED) {
|
|
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_NEWSLETTERS)->info(
|
|
'completed newsletter sending',
|
|
['newsletter_id' => $newsletter->getId(), 'task_id' => $task->getId()]
|
|
);
|
|
$this->newsletterTask->markNewsletterAsSent($newsletter);
|
|
$this->statsNotificationsScheduler->schedule($newsletter);
|
|
}
|
|
}
|
|
}
|