name = $name; } public function createItem($data) { // During a Drupal 6.x to 8.x update, drupal_get_schema() does not contain // the queue table yet, so we cannot rely on drupal_write_record(). $query = db_insert('queue') ->fields(array( 'name' => $this->name, 'data' => serialize($data), // We cannot rely on REQUEST_TIME because many items might be created // by a single request which takes longer than 1 second. 'created' => time(), )); return (bool) $query->execute(); } public function numberOfItems() { return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField(); } public function claimItem($lease_time = 30) { // Claim an item by updating its expire fields. If claim is not successful // another thread may have claimed the item in the meantime. Therefore loop // until an item is successfully claimed or we are reasonably sure there // are no unclaimed items left. while (TRUE) { $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject(); if ($item) { // Try to update the item. Only one thread can succeed in UPDATEing the // same row. We cannot rely on REQUEST_TIME because items might be // claimed by a single consumer which runs longer than 1 second. If we // continue to use REQUEST_TIME instead of the current time(), we steal // time from the lease, and will tend to reset items before the lease // should really expire. $update = db_update('queue') ->fields(array( 'expire' => time() + $lease_time, )) ->condition('item_id', $item->item_id) ->condition('expire', 0); // If there are affected rows, this update succeeded. if ($update->execute()) { $item->data = unserialize($item->data); return $item; } } else { // No items currently available to claim. return FALSE; } } } public function releaseItem($item) { $update = db_update('queue') ->fields(array( 'expire' => 0, )) ->condition('item_id', $item->item_id); return $update->execute(); } public function deleteItem($item) { db_delete('queue') ->condition('item_id', $item->item_id) ->execute(); } public function createQueue() { // All tasks are stored in a single database table (which is created when // Drupal is first installed) so there is nothing we need to do to create // a new queue. } public function deleteQueue() { db_delete('queue') ->condition('name', $this->name) ->execute(); } } /** * Static queue implementation. * * This allows "undelayed" variants of processes relying on the Queue * interface. The queue data resides in memory. It should only be used for * items that will be queued and dequeued within a given page request. */ class MemoryQueue implements DrupalQueueInterface { /** * The queue data. * * @var array */ protected $queue; /** * Counter for item ids. * * @var int */ protected $id_sequence; public function __construct($name) { $this->queue = array(); $this->id_sequence = 0; } public function createItem($data) { $item = new stdClass(); $item->item_id = $this->id_sequence++; $item->data = $data; $item->created = time(); $item->expire = 0; $this->queue[$item->item_id] = $item; } public function numberOfItems() { return count($this->queue); } public function claimItem($lease_time = 30) { foreach ($this->queue as $key => $item) { if ($item->expire == 0) { $item->expire = time() + $lease_time; $this->queue[$key] = $item; return $item; } } return FALSE; } public function deleteItem($item) { unset($this->queue[$item->item_id]); } public function releaseItem($item) { if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) { $this->queue[$item->item_id]->expire = 0; return TRUE; } return FALSE; } public function createQueue() { // Nothing needed here. } public function deleteQueue() { $this->queue = array(); $this->id_sequence = 0; } } /** * @} End of "defgroup queue". */