migration = $migration; $this->message = $message ?: new MigrateMessage(); $this->getIdMap()->setMessage($this->message); $this->eventDispatcher = $event_dispatcher; // Record the memory limit in bytes $limit = trim(ini_get('memory_limit')); if ($limit == '-1') { $this->memoryLimit = PHP_INT_MAX; } else { $this->memoryLimit = Bytes::toNumber($limit); } } /** * Returns the source. * * Makes sure source is initialized based on migration settings. * * @return \Drupal\migrate\Plugin\MigrateSourceInterface * The source. */ protected function getSource() { if (!isset($this->source)) { $this->source = $this->migration->getSourcePlugin(); } return $this->source; } /** * Gets the event dispatcher. * * @return \Symfony\Contracts\EventDispatcher\EventDispatcherInterface */ protected function getEventDispatcher() { if (!$this->eventDispatcher) { $this->eventDispatcher = \Drupal::service('event_dispatcher'); } return $this->eventDispatcher; } /** * {@inheritdoc} */ public function import() { // Only begin the import operation if the migration is currently idle. if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) { $this->message->display($this->t('Migration @id is busy with another operation: @status', [ '@id' => $this->migration->id(), '@status' => $this->t($this->migration->getStatusLabel()), ]), 'error'); return MigrationInterface::RESULT_FAILED; } $this->getEventDispatcher()->dispatch(new MigrateImportEvent($this->migration, $this->message), MigrateEvents::PRE_IMPORT); // Knock off migration if the requirements haven't been met. try { $this->migration->checkRequirements(); } catch (RequirementsException $e) { $this->message->display( $this->t( 'Migration @id did not meet the requirements. @message', [ '@id' => $this->migration->id(), '@message' => $e->getMessage(), ] ), 'error' ); return MigrationInterface::RESULT_FAILED; } $this->migration->setStatus(MigrationInterface::STATUS_IMPORTING); $source = $this->getSource(); try { $source->rewind(); } catch (\Exception $e) { $this->message->display( $this->t('Migration failed with source plugin exception: @e in @file line @line', [ '@e' => $e->getMessage(), '@file' => $e->getFile(), '@line' => $e->getLine(), ]), 'error'); $this->migration->setStatus(MigrationInterface::STATUS_IDLE); return MigrationInterface::RESULT_FAILED; } // Get the process pipeline. $pipeline = FALSE; if ($source->valid()) { try { $pipeline = $this->migration->getProcessPlugins(); } catch (MigrateException $e) { $row = $source->current(); $this->sourceIdValues = $row->getSourceIdValues(); $this->getIdMap()->saveIdMapping($row, [], $e->getStatus()); $this->saveMessage($e->getMessage(), $e->getLevel()); } } $return = MigrationInterface::RESULT_COMPLETED; if ($pipeline) { $id_map = $this->getIdMap(); $destination = $this->migration->getDestinationPlugin(); while ($source->valid()) { $row = $source->current(); $this->sourceIdValues = $row->getSourceIdValues(); try { foreach ($pipeline as $destination_property_name => $plugins) { $this->processPipeline($row, $destination_property_name, $plugins, NULL); } $save = TRUE; } catch (MigrateException $e) { $this->getIdMap()->saveIdMapping($row, [], $e->getStatus()); $msg = sprintf("%s:%s:%s", $this->migration->getPluginId(), $destination_property_name, $e->getMessage()); $this->saveMessage($msg, $e->getLevel()); $save = FALSE; } catch (MigrateSkipRowException $e) { if ($e->getSaveToMap()) { $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED); } if ($message = trim($e->getMessage())) { $msg = sprintf("%s:%s: %s", $this->migration->getPluginId(), $destination_property_name, $message); $this->saveMessage($msg, MigrationInterface::MESSAGE_INFORMATIONAL); } $save = FALSE; } if ($save) { try { $this->getEventDispatcher() ->dispatch(new MigratePreRowSaveEvent($this->migration, $this->message, $row), MigrateEvents::PRE_ROW_SAVE); $destination_ids = $id_map->lookupDestinationIds($this->sourceIdValues); $destination_id_values = $destination_ids ? reset($destination_ids) : []; $destination_id_values = $destination->import($row, $destination_id_values); $this->getEventDispatcher() ->dispatch(new MigratePostRowSaveEvent($this->migration, $this->message, $row, $destination_id_values), MigrateEvents::POST_ROW_SAVE); if ($destination_id_values) { // We do not save an idMap entry for config. if ($destination_id_values !== TRUE) { $id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $destination->rollbackAction()); } } else { $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); if (!$id_map->messageCount()) { $message = $this->t('New object was not saved, no error provided'); $this->saveMessage($message); $this->message->display($message); } } } catch (MigrateException $e) { $this->getIdMap()->saveIdMapping($row, [], $e->getStatus()); $this->saveMessage($e->getMessage(), $e->getLevel()); } catch (\Exception $e) { $this->getIdMap() ->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); $this->handleException($e); } } $this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED; // Check for memory exhaustion. if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) { break; } // If anyone has requested we stop, return the requested result. if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { $return = $this->migration->getInterruptionResult(); $this->migration->clearInterruptionResult(); break; } try { $source->next(); } catch (\Exception $e) { $this->message->display( $this->t('Migration failed with source plugin exception: @e in @file line @line', [ '@e' => $e->getMessage(), '@file' => $e->getFile(), '@line' => $e->getLine(), ]), 'error'); $this->migration->setStatus(MigrationInterface::STATUS_IDLE); return MigrationInterface::RESULT_FAILED; } } } $this->getEventDispatcher()->dispatch(new MigrateImportEvent($this->migration, $this->message), MigrateEvents::POST_IMPORT); $this->migration->setStatus(MigrationInterface::STATUS_IDLE); return $return; } /** * {@inheritdoc} */ public function rollback() { // Only begin the rollback operation if the migration is currently idle. if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) { $this->message->display($this->t('Migration @id is busy with another operation: @status', ['@id' => $this->migration->id(), '@status' => $this->t($this->migration->getStatusLabel())]), 'error'); return MigrationInterface::RESULT_FAILED; } // Announce that rollback is about to happen. $this->getEventDispatcher()->dispatch(new MigrateRollbackEvent($this->migration), MigrateEvents::PRE_ROLLBACK); // Optimistically assume things are going to work out; if not, $return will be // updated to some other status. $return = MigrationInterface::RESULT_COMPLETED; $this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK); $id_map = $this->getIdMap(); $destination = $this->migration->getDestinationPlugin(); // Loop through each row in the map, and try to roll it back. $id_map->rewind(); while ($id_map->valid()) { $destination_key = $id_map->currentDestination(); if ($destination_key) { $map_row = $id_map->getRowByDestination($destination_key); if (!isset($map_row['rollback_action']) || $map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) { $this->getEventDispatcher() ->dispatch(new MigrateRowDeleteEvent($this->migration, $destination_key), MigrateEvents::PRE_ROW_DELETE); $destination->rollback($destination_key); $this->getEventDispatcher() ->dispatch(new MigrateRowDeleteEvent($this->migration, $destination_key), MigrateEvents::POST_ROW_DELETE); } // We're now done with this row, so remove it from the map. $id_map->deleteDestination($destination_key); } else { // If there is no destination key the import probably failed and we can // remove the row without further action. $source_key = $id_map->currentSource(); $id_map->delete($source_key); } $id_map->next(); // Check for memory exhaustion. if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) { break; } // If anyone has requested we stop, return the requested result. if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { $return = $this->migration->getInterruptionResult(); $this->migration->clearInterruptionResult(); break; } } // Notify modules that rollback attempt was complete. $this->getEventDispatcher()->dispatch(new MigrateRollbackEvent($this->migration), MigrateEvents::POST_ROLLBACK); $this->migration->setStatus(MigrationInterface::STATUS_IDLE); return $return; } /** * Get the ID map from the current migration. * * @return \Drupal\migrate\Plugin\MigrateIdMapInterface * The ID map. */ protected function getIdMap() { return $this->migration->getIdMap(); } /** * {@inheritdoc} */ public function processRow(Row $row, array $process = NULL, $value = NULL) { foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) { $this->processPipeline($row, $destination, $plugins, $value); } } /** * Runs a process pipeline. * * @param \Drupal\migrate\Row $row * The $row to be processed. * @param string $destination * The destination property name. * @param array $plugins * The process pipeline plugins. * @param mixed $value * (optional) Initial value of the pipeline for the destination. * * @see \Drupal\migrate\MigrateExecutableInterface::processRow * * @throws \Drupal\migrate\MigrateException */ protected function processPipeline(Row $row, string $destination, array $plugins, $value) { $multiple = FALSE; /** @var \Drupal\migrate\Plugin\MigrateProcessInterface $plugin */ foreach ($plugins as $plugin) { $definition = $plugin->getPluginDefinition(); // Many plugins expect a scalar value but the current value of the // pipeline might be multiple scalars (this is set by the previous plugin) // and in this case the current value needs to be iterated and each scalar // separately transformed. if ($multiple && !$definition['handle_multiples']) { $new_value = []; if (!is_array($value)) { throw new MigrateException(sprintf('Pipeline failed at %s plugin for destination %s: %s received instead of an array,', $plugin->getPluginId(), $destination, $value)); } $break = FALSE; foreach ($value as $scalar_value) { $plugin->reset(); try { $new_value[] = $plugin->transform($scalar_value, $this, $row, $destination); } catch (MigrateSkipProcessException $e) { $new_value[] = NULL; $break = TRUE; } catch (MigrateException $e) { // Prepend the process plugin id to the message. $message = sprintf("%s: %s", $plugin->getPluginId(), $e->getMessage()); throw new MigrateException($message); } if ($plugin->isPipelineStopped()) { $break = TRUE; } } $value = $new_value; if ($break) { break; } } else { $plugin->reset(); try { $value = $plugin->transform($value, $this, $row, $destination); } catch (MigrateSkipProcessException $e) { $value = NULL; break; } catch (MigrateException $e) { // Prepend the process plugin id to the message. $message = sprintf("%s: %s", $plugin->getPluginId(), $e->getMessage()); throw new MigrateException($message); } if ($plugin->isPipelineStopped()) { break; } $multiple = $plugin->multiple(); } } // Ensure all values, including nulls, are migrated. if ($plugins) { if (isset($value)) { $row->setDestinationProperty($destination, $value); } else { $row->setEmptyDestinationProperty($destination); } } } /** * Fetches the key array for the current source record. * * @return array * The current source IDs. */ protected function currentSourceIds() { return $this->getSource()->getCurrentIds(); } /** * {@inheritdoc} */ public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) { $this->getIdMap()->saveMessage($this->sourceIdValues, $message, $level); } /** * Takes an Exception object and both saves and displays it. * * Pulls in additional information on the location triggering the exception. * * @param \Exception $exception * Object representing the exception. * @param bool $save * (optional) Whether to save the message in the migration's mapping table. * Set to FALSE in contexts where this doesn't make sense. */ protected function handleException(\Exception $exception, $save = TRUE) { $result = Error::decodeException($exception); $message = $result['@message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')'; if ($save) { $this->saveMessage($message); } $this->message->display($message, 'error'); } /** * Checks for exceptional conditions, and display feedback. */ protected function checkStatus() { if ($this->memoryExceeded()) { return MigrationInterface::RESULT_INCOMPLETE; } return MigrationInterface::RESULT_COMPLETED; } /** * Tests whether we've exceeded the desired memory threshold. * * If so, output a message. * * @return bool * TRUE if the threshold is exceeded, otherwise FALSE. */ protected function memoryExceeded() { $usage = $this->getMemoryUsage(); $pct_memory = $usage / $this->memoryLimit; if (!$threshold = $this->memoryThreshold) { return FALSE; } if ($pct_memory > $threshold) { $this->message->display( $this->t( 'Memory usage is @usage (@pct% of limit @limit), reclaiming memory.', [ '@pct' => round($pct_memory * 100), '@usage' => ByteSizeMarkup::create($usage, NULL, $this->stringTranslation), '@limit' => ByteSizeMarkup::create($this->memoryLimit, NULL, $this->stringTranslation), ] ), 'warning' ); $usage = $this->attemptMemoryReclaim(); $pct_memory = $usage / $this->memoryLimit; // Use a lower threshold - we don't want to be in a situation where we keep // coming back here and trimming a tiny amount if ($pct_memory > (0.90 * $threshold)) { $this->message->display( $this->t( 'Memory usage is now @usage (@pct% of limit @limit), not enough reclaimed, starting new batch', [ '@pct' => round($pct_memory * 100), '@usage' => ByteSizeMarkup::create($usage, NULL, $this->stringTranslation), '@limit' => ByteSizeMarkup::create($this->memoryLimit, NULL, $this->stringTranslation), ] ), 'warning' ); return TRUE; } else { $this->message->display( $this->t( 'Memory usage is now @usage (@pct% of limit @limit), reclaimed enough, continuing', [ '@pct' => round($pct_memory * 100), '@usage' => ByteSizeMarkup::create($usage, NULL, $this->stringTranslation), '@limit' => ByteSizeMarkup::create($this->memoryLimit, NULL, $this->stringTranslation), ] ), 'warning'); return FALSE; } } else { return FALSE; } } /** * Returns the memory usage so far. * * @return int * The memory usage. */ protected function getMemoryUsage() { return memory_get_usage(); } /** * Tries to reclaim memory. * * @return int * The memory usage after reclaim. */ protected function attemptMemoryReclaim() { // First, try resetting Drupal's static storage - this frequently releases // plenty of memory to continue. drupal_static_reset(); // Entity storage can blow up with caches, so clear it out. \Drupal::service('entity.memory_cache')->deleteAll(); // @TODO: explore resetting the container. // Run garbage collector to further reduce memory. gc_collect_cycles(); return memory_get_usage(); } }