oont-contents/plugins/mailpoet/lib/Newsletter/Sending/ScheduledTaskSubscribersRepository.php
2025-02-08 15:10:23 +01:00

229 lines
9.2 KiB
PHP

<?php declare(strict_types = 1);
namespace MailPoet\Newsletter\Sending;
if (!defined('ABSPATH')) exit;
use MailPoet\Doctrine\Repository;
use MailPoet\Entities\ScheduledTaskEntity;
use MailPoet\Entities\ScheduledTaskSubscriberEntity;
use MailPoet\Entities\SubscriberEntity;
use MailPoet\InvalidStateException;
use MailPoetVendor\Carbon\Carbon;
use MailPoetVendor\Doctrine\DBAL\ArrayParameterType;
use MailPoetVendor\Doctrine\ORM\QueryBuilder;
/**
* @extends Repository<ScheduledTaskSubscriberEntity>
*/
class ScheduledTaskSubscribersRepository extends Repository {
protected function getEntityClassName() {
return ScheduledTaskSubscriberEntity::class;
}
public function isSubscriberProcessed(ScheduledTaskEntity $task, SubscriberEntity $subscriber): bool {
$scheduledTaskSubscriber = $this
->doctrineRepository
->createQueryBuilder('sts')
->andWhere('sts.processed = 1')
->andWhere('sts.task = :task')
->andWhere('sts.subscriber = :subscriber')
->setParameter('subscriber', $subscriber)
->setParameter('task', $task)
->getQuery()
->getOneOrNullResult();
return !empty($scheduledTaskSubscriber);
}
public function createOrUpdate(array $data): ?ScheduledTaskSubscriberEntity {
if (!isset($data['task_id'], $data['subscriber_id'])) {
return null;
}
$taskSubscriber = $this->findOneBy(['task' => $data['task_id'], 'subscriber' => $data['subscriber_id']]);
if (!$taskSubscriber) {
$task = $this->entityManager->getReference(ScheduledTaskEntity::class, (int)$data['task_id']);
$subscriber = $this->entityManager->getReference(SubscriberEntity::class, (int)$data['subscriber_id']);
if (!$task || !$subscriber) throw new InvalidStateException();
$taskSubscriber = new ScheduledTaskSubscriberEntity($task, $subscriber);
$this->persist($taskSubscriber);
}
$processed = $data['processed'] ?? ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED;
$failed = $data['failed'] ?? ScheduledTaskSubscriberEntity::FAIL_STATUS_OK;
$taskSubscriber->setProcessed($processed);
$taskSubscriber->setFailed($failed);
$this->flush();
return $taskSubscriber;
}
public function countSubscriberIdsBatchForTask(int $taskId, int $lastProcessedSubscriberId): int {
$queryBuilder = $this->getBaseSubscribersIdsBatchForTaskQuery($taskId, $lastProcessedSubscriberId);
$countSubscribers = $queryBuilder
->select('count(sts.subscriber)')
->getQuery()
->getSingleScalarResult();
return intval($countSubscribers);
}
public function getSubscriberIdsBatchForTask(int $taskId, int $lastProcessedSubscriberId, int $limit): array {
$queryBuilder = $this->getBaseSubscribersIdsBatchForTaskQuery($taskId, $lastProcessedSubscriberId);
$subscribersIds = $queryBuilder
->select('IDENTITY(sts.subscriber) AS subscriber_id')
->orderBy('sts.subscriber', 'asc')
->setMaxResults($limit)
->getQuery()
->getSingleColumnResult();
return $subscribersIds;
}
/**
* @param int[] $subscriberIds
*/
public function updateProcessedSubscribers(ScheduledTaskEntity $task, array $subscriberIds): void {
if ($subscriberIds) {
$this->entityManager->createQueryBuilder()
->update(ScheduledTaskSubscriberEntity::class, 'sts')
->set('sts.processed', ScheduledTaskSubscriberEntity::STATUS_PROCESSED)
->where('sts.subscriber IN (:subscriberIds)')
->andWhere('sts.task = :task')
->setParameter('subscriberIds', $subscriberIds, ArrayParameterType::INTEGER)
->setParameter('task', $task)
->getQuery()
->execute();
// update was done via DQL, make sure the entities are also refreshed in the entity manager
$this->refreshAll(function (ScheduledTaskSubscriberEntity $entity) use ($task, $subscriberIds) {
return $entity->getTask() === $task && in_array($entity->getSubscriberId(), $subscriberIds, true);
});
}
$this->checkCompleted($task);
}
public function createSubscribersForBounceWorker(ScheduledTaskEntity $scheduledTaskEntity): void {
$scheduledTaskSubscribersTable = $this->entityManager->getClassMetadata(ScheduledTaskSubscriberEntity::class)->getTableName();
$subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName();
$stmt = $this->entityManager->getConnection()->prepare("
INSERT IGNORE INTO " . $scheduledTaskSubscribersTable . "
(task_id, subscriber_id, processed)
SELECT :taskId AS task_id, s.`id` AS subscriber_id, :unprocessed AS processed
FROM " . $subscribersTable . " s
WHERE s.`deleted_at` IS NULL
AND s.`status` IN (:subscribed, :unconfirmed)
");
$stmt->bindValue('taskId', $scheduledTaskEntity->getId());
$stmt->bindValue('unprocessed', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED);
$stmt->bindValue('subscribed', SubscriberEntity::STATUS_SUBSCRIBED);
$stmt->bindValue('unconfirmed', SubscriberEntity::STATUS_UNCONFIRMED);
$stmt->executeQuery();
}
/** @param int[] $ids */
public function deleteByTaskIds(array $ids): void {
$this->entityManager->createQueryBuilder()
->delete(ScheduledTaskSubscriberEntity::class, 'sts')
->where('sts.task IN (:taskIds)')
->setParameter('taskIds', $ids)
->getQuery()
->execute();
// delete was done via DQL, make sure the entities are also detached from the entity manager
$this->detachAll(function (ScheduledTaskSubscriberEntity $entity) use ($ids) {
$task = $entity->getTask();
return $task && in_array($task->getId(), $ids, true);
});
}
public function deleteByScheduledTask(ScheduledTaskEntity $scheduledTask): void {
$this->entityManager->createQueryBuilder()
->delete(ScheduledTaskSubscriberEntity::class, 'sts')
->where('sts.task = :task')
->setParameter('task', $scheduledTask)
->getQuery()
->execute();
// delete was done via DQL, make sure the entities are also detached from the entity manager
$this->detachAll(function (ScheduledTaskSubscriberEntity $entity) use ($scheduledTask) {
return $entity->getTask() === $scheduledTask;
});
}
public function deleteByScheduledTaskAndSubscriberIds(ScheduledTaskEntity $scheduledTask, array $subscriberIds): void {
$this->entityManager->createQueryBuilder()
->delete(ScheduledTaskSubscriberEntity::class, 'sts')
->where('sts.task = :task')
->andWhere('sts.subscriber IN (:subscriberIds)')
->setParameter('task', $scheduledTask)
->setParameter('subscriberIds', $subscriberIds, ArrayParameterType::INTEGER)
->getQuery()
->execute();
// delete was done via DQL, make sure the entities are also detached from the entity manager
$this->detachAll(function (ScheduledTaskSubscriberEntity $entity) use ($scheduledTask, $subscriberIds) {
return $entity->getTask() === $scheduledTask && in_array($entity->getSubscriberId(), $subscriberIds, true);
});
$this->checkCompleted($scheduledTask);
}
public function setSubscribers(ScheduledTaskEntity $task, array $subscriberIds): void {
$this->deleteByScheduledTask($task);
foreach ($subscriberIds as $subscriberId) {
$this->createOrUpdate([
'task_id' => $task->getId(),
'subscriber_id' => $subscriberId,
]);
}
}
public function saveError(ScheduledTaskEntity $scheduledTask, int $subscriberId, string $errorMessage): void {
$scheduledTaskSubscriber = $this->findOneBy(['task' => $scheduledTask, 'subscriber' => $subscriberId]);
if ($scheduledTaskSubscriber instanceof ScheduledTaskSubscriberEntity) {
$scheduledTaskSubscriber->setFailed(ScheduledTaskSubscriberEntity::FAIL_STATUS_FAILED);
$scheduledTaskSubscriber->setProcessed(ScheduledTaskSubscriberEntity::STATUS_PROCESSED);
$scheduledTaskSubscriber->setError($errorMessage);
$this->persist($scheduledTaskSubscriber);
$this->flush();
$this->checkCompleted($scheduledTask);
}
}
public function countProcessed(ScheduledTaskEntity $scheduledTaskEntity): int {
return $this->countBy(['task' => $scheduledTaskEntity, 'processed' => ScheduledTaskSubscriberEntity::STATUS_PROCESSED]);
}
public function countUnprocessed(ScheduledTaskEntity $scheduledTaskEntity): int {
return $this->countBy(['task' => $scheduledTaskEntity, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED]);
}
private function checkCompleted(ScheduledTaskEntity $task): void {
$count = $this->countUnprocessed($task);
if ($count === 0) {
$task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED);
$task->setProcessedAt(Carbon::now()->millisecond(0));
$this->entityManager->flush();
}
}
private function getBaseSubscribersIdsBatchForTaskQuery(int $taskId, int $lastProcessedSubscriberId): QueryBuilder {
return $this->entityManager
->createQueryBuilder()
->from(ScheduledTaskSubscriberEntity::class, 'sts')
->andWhere('sts.task = :taskId')
->andWhere('sts.subscriber > :lastProcessedSubscriberId')
->andWhere('sts.processed = :status')
->setParameter('taskId', $taskId)
->setParameter('lastProcessedSubscriberId', $lastProcessedSubscriberId)
->setParameter('status', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED);
}
}