diff --git a/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php b/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php index 6eebdeac0bef5e5bd394a8c9af8f18fe079d8e1a..5ce5fd39a5b5540f00680a044c1e88038ef94144 100644 --- a/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php +++ b/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php @@ -310,13 +310,13 @@ public function next() { while (!isset($this->currentRow) && $this->getIterator()->valid()) { $row_data = $this->getIterator()->current() + $this->configuration; - $this->getIterator()->next(); + $this->fetchNextRow(); $row = new Row($row_data, $this->migration->getSourcePlugin()->getIds(), $this->migration->getDestinationIds()); // Populate the source key for this row. $this->currentSourceIds = $row->getSourceIdValues(); - // Pick up the existing map row, if any, unless getNextRow() did it. + // Pick up the existing map row, if any, unless fetchNextRow() did it. if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) { $row->setIdMap($id_map); } @@ -348,7 +348,14 @@ public function next() { } /** - * Checks if the incoming data is newer than what we've previously imported. + * Position the iterator to the following row. + */ + protected function fetchNextRow() { + $this->getIterator()->next(); + } + + /** + * Check if the incoming data is newer than what we've previously imported. * * @param \Drupal\migrate\Row $row * The row we're importing. diff --git a/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php b/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php index 0eac141ec171a9e5fb7c5276ac2041a620793ca0..ee434514bb4d1938b25275755eeb0a6cdc6e2d2d 100644 --- a/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php +++ b/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php @@ -5,6 +5,7 @@ use Drupal\Core\Database\Database; use Drupal\Core\Plugin\ContainerFactoryPluginInterface; use Drupal\Core\State\StateInterface; +use Drupal\migrate\MigrateException; use Drupal\migrate\Plugin\MigrationInterface; use Drupal\migrate\Plugin\migrate\id_map\Sql; use Drupal\migrate\Plugin\MigrateIdMapInterface; @@ -42,6 +43,22 @@ abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPlugi */ protected $state; + /** + * The count of the number of batches run. + * + * @var int + */ + protected $batch = 0; + + /** + * Number of records to fetch from the database during each batch. + * + * A value of zero indicates no batching is to be done. + * + * @var int + */ + protected $batchSize = 0; + /** * {@inheritdoc} */ @@ -160,68 +177,108 @@ protected function prepareQuery() { * we will take advantage of the PDO-based API to optimize the query up-front. */ protected function initializeIterator() { - $this->prepareQuery(); + // Initialize the batch size. + if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) { + // Valid batch sizes are integers >= 0. + if (is_int($this->configuration['batch_size']) && ($this->configuration['batch_size']) >= 0) { + $this->batchSize = $this->configuration['batch_size']; + } + else { + throw new MigrateException("batch_size must be greater than or equal to zero"); + } + } + + // If a batch has run the query is already setup. + if ($this->batch == 0) { + $this->prepareQuery(); - // Get the key values, for potential use in joining to the map table. - $keys = array(); + // Get the key values, for potential use in joining to the map table. + $keys = array(); - // The rules for determining what conditions to add to the query are as - // follows (applying first applicable rule): - // 1. If the map is joinable, join it. We will want to accept all rows - // which are either not in the map, or marked in the map as NEEDS_UPDATE. - // Note that if high water fields are in play, we want to accept all rows - // above the high water mark in addition to those selected by the map - // conditions, so we need to OR them together (but AND with any existing - // conditions in the query). So, ultimately the SQL condition will look - // like (original conditions) AND (map IS NULL OR map needs update - // OR above high water). - $conditions = $this->query->orConditionGroup(); - $condition_added = FALSE; - if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) { - // Build the join to the map table. Because the source key could have - // multiple fields, we need to build things up. - $count = 1; - $map_join = ''; - $delimiter = ''; - foreach ($this->getIds() as $field_name => $field_schema) { - if (isset($field_schema['alias'])) { - $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name); + // The rules for determining what conditions to add to the query are as + // follows (applying first applicable rule): + // 1. If the map is joinable, join it. We will want to accept all rows + // which are either not in the map, or marked in the map as NEEDS_UPDATE. + // Note that if high water fields are in play, we want to accept all rows + // above the high water mark in addition to those selected by the map + // conditions, so we need to OR them together (but AND with any existing + // conditions in the query). So, ultimately the SQL condition will look + // like (original conditions) AND (map IS NULL OR map needs update + // OR above high water). + $conditions = $this->query->orConditionGroup(); + $condition_added = FALSE; + if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) { + // Build the join to the map table. Because the source key could have + // multiple fields, we need to build things up. + $count = 1; + $map_join = ''; + $delimiter = ''; + foreach ($this->getIds() as $field_name => $field_schema) { + if (isset($field_schema['alias'])) { + $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name); + } + $map_join .= "$delimiter$field_name = map.sourceid" . $count++; + $delimiter = ' AND '; } - $map_join .= "$delimiter$field_name = map.sourceid" . $count++; - $delimiter = ' AND '; - } - $alias = $this->query->leftJoin($this->migration->getIdMap()->getQualifiedMapTableName(), 'map', $map_join); - $conditions->isNull($alias . '.sourceid1'); - $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE); - $condition_added = TRUE; + $alias = $this->query->leftJoin($this->migration->getIdMap() + ->getQualifiedMapTableName(), 'map', $map_join); + $conditions->isNull($alias . '.sourceid1'); + $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE); + $condition_added = TRUE; - // And as long as we have the map table, add its data to the row. - $n = count($this->getIds()); - for ($count = 1; $count <= $n; $count++) { - $map_key = 'sourceid' . $count; - $this->query->addField($alias, $map_key, "migrate_map_$map_key"); - } - if ($n = count($this->migration->getDestinationIds())) { + // And as long as we have the map table, add its data to the row. + $n = count($this->getIds()); for ($count = 1; $count <= $n; $count++) { - $map_key = 'destid' . $count++; + $map_key = 'sourceid' . $count; $this->query->addField($alias, $map_key, "migrate_map_$map_key"); } + if ($n = count($this->migration->getDestinationIds())) { + for ($count = 1; $count <= $n; $count++) { + $map_key = 'destid' . $count++; + $this->query->addField($alias, $map_key, "migrate_map_$map_key"); + } + } + $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status'); + } + // 2. If we are using high water marks, also include rows above the mark. + // But, include all rows if the high water mark is not set. + if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater()) !== '') { + $high_water_field = $this->getHighWaterField(); + $conditions->condition($high_water_field, $high_water, '>'); + $this->query->orderBy($high_water_field); + } + if ($condition_added) { + $this->query->condition($conditions); } - $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status'); } - // 2. If we are using high water marks, also include rows above the mark. - // But, include all rows if the high water mark is not set. - if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater()) !== '') { - $high_water_field = $this->getHighWaterField(); - $conditions->condition($high_water_field, $high_water, '>'); - $this->query->orderBy($high_water_field); + + // Download data in batches for performance. + if (($this->batchSize > 0)) { + $this->query->range($this->batch * $this->batchSize, $this->batchSize); } - if ($condition_added) { - $this->query->condition($conditions); + return new \IteratorIterator($this->query->execute()); + } + + /** + * Position the iterator to the following row. + */ + protected function fetchNextRow() { + $this->getIterator()->next(); + // We might be out of data entirely, or just out of data in the current + // batch. Attempt to fetch the next batch and see. + if ($this->batchSize > 0 && !$this->getIterator()->valid()) { + $this->fetchNextBatch(); } + } - return new \IteratorIterator($this->query->execute()); + /** + * Prepares query for the next set of data from the source database. + */ + protected function fetchNextBatch() { + $this->batch++; + unset($this->iterator); + $this->getIterator()->rewind(); } /** @@ -249,6 +306,14 @@ protected function mapJoinable() { if (!$this->getIds()) { return FALSE; } + // With batching, we want a later batch to return the same rows that would + // have been returned at the same point within a monolithic query. If we + // join to the map table, the first batch is writing to the map table and + // thus affecting the results of subsequent batches. To be safe, we avoid + // joining to the map table when batching. + if ($this->batchSize > 0) { + return FALSE; + } $id_map = $this->migration->getIdMap(); if (!$id_map instanceof Sql) { return FALSE; diff --git a/core/modules/migrate/tests/modules/migrate_query_batch_test/migrate_query_batch_test.info.yml b/core/modules/migrate/tests/modules/migrate_query_batch_test/migrate_query_batch_test.info.yml new file mode 100644 index 0000000000000000000000000000000000000000..d416a7b4c982ce66609bb7b3595219a1a5fae22a --- /dev/null +++ b/core/modules/migrate/tests/modules/migrate_query_batch_test/migrate_query_batch_test.info.yml @@ -0,0 +1,7 @@ +type: module +name: Migrate query batch Source test +description: 'Provides a database table and records for SQL import with batch testing.' +package: Testing +core: 8.x +dependencies: + - migrate diff --git a/core/modules/migrate/tests/modules/migrate_query_batch_test/src/Plugin/migrate/source/QueryBatchTest.php b/core/modules/migrate/tests/modules/migrate_query_batch_test/src/Plugin/migrate/source/QueryBatchTest.php new file mode 100644 index 0000000000000000000000000000000000000000..cc00c0dfa37f3226195192dda57552935648bea1 --- /dev/null +++ b/core/modules/migrate/tests/modules/migrate_query_batch_test/src/Plugin/migrate/source/QueryBatchTest.php @@ -0,0 +1,45 @@ +select('query_batch_test', 'q')->fields('q')); + } + + /** + * {@inheritdoc} + */ + public function fields() { + $fields = [ + 'id' => $this->t('Id'), + 'data' => $this->t('data'), + ]; + return $fields; + } + + /** + * {@inheritdoc} + */ + public function getIds() { + return [ + 'id' => [ + 'type' => 'integer', + ], + ]; + } + +} diff --git a/core/modules/migrate/tests/src/Kernel/QueryBatchTest.php b/core/modules/migrate/tests/src/Kernel/QueryBatchTest.php new file mode 100644 index 0000000000000000000000000000000000000000..2b749dc23ffe917da118ea791c6b813cbb930c80 --- /dev/null +++ b/core/modules/migrate/tests/src/Kernel/QueryBatchTest.php @@ -0,0 +1,261 @@ +migration = $this->prophesize(MigrationInterface::class); + + $this->migration->id()->willReturn( + $this->randomMachineName(16) + ); + // Prophesize a useless ID map plugin and an empty set of destination IDs. + // Calling code can override these prophecies later and set up different + // behaviors. + $this->migration->getIdMap()->willReturn( + $this->prophesize(MigrateIdMapInterface::class)->reveal() + ); + $this->migration->getDestinationIds()->willReturn([]); + } + + /** + * Tests a negative batch size throws an exception. + */ + public function testBatchSizeNegative() { + $this->setExpectedException(MigrateException::class, 'batch_size must be greater than or equal to zero'); + $plugin = $this->getPlugin(['batch_size' => -1]); + $plugin->next(); + } + + /** + * Tests a non integer batch size throws an exception. + */ + public function testBatchSizeNonInteger() { + $this->setExpectedException(MigrateException::class, 'batch_size must be greater than or equal to zero'); + $plugin = $this->getPlugin(['batch_size' => '1']); + $plugin->next(); + } + + /** + * {@inheritdoc} + */ + public function queryDataProvider() { + // Define the parameters for building the data array. The first element is + // the number of source data rows, the second is the batch size to set on + // the plugin configuration. + $test_parameters = [ + // Test when batch size is 0. + [200, 0], + // Test when rows mod batch size is 0. + [200, 20], + // Test when rows mod batch size is > 0. + [200, 30], + // Test when batch size = row count. + [200, 200], + // Test when batch size > row count. + [200, 300], + ]; + + // Build the data provider array. The provider array consists of the source + // data rows, the expected result data, the expected count, the plugin + // configuration, the expected batch size and the expected batch count. + $table = 'query_batch_test'; + $tests = []; + $data_set = 0; + foreach ($test_parameters as $data) { + list($num_rows, $batch_size) = $data; + for ($i = 0; $i < $num_rows; $i++) { + $tests[$data_set]['source_data'][$table][] = [ + 'id' => $i, + 'data' => $this->randomString(), + ]; + } + $tests[$data_set]['expected_data'] = $tests[$data_set]['source_data'][$table]; + $tests[$data_set][2] = $num_rows; + // Plugin configuration array. + $tests[$data_set][3] = ['batch_size' => $batch_size]; + // Expected batch size. + $tests[$data_set][4] = $batch_size; + // Expected batch count is 0 unless a batch size is set. + $expected_batch_count = 0; + if ($batch_size > 0) { + $expected_batch_count = (int) ($num_rows / $batch_size); + if ($num_rows % $batch_size) { + // If there is a remainder an extra batch is needed to get the + // remaining rows. + $expected_batch_count++; + } + } + $tests[$data_set][5] = $expected_batch_count; + $data_set++; + } + return $tests; + } + + /** + * Tests query batch size. + * + * @param array $source_data + * The source data, keyed by table name. Each table is an array containing + * the rows in that table. + * @param array $expected_data + * The result rows the plugin is expected to return. + * @param int $num_rows + * How many rows the source plugin is expected to return. + * @param array $configuration + * Configuration for the source plugin specifying the batch size. + * @param int $expected_batch_size + * The expected batch size, will be set to zero for invalid batch sizes. + * @param int $expected_batch_count + * The total number of batches. + * + * @dataProvider queryDataProvider + */ + public function testQueryBatch($source_data, $expected_data, $num_rows, $configuration, $expected_batch_size, $expected_batch_count) { + $plugin = $this->getPlugin($configuration); + + // Since we don't yet inject the database connection, we need to use a + // reflection hack to set it in the plugin instance. + $reflector = new \ReflectionObject($plugin); + $property = $reflector->getProperty('database'); + $property->setAccessible(TRUE); + + $connection = $this->getDatabase($source_data); + $property->setValue($plugin, $connection); + + // Test the results. + $i = 0; + /** @var \Drupal\migrate\Row $row */ + foreach ($plugin as $row) { + + $expected = $expected_data[$i++]; + $actual = $row->getSource(); + + foreach ($expected as $key => $value) { + $this->assertArrayHasKey($key, $actual); + $this->assertSame((string) $value, (string) $actual[$key]); + } + } + + // Test that all rows were retrieved. + self::assertSame($num_rows, $i); + + // Test the batch size. + if (is_null($expected_batch_size)) { + $expected_batch_size = $configuration['batch_size']; + } + $property = $reflector->getProperty('batchSize'); + $property->setAccessible(TRUE); + self::assertSame($expected_batch_size, $property->getValue($plugin)); + + // Test the batch count. + if (is_null($expected_batch_count)) { + $expected_batch_count = intdiv($num_rows, $expected_batch_size); + if ($num_rows % $configuration['batch_size']) { + $expected_batch_count++; + } + } + $property = $reflector->getProperty('batch'); + $property->setAccessible(TRUE); + self::assertSame($expected_batch_count, $property->getValue($plugin)); + } + + /** + * Instantiates the source plugin under test. + * + * @param array $configuration + * The source plugin configuration. + * + * @return \Drupal\migrate\Plugin\MigrateSourceInterface|object + * The fully configured source plugin. + */ + protected function getPlugin($configuration) { + /** @var \Drupal\migrate\Plugin\MigratePluginManager $plugin_manager */ + $plugin_manager = $this->container->get('plugin.manager.migrate.source'); + $plugin = $plugin_manager->createInstance('query_batch_test', $configuration, $this->migration->reveal()); + + $this->migration + ->getSourcePlugin() + ->willReturn($plugin); + return $plugin; + } + + /** + * Builds an in-memory SQLite database from a set of source data. + * + * @param array $source_data + * The source data, keyed by table name. Each table is an array containing + * the rows in that table. + * + * @return \Drupal\Core\Database\Driver\sqlite\Connection + * The SQLite database connection. + */ + protected function getDatabase(array $source_data) { + // Create an in-memory SQLite database. Plugins can interact with it like + // any other database, and it will cease to exist when the connection is + // closed. + $connection_options = ['database' => ':memory:']; + $pdo = Connection::open($connection_options); + $connection = new Connection($pdo, $connection_options); + + // Create the tables and fill them with data. + foreach ($source_data as $table => $rows) { + // Use the biggest row to build the table schema. + $counts = array_map('count', $rows); + asort($counts); + end($counts); + $pilot = $rows[key($counts)]; + + $connection->schema() + ->createTable($table, [ + // SQLite uses loose affinity typing, so it's OK for every field to + // be a text field. + 'fields' => array_map(function () { + return ['type' => 'text']; + }, $pilot), + ]); + + $fields = array_keys($pilot); + $insert = $connection->insert($table)->fields($fields); + array_walk($rows, [$insert, 'values']); + $insert->execute(); + } + return $connection; + } + +}