summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Pott2014-05-23 13:46:49 (GMT)
committerAlex Pott2014-05-23 13:46:49 (GMT)
commit082bf59e86c1471fc12a1031e7413197eb743e23 (patch)
treed513ab2a552efd8ae04db6f5a6e25f0127c93ac0
parentd8cdb35f5faf53b04fa7439ee8986cd17d63a47c (diff)
Issue #1524550 by superspring, joachim, David Hernández, marthinal, socketwench, subson | jfinkel: Fixed queue cron workers can't signal a broken queue.
-rw-r--r--core/lib/Drupal/Core/Cron.php15
-rw-r--r--core/lib/Drupal/Core/Queue/SuspendQueueException.php20
-rw-r--r--core/modules/system/lib/Drupal/system/Tests/System/CronQueueTest.php24
-rw-r--r--core/modules/system/system.api.php7
-rw-r--r--core/modules/system/tests/modules/cron_queue_test/cron_queue_test.module21
5 files changed, 84 insertions, 3 deletions
diff --git a/core/lib/Drupal/Core/Cron.php b/core/lib/Drupal/Core/Cron.php
index 046216e..b639f5c 100644
--- a/core/lib/Drupal/Core/Cron.php
+++ b/core/lib/Drupal/Core/Cron.php
@@ -14,6 +14,7 @@ use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Session\AccountProxyInterface;
use Drupal\Core\Session\AnonymousUserSession;
use Drupal\Core\Session\SessionManagerInterface;
+use Drupal\Core\Queue\SuspendQueueException;
/**
* The Drupal core Cron service.
@@ -167,9 +168,19 @@ class Cron implements CronInterface {
call_user_func_array($callback, array($item->data));
$queue->deleteItem($item);
}
+ catch (SuspendQueueException $e) {
+ // If the worker indicates there is a problem with the whole queue,
+ // release the item and skip to the next queue.
+ $queue->releaseItem($item);
+
+ watchdog_exception('cron', $e);
+
+ // Skip to the next queue.
+ continue 2;
+ }
catch (\Exception $e) {
- // In case of exception log it and leave the item in the queue
- // to be processed again later.
+ // In case of any other kind of exception, log it and leave the item
+ // in the queue to be processed again later.
watchdog_exception('cron', $e);
}
}
diff --git a/core/lib/Drupal/Core/Queue/SuspendQueueException.php b/core/lib/Drupal/Core/Queue/SuspendQueueException.php
new file mode 100644
index 0000000..50c3c13
--- /dev/null
+++ b/core/lib/Drupal/Core/Queue/SuspendQueueException.php
@@ -0,0 +1,20 @@
+<?php
+
+/**
+ * @file
+ * Definition of Drupal\Core\Queue\SuspendQueueException.
+ */
+
+namespace Drupal\Core\Queue;
+
+/**
+ * Exception class to throw to indicate that a cron queue should be skipped.
+ *
+ * An implementation of callback_queue_worker() may throw this class of
+ * exception to indicate that processing of the whole queue should be skipped.
+ * This should be thrown rather than a normal Exception if the problem
+ * encountered by the queue worker is such that it can be deduced that workers
+ * of subsequent items would encounter it too. For example, if a remote site
+ * that the queue worker depends on appears to be inaccessible.
+ */
+class SuspendQueueException extends \RuntimeException {}
diff --git a/core/modules/system/lib/Drupal/system/Tests/System/CronQueueTest.php b/core/modules/system/lib/Drupal/system/Tests/System/CronQueueTest.php
index 2bbd872..389e880 100644
--- a/core/modules/system/lib/Drupal/system/Tests/System/CronQueueTest.php
+++ b/core/modules/system/lib/Drupal/system/Tests/System/CronQueueTest.php
@@ -33,7 +33,7 @@ class CronQueueTest extends WebTestBase {
* Tests that exceptions thrown by workers are handled properly.
*/
public function testExceptions() {
-
+ // Get the queue to test the normal Exception.
$queue = $this->container->get('queue')->get('cron_queue_test_exception');
// Enqueue an item for processing.
@@ -45,5 +45,27 @@ class CronQueueTest extends WebTestBase {
// The item should be left in the queue.
$this->assertEqual($queue->numberOfItems(), 1, 'Failing item still in the queue after throwing an exception.');
+
+ // Get the queue to test the specific SuspendQueueException.
+ $queue = $this->container->get('queue')->get('cron_queue_test_broken_queue');
+
+ // Enqueue several item for processing.
+ $queue->createItem('process');
+ $queue->createItem('crash');
+ $queue->createItem('ignored');
+
+ // Run cron; the worker for this queue should process as far as the crashing
+ // item.
+ $this->cronRun();
+
+ // Only one item should have been processed.
+ $this->assertEqual($queue->numberOfItems(), 2, 'Failing queue stopped processing at the failing item.');
+
+ // Check the items remaining in the queue. The item that throws the
+ // exception gets released by cron, so we can claim it again to check it.
+ $item = $queue->claimItem();
+ $this->assertEqual($item->data, 'crash', 'Failing item remains in the queue.');
+ $item = $queue->claimItem();
+ $this->assertEqual($item->data, 'ignored', 'Item beyond the failing item remains in the queue.');
}
}
diff --git a/core/modules/system/system.api.php b/core/modules/system/system.api.php
index 7574ed9..c7c8a49 100644
--- a/core/modules/system/system.api.php
+++ b/core/modules/system/system.api.php
@@ -168,6 +168,13 @@ function hook_queue_info_alter(&$queues) {
* The worker callback may throw an exception to indicate there was a problem.
* The cron process will log the exception, and leave the item in the queue to
* be processed again later.
+ * @throws \Drupal\Core\Queue\SuspendQueueException
+ * More specifically, a SuspendQueueException should be thrown when the
+ * callback is aware that the problem will affect all subsequent workers of
+ * its queue. For example, a callback that makes HTTP requests may find that
+ * the remote server is not responding. The cron process will behave as with a
+ * normal Exception, and in addition will not attempt to process further items
+ * from the current item's queue during the current cron run.
*
* @see \Drupal\Core\Cron::run()
*/
diff --git a/core/modules/system/tests/modules/cron_queue_test/cron_queue_test.module b/core/modules/system/tests/modules/cron_queue_test/cron_queue_test.module
index b88892f..c46fdb9 100644
--- a/core/modules/system/tests/modules/cron_queue_test/cron_queue_test.module
+++ b/core/modules/system/tests/modules/cron_queue_test/cron_queue_test.module
@@ -9,9 +9,30 @@ function cron_queue_test_queue_info() {
'time' => 60,
),
);
+ $queues['cron_queue_test_broken_queue'] = array(
+ 'title' => t('Broken queue test'),
+ 'worker callback' => 'cron_queue_test_broken_queue',
+ // Only needed if this queue should be processed by cron.
+ 'cron' => array(
+ 'time' => 60,
+ ),
+ );
+
return $queues;
}
function cron_queue_test_exception($item) {
throw new Exception('That is not supposed to happen.');
}
+
+/**
+ * Implements callback_queue_worker().
+ *
+ * This queue is declared broken if the queue item data is 'crash'.
+ */
+function cron_queue_test_broken_queue($queue_item_data) {
+ if ($queue_item_data == 'crash') {
+ throw new \Drupal\Core\Queue\SuspendQueueException('The queue is broken.');
+ }
+ // Do nothing otherwise.
+}