oont-contents/plugins/jetpack/jetpack_vendor/automattic/jetpack-sync/src/class-rest-sender.php
2025-02-08 15:10:23 +01:00

144 lines
3.8 KiB
PHP

<?php
/**
* Sync package.
*
* @package automattic/jetpack-sync
*/
namespace Automattic\Jetpack\Sync;
use WP_Error;
/**
* This class will handle checkout of Sync queues for REST Endpoints.
*
* @since 1.23.1
*/
class REST_Sender {
/**
* Items pending send.
*
* @var array
*/
public $items = array();
/**
* Checkout objects from the queue
*
* @param string $queue_name Name of Queue.
* @param int $number_of_items Number of Items.
* @param array $args arguments.
*
* @return array|WP_Error
*/
public function queue_pull( $queue_name, $number_of_items, $args ) {
$queue = new Queue( $queue_name );
if ( 0 === $queue->size() ) {
return new WP_Error( 'queue_size', 'The queue is empty and there is nothing to send', 400 );
}
$sender = Sender::get_instance();
// try to give ourselves as much time as possible.
set_time_limit( 0 );
if ( ! empty( $args['pop'] ) ) {
$buffer = new Queue_Buffer( 'pop', $queue->pop( $number_of_items ) );
} else {
// let's delete the checkin state.
if ( $args['force'] ) {
$queue->unlock();
}
$buffer = $this->get_buffer( $queue, $number_of_items );
}
// Check that the $buffer is not checkout out already.
if ( is_wp_error( $buffer ) ) {
return new WP_Error( 'buffer_open', "We couldn't get the buffer it is currently checked out", 400 );
}
if ( ! is_object( $buffer ) ) {
return new WP_Error( 'buffer_non-object', 'Buffer is not an object', 400 );
}
$encode = isset( $args['encode'] ) ? $args['encode'] : true;
Settings::set_is_syncing( true );
list( $items_to_send, $skipped_items_ids ) = $sender->get_items_to_send( $buffer, $encode );
Settings::set_is_syncing( false );
return array(
'buffer_id' => $buffer->id,
'items' => $items_to_send,
'skipped_items' => $skipped_items_ids,
'codec' => $encode ? $sender->get_codec()->name() : null,
'sent_timestamp' => time(),
'queue_size' => $queue->size(),
);
}
/**
* Adds Sync items to local property.
*/
public function jetpack_sync_send_data_listener() {
foreach ( func_get_args()[0] as $key => $item ) {
$this->items[ $key ] = $item;
}
}
/**
* Check out a buffer of full sync actions.
*
* @return array Sync Actions to be returned to requestor
*/
public function immediate_full_sync_pull() {
// try to give ourselves as much time as possible.
set_time_limit( 0 );
$original_send_data_cb = array( 'Automattic\Jetpack\Sync\Actions', 'send_data' );
$temp_send_data_cb = array( $this, 'jetpack_sync_send_data_listener' );
Sender::get_instance()->set_enqueue_wait_time( 0 );
remove_filter( 'jetpack_sync_send_data', $original_send_data_cb );
add_filter( 'jetpack_sync_send_data', $temp_send_data_cb, 10, 6 );
Sender::get_instance()->do_full_sync();
remove_filter( 'jetpack_sync_send_data', $temp_send_data_cb );
add_filter( 'jetpack_sync_send_data', $original_send_data_cb, 10, 6 );
return array(
'items' => $this->items,
'codec' => Sender::get_instance()->get_codec()->name(),
'sent_timestamp' => time(),
'status' => Actions::get_sync_status(),
);
}
/**
* Checkout items out of the sync queue.
*
* @param Queue $queue Sync Queue.
* @param int $number_of_items Number of items to checkout.
*
* @return WP_Error
*/
protected function get_buffer( $queue, $number_of_items ) {
$start = time();
$max_duration = 5; // this will try to get the buffer.
$buffer = $queue->checkout( $number_of_items );
$duration = time() - $start;
while ( is_wp_error( $buffer ) && $duration < $max_duration ) {
sleep( 2 );
$duration = time() - $start;
$buffer = $queue->checkout( $number_of_items );
}
if ( false === $buffer ) {
return new WP_Error( 'queue_size', 'The queue is empty and there is nothing to send', 400 );
}
return $buffer;
}
}