summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Pott2016-12-08 22:03:31 +0000
committerAlex Pott2016-12-08 22:03:31 +0000
commitbf02c1b16c60ca76c1d57b37da31d3e11204907e (patch)
tree84669ad3c5b761edab8451f0e7cd1d3701684121
parent37c07a737bfd935d0cd51e21acb9061accd55270 (diff)
Issue #2309695 by quietone, alexpott, mikeryan, benjy: Add query batching to SqlBase
-rw-r--r--core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php13
-rw-r--r--core/modules/migrate/src/Plugin/migrate/source/SqlBase.php163
-rw-r--r--core/modules/migrate/tests/modules/migrate_query_batch_test/migrate_query_batch_test.info.yml7
-rw-r--r--core/modules/migrate/tests/modules/migrate_query_batch_test/src/Plugin/migrate/source/QueryBatchTest.php45
-rw-r--r--core/modules/migrate/tests/src/Kernel/QueryBatchTest.php261
5 files changed, 437 insertions, 52 deletions
diff --git a/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php b/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php
index 6eebdea..5ce5fd3 100644
--- a/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php
+++ b/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php
@@ -310,13 +310,13 @@ abstract class SourcePluginBase extends PluginBase implements MigrateSourceInter
while (!isset($this->currentRow) && $this->getIterator()->valid()) {
$row_data = $this->getIterator()->current() + $this->configuration;
- $this->getIterator()->next();
+ $this->fetchNextRow();
$row = new Row($row_data, $this->migration->getSourcePlugin()->getIds(), $this->migration->getDestinationIds());
// Populate the source key for this row.
$this->currentSourceIds = $row->getSourceIdValues();
- // Pick up the existing map row, if any, unless getNextRow() did it.
+ // Pick up the existing map row, if any, unless fetchNextRow() did it.
if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) {
$row->setIdMap($id_map);
}
@@ -348,7 +348,14 @@ abstract class SourcePluginBase extends PluginBase implements MigrateSourceInter
}
/**
- * Checks if the incoming data is newer than what we've previously imported.
+ * Position the iterator to the following row.
+ */
+ protected function fetchNextRow() {
+ $this->getIterator()->next();
+ }
+
+ /**
+ * Check if the incoming data is newer than what we've previously imported.
*
* @param \Drupal\migrate\Row $row
* The row we're importing.
diff --git a/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php b/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php
index 0eac141..ee43451 100644
--- a/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php
+++ b/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php
@@ -5,6 +5,7 @@ namespace Drupal\migrate\Plugin\migrate\source;
use Drupal\Core\Database\Database;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\State\StateInterface;
+use Drupal\migrate\MigrateException;
use Drupal\migrate\Plugin\MigrationInterface;
use Drupal\migrate\Plugin\migrate\id_map\Sql;
use Drupal\migrate\Plugin\MigrateIdMapInterface;
@@ -43,6 +44,22 @@ abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPlugi
protected $state;
/**
+ * The count of the number of batches run.
+ *
+ * @var int
+ */
+ protected $batch = 0;
+
+ /**
+ * Number of records to fetch from the database during each batch.
+ *
+ * A value of zero indicates no batching is to be done.
+ *
+ * @var int
+ */
+ protected $batchSize = 0;
+
+ /**
* {@inheritdoc}
*/
public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
@@ -160,68 +177,108 @@ abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPlugi
* we will take advantage of the PDO-based API to optimize the query up-front.
*/
protected function initializeIterator() {
- $this->prepareQuery();
+ // Initialize the batch size.
+ if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {
+ // Valid batch sizes are integers >= 0.
+ if (is_int($this->configuration['batch_size']) && ($this->configuration['batch_size']) >= 0) {
+ $this->batchSize = $this->configuration['batch_size'];
+ }
+ else {
+ throw new MigrateException("batch_size must be greater than or equal to zero");
+ }
+ }
+
+ // If a batch has run the query is already setup.
+ if ($this->batch == 0) {
+ $this->prepareQuery();
- // Get the key values, for potential use in joining to the map table.
- $keys = array();
+ // Get the key values, for potential use in joining to the map table.
+ $keys = array();
- // The rules for determining what conditions to add to the query are as
- // follows (applying first applicable rule):
- // 1. If the map is joinable, join it. We will want to accept all rows
- // which are either not in the map, or marked in the map as NEEDS_UPDATE.
- // Note that if high water fields are in play, we want to accept all rows
- // above the high water mark in addition to those selected by the map
- // conditions, so we need to OR them together (but AND with any existing
- // conditions in the query). So, ultimately the SQL condition will look
- // like (original conditions) AND (map IS NULL OR map needs update
- // OR above high water).
- $conditions = $this->query->orConditionGroup();
- $condition_added = FALSE;
- if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
- // Build the join to the map table. Because the source key could have
- // multiple fields, we need to build things up.
- $count = 1;
- $map_join = '';
- $delimiter = '';
- foreach ($this->getIds() as $field_name => $field_schema) {
- if (isset($field_schema['alias'])) {
- $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
+ // The rules for determining what conditions to add to the query are as
+ // follows (applying first applicable rule):
+ // 1. If the map is joinable, join it. We will want to accept all rows
+ // which are either not in the map, or marked in the map as NEEDS_UPDATE.
+ // Note that if high water fields are in play, we want to accept all rows
+ // above the high water mark in addition to those selected by the map
+ // conditions, so we need to OR them together (but AND with any existing
+ // conditions in the query). So, ultimately the SQL condition will look
+ // like (original conditions) AND (map IS NULL OR map needs update
+ // OR above high water).
+ $conditions = $this->query->orConditionGroup();
+ $condition_added = FALSE;
+ if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
+ // Build the join to the map table. Because the source key could have
+ // multiple fields, we need to build things up.
+ $count = 1;
+ $map_join = '';
+ $delimiter = '';
+ foreach ($this->getIds() as $field_name => $field_schema) {
+ if (isset($field_schema['alias'])) {
+ $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
+ }
+ $map_join .= "$delimiter$field_name = map.sourceid" . $count++;
+ $delimiter = ' AND ';
}
- $map_join .= "$delimiter$field_name = map.sourceid" . $count++;
- $delimiter = ' AND ';
- }
- $alias = $this->query->leftJoin($this->migration->getIdMap()->getQualifiedMapTableName(), 'map', $map_join);
- $conditions->isNull($alias . '.sourceid1');
- $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
- $condition_added = TRUE;
+ $alias = $this->query->leftJoin($this->migration->getIdMap()
+ ->getQualifiedMapTableName(), 'map', $map_join);
+ $conditions->isNull($alias . '.sourceid1');
+ $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
+ $condition_added = TRUE;
- // And as long as we have the map table, add its data to the row.
- $n = count($this->getIds());
- for ($count = 1; $count <= $n; $count++) {
- $map_key = 'sourceid' . $count;
- $this->query->addField($alias, $map_key, "migrate_map_$map_key");
- }
- if ($n = count($this->migration->getDestinationIds())) {
+ // And as long as we have the map table, add its data to the row.
+ $n = count($this->getIds());
for ($count = 1; $count <= $n; $count++) {
- $map_key = 'destid' . $count++;
+ $map_key = 'sourceid' . $count;
$this->query->addField($alias, $map_key, "migrate_map_$map_key");
}
+ if ($n = count($this->migration->getDestinationIds())) {
+ for ($count = 1; $count <= $n; $count++) {
+ $map_key = 'destid' . $count++;
+ $this->query->addField($alias, $map_key, "migrate_map_$map_key");
+ }
+ }
+ $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
+ }
+ // 2. If we are using high water marks, also include rows above the mark.
+ // But, include all rows if the high water mark is not set.
+ if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater()) !== '') {
+ $high_water_field = $this->getHighWaterField();
+ $conditions->condition($high_water_field, $high_water, '>');
+ $this->query->orderBy($high_water_field);
+ }
+ if ($condition_added) {
+ $this->query->condition($conditions);
}
- $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
}
- // 2. If we are using high water marks, also include rows above the mark.
- // But, include all rows if the high water mark is not set.
- if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater()) !== '') {
- $high_water_field = $this->getHighWaterField();
- $conditions->condition($high_water_field, $high_water, '>');
- $this->query->orderBy($high_water_field);
+
+ // Download data in batches for performance.
+ if (($this->batchSize > 0)) {
+ $this->query->range($this->batch * $this->batchSize, $this->batchSize);
}
- if ($condition_added) {
- $this->query->condition($conditions);
+ return new \IteratorIterator($this->query->execute());
+ }
+
+ /**
+ * Position the iterator to the following row.
+ */
+ protected function fetchNextRow() {
+ $this->getIterator()->next();
+ // We might be out of data entirely, or just out of data in the current
+ // batch. Attempt to fetch the next batch and see.
+ if ($this->batchSize > 0 && !$this->getIterator()->valid()) {
+ $this->fetchNextBatch();
}
+ }
- return new \IteratorIterator($this->query->execute());
+ /**
+ * Prepares query for the next set of data from the source database.
+ */
+ protected function fetchNextBatch() {
+ $this->batch++;
+ unset($this->iterator);
+ $this->getIterator()->rewind();
}
/**
@@ -249,6 +306,14 @@ abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPlugi
if (!$this->getIds()) {
return FALSE;
}
+ // With batching, we want a later batch to return the same rows that would
+ // have been returned at the same point within a monolithic query. If we
+ // join to the map table, the first batch is writing to the map table and
+ // thus affecting the results of subsequent batches. To be safe, we avoid
+ // joining to the map table when batching.
+ if ($this->batchSize > 0) {
+ return FALSE;
+ }
$id_map = $this->migration->getIdMap();
if (!$id_map instanceof Sql) {
return FALSE;
diff --git a/core/modules/migrate/tests/modules/migrate_query_batch_test/migrate_query_batch_test.info.yml b/core/modules/migrate/tests/modules/migrate_query_batch_test/migrate_query_batch_test.info.yml
new file mode 100644
index 0000000..d416a7b
--- /dev/null
+++ b/core/modules/migrate/tests/modules/migrate_query_batch_test/migrate_query_batch_test.info.yml
@@ -0,0 +1,7 @@
+type: module
+name: Migrate query batch Source test
+description: 'Provides a database table and records for SQL import with batch testing.'
+package: Testing
+core: 8.x
+dependencies:
+ - migrate
diff --git a/core/modules/migrate/tests/modules/migrate_query_batch_test/src/Plugin/migrate/source/QueryBatchTest.php b/core/modules/migrate/tests/modules/migrate_query_batch_test/src/Plugin/migrate/source/QueryBatchTest.php
new file mode 100644
index 0000000..cc00c0d
--- /dev/null
+++ b/core/modules/migrate/tests/modules/migrate_query_batch_test/src/Plugin/migrate/source/QueryBatchTest.php
@@ -0,0 +1,45 @@
+<?php
+
+namespace Drupal\migrate_query_batch_test\Plugin\migrate\source;
+
+use Drupal\migrate\Plugin\migrate\source\SqlBase;
+
+/**
+ * Source plugin for migration high water tests.
+ *
+ * @MigrateSource(
+ * id = "query_batch_test"
+ * )
+ */
+class QueryBatchTest extends SqlBase {
+
+ /**
+ * {@inheritdoc}
+ */
+ public function query() {
+ return ($this->select('query_batch_test', 'q')->fields('q'));
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function fields() {
+ $fields = [
+ 'id' => $this->t('Id'),
+ 'data' => $this->t('data'),
+ ];
+ return $fields;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getIds() {
+ return [
+ 'id' => [
+ 'type' => 'integer',
+ ],
+ ];
+ }
+
+}
diff --git a/core/modules/migrate/tests/src/Kernel/QueryBatchTest.php b/core/modules/migrate/tests/src/Kernel/QueryBatchTest.php
new file mode 100644
index 0000000..2b749dc
--- /dev/null
+++ b/core/modules/migrate/tests/src/Kernel/QueryBatchTest.php
@@ -0,0 +1,261 @@
+<?php
+
+namespace Drupal\Tests\migrate\Kernel;
+
+use Drupal\KernelTests\KernelTestBase;
+use Drupal\migrate\MigrateException;
+use Drupal\migrate\Plugin\MigrateIdMapInterface;
+use Drupal\migrate\Plugin\MigrationInterface;
+use Drupal\Core\Database\Driver\sqlite\Connection;
+
+/**
+ * Tests query batching.
+ *
+ * @covers \Drupal\migrate_query_batch_test\Plugin\migrate\source\QueryBatchTest
+ * @group migrate
+ */
+class QueryBatchTest extends KernelTestBase {
+
+ /**
+ * The mocked migration.
+ *
+ * @var MigrationInterface|\Prophecy\Prophecy\ObjectProphecy
+ */
+ protected $migration;
+
+ /**
+ * {@inheritdoc}
+ */
+ public static $modules = [
+ 'migrate',
+ 'migrate_query_batch_test',
+ ];
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function setUp() {
+ parent::setUp();
+
+ // Create a mock migration. This will be injected into the source plugin
+ // under test.
+ $this->migration = $this->prophesize(MigrationInterface::class);
+
+ $this->migration->id()->willReturn(
+ $this->randomMachineName(16)
+ );
+ // Prophesize a useless ID map plugin and an empty set of destination IDs.
+ // Calling code can override these prophecies later and set up different
+ // behaviors.
+ $this->migration->getIdMap()->willReturn(
+ $this->prophesize(MigrateIdMapInterface::class)->reveal()
+ );
+ $this->migration->getDestinationIds()->willReturn([]);
+ }
+
+ /**
+ * Tests a negative batch size throws an exception.
+ */
+ public function testBatchSizeNegative() {
+ $this->setExpectedException(MigrateException::class, 'batch_size must be greater than or equal to zero');
+ $plugin = $this->getPlugin(['batch_size' => -1]);
+ $plugin->next();
+ }
+
+ /**
+ * Tests a non integer batch size throws an exception.
+ */
+ public function testBatchSizeNonInteger() {
+ $this->setExpectedException(MigrateException::class, 'batch_size must be greater than or equal to zero');
+ $plugin = $this->getPlugin(['batch_size' => '1']);
+ $plugin->next();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function queryDataProvider() {
+ // Define the parameters for building the data array. The first element is
+ // the number of source data rows, the second is the batch size to set on
+ // the plugin configuration.
+ $test_parameters = [
+ // Test when batch size is 0.
+ [200, 0],
+ // Test when rows mod batch size is 0.
+ [200, 20],
+ // Test when rows mod batch size is > 0.
+ [200, 30],
+ // Test when batch size = row count.
+ [200, 200],
+ // Test when batch size > row count.
+ [200, 300],
+ ];
+
+ // Build the data provider array. The provider array consists of the source
+ // data rows, the expected result data, the expected count, the plugin
+ // configuration, the expected batch size and the expected batch count.
+ $table = 'query_batch_test';
+ $tests = [];
+ $data_set = 0;
+ foreach ($test_parameters as $data) {
+ list($num_rows, $batch_size) = $data;
+ for ($i = 0; $i < $num_rows; $i++) {
+ $tests[$data_set]['source_data'][$table][] = [
+ 'id' => $i,
+ 'data' => $this->randomString(),
+ ];
+ }
+ $tests[$data_set]['expected_data'] = $tests[$data_set]['source_data'][$table];
+ $tests[$data_set][2] = $num_rows;
+ // Plugin configuration array.
+ $tests[$data_set][3] = ['batch_size' => $batch_size];
+ // Expected batch size.
+ $tests[$data_set][4] = $batch_size;
+ // Expected batch count is 0 unless a batch size is set.
+ $expected_batch_count = 0;
+ if ($batch_size > 0) {
+ $expected_batch_count = (int) ($num_rows / $batch_size);
+ if ($num_rows % $batch_size) {
+ // If there is a remainder an extra batch is needed to get the
+ // remaining rows.
+ $expected_batch_count++;
+ }
+ }
+ $tests[$data_set][5] = $expected_batch_count;
+ $data_set++;
+ }
+ return $tests;
+ }
+
+ /**
+ * Tests query batch size.
+ *
+ * @param array $source_data
+ * The source data, keyed by table name. Each table is an array containing
+ * the rows in that table.
+ * @param array $expected_data
+ * The result rows the plugin is expected to return.
+ * @param int $num_rows
+ * How many rows the source plugin is expected to return.
+ * @param array $configuration
+ * Configuration for the source plugin specifying the batch size.
+ * @param int $expected_batch_size
+ * The expected batch size, will be set to zero for invalid batch sizes.
+ * @param int $expected_batch_count
+ * The total number of batches.
+ *
+ * @dataProvider queryDataProvider
+ */
+ public function testQueryBatch($source_data, $expected_data, $num_rows, $configuration, $expected_batch_size, $expected_batch_count) {
+ $plugin = $this->getPlugin($configuration);
+
+ // Since we don't yet inject the database connection, we need to use a
+ // reflection hack to set it in the plugin instance.
+ $reflector = new \ReflectionObject($plugin);
+ $property = $reflector->getProperty('database');
+ $property->setAccessible(TRUE);
+
+ $connection = $this->getDatabase($source_data);
+ $property->setValue($plugin, $connection);
+
+ // Test the results.
+ $i = 0;
+ /** @var \Drupal\migrate\Row $row */
+ foreach ($plugin as $row) {
+
+ $expected = $expected_data[$i++];
+ $actual = $row->getSource();
+
+ foreach ($expected as $key => $value) {
+ $this->assertArrayHasKey($key, $actual);
+ $this->assertSame((string) $value, (string) $actual[$key]);
+ }
+ }
+
+ // Test that all rows were retrieved.
+ self::assertSame($num_rows, $i);
+
+ // Test the batch size.
+ if (is_null($expected_batch_size)) {
+ $expected_batch_size = $configuration['batch_size'];
+ }
+ $property = $reflector->getProperty('batchSize');
+ $property->setAccessible(TRUE);
+ self::assertSame($expected_batch_size, $property->getValue($plugin));
+
+ // Test the batch count.
+ if (is_null($expected_batch_count)) {
+ $expected_batch_count = intdiv($num_rows, $expected_batch_size);
+ if ($num_rows % $configuration['batch_size']) {
+ $expected_batch_count++;
+ }
+ }
+ $property = $reflector->getProperty('batch');
+ $property->setAccessible(TRUE);
+ self::assertSame($expected_batch_count, $property->getValue($plugin));
+ }
+
+ /**
+ * Instantiates the source plugin under test.
+ *
+ * @param array $configuration
+ * The source plugin configuration.
+ *
+ * @return \Drupal\migrate\Plugin\MigrateSourceInterface|object
+ * The fully configured source plugin.
+ */
+ protected function getPlugin($configuration) {
+ /** @var \Drupal\migrate\Plugin\MigratePluginManager $plugin_manager */
+ $plugin_manager = $this->container->get('plugin.manager.migrate.source');
+ $plugin = $plugin_manager->createInstance('query_batch_test', $configuration, $this->migration->reveal());
+
+ $this->migration
+ ->getSourcePlugin()
+ ->willReturn($plugin);
+ return $plugin;
+ }
+
+ /**
+ * Builds an in-memory SQLite database from a set of source data.
+ *
+ * @param array $source_data
+ * The source data, keyed by table name. Each table is an array containing
+ * the rows in that table.
+ *
+ * @return \Drupal\Core\Database\Driver\sqlite\Connection
+ * The SQLite database connection.
+ */
+ protected function getDatabase(array $source_data) {
+ // Create an in-memory SQLite database. Plugins can interact with it like
+ // any other database, and it will cease to exist when the connection is
+ // closed.
+ $connection_options = ['database' => ':memory:'];
+ $pdo = Connection::open($connection_options);
+ $connection = new Connection($pdo, $connection_options);
+
+ // Create the tables and fill them with data.
+ foreach ($source_data as $table => $rows) {
+ // Use the biggest row to build the table schema.
+ $counts = array_map('count', $rows);
+ asort($counts);
+ end($counts);
+ $pilot = $rows[key($counts)];
+
+ $connection->schema()
+ ->createTable($table, [
+ // SQLite uses loose affinity typing, so it's OK for every field to
+ // be a text field.
+ 'fields' => array_map(function () {
+ return ['type' => 'text'];
+ }, $pilot),
+ ]);
+
+ $fields = array_keys($pilot);
+ $insert = $connection->insert($table)->fields($fields);
+ array_walk($rows, [$insert, 'values']);
+ $insert->execute();
+ }
+ return $connection;
+ }
+
+}