summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcatch2012-03-12 03:07:39 (GMT)
committercatch2012-03-12 03:07:39 (GMT)
commit9e43704ae6f7b4c0bbc13000c4061af2098b2553 (patch)
tree33b1b463dabab0ed00cbb18ee845e9c4fbb76d5b
parentd4442020ddd03a29dc78316a65e10d711acd11e3 (diff)
Issue #1468244 by amateescu, aspilicious: Convert DrupalQueue system to PSR-0.
-rw-r--r--core/includes/batch.queue.inc84
-rw-r--r--core/includes/common.inc98
-rw-r--r--core/includes/form.inc11
-rw-r--r--core/lib/Drupal/Core/Queue/Batch.php56
-rw-r--r--core/lib/Drupal/Core/Queue/BatchMemory.php52
-rw-r--r--core/lib/Drupal/Core/Queue/Memory.php107
-rw-r--r--core/lib/Drupal/Core/Queue/QueueInterface.php114
-rw-r--r--core/lib/Drupal/Core/Queue/ReliableQueueInterface.php17
-rw-r--r--core/lib/Drupal/Core/Queue/System.php126
-rw-r--r--core/modules/aggregator/aggregator.module2
-rw-r--r--core/modules/simpletest/simpletest.info1
-rw-r--r--core/modules/simpletest/tests/queue.test120
-rw-r--r--core/modules/system/system.api.php6
-rw-r--r--core/modules/system/system.info1
-rw-r--r--core/modules/system/system.queue.inc371
-rw-r--r--core/modules/system/system.test92
-rw-r--r--core/modules/update/update.fetch.inc6
-rw-r--r--core/modules/update/update.install4
18 files changed, 702 insertions, 566 deletions
diff --git a/core/includes/batch.queue.inc b/core/includes/batch.queue.inc
deleted file mode 100644
index ed290ee..0000000
--- a/core/includes/batch.queue.inc
+++ /dev/null
@@ -1,84 +0,0 @@
-<?php
-
-/**
- * @file
- * Queue handlers used by the Batch API.
- *
- * These implementations:
- * - Ensure FIFO ordering.
- * - Allow an item to be repeatedly claimed until it is actually deleted (no
- * notion of lease time or 'expire' date), to allow multipass operations.
- */
-
-/**
- * Defines a batch queue.
- *
- * Stale items from failed batches are cleaned from the {queue} table on cron
- * using the 'created' date.
- */
-class BatchQueue extends SystemQueue {
-
- /**
- * Overrides SystemQueue::claimItem().
- *
- * Unlike SystemQueue::claimItem(), this method provides a default lease
- * time of 0 (no expiration) instead of 30. This allows the item to be
- * claimed repeatedly until it is deleted.
- */
- public function claimItem($lease_time = 0) {
- $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE name = :name ORDER BY item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
- if ($item) {
- $item->data = unserialize($item->data);
- return $item;
- }
- return FALSE;
- }
-
- /**
- * Retrieves all remaining items in the queue.
- *
- * This is specific to Batch API and is not part of the DrupalQueueInterface.
- */
- public function getAllItems() {
- $result = array();
- $items = db_query('SELECT data FROM {queue} q WHERE name = :name ORDER BY item_id ASC', array(':name' => $this->name))->fetchAll();
- foreach ($items as $item) {
- $result[] = unserialize($item->data);
- }
- return $result;
- }
-}
-
-/**
- * Defines a batch queue for non-progressive batches.
- */
-class BatchMemoryQueue extends MemoryQueue {
-
- /**
- * Overrides MemoryQueue::claimItem().
- *
- * Unlike MemoryQueue::claimItem(), this method provides a default lease
- * time of 0 (no expiration) instead of 30. This allows the item to be
- * claimed repeatedly until it is deleted.
- */
- public function claimItem($lease_time = 0) {
- if (!empty($this->queue)) {
- reset($this->queue);
- return current($this->queue);
- }
- return FALSE;
- }
-
- /**
- * Retrieves all remaining items in the queue.
- *
- * This is specific to Batch API and is not part of the DrupalQueueInterface.
- */
- public function getAllItems() {
- $result = array();
- foreach ($this->queue as $item) {
- $result[] = $item->data;
- }
- return $result;
- }
-}
diff --git a/core/includes/common.inc b/core/includes/common.inc
index 327ffbd..52a3a43 100644
--- a/core/includes/common.inc
+++ b/core/includes/common.inc
@@ -5264,7 +5264,7 @@ function drupal_cron_run() {
// Make sure every queue exists. There is no harm in trying to recreate an
// existing queue.
foreach ($queues as $queue_name => $info) {
- DrupalQueue::get($queue_name)->createQueue();
+ queue($queue_name)->createQueue();
}
// Register shutdown callback.
drupal_register_shutdown_function('drupal_cron_cleanup');
@@ -5294,7 +5294,7 @@ function drupal_cron_run() {
foreach ($queues as $queue_name => $info) {
$function = $info['worker callback'];
$end = time() + (isset($info['time']) ? $info['time'] : 15);
- $queue = DrupalQueue::get($queue_name);
+ $queue = queue($queue_name);
while (time() < $end && ($item = $queue->claimItem())) {
$function($item->data);
$queue->deleteItem($item);
@@ -7863,3 +7863,97 @@ function drupal_get_filetransfer_info() {
}
return $info;
}
+
+/**
+ * @defgroup queue Queue operations
+ * @{
+ * Queue items to allow later processing.
+ *
+ * The queue system allows placing items in a queue and processing them later.
+ * The system tries to ensure that only one consumer can process an item.
+ *
+ * Before a queue can be used it needs to be created by
+ * Drupal\Core\Queue\QueueInterface::createQueue().
+ *
+ * Items can be added to the queue by passing an arbitrary data object to
+ * Drupal\Core\Queue\QueueInterface::createItem().
+ *
+ * To process an item, call Drupal\Core\Queue\QueueInterface::claimItem() and
+ * specify how long you want to have a lease for working on that item.
+ * When finished processing, the item needs to be deleted by calling
+ * Drupal\Core\Queue\QueueInterface::deleteItem(). If the consumer dies, the
+ * item will be made available again by the Drupal\Core\Queue\QueueInterface
+ * implementation once the lease expires. Another consumer will then be able to
+ * receive it when calling Drupal\Core\Queue\QueueInterface::claimItem().
+ * Due to this, the processing code should be aware that an item might be handed
+ * over for processing more than once.
+ *
+ * The $item object used by the Drupal\Core\Queue\QueueInterface can contain
+ * arbitrary metadata depending on the implementation. Systems using the
+ * interface should only rely on the data property which will contain the
+ * information passed to Drupal\Core\Queue\QueueInterface::createItem().
+ * The full queue item returned by Drupal\Core\Queue\QueueInterface::claimItem()
+ * needs to be passed to Drupal\Core\Queue\QueueInterface::deleteItem() once
+ * processing is completed.
+ *
+ * There are two kinds of queue backends available: reliable, which preserves
+ * the order of messages and guarantees that every item will be executed at
+ * least once. The non-reliable kind only does a best effort to preserve order
+ * in messages and to execute them at least once but there is a small chance
+ * that some items get lost. For example, some distributed back-ends like
+ * Amazon SQS will be managing jobs for a large set of producers and consumers
+ * where a strict FIFO ordering will likely not be preserved. Another example
+ * would be an in-memory queue backend which might lose items if it crashes.
+ * However, such a backend would be able to deal with significantly more writes
+ * than a reliable queue and for many tasks this is more important. See
+ * aggregator_cron() for an example of how to effectively utilize a
+ * non-reliable queue. Another example is doing Twitter statistics -- the small
+ * possibility of losing a few items is insignificant next to power of the
+ * queue being able to keep up with writes. As described in the processing
+ * section, regardless of the queue being reliable or not, the processing code
+ * should be aware that an item might be handed over for processing more than
+ * once (because the processing code might time out before it finishes).
+ */
+
+/**
+ * Instantiates and statically caches the correct class for a queue.
+ *
+ * The following variables can be set by variable_set or $conf overrides:
+ * - queue_class_$name: the class to be used for the queue $name.
+ * - queue_default_class: the class to use when queue_class_$name is not
+ * defined. Defaults to Drupal\Core\Queue\System, a reliable backend using
+ * SQL.
+ * - queue_default_reliable_class: the class to use when queue_class_$name is
+ * not defined and the queue_default_class is not reliable. Defaults to
+ * Drupal\Core\Queue\System.
+ *
+ * @param string $name
+ * The name of the queue to work with.
+ * @param bool $reliable
+ * TRUE if the ordering of items and guaranteeing every item executes at
+ * least once is important, FALSE if scalability is the main concern. Defaults
+ * to FALSE.
+ *
+ * @return Drupal\Core\Queue\QueueInterface
+ * The queue object for a given name.
+ *
+ * @see Drupal\Core\Queue\QueueInterface
+ */
+function queue($name, $reliable = FALSE) {
+ static $queues;
+ if (!isset($queues[$name])) {
+ $class = variable_get('queue_class_' . $name, NULL);
+ if ($class && $reliable && in_array('Drupal\Core\Queue\ReliableQueueInterface', class_implements($class))) {
+ $class = variable_get('queue_default_reliable_class', 'Drupal\Core\Queue\System');
+ }
+ elseif (!$class) {
+ $class = variable_get('queue_default_class', 'Drupal\Core\Queue\System');
+ }
+ $queues[$name] = new $class($name);
+ }
+ return $queues[$name];
+}
+
+/**
+ * @} End of "defgroup queue".
+ */
diff --git a/core/includes/form.inc b/core/includes/form.inc
index 329ab4d..6ebbac4 100644
--- a/core/includes/form.inc
+++ b/core/includes/form.inc
@@ -4641,8 +4641,9 @@ function &batch_get() {
/**
* Populates a job queue with the operations of a batch set.
*
- * Depending on whether the batch is progressive or not, the BatchQueue or
- * BatchMemoryQueue handler classes will be used.
+ * Depending on whether the batch is progressive or not, the
+ * Drupal\Core\Queue\Batch or Drupal\Core\Queue\BatchMemory handler classes will
+ * be used.
*
* @param $batch
* The batch array.
@@ -4659,7 +4660,7 @@ function _batch_populate_queue(&$batch, $set_id) {
$batch_set += array(
'queue' => array(
'name' => 'drupal_batch:' . $batch['id'] . ':' . $set_id,
- 'class' => $batch['progressive'] ? 'BatchQueue' : 'BatchMemoryQueue',
+ 'class' => $batch['progressive'] ? 'Drupal\Core\Queue\Batch' : 'Drupal\Core\Queue\BatchMemory',
),
);
@@ -4685,12 +4686,8 @@ function _batch_populate_queue(&$batch, $set_id) {
function _batch_queue($batch_set) {
static $queues;
- // The class autoloader is not available when running update.php, so make
- // sure the files are manually included.
if (!isset($queues)) {
$queues = array();
- require_once DRUPAL_ROOT . '/core/modules/system/system.queue.inc';
- require_once DRUPAL_ROOT . '/core/includes/batch.queue.inc';
}
if (isset($batch_set['queue'])) {
diff --git a/core/lib/Drupal/Core/Queue/Batch.php b/core/lib/Drupal/Core/Queue/Batch.php
new file mode 100644
index 0000000..a1de48a
--- /dev/null
+++ b/core/lib/Drupal/Core/Queue/Batch.php
@@ -0,0 +1,56 @@
+<?php
+
+/**
+ * @file
+ * Definition of Drupal\Core\Queue\Batch.
+ */
+
+namespace Drupal\Core\Queue;
+
+/**
+ * Defines a batch queue handler used by the Batch API.
+ *
+ * This implementation:
+ * - Ensures FIFO ordering.
+ * - Allows an item to be repeatedly claimed until it is actually deleted (no
+ * notion of lease time or 'expire' date), to allow multipass operations.
+ *
+ * Stale items from failed batches are cleaned from the {queue} table on cron
+ * using the 'created' date.
+ */
+class Batch extends System {
+
+ /**
+ * Overrides Drupal\Core\Queue\System::claimItem().
+ *
+ * Unlike Drupal\Core\Queue\System::claimItem(), this method provides a
+ * default lease time of 0 (no expiration) instead of 30. This allows the item
+ * to be claimed repeatedly until it is deleted.
+ */
+ public function claimItem($lease_time = 0) {
+ $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE name = :name ORDER BY item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
+ if ($item) {
+ $item->data = unserialize($item->data);
+ return $item;
+ }
+ return FALSE;
+ }
+
+ /**
+ * Retrieves all remaining items in the queue.
+ *
+ * This is specific to Batch API and is not part of the
+ * Drupal\Core\Queue\QueueInterface.
+ *
+ * @return array
+ * An array of queue items.
+ */
+ public function getAllItems() {
+ $result = array();
+ $items = db_query('SELECT data FROM {queue} q WHERE name = :name ORDER BY item_id ASC', array(':name' => $this->name))->fetchAll();
+ foreach ($items as $item) {
+ $result[] = unserialize($item->data);
+ }
+ return $result;
+ }
+}
diff --git a/core/lib/Drupal/Core/Queue/BatchMemory.php b/core/lib/Drupal/Core/Queue/BatchMemory.php
new file mode 100644
index 0000000..fcc49f5
--- /dev/null
+++ b/core/lib/Drupal/Core/Queue/BatchMemory.php
@@ -0,0 +1,52 @@
+<?php
+
+/**
+ * @file
+ * Definition of Drupal\Core\Queue\BatchMemory.
+ */
+
+namespace Drupal\Core\Queue;
+
+/**
+ * Defines a batch queue handler used by the Batch API for non-progressive
+ * batches.
+ *
+ * This implementation:
+ * - Ensures FIFO ordering.
+ * - Allows an item to be repeatedly claimed until it is actually deleted (no
+ * notion of lease time or 'expire' date), to allow multipass operations.
+ */
+class BatchMemory extends Memory {
+
+ /**
+ * Overrides Drupal\Core\Queue\Memory::claimItem().
+ *
+ * Unlike Drupal\Core\Queue\Memory::claimItem(), this method provides a
+ * default lease time of 0 (no expiration) instead of 30. This allows the item
+ * to be claimed repeatedly until it is deleted.
+ */
+ public function claimItem($lease_time = 0) {
+ if (!empty($this->queue)) {
+ reset($this->queue);
+ return current($this->queue);
+ }
+ return FALSE;
+ }
+
+ /**
+ * Retrieves all remaining items in the queue.
+ *
+ * This is specific to Batch API and is not part of the
+ * Drupal\Core\Queue\QueueInterface.
+ *
+ * @return array
+ * An array of queue items.
+ */
+ public function getAllItems() {
+ $result = array();
+ foreach ($this->queue as $item) {
+ $result[] = $item->data;
+ }
+ return $result;
+ }
+}
diff --git a/core/lib/Drupal/Core/Queue/Memory.php b/core/lib/Drupal/Core/Queue/Memory.php
new file mode 100644
index 0000000..d741bda
--- /dev/null
+++ b/core/lib/Drupal/Core/Queue/Memory.php
@@ -0,0 +1,107 @@
+<?php
+
+/**
+ * @file
+ * Definition of Drupal\Core\Queue\Memory.
+ */
+
+namespace Drupal\Core\Queue;
+
+use stdClass;
+
+/**
+ * Static queue implementation.
+ *
+ * This allows "undelayed" variants of processes relying on the Queue
+ * interface. The queue data resides in memory. It should only be used for
+ * items that will be queued and dequeued within a given page request.
+ */
+class Memory implements QueueInterface {
+ /**
+ * The queue data.
+ *
+ * @var array
+ */
+ protected $queue;
+
+ /**
+ * Counter for item ids.
+ *
+ * @var int
+ */
+ protected $idSequence;
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::__construct().
+ */
+ public function __construct($name) {
+ $this->queue = array();
+ $this->idSequence = 0;
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::createItem().
+ */
+ public function createItem($data) {
+ $item = new stdClass();
+ $item->item_id = $this->idSequence++;
+ $item->data = $data;
+ $item->created = time();
+ $item->expire = 0;
+ $this->queue[$item->item_id] = $item;
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::numberOfItems().
+ */
+ public function numberOfItems() {
+ return count($this->queue);
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::claimItem().
+ */
+ public function claimItem($lease_time = 30) {
+ foreach ($this->queue as $key => $item) {
+ if ($item->expire == 0) {
+ $item->expire = time() + $lease_time;
+ $this->queue[$key] = $item;
+ return $item;
+ }
+ }
+ return FALSE;
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::deleteItem().
+ */
+ public function deleteItem($item) {
+ unset($this->queue[$item->item_id]);
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::releaseItem().
+ */
+ public function releaseItem($item) {
+ if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
+ $this->queue[$item->item_id]->expire = 0;
+ return TRUE;
+ }
+ return FALSE;
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::createQueue().
+ */
+ public function createQueue() {
+ // Nothing needed here.
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::deleteQueue().
+ */
+ public function deleteQueue() {
+ $this->queue = array();
+ $this->idSequence = 0;
+ }
+}
diff --git a/core/lib/Drupal/Core/Queue/QueueInterface.php b/core/lib/Drupal/Core/Queue/QueueInterface.php
new file mode 100644
index 0000000..90d4b00
--- /dev/null
+++ b/core/lib/Drupal/Core/Queue/QueueInterface.php
@@ -0,0 +1,114 @@
+<?php
+
+/**
+ * @file
+ * Definition of Drupal\Core\Queue\QueueInterface.
+ */
+
+namespace Drupal\Core\Queue;
+
+/**
+ * Interface for a queue.
+ *
+ * Classes implementing this interface will do a best effort to preserve order
+ * in messages and to execute them at least once.
+ */
+interface QueueInterface {
+ /**
+ * Start working with a queue.
+ *
+ * @param $name
+ * Arbitrary string. The name of the queue to work with.
+ */
+ public function __construct($name);
+
+ /**
+ * Adds a queue item and store it directly to the queue.
+ *
+ * @param $data
+ * Arbitrary data to be associated with the new task in the queue.
+ *
+ * @return
+ * TRUE if the item was successfully created and was (best effort) added
+ * to the queue, otherwise FALSE. We don't guarantee the item was
+ * committed to disk etc, but as far as we know, the item is now in the
+ * queue.
+ */
+ public function createItem($data);
+
+ /**
+ * Retrieves the number of items in the queue.
+ *
+ * This is intended to provide a "best guess" count of the number of items in
+ * the queue. Depending on the implementation and the setup, the accuracy of
+ * the results of this function may vary.
+ *
+ * e.g. On a busy system with a large number of consumers and items, the
+ * result might only be valid for a fraction of a second and not provide an
+ * accurate representation.
+ *
+ * @return
+ * An integer estimate of the number of items in the queue.
+ */
+ public function numberOfItems();
+
+ /**
+ * Claims an item in the queue for processing.
+ *
+ * @param $lease_time
+ * How long the processing is expected to take in seconds, defaults to an
+ * hour. After this lease expires, the item will be reset and another
+ * consumer can claim the item. For idempotent tasks (which can be run
+ * multiple times without side effects), shorter lease times would result
+ * in lower latency in case a consumer fails. For tasks that should not be
+ * run more than once (non-idempotent), a larger lease time will make it
+ * more rare for a given task to run multiple times in cases of failure,
+ * at the cost of higher latency.
+ *
+ * @return
+ * On success we return an item object. If the queue is unable to claim an
+ * item it returns false. This implies a best effort to retrieve an item
+ * and either the queue is empty or there is some other non-recoverable
+ * problem.
+ */
+ public function claimItem($lease_time = 3600);
+
+ /**
+ * Deletes a finished item from the queue.
+ *
+ * @param $item
+ * The item returned by Drupal\Core\Queue\QueueInterface::claimItem().
+ */
+ public function deleteItem($item);
+
+ /**
+ * Releases an item that the worker could not process.
+ *
+ * Another worker can come in and process it before the timeout expires.
+ *
+ * @param $item
+ * The item returned by Drupal\Core\Queue\QueueInterface::claimItem().
+ *
+ * @return boolean
+ * TRUE if the item has been released, FALSE otherwise.
+ */
+ public function releaseItem($item);
+
+ /**
+ * Creates a queue.
+ *
+ * Called during installation and should be used to perform any necessary
+ * initialization operations. This should not be confused with the
+ * constructor for these objects, which is called every time an object is
+ * instantiated to operate on a queue. This operation is only needed the
+ * first time a given queue is going to be initialized (for example, to make
+ * a new database table or directory to hold tasks for the queue -- it
+ * depends on the queue implementation if this is necessary at all).
+ */
+ public function createQueue();
+
+ /**
+ * Deletes a queue and every item in the queue.
+ */
+ public function deleteQueue();
+}
diff --git a/core/lib/Drupal/Core/Queue/ReliableQueueInterface.php b/core/lib/Drupal/Core/Queue/ReliableQueueInterface.php
new file mode 100644
index 0000000..7f7f8b2
--- /dev/null
+++ b/core/lib/Drupal/Core/Queue/ReliableQueueInterface.php
@@ -0,0 +1,17 @@
+<?php
+
+/**
+ * @file
+ * Definition of Drupal\Core\Queue\ReliableQueueInterface.
+ */
+
+namespace Drupal\Core\Queue;
+
+/**
+ * Reliable queue interface.
+ *
+ * Classes implementing this interface preserve the order of messages and
+ * guarantee that every item will be executed at least once.
+ */
+interface ReliableQueueInterface extends QueueInterface {
+}
diff --git a/core/lib/Drupal/Core/Queue/System.php b/core/lib/Drupal/Core/Queue/System.php
new file mode 100644
index 0000000..6deab47
--- /dev/null
+++ b/core/lib/Drupal/Core/Queue/System.php
@@ -0,0 +1,126 @@
+<?php
+
+/**
+ * @file
+ * Definition of Drupal\Core\Queue\System.
+ */
+
+namespace Drupal\Core\Queue;
+
+/**
+ * Default queue implementation.
+ */
+class System implements ReliableQueueInterface {
+ /**
+ * The name of the queue this instance is working with.
+ *
+ * @var string
+ */
+ protected $name;
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::__construct().
+ */
+ public function __construct($name) {
+ $this->name = $name;
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::createItem().
+ */
+ public function createItem($data) {
+ // During a Drupal 6.x to 8.x update, drupal_get_schema() does not contain
+ // the queue table yet, so we cannot rely on drupal_write_record().
+ $query = db_insert('queue')
+ ->fields(array(
+ 'name' => $this->name,
+ 'data' => serialize($data),
+ // We cannot rely on REQUEST_TIME because many items might be created
+ // by a single request which takes longer than 1 second.
+ 'created' => time(),
+ ));
+ return (bool) $query->execute();
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::numberOfItems().
+ */
+ public function numberOfItems() {
+ return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::claimItem().
+ */
+ public function claimItem($lease_time = 30) {
+ // Claim an item by updating its expire fields. If claim is not successful
+ // another thread may have claimed the item in the meantime. Therefore loop
+ // until an item is successfully claimed or we are reasonably sure there
+ // are no unclaimed items left.
+ while (TRUE) {
+ $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
+ if ($item) {
+ // Try to update the item. Only one thread can succeed in UPDATEing the
+ // same row. We cannot rely on REQUEST_TIME because items might be
+ // claimed by a single consumer which runs longer than 1 second. If we
+ // continue to use REQUEST_TIME instead of the current time(), we steal
+ // time from the lease, and will tend to reset items before the lease
+ // should really expire.
+ $update = db_update('queue')
+ ->fields(array(
+ 'expire' => time() + $lease_time,
+ ))
+ ->condition('item_id', $item->item_id)
+ ->condition('expire', 0);
+ // If there are affected rows, this update succeeded.
+ if ($update->execute()) {
+ $item->data = unserialize($item->data);
+ return $item;
+ }
+ }
+ else {
+ // No items currently available to claim.
+ return FALSE;
+ }
+ }
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::releaseItem().
+ */
+ public function releaseItem($item) {
+ $update = db_update('queue')
+ ->fields(array(
+ 'expire' => 0,
+ ))
+ ->condition('item_id', $item->item_id);
+ return $update->execute();
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::deleteItem().
+ */
+ public function deleteItem($item) {
+ db_delete('queue')
+ ->condition('item_id', $item->item_id)
+ ->execute();
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::createQueue().
+ */
+ public function createQueue() {
+ // All tasks are stored in a single database table (which is created when
+ // Drupal is first installed) so there is nothing we need to do to create
+ // a new queue.
+ }
+
+ /**
+ * Implements Drupal\Core\Queue\QueueInterface::deleteQueue().
+ */
+ public function deleteQueue() {
+ db_delete('queue')
+ ->condition('name', $this->name)
+ ->execute();
+ }
+}
diff --git a/core/modules/aggregator/aggregator.module b/core/modules/aggregator/aggregator.module
index 7b65dfa..b4ffa8c 100644
--- a/core/modules/aggregator/aggregator.module
+++ b/core/modules/aggregator/aggregator.module
@@ -313,7 +313,7 @@ function aggregator_cron() {
':time' => REQUEST_TIME,
':never' => AGGREGATOR_CLEAR_NEVER
));
- $queue = DrupalQueue::get('aggregator_feeds');
+ $queue = queue('aggregator_feeds');
foreach ($result as $feed) {
if ($queue->createItem($feed)) {
// Add timestamp to avoid queueing item more than once.
diff --git a/core/modules/simpletest/simpletest.info b/core/modules/simpletest/simpletest.info
index d18fe6e..cf55121 100644
--- a/core/modules/simpletest/simpletest.info
+++ b/core/modules/simpletest/simpletest.info
@@ -29,6 +29,7 @@ files[] = tests/module.test
files[] = tests/pager.test
files[] = tests/password.test
files[] = tests/path.test
+files[] = tests/queue.test
files[] = tests/registry.test
files[] = tests/schema.test
files[] = tests/session.test
diff --git a/core/modules/simpletest/tests/queue.test b/core/modules/simpletest/tests/queue.test
new file mode 100644
index 0000000..e8f4c2c
--- /dev/null
+++ b/core/modules/simpletest/tests/queue.test
@@ -0,0 +1,120 @@
+<?php
+
+use Drupal\Core\Queue\Memory;
+use Drupal\Core\Queue\System;
+
+/**
+ * Tests the basic queue functionality.
+ */
+class QueueTestCase extends DrupalWebTestCase {
+ public static function getInfo() {
+ return array(
+ 'name' => 'Queue functionality',
+ 'description' => 'Queues and dequeues a set of items to check the basic queue functionality.',
+ 'group' => 'Queue',
+ );
+ }
+
+ /**
+ * Tests the System queue.
+ */
+ public function testSystemQueue() {
+ // Create two queues.
+ $queue1 = new System($this->randomName());
+ $queue1->createQueue();
+ $queue2 = new System($this->randomName());
+ $queue2->createQueue();
+
+ $this->queueTest($queue1, $queue2);
+ }
+
+ /**
+ * Tests the Memory queue.
+ */
+ public function testMemoryQueue() {
+ // Create two queues.
+ $queue1 = new Memory($this->randomName());
+ $queue1->createQueue();
+ $queue2 = new Memory($this->randomName());
+ $queue2->createQueue();
+
+ $this->queueTest($queue1, $queue2);
+ }
+
+ /**
+ * Queues and dequeues a set of items to check the basic queue functionality.
+ *
+ * @param Drupal\Core\Queue\QueueInterface $queue1
+ * An instantiated queue object.
+ * @param Drupal\Core\Queue\QueueInterface $queue2
+ * An instantiated queue object.
+ */
+ protected function queueTest($queue1, $queue2) {
+ // Create four items.
+ $data = array();
+ for ($i = 0; $i < 4; $i++) {
+ $data[] = array($this->randomName() => $this->randomName());
+ }
+
+ // Queue items 1 and 2 in the queue1.
+ $queue1->createItem($data[0]);
+ $queue1->createItem($data[1]);
+
+ // Retrieve two items from queue1.
+ $items = array();
+ $new_items = array();
+
+ $items[] = $item = $queue1->claimItem();
+ $new_items[] = $item->data;
+
+ $items[] = $item = $queue1->claimItem();
+ $new_items[] = $item->data;
+
+ // First two dequeued items should match the first two items we queued.
+ $this->assertEqual($this->queueScore($data, $new_items), 2, t('Two items matched'));
+
+ // Add two more items.
+ $queue1->createItem($data[2]);
+ $queue1->createItem($data[3]);
+
+ $this->assertTrue($queue1->numberOfItems(), t('Queue 1 is not empty after adding items.'));
+ $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty while Queue 1 has items'));
+
+ $items[] = $item = $queue1->claimItem();
+ $new_items[] = $item->data;
+
+ $items[] = $item = $queue1->claimItem();
+ $new_items[] = $item->data;
+
+ // All dequeued items should match the items we queued exactly once,
+ // therefore the score must be exactly 4.
+ $this->assertEqual($this->queueScore($data, $new_items), 4, t('Four items matched'));
+
+ // There should be no duplicate items.
+ $this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched'));
+
+ // Delete all items from queue1.
+ foreach ($items as $item) {
+ $queue1->deleteItem($item);
+ }
+
+ // Check that both queues are empty.
+ $this->assertFalse($queue1->numberOfItems(), t('Queue 1 is empty'));
+ $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty'));
+ }
+
+ /**
+ * Returns the number of equal items in two arrays.
+ */
+ protected function queueScore($items, $new_items) {
+ $score = 0;
+ foreach ($items as $item) {
+ foreach ($new_items as $new_item) {
+ if ($item === $new_item) {
+ $score++;
+ }
+ }
+ }
+ return $score;
+ }
+}
diff --git a/core/modules/system/system.api.php b/core/modules/system/system.api.php
index e1d5298..785b098 100644
--- a/core/modules/system/system.api.php
+++ b/core/modules/system/system.api.php
@@ -138,7 +138,7 @@ function hook_cron() {
':time' => REQUEST_TIME,
':never' => AGGREGATOR_CLEAR_NEVER,
));
- $queue = DrupalQueue::get('aggregator_feeds');
+ $queue = queue('aggregator_feeds');
foreach ($result as $feed) {
$queue->createItem($feed);
}
@@ -158,8 +158,8 @@ function hook_cron() {
* An associative array where the key is the queue name and the value is
* again an associative array. Possible keys are:
* - 'worker callback': The name of the function to call. It will be called
- * with one argument, the item created via DrupalQueue::createItem() in
- * hook_cron().
+ * with one argument, the item created via
+ * Drupal\Core\Queue\QueueInterface::createItem() in hook_cron().
* - 'time': (optional) How much time Drupal should spend on calling this
* worker in seconds. Defaults to 15.
*
diff --git a/core/modules/system/system.info b/core/modules/system/system.info
index d95d561..0e8e3e1 100644
--- a/core/modules/system/system.info
+++ b/core/modules/system/system.info
@@ -5,7 +5,6 @@ version = VERSION
core = 8.x
files[] = system.archiver.inc
files[] = system.mail.inc
-files[] = system.queue.inc
files[] = system.tar.inc
files[] = system.test
required = TRUE
diff --git a/core/modules/system/system.queue.inc b/core/modules/system/system.queue.inc
deleted file mode 100644
index c2a6b13..0000000
--- a/core/modules/system/system.queue.inc
+++ /dev/null
@@ -1,371 +0,0 @@
-<?php
-
-/**
- * @file
- * Queue functionality.
- */
-
-/**
- * @defgroup queue Queue operations
- * @{
- * Queue items to allow later processing.
- *
- * The queue system allows placing items in a queue and processing them later.
- * The system tries to ensure that only one consumer can process an item.
- *
- * Before a queue can be used it needs to be created by
- * DrupalQueueInterface::createQueue().
- *
- * Items can be added to the queue by passing an arbitrary data object to
- * DrupalQueueInterface::createItem().
- *
- * To process an item, call DrupalQueueInterface::claimItem() and specify how
- * long you want to have a lease for working on that item. When finished
- * processing, the item needs to be deleted by calling
- * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
- * made available again by the DrupalQueueInterface implementation once the
- * lease expires. Another consumer will then be able to receive it when calling
- * DrupalQueueInterface::claimItem(). Due to this, the processing code should
- * be aware that an item might be handed over for processing more than once.
- *
- * The $item object used by the DrupalQueueInterface can contain arbitrary
- * metadata depending on the implementation. Systems using the interface should
- * only rely on the data property which will contain the information passed to
- * DrupalQueueInterface::createItem(). The full queue item returned by
- * DrupalQueueInterface::claimItem() needs to be passed to
- * DrupalQueueInterface::deleteItem() once processing is completed.
- *
- * There are two kinds of queue backends available: reliable, which preserves
- * the order of messages and guarantees that every item will be executed at
- * least once. The non-reliable kind only does a best effort to preserve order
- * in messages and to execute them at least once but there is a small chance
- * that some items get lost. For example, some distributed back-ends like
- * Amazon SQS will be managing jobs for a large set of producers and consumers
- * where a strict FIFO ordering will likely not be preserved. Another example
- * would be an in-memory queue backend which might lose items if it crashes.
- * However, such a backend would be able to deal with significantly more writes
- * than a reliable queue and for many tasks this is more important. See
- * aggregator_cron() for an example of how to effectively utilize a
- * non-reliable queue. Another example is doing Twitter statistics -- the small
- * possibility of losing a few items is insignificant next to power of the
- * queue being able to keep up with writes. As described in the processing
- * section, regardless of the queue being reliable or not, the processing code
- * should be aware that an item might be handed over for processing more than
- * once (because the processing code might time out before it finishes).
- */
-
-/**
- * Factory class for interacting with queues.
- */
-class DrupalQueue {
- /**
- * Returns the queue object for a given name.
- *
- * The following variables can be set by variable_set or $conf overrides:
- * - queue_class_$name: the class to be used for the queue $name.
- * - queue_default_class: the class to use when queue_class_$name is not
- * defined. Defaults to SystemQueue, a reliable backend using SQL.
- * - queue_default_reliable_class: the class to use when queue_class_$name is
- * not defined and the queue_default_class is not reliable. Defaults to
- * SystemQueue.
- *
- * @param $name
- * Arbitrary string. The name of the queue to work with.
- * @param $reliable
- * TRUE if the ordering of items and guaranteeing every item executes at
- * least once is important, FALSE if scalability is the main concern.
- *
- * @return
- * The queue object for a given name.
- */
- public static function get($name, $reliable = FALSE) {
- static $queues;
- if (!isset($queues[$name])) {
- $class = variable_get('queue_class_' . $name, NULL);
- if (!$class) {
- $class = variable_get('queue_default_class', 'SystemQueue');
- }
- $object = new $class($name);
- if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
- $class = variable_get('queue_default_reliable_class', 'SystemQueue');
- $object = new $class($name);
- }
- $queues[$name] = $object;
- }
- return $queues[$name];
- }
-}
-
-interface DrupalQueueInterface {
- /**
- * Start working with a queue.
- *
- * @param $name
- * Arbitrary string. The name of the queue to work with.
- */
- public function __construct($name);
-
- /**
- * Add a queue item and store it directly to the queue.
- *
- * @param $data
- * Arbitrary data to be associated with the new task in the queue.
- * @return
- * TRUE if the item was successfully created and was (best effort) added
- * to the queue, otherwise FALSE. We don't guarantee the item was
- * committed to disk etc, but as far as we know, the item is now in the
- * queue.
- */
- public function createItem($data);
-
- /**
- * Retrieve the number of items in the queue.
- *
- * This is intended to provide a "best guess" count of the number of items in
- * the queue. Depending on the implementation and the setup, the accuracy of
- * the results of this function may vary.
- *
- * e.g. On a busy system with a large number of consumers and items, the
- * result might only be valid for a fraction of a second and not provide an
- * accurate representation.
- *
- * @return
- * An integer estimate of the number of items in the queue.
- */
- public function numberOfItems();
-
- /**
- * Claim an item in the queue for processing.
- *
- * @param $lease_time
- * How long the processing is expected to take in seconds, defaults to an
- * hour. After this lease expires, the item will be reset and another
- * consumer can claim the item. For idempotent tasks (which can be run
- * multiple times without side effects), shorter lease times would result
- * in lower latency in case a consumer fails. For tasks that should not be
- * run more than once (non-idempotent), a larger lease time will make it
- * more rare for a given task to run multiple times in cases of failure,
- * at the cost of higher latency.
- * @return
- * On success we return an item object. If the queue is unable to claim an
- * item it returns false. This implies a best effort to retrieve an item
- * and either the queue is empty or there is some other non-recoverable
- * problem.
- */
- public function claimItem($lease_time = 3600);
-
- /**
- * Delete a finished item from the queue.
- *
- * @param $item
- * The item returned by DrupalQueueInterface::claimItem().
- */
- public function deleteItem($item);
-
- /**
- * Release an item that the worker could not process, so another
- * worker can come in and process it before the timeout expires.
- *
- * @param $item
- * @return boolean
- */
- public function releaseItem($item);
-
- /**
- * Create a queue.
- *
- * Called during installation and should be used to perform any necessary
- * initialization operations. This should not be confused with the
- * constructor for these objects, which is called every time an object is
- * instantiated to operate on a queue. This operation is only needed the
- * first time a given queue is going to be initialized (for example, to make
- * a new database table or directory to hold tasks for the queue -- it
- * depends on the queue implementation if this is necessary at all).
- */
- public function createQueue();
-
- /**
- * Delete a queue and every item in the queue.
- */
- public function deleteQueue();
-}
-
-/**
- * Reliable queue interface.
- *
- * Classes implementing this interface preserve the order of messages and
- * guarantee that every item will be executed at least once.
- */
-interface DrupalReliableQueueInterface extends DrupalQueueInterface {
-}
-
-/**
- * Default queue implementation.
- */
-class SystemQueue implements DrupalReliableQueueInterface {
- /**
- * The name of the queue this instance is working with.
- *
- * @var string
- */
- protected $name;
-
- public function __construct($name) {
- $this->name = $name;
- }
-
- public function createItem($data) {
- // During a Drupal 6.x to 8.x update, drupal_get_schema() does not contain
- // the queue table yet, so we cannot rely on drupal_write_record().
- $query = db_insert('queue')
- ->fields(array(
- 'name' => $this->name,
- 'data' => serialize($data),
- // We cannot rely on REQUEST_TIME because many items might be created
- // by a single request which takes longer than 1 second.
- 'created' => time(),
- ));
- return (bool) $query->execute();
- }
-
- public function numberOfItems() {
- return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
- }
-
- public function claimItem($lease_time = 30) {
- // Claim an item by updating its expire fields. If claim is not successful
- // another thread may have claimed the item in the meantime. Therefore loop
- // until an item is successfully claimed or we are reasonably sure there
- // are no unclaimed items left.
- while (TRUE) {
- $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
- if ($item) {
- // Try to update the item. Only one thread can succeed in UPDATEing the
- // same row. We cannot rely on REQUEST_TIME because items might be
- // claimed by a single consumer which runs longer than 1 second. If we
- // continue to use REQUEST_TIME instead of the current time(), we steal
- // time from the lease, and will tend to reset items before the lease
- // should really expire.
- $update = db_update('queue')
- ->fields(array(
- 'expire' => time() + $lease_time,
- ))
- ->condition('item_id', $item->item_id)
- ->condition('expire', 0);
- // If there are affected rows, this update succeeded.
- if ($update->execute()) {
- $item->data = unserialize($item->data);
- return $item;
- }
- }
- else {
- // No items currently available to claim.
- return FALSE;
- }
- }
- }
-
- public function releaseItem($item) {
- $update = db_update('queue')
- ->fields(array(
- 'expire' => 0,
- ))
- ->condition('item_id', $item->item_id);
- return $update->execute();
- }
-
- public function deleteItem($item) {
- db_delete('queue')
- ->condition('item_id', $item->item_id)
- ->execute();
- }
-
- public function createQueue() {
- // All tasks are stored in a single database table (which is created when
- // Drupal is first installed) so there is nothing we need to do to create
- // a new queue.
- }
-
- public function deleteQueue() {
- db_delete('queue')
- ->condition('name', $this->name)
- ->execute();
- }
-}
-
-/**
- * Static queue implementation.
- *
- * This allows "undelayed" variants of processes relying on the Queue
- * interface. The queue data resides in memory. It should only be used for
- * items that will be queued and dequeued within a given page request.
- */
-class MemoryQueue implements DrupalQueueInterface {
- /**
- * The queue data.
- *
- * @var array
- */
- protected $queue;
-
- /**
- * Counter for item ids.
- *
- * @var int
- */
- protected $id_sequence;
-
- public function __construct($name) {
- $this->queue = array();
- $this->id_sequence = 0;
- }
-
- public function createItem($data) {
- $item = new stdClass();
- $item->item_id = $this->id_sequence++;
- $item->data = $data;
- $item->created = time();
- $item->expire = 0;
- $this->queue[$item->item_id] = $item;
- }
-
- public function numberOfItems() {
- return count($this->queue);
- }
-
- public function claimItem($lease_time = 30) {
- foreach ($this->queue as $key => $item) {
- if ($item->expire == 0) {
- $item->expire = time() + $lease_time;
- $this->queue[$key] = $item;
- return $item;
- }
- }
- return FALSE;
- }
-
- public function deleteItem($item) {
- unset($this->queue[$item->item_id]);
- }
-
- public function releaseItem($item) {
- if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
- $this->queue[$item->item_id]->expire = 0;
- return TRUE;
- }
- return FALSE;
- }
-
- public function createQueue() {
- // Nothing needed here.
- }
-
- public function deleteQueue() {
- $this->queue = array();
- $this->id_sequence = 0;
- }
-}
-
-/**
- * @} End of "defgroup queue".
- */
diff --git a/core/modules/system/system.test b/core/modules/system/system.test
index ecde8de..3b958dd 100644
--- a/core/modules/system/system.test
+++ b/core/modules/system/system.test
@@ -1811,98 +1811,6 @@ class SystemThemeFunctionalTest extends DrupalWebTestCase {
}
}
-
-/**
- * Test the basic queue functionality.
- */
-class QueueTestCase extends DrupalWebTestCase {
- public static function getInfo() {
- return array(
- 'name' => 'Queue functionality',
- 'description' => 'Queues and dequeues a set of items to check the basic queue functionality.',
- 'group' => 'System',
- );
- }
-
- /**
- * Queues and dequeues a set of items to check the basic queue functionality.
- */
- function testQueue() {
- // Create two queues.
- $queue1 = DrupalQueue::get($this->randomName());
- $queue1->createQueue();
- $queue2 = DrupalQueue::get($this->randomName());
- $queue2->createQueue();
-
- // Create four items.
- $data = array();
- for ($i = 0; $i < 4; $i++) {
- $data[] = array($this->randomName() => $this->randomName());
- }
-
- // Queue items 1 and 2 in the queue1.
- $queue1->createItem($data[0]);
- $queue1->createItem($data[1]);
-
- // Retrieve two items from queue1.
- $items = array();
- $new_items = array();
-
- $items[] = $item = $queue1->claimItem();
- $new_items[] = $item->data;
-
- $items[] = $item = $queue1->claimItem();
- $new_items[] = $item->data;
-
- // First two dequeued items should match the first two items we queued.
- $this->assertEqual($this->queueScore($data, $new_items), 2, t('Two items matched'));
-
- // Add two more items.
- $queue1->createItem($data[2]);
- $queue1->createItem($data[3]);
-
- $this->assertTrue($queue1->numberOfItems(), t('Queue 1 is not empty after adding items.'));
- $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty while Queue 1 has items'));
-
- $items[] = $item = $queue1->claimItem();
- $new_items[] = $item->data;
-
- $items[] = $item = $queue1->claimItem();
- $new_items[] = $item->data;
-
- // All dequeued items should match the items we queued exactly once,
- // therefore the score must be exactly 4.
- $this->assertEqual($this->queueScore($data, $new_items), 4, t('Four items matched'));
-
- // There should be no duplicate items.
- $this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched'));
-
- // Delete all items from queue1.
- foreach ($items as $item) {
- $queue1->deleteItem($item);
- }
-
- // Check that both queues are empty.
- $this->assertFalse($queue1->numberOfItems(), t('Queue 1 is empty'));
- $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty'));
- }
-
- /**
- * This function returns the number of equal items in two arrays.
- */
- function queueScore($items, $new_items) {
- $score = 0;
- foreach ($items as $item) {
- foreach ($new_items as $new_item) {
- if ($item === $new_item) {
- $score++;
- }
- }
- }
- return $score;
- }
-}
-
/**
* Test token replacement in strings.
*/
diff --git a/core/modules/update/update.fetch.inc b/core/modules/update/update.fetch.inc
index 7ac0dbe..ec3bfbc 100644
--- a/core/modules/update/update.fetch.inc
+++ b/core/modules/update/update.fetch.inc
@@ -28,7 +28,7 @@ function update_manual_status() {
* Process a step in the batch for fetching available update data.
*/
function update_fetch_data_batch(&$context) {
- $queue = DrupalQueue::get('update_fetch_tasks');
+ $queue = queue('update_fetch_tasks');
if (empty($context['sandbox']['max'])) {
$context['finished'] = 0;
$context['sandbox']['max'] = $queue->numberOfItems();
@@ -99,7 +99,7 @@ function update_fetch_data_finished($success, $results) {
* Attempt to drain the queue of tasks for release history data to fetch.
*/
function _update_fetch_data() {
- $queue = DrupalQueue::get('update_fetch_tasks');
+ $queue = queue('update_fetch_tasks');
$end = time() + variable_get('update_max_fetch_time', UPDATE_MAX_FETCH_TIME);
while (time() < $end && ($item = $queue->claimItem())) {
_update_process_fetch_task($item->data);
@@ -235,7 +235,7 @@ function _update_create_fetch_task($project) {
}
$cid = 'fetch_task::' . $project['name'];
if (empty($fetch_tasks[$cid])) {
- $queue = DrupalQueue::get('update_fetch_tasks');
+ $queue = queue('update_fetch_tasks');
$queue->createItem($project);
db_insert('cache_update')
->fields(array(
diff --git a/core/modules/update/update.install b/core/modules/update/update.install
index f0d5499..9ff39d1 100644
--- a/core/modules/update/update.install
+++ b/core/modules/update/update.install
@@ -70,7 +70,7 @@ function update_schema() {
* Implements hook_install().
*/
function update_install() {
- $queue = DrupalQueue::get('update_fetch_tasks', TRUE);
+ $queue = queue('update_fetch_tasks', TRUE);
$queue->createQueue();
}
@@ -91,7 +91,7 @@ function update_uninstall() {
foreach ($variables as $variable) {
variable_del($variable);
}
- $queue = DrupalQueue::get('update_fetch_tasks');
+ $queue = queue('update_fetch_tasks');
$queue->deleteQueue();
}