diff --git a/core/includes/batch.queue.inc b/core/includes/batch.queue.inc deleted file mode 100644 index ed290ee70f00dce8a17523984cc07d3d1d9785c3..0000000000000000000000000000000000000000 --- a/core/includes/batch.queue.inc +++ /dev/null @@ -1,84 +0,0 @@ - $this->name))->fetchObject(); - if ($item) { - $item->data = unserialize($item->data); - return $item; - } - return FALSE; - } - - /** - * Retrieves all remaining items in the queue. - * - * This is specific to Batch API and is not part of the DrupalQueueInterface. - */ - public function getAllItems() { - $result = array(); - $items = db_query('SELECT data FROM {queue} q WHERE name = :name ORDER BY item_id ASC', array(':name' => $this->name))->fetchAll(); - foreach ($items as $item) { - $result[] = unserialize($item->data); - } - return $result; - } -} - -/** - * Defines a batch queue for non-progressive batches. - */ -class BatchMemoryQueue extends MemoryQueue { - - /** - * Overrides MemoryQueue::claimItem(). - * - * Unlike MemoryQueue::claimItem(), this method provides a default lease - * time of 0 (no expiration) instead of 30. This allows the item to be - * claimed repeatedly until it is deleted. - */ - public function claimItem($lease_time = 0) { - if (!empty($this->queue)) { - reset($this->queue); - return current($this->queue); - } - return FALSE; - } - - /** - * Retrieves all remaining items in the queue. - * - * This is specific to Batch API and is not part of the DrupalQueueInterface. - */ - public function getAllItems() { - $result = array(); - foreach ($this->queue as $item) { - $result[] = $item->data; - } - return $result; - } -} diff --git a/core/includes/common.inc b/core/includes/common.inc index 327ffbd27a53c4dff3cfffd775beb50923a57c6e..52a3a43d51b8f7adf0483fe695e2b23984396d8e 100644 --- a/core/includes/common.inc +++ b/core/includes/common.inc @@ -5264,7 +5264,7 @@ function drupal_cron_run() { // Make sure every queue exists. There is no harm in trying to recreate an // existing queue. foreach ($queues as $queue_name => $info) { - DrupalQueue::get($queue_name)->createQueue(); + queue($queue_name)->createQueue(); } // Register shutdown callback. drupal_register_shutdown_function('drupal_cron_cleanup'); @@ -5294,7 +5294,7 @@ function drupal_cron_run() { foreach ($queues as $queue_name => $info) { $function = $info['worker callback']; $end = time() + (isset($info['time']) ? $info['time'] : 15); - $queue = DrupalQueue::get($queue_name); + $queue = queue($queue_name); while (time() < $end && ($item = $queue->claimItem())) { $function($item->data); $queue->deleteItem($item); @@ -7863,3 +7863,97 @@ function drupal_get_filetransfer_info() { } return $info; } + +/** + * @defgroup queue Queue operations + * @{ + * Queue items to allow later processing. + * + * The queue system allows placing items in a queue and processing them later. + * The system tries to ensure that only one consumer can process an item. + * + * Before a queue can be used it needs to be created by + * Drupal\Core\Queue\QueueInterface::createQueue(). + * + * Items can be added to the queue by passing an arbitrary data object to + * Drupal\Core\Queue\QueueInterface::createItem(). + * + * To process an item, call Drupal\Core\Queue\QueueInterface::claimItem() and + * specify how long you want to have a lease for working on that item. + * When finished processing, the item needs to be deleted by calling + * Drupal\Core\Queue\QueueInterface::deleteItem(). If the consumer dies, the + * item will be made available again by the Drupal\Core\Queue\QueueInterface + * implementation once the lease expires. Another consumer will then be able to + * receive it when calling Drupal\Core\Queue\QueueInterface::claimItem(). + * Due to this, the processing code should be aware that an item might be handed + * over for processing more than once. + * + * The $item object used by the Drupal\Core\Queue\QueueInterface can contain + * arbitrary metadata depending on the implementation. Systems using the + * interface should only rely on the data property which will contain the + * information passed to Drupal\Core\Queue\QueueInterface::createItem(). + * The full queue item returned by Drupal\Core\Queue\QueueInterface::claimItem() + * needs to be passed to Drupal\Core\Queue\QueueInterface::deleteItem() once + * processing is completed. + * + * There are two kinds of queue backends available: reliable, which preserves + * the order of messages and guarantees that every item will be executed at + * least once. The non-reliable kind only does a best effort to preserve order + * in messages and to execute them at least once but there is a small chance + * that some items get lost. For example, some distributed back-ends like + * Amazon SQS will be managing jobs for a large set of producers and consumers + * where a strict FIFO ordering will likely not be preserved. Another example + * would be an in-memory queue backend which might lose items if it crashes. + * However, such a backend would be able to deal with significantly more writes + * than a reliable queue and for many tasks this is more important. See + * aggregator_cron() for an example of how to effectively utilize a + * non-reliable queue. Another example is doing Twitter statistics -- the small + * possibility of losing a few items is insignificant next to power of the + * queue being able to keep up with writes. As described in the processing + * section, regardless of the queue being reliable or not, the processing code + * should be aware that an item might be handed over for processing more than + * once (because the processing code might time out before it finishes). + */ + +/** + * Instantiates and statically caches the correct class for a queue. + * + * The following variables can be set by variable_set or $conf overrides: + * - queue_class_$name: the class to be used for the queue $name. + * - queue_default_class: the class to use when queue_class_$name is not + * defined. Defaults to Drupal\Core\Queue\System, a reliable backend using + * SQL. + * - queue_default_reliable_class: the class to use when queue_class_$name is + * not defined and the queue_default_class is not reliable. Defaults to + * Drupal\Core\Queue\System. + * + * @param string $name + * The name of the queue to work with. + * @param bool $reliable + * TRUE if the ordering of items and guaranteeing every item executes at + * least once is important, FALSE if scalability is the main concern. Defaults + * to FALSE. + * + * @return Drupal\Core\Queue\QueueInterface + * The queue object for a given name. + * + * @see Drupal\Core\Queue\QueueInterface + */ +function queue($name, $reliable = FALSE) { + static $queues; + if (!isset($queues[$name])) { + $class = variable_get('queue_class_' . $name, NULL); + if ($class && $reliable && in_array('Drupal\Core\Queue\ReliableQueueInterface', class_implements($class))) { + $class = variable_get('queue_default_reliable_class', 'Drupal\Core\Queue\System'); + } + elseif (!$class) { + $class = variable_get('queue_default_class', 'Drupal\Core\Queue\System'); + } + $queues[$name] = new $class($name); + } + return $queues[$name]; +} + +/** + * @} End of "defgroup queue". + */ diff --git a/core/includes/form.inc b/core/includes/form.inc index 329ab4d85e15d36a69d46b6568ff8e08dbe874d6..6ebbac4e4912a3be96b7c5833990a02daf1716ba 100644 --- a/core/includes/form.inc +++ b/core/includes/form.inc @@ -4641,8 +4641,9 @@ function &batch_get() { /** * Populates a job queue with the operations of a batch set. * - * Depending on whether the batch is progressive or not, the BatchQueue or - * BatchMemoryQueue handler classes will be used. + * Depending on whether the batch is progressive or not, the + * Drupal\Core\Queue\Batch or Drupal\Core\Queue\BatchMemory handler classes will + * be used. * * @param $batch * The batch array. @@ -4659,7 +4660,7 @@ function _batch_populate_queue(&$batch, $set_id) { $batch_set += array( 'queue' => array( 'name' => 'drupal_batch:' . $batch['id'] . ':' . $set_id, - 'class' => $batch['progressive'] ? 'BatchQueue' : 'BatchMemoryQueue', + 'class' => $batch['progressive'] ? 'Drupal\Core\Queue\Batch' : 'Drupal\Core\Queue\BatchMemory', ), ); @@ -4685,12 +4686,8 @@ function _batch_populate_queue(&$batch, $set_id) { function _batch_queue($batch_set) { static $queues; - // The class autoloader is not available when running update.php, so make - // sure the files are manually included. if (!isset($queues)) { $queues = array(); - require_once DRUPAL_ROOT . '/core/modules/system/system.queue.inc'; - require_once DRUPAL_ROOT . '/core/includes/batch.queue.inc'; } if (isset($batch_set['queue'])) { diff --git a/core/lib/Drupal/Core/Queue/Batch.php b/core/lib/Drupal/Core/Queue/Batch.php new file mode 100644 index 0000000000000000000000000000000000000000..a1de48aa68133b373dbd03db5fdbcbcc5c91ea69 --- /dev/null +++ b/core/lib/Drupal/Core/Queue/Batch.php @@ -0,0 +1,56 @@ + $this->name))->fetchObject(); + if ($item) { + $item->data = unserialize($item->data); + return $item; + } + return FALSE; + } + + /** + * Retrieves all remaining items in the queue. + * + * This is specific to Batch API and is not part of the + * Drupal\Core\Queue\QueueInterface. + * + * @return array + * An array of queue items. + */ + public function getAllItems() { + $result = array(); + $items = db_query('SELECT data FROM {queue} q WHERE name = :name ORDER BY item_id ASC', array(':name' => $this->name))->fetchAll(); + foreach ($items as $item) { + $result[] = unserialize($item->data); + } + return $result; + } +} diff --git a/core/lib/Drupal/Core/Queue/BatchMemory.php b/core/lib/Drupal/Core/Queue/BatchMemory.php new file mode 100644 index 0000000000000000000000000000000000000000..fcc49f5c7a9580eea75fb57b4b0394fe2af09c0a --- /dev/null +++ b/core/lib/Drupal/Core/Queue/BatchMemory.php @@ -0,0 +1,52 @@ +queue)) { + reset($this->queue); + return current($this->queue); + } + return FALSE; + } + + /** + * Retrieves all remaining items in the queue. + * + * This is specific to Batch API and is not part of the + * Drupal\Core\Queue\QueueInterface. + * + * @return array + * An array of queue items. + */ + public function getAllItems() { + $result = array(); + foreach ($this->queue as $item) { + $result[] = $item->data; + } + return $result; + } +} diff --git a/core/lib/Drupal/Core/Queue/Memory.php b/core/lib/Drupal/Core/Queue/Memory.php new file mode 100644 index 0000000000000000000000000000000000000000..d741bda570d5e13c4a76c3c035d8b6572bfaa94d --- /dev/null +++ b/core/lib/Drupal/Core/Queue/Memory.php @@ -0,0 +1,107 @@ +queue = array(); + $this->idSequence = 0; + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::createItem(). + */ + public function createItem($data) { + $item = new stdClass(); + $item->item_id = $this->idSequence++; + $item->data = $data; + $item->created = time(); + $item->expire = 0; + $this->queue[$item->item_id] = $item; + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::numberOfItems(). + */ + public function numberOfItems() { + return count($this->queue); + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::claimItem(). + */ + public function claimItem($lease_time = 30) { + foreach ($this->queue as $key => $item) { + if ($item->expire == 0) { + $item->expire = time() + $lease_time; + $this->queue[$key] = $item; + return $item; + } + } + return FALSE; + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::deleteItem(). + */ + public function deleteItem($item) { + unset($this->queue[$item->item_id]); + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::releaseItem(). + */ + public function releaseItem($item) { + if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) { + $this->queue[$item->item_id]->expire = 0; + return TRUE; + } + return FALSE; + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::createQueue(). + */ + public function createQueue() { + // Nothing needed here. + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::deleteQueue(). + */ + public function deleteQueue() { + $this->queue = array(); + $this->idSequence = 0; + } +} diff --git a/core/lib/Drupal/Core/Queue/QueueInterface.php b/core/lib/Drupal/Core/Queue/QueueInterface.php new file mode 100644 index 0000000000000000000000000000000000000000..90d4b00a243864dad0c03c4675068520fb79b829 --- /dev/null +++ b/core/lib/Drupal/Core/Queue/QueueInterface.php @@ -0,0 +1,114 @@ +name = $name; + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::createItem(). + */ + public function createItem($data) { + // During a Drupal 6.x to 8.x update, drupal_get_schema() does not contain + // the queue table yet, so we cannot rely on drupal_write_record(). + $query = db_insert('queue') + ->fields(array( + 'name' => $this->name, + 'data' => serialize($data), + // We cannot rely on REQUEST_TIME because many items might be created + // by a single request which takes longer than 1 second. + 'created' => time(), + )); + return (bool) $query->execute(); + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::numberOfItems(). + */ + public function numberOfItems() { + return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField(); + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::claimItem(). + */ + public function claimItem($lease_time = 30) { + // Claim an item by updating its expire fields. If claim is not successful + // another thread may have claimed the item in the meantime. Therefore loop + // until an item is successfully claimed or we are reasonably sure there + // are no unclaimed items left. + while (TRUE) { + $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject(); + if ($item) { + // Try to update the item. Only one thread can succeed in UPDATEing the + // same row. We cannot rely on REQUEST_TIME because items might be + // claimed by a single consumer which runs longer than 1 second. If we + // continue to use REQUEST_TIME instead of the current time(), we steal + // time from the lease, and will tend to reset items before the lease + // should really expire. + $update = db_update('queue') + ->fields(array( + 'expire' => time() + $lease_time, + )) + ->condition('item_id', $item->item_id) + ->condition('expire', 0); + // If there are affected rows, this update succeeded. + if ($update->execute()) { + $item->data = unserialize($item->data); + return $item; + } + } + else { + // No items currently available to claim. + return FALSE; + } + } + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::releaseItem(). + */ + public function releaseItem($item) { + $update = db_update('queue') + ->fields(array( + 'expire' => 0, + )) + ->condition('item_id', $item->item_id); + return $update->execute(); + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::deleteItem(). + */ + public function deleteItem($item) { + db_delete('queue') + ->condition('item_id', $item->item_id) + ->execute(); + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::createQueue(). + */ + public function createQueue() { + // All tasks are stored in a single database table (which is created when + // Drupal is first installed) so there is nothing we need to do to create + // a new queue. + } + + /** + * Implements Drupal\Core\Queue\QueueInterface::deleteQueue(). + */ + public function deleteQueue() { + db_delete('queue') + ->condition('name', $this->name) + ->execute(); + } +} diff --git a/core/modules/aggregator/aggregator.module b/core/modules/aggregator/aggregator.module index 7b65dfa83a1299ae7a359052f27c89b1b7adb4ca..b4ffa8c736306bcdfd34e146bcdaae28429dedeb 100644 --- a/core/modules/aggregator/aggregator.module +++ b/core/modules/aggregator/aggregator.module @@ -313,7 +313,7 @@ function aggregator_cron() { ':time' => REQUEST_TIME, ':never' => AGGREGATOR_CLEAR_NEVER )); - $queue = DrupalQueue::get('aggregator_feeds'); + $queue = queue('aggregator_feeds'); foreach ($result as $feed) { if ($queue->createItem($feed)) { // Add timestamp to avoid queueing item more than once. diff --git a/core/modules/simpletest/simpletest.info b/core/modules/simpletest/simpletest.info index d18fe6e5374603e9caa55a9ca9f9e51048bb7ed4..cf55121f3f33aa78e92fa060e487552243bbabd9 100644 --- a/core/modules/simpletest/simpletest.info +++ b/core/modules/simpletest/simpletest.info @@ -29,6 +29,7 @@ files[] = tests/module.test files[] = tests/pager.test files[] = tests/password.test files[] = tests/path.test +files[] = tests/queue.test files[] = tests/registry.test files[] = tests/schema.test files[] = tests/session.test diff --git a/core/modules/simpletest/tests/queue.test b/core/modules/simpletest/tests/queue.test new file mode 100644 index 0000000000000000000000000000000000000000..e8f4c2c42b1d508c7680d7fbe96d902691c3aec2 --- /dev/null +++ b/core/modules/simpletest/tests/queue.test @@ -0,0 +1,120 @@ + 'Queue functionality', + 'description' => 'Queues and dequeues a set of items to check the basic queue functionality.', + 'group' => 'Queue', + ); + } + + /** + * Tests the System queue. + */ + public function testSystemQueue() { + // Create two queues. + $queue1 = new System($this->randomName()); + $queue1->createQueue(); + $queue2 = new System($this->randomName()); + $queue2->createQueue(); + + $this->queueTest($queue1, $queue2); + } + + /** + * Tests the Memory queue. + */ + public function testMemoryQueue() { + // Create two queues. + $queue1 = new Memory($this->randomName()); + $queue1->createQueue(); + $queue2 = new Memory($this->randomName()); + $queue2->createQueue(); + + $this->queueTest($queue1, $queue2); + } + + /** + * Queues and dequeues a set of items to check the basic queue functionality. + * + * @param Drupal\Core\Queue\QueueInterface $queue1 + * An instantiated queue object. + * @param Drupal\Core\Queue\QueueInterface $queue2 + * An instantiated queue object. + */ + protected function queueTest($queue1, $queue2) { + // Create four items. + $data = array(); + for ($i = 0; $i < 4; $i++) { + $data[] = array($this->randomName() => $this->randomName()); + } + + // Queue items 1 and 2 in the queue1. + $queue1->createItem($data[0]); + $queue1->createItem($data[1]); + + // Retrieve two items from queue1. + $items = array(); + $new_items = array(); + + $items[] = $item = $queue1->claimItem(); + $new_items[] = $item->data; + + $items[] = $item = $queue1->claimItem(); + $new_items[] = $item->data; + + // First two dequeued items should match the first two items we queued. + $this->assertEqual($this->queueScore($data, $new_items), 2, t('Two items matched')); + + // Add two more items. + $queue1->createItem($data[2]); + $queue1->createItem($data[3]); + + $this->assertTrue($queue1->numberOfItems(), t('Queue 1 is not empty after adding items.')); + $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty while Queue 1 has items')); + + $items[] = $item = $queue1->claimItem(); + $new_items[] = $item->data; + + $items[] = $item = $queue1->claimItem(); + $new_items[] = $item->data; + + // All dequeued items should match the items we queued exactly once, + // therefore the score must be exactly 4. + $this->assertEqual($this->queueScore($data, $new_items), 4, t('Four items matched')); + + // There should be no duplicate items. + $this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched')); + + // Delete all items from queue1. + foreach ($items as $item) { + $queue1->deleteItem($item); + } + + // Check that both queues are empty. + $this->assertFalse($queue1->numberOfItems(), t('Queue 1 is empty')); + $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty')); + } + + /** + * Returns the number of equal items in two arrays. + */ + protected function queueScore($items, $new_items) { + $score = 0; + foreach ($items as $item) { + foreach ($new_items as $new_item) { + if ($item === $new_item) { + $score++; + } + } + } + return $score; + } +} diff --git a/core/modules/system/system.api.php b/core/modules/system/system.api.php index e1d5298eab42387bc31cb4d948721713af599fb5..785b0982acf3c4f5cab03c6688a8e390f1ccdd1a 100644 --- a/core/modules/system/system.api.php +++ b/core/modules/system/system.api.php @@ -138,7 +138,7 @@ function hook_cron() { ':time' => REQUEST_TIME, ':never' => AGGREGATOR_CLEAR_NEVER, )); - $queue = DrupalQueue::get('aggregator_feeds'); + $queue = queue('aggregator_feeds'); foreach ($result as $feed) { $queue->createItem($feed); } @@ -158,8 +158,8 @@ function hook_cron() { * An associative array where the key is the queue name and the value is * again an associative array. Possible keys are: * - 'worker callback': The name of the function to call. It will be called - * with one argument, the item created via DrupalQueue::createItem() in - * hook_cron(). + * with one argument, the item created via + * Drupal\Core\Queue\QueueInterface::createItem() in hook_cron(). * - 'time': (optional) How much time Drupal should spend on calling this * worker in seconds. Defaults to 15. * diff --git a/core/modules/system/system.info b/core/modules/system/system.info index d95d5611f10f4114361f9d1bd5e634ffdff88f73..0e8e3e10378f9af1493274643bc841744d29a090 100644 --- a/core/modules/system/system.info +++ b/core/modules/system/system.info @@ -5,7 +5,6 @@ version = VERSION core = 8.x files[] = system.archiver.inc files[] = system.mail.inc -files[] = system.queue.inc files[] = system.tar.inc files[] = system.test required = TRUE diff --git a/core/modules/system/system.queue.inc b/core/modules/system/system.queue.inc deleted file mode 100644 index c2a6b134214da39333ac93e9f6db28b71895e9ba..0000000000000000000000000000000000000000 --- a/core/modules/system/system.queue.inc +++ /dev/null @@ -1,371 +0,0 @@ -name = $name; - } - - public function createItem($data) { - // During a Drupal 6.x to 8.x update, drupal_get_schema() does not contain - // the queue table yet, so we cannot rely on drupal_write_record(). - $query = db_insert('queue') - ->fields(array( - 'name' => $this->name, - 'data' => serialize($data), - // We cannot rely on REQUEST_TIME because many items might be created - // by a single request which takes longer than 1 second. - 'created' => time(), - )); - return (bool) $query->execute(); - } - - public function numberOfItems() { - return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField(); - } - - public function claimItem($lease_time = 30) { - // Claim an item by updating its expire fields. If claim is not successful - // another thread may have claimed the item in the meantime. Therefore loop - // until an item is successfully claimed or we are reasonably sure there - // are no unclaimed items left. - while (TRUE) { - $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject(); - if ($item) { - // Try to update the item. Only one thread can succeed in UPDATEing the - // same row. We cannot rely on REQUEST_TIME because items might be - // claimed by a single consumer which runs longer than 1 second. If we - // continue to use REQUEST_TIME instead of the current time(), we steal - // time from the lease, and will tend to reset items before the lease - // should really expire. - $update = db_update('queue') - ->fields(array( - 'expire' => time() + $lease_time, - )) - ->condition('item_id', $item->item_id) - ->condition('expire', 0); - // If there are affected rows, this update succeeded. - if ($update->execute()) { - $item->data = unserialize($item->data); - return $item; - } - } - else { - // No items currently available to claim. - return FALSE; - } - } - } - - public function releaseItem($item) { - $update = db_update('queue') - ->fields(array( - 'expire' => 0, - )) - ->condition('item_id', $item->item_id); - return $update->execute(); - } - - public function deleteItem($item) { - db_delete('queue') - ->condition('item_id', $item->item_id) - ->execute(); - } - - public function createQueue() { - // All tasks are stored in a single database table (which is created when - // Drupal is first installed) so there is nothing we need to do to create - // a new queue. - } - - public function deleteQueue() { - db_delete('queue') - ->condition('name', $this->name) - ->execute(); - } -} - -/** - * Static queue implementation. - * - * This allows "undelayed" variants of processes relying on the Queue - * interface. The queue data resides in memory. It should only be used for - * items that will be queued and dequeued within a given page request. - */ -class MemoryQueue implements DrupalQueueInterface { - /** - * The queue data. - * - * @var array - */ - protected $queue; - - /** - * Counter for item ids. - * - * @var int - */ - protected $id_sequence; - - public function __construct($name) { - $this->queue = array(); - $this->id_sequence = 0; - } - - public function createItem($data) { - $item = new stdClass(); - $item->item_id = $this->id_sequence++; - $item->data = $data; - $item->created = time(); - $item->expire = 0; - $this->queue[$item->item_id] = $item; - } - - public function numberOfItems() { - return count($this->queue); - } - - public function claimItem($lease_time = 30) { - foreach ($this->queue as $key => $item) { - if ($item->expire == 0) { - $item->expire = time() + $lease_time; - $this->queue[$key] = $item; - return $item; - } - } - return FALSE; - } - - public function deleteItem($item) { - unset($this->queue[$item->item_id]); - } - - public function releaseItem($item) { - if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) { - $this->queue[$item->item_id]->expire = 0; - return TRUE; - } - return FALSE; - } - - public function createQueue() { - // Nothing needed here. - } - - public function deleteQueue() { - $this->queue = array(); - $this->id_sequence = 0; - } -} - -/** - * @} End of "defgroup queue". - */ diff --git a/core/modules/system/system.test b/core/modules/system/system.test index ecde8de7b0886df34a3b69f075813e7a9ffdef1a..3b958dd35505f963624b04720f4841bf71524a10 100644 --- a/core/modules/system/system.test +++ b/core/modules/system/system.test @@ -1811,98 +1811,6 @@ class SystemThemeFunctionalTest extends DrupalWebTestCase { } } - -/** - * Test the basic queue functionality. - */ -class QueueTestCase extends DrupalWebTestCase { - public static function getInfo() { - return array( - 'name' => 'Queue functionality', - 'description' => 'Queues and dequeues a set of items to check the basic queue functionality.', - 'group' => 'System', - ); - } - - /** - * Queues and dequeues a set of items to check the basic queue functionality. - */ - function testQueue() { - // Create two queues. - $queue1 = DrupalQueue::get($this->randomName()); - $queue1->createQueue(); - $queue2 = DrupalQueue::get($this->randomName()); - $queue2->createQueue(); - - // Create four items. - $data = array(); - for ($i = 0; $i < 4; $i++) { - $data[] = array($this->randomName() => $this->randomName()); - } - - // Queue items 1 and 2 in the queue1. - $queue1->createItem($data[0]); - $queue1->createItem($data[1]); - - // Retrieve two items from queue1. - $items = array(); - $new_items = array(); - - $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; - - $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; - - // First two dequeued items should match the first two items we queued. - $this->assertEqual($this->queueScore($data, $new_items), 2, t('Two items matched')); - - // Add two more items. - $queue1->createItem($data[2]); - $queue1->createItem($data[3]); - - $this->assertTrue($queue1->numberOfItems(), t('Queue 1 is not empty after adding items.')); - $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty while Queue 1 has items')); - - $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; - - $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; - - // All dequeued items should match the items we queued exactly once, - // therefore the score must be exactly 4. - $this->assertEqual($this->queueScore($data, $new_items), 4, t('Four items matched')); - - // There should be no duplicate items. - $this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched')); - - // Delete all items from queue1. - foreach ($items as $item) { - $queue1->deleteItem($item); - } - - // Check that both queues are empty. - $this->assertFalse($queue1->numberOfItems(), t('Queue 1 is empty')); - $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty')); - } - - /** - * This function returns the number of equal items in two arrays. - */ - function queueScore($items, $new_items) { - $score = 0; - foreach ($items as $item) { - foreach ($new_items as $new_item) { - if ($item === $new_item) { - $score++; - } - } - } - return $score; - } -} - /** * Test token replacement in strings. */ diff --git a/core/modules/update/update.fetch.inc b/core/modules/update/update.fetch.inc index 7ac0dbefbc2bb1555f87736823653fa9ec79b97d..ec3bfbc25a9f7fb7c55ce6a1ccde8f0eef9b9286 100644 --- a/core/modules/update/update.fetch.inc +++ b/core/modules/update/update.fetch.inc @@ -28,7 +28,7 @@ function update_manual_status() { * Process a step in the batch for fetching available update data. */ function update_fetch_data_batch(&$context) { - $queue = DrupalQueue::get('update_fetch_tasks'); + $queue = queue('update_fetch_tasks'); if (empty($context['sandbox']['max'])) { $context['finished'] = 0; $context['sandbox']['max'] = $queue->numberOfItems(); @@ -99,7 +99,7 @@ function update_fetch_data_finished($success, $results) { * Attempt to drain the queue of tasks for release history data to fetch. */ function _update_fetch_data() { - $queue = DrupalQueue::get('update_fetch_tasks'); + $queue = queue('update_fetch_tasks'); $end = time() + variable_get('update_max_fetch_time', UPDATE_MAX_FETCH_TIME); while (time() < $end && ($item = $queue->claimItem())) { _update_process_fetch_task($item->data); @@ -235,7 +235,7 @@ function _update_create_fetch_task($project) { } $cid = 'fetch_task::' . $project['name']; if (empty($fetch_tasks[$cid])) { - $queue = DrupalQueue::get('update_fetch_tasks'); + $queue = queue('update_fetch_tasks'); $queue->createItem($project); db_insert('cache_update') ->fields(array( diff --git a/core/modules/update/update.install b/core/modules/update/update.install index f0d549922a0d66d7995d535f9c6e0b3ab40eadcf..9ff39d126b05b33591c1d3aab6cfb750d40d1cfd 100644 --- a/core/modules/update/update.install +++ b/core/modules/update/update.install @@ -70,7 +70,7 @@ function update_schema() { * Implements hook_install(). */ function update_install() { - $queue = DrupalQueue::get('update_fetch_tasks', TRUE); + $queue = queue('update_fetch_tasks', TRUE); $queue->createQueue(); } @@ -91,7 +91,7 @@ function update_uninstall() { foreach ($variables as $variable) { variable_del($variable); } - $queue = DrupalQueue::get('update_fetch_tasks'); + $queue = queue('update_fetch_tasks'); $queue->deleteQueue(); }