// $Id$

 * @file
 * Defines the base class for import/rollback processes.

 * The base class for all import objects. This is where most of the smarts
 * of the migrate module resides. Migrations are created by deriving from this
 * class, and in the constructor (after calling parent::__construct()) initializing
 * at a minimum the name, description, source, and destination properties. The constructor
 * will also usually make several calls to addFieldMapping().
abstract class Migration extends MigrationBase {
   * Source object for the migration, derived from MigrateSource.
   * @var MigrateSource
  protected $source;
  public function getSource() {
    return $this->source;
   * Destination object for the migration, derived from MigrateDestination.
   * @var MigrateDestination
  protected $destination;
  public function getDestination() {
    return $this->destination;
   * Map object tracking relationships between source and destination data
   * @var MigrateMap
  protected $map;
  public function getMap() {
    return $this->map;

   * Indicate whether the primary system of record for this migration is the
   * source, or the destination (Drupal). In the source case, migration of
   * an existing object will completely replace the Drupal object with data from
   * the source side. In the destination case, the existing Drupal object will
   * be loaded, then changes from the source applied; also, rollback will not be
   * supported.
   * @var int
  const SOURCE = 1;
  const DESTINATION = 2;
  protected $systemOfRecord = Migration::SOURCE;
  public function getSystemOfRecord() {
    return $this->systemOfRecord;
   * Specify value of needs_update for current map row. Usually set by
   * MigrateFieldHandler implementations.
  protected $needsUpdate = FALSE;
   * Simple mappings between destination fields (keys) and source fields (values).
   * @var array
  protected $fieldMappings = array();
  public function getFieldMappings() {
    return $this->fieldMappings;
   * An array of counts. Initially used for cache hit/miss tracking.
   * @var array
  protected $counts = array();
   * When performing a bulkRollback(), the maximum number of items to pass in
   * a single call. Can be overridden in derived class constructor.
   * @var int
  protected $rollbackBatchSize = 500;
   * If present, an array with keys name and alias (optional). Name refers to
   * the source columns used for tracking highwater marks. alias is an
   * optional table alias.
   * @var array
  protected $highwaterField = array();
  public function getHighwaterField() {
    return $this->highwaterField;
   * The object currently being constructed
   * @var stdClass
  protected $values;

   * General initialization of a Migration object.
  public function __construct() {

   * Deregister a migration - remove all traces of it from the database (without
   * touching any content which was created by this migration).
   * We'd like to do this at uninstall time, but the implementing module is
   * already disabled, so we can't instantiate it to get at the map. This can
   * be done in hook_disable(), however.
   * @param string $machine_name
  static public function deregisterMigration($machine_name) {
    // Remove map and message tables
    $migration = self::getInstance($machine_name);

    // TODO: Clear log entries? Or keep for historical purposes?

    // Call the parent deregistration (which clears migrate_status) last, the
    // above will reference it.

  // Processing

   * Add a mapping for a destination field, specifying a source field and/or
   * a default value.
   * @param string $destinationField
   *  Name of the destination field.
   * @param string $sourceField
   *  Name of the source field (optional).
  protected function addFieldMapping($destination_field, $source_field = NULL) {
    // Warn of duplicate mappings
    if (!is_null($destination_field) && isset($this->fieldMappings[$destination_field])) {
        t('!name addFieldMapping: !dest was previously mapped, overridden',
          array('!name' => $this->machineName, '!dest' => $destination_field)),
    $mapping = new MigrateFieldMapping($destination_field, $source_field);
    if (is_null($destination_field)) {
      $this->fieldMappings[] = $mapping;
    else {
      $this->fieldMappings[$destination_field] = $mapping;
   * Remove any existing mappings for a given destination or source field.
   * @param string $destination_field
   *  Name of the destination field.
   * @param string $source_field
   *  Name of the source field.
  protected function removeFieldMapping($destination_field, $source_field = NULL) {
    if (isset($destination_field)) {
    if (isset($source_field)) {
      foreach ($this->fieldMappings as $key => $mapping) {
        if ($mapping->getSourceField() == $source_field) {

   * Reports whether this migration process is complete (i.e., all available
   * source rows have been processed).
  public function isComplete() {
    $total = $this->source->count(TRUE);
    $imported = $this->importedCount();
    $errors = $this->errorCount();
    return $total <= ($imported + $errors);
   * Override MigrationBase::beginProcess, to make sure the map/message tables
   * are present.
   * @param int $newStatus
  protected function beginProcess($newStatus) {

    // Do some standard setup
    if (isset($this->options['feedback']) && isset($this->options['feedback']['frequency']) &&
        isset($this->options['feedback']['frequency_unit'])) {
      $this->frequency = $this->options['feedback']['frequency'];
      $this->frequency_unit = $this->options['feedback']['frequency_unit'];
    $this->lastfeedback = $this->starttime;

    $this->total_processed = $this->total_successes =
      $this->processed_since_feedback = $this->successes_since_feedback = 0;

    // Call pre-process methods
    if ($this->status == Migration::STATUS_IMPORTING) {
      if (method_exists($this->destination, 'preImport')) {
      if (method_exists($this->destination, 'preRollback')) {
   * Override MigrationBase::endProcess, to call post hooks. Note that it must
   * be public to be callable as the shutdown function.
  public function endProcess() {
    // Call post-process methods
    if ($this->status == Migration::STATUS_IMPORTING) {
      if (method_exists($this->destination, 'postImport')) {
      if (method_exists($this->destination, 'postRollback')) {
   * Perform a rollback operation - remove migrated items from the destination.
    $return = MigrationBase::RESULT_COMPLETED;
    $itemlimit = $this->getOption('itemlimit');
    if (method_exists($this->destination, 'bulkRollback')) {
      // Too many at once can lead to memory issues, so batch 'em up
      $destids = array();
      $sourceids = array();
      $batch_count = 0;
      foreach ($this->map as $destination_key) {
        if (($return = $this->checkStatus()) != MigrationBase::RESULT_COMPLETED) {
        if ($itemlimit && ($this->total_processed + $batch_count >= $itemlimit)) {
        $this->currentSourceKey = $this->map->getCurrentKey();

        // Note that bulk rollback is only supported for single-column keys
        $sourceids[] = $this->currentSourceKey;
        if ($batch_count >= $this->rollbackBatchSize) {
            if ($this->systemOfRecord == Migration::SOURCE) {
              migrate_instrument_start('destination bulkRollback');
              migrate_instrument_stop('destination bulkRollback');
            // Keep track in case of interruption
            migrate_instrument_start('rollback map/message update');
            migrate_instrument_stop('rollback map/message update');
            $this->total_successes += $batch_count;
            $this->successes_since_feedback += $batch_count;
          catch (Exception $e) {
            migrate_instrument_stop('rollback map/message update');
          $destids = array();
          $sourceids = array();
          $batch_count = 0;
          // Will increment even if there was an exception... But we don't
          // really have a way to know how many really were successfully rolled back
          $this->total_processed += $batch_count;
          $this->processed_since_feedback += $batch_count;
      if ($batch_count > 0) {
        if ($this->systemOfRecord == Migration::SOURCE) {
          migrate_instrument_start('destination bulkRollback');
          migrate_instrument_stop('destination bulkRollback');
          $this->total_processed += $batch_count;
          $this->total_successes += $batch_count;
          $this->processed_since_feedback += $batch_count;
          $this->successes_since_feedback += $batch_count;
        migrate_instrument_start('rollback map/message update');
        migrate_instrument_stop('rollback map/message update');
    else {
      foreach ($this->map as $destination_key) {
        if (($return = $this->checkStatus()) != MigrationBase::RESULT_COMPLETED) {
        if ($itemlimit && ($this->total_processed >= $itemlimit)) {
        $this->currentSourceKey = $this->map->getCurrentKey();
        // Rollback one record
        try {
          if ($this->systemOfRecord == Migration::SOURCE) {
            migrate_instrument_start('destination rollback');
            migrate_instrument_stop('destination rollback');
          migrate_instrument_start('rollback map/message update');
          migrate_instrument_stop('rollback map/message update');
        catch (Exception $e) {
          // TODO: At least count failures
    // If we're using highwater marks, reset at completion of a full rollback
    // TODO: What about partial rollbacks? Probably little we can do to make
    // that work cleanly...
    if ($this->highwaterField) {
      $this->saveHighwater('', TRUE);

   * Perform an import operation - migrate items from source to destination.
    $return = MigrationBase::RESULT_COMPLETED;
      if (($return = $this->checkStatus()) != MigrationBase::RESULT_COMPLETED) {
      $this->currentSourceKey = $this->source->getCurrentKey();
      $this->values = $this->applyMappings($data_row);
        // Wipe old messages
        $this->map->delete($this->currentSourceKey, TRUE);

        migrate_instrument_start('destination import', TRUE);
        $ids = $this->destination->import($this->values, $data_row);
        migrate_instrument_stop('destination import');
        if ($ids) {
          $this->map->saveIDMapping($data_row, $ids, $this->needsUpdate);
        else {
          $message = t('New object was not saved, no error provided');
      catch (MigrateException $e) {
        $this->saveMessage($e->getMessage(), $e->getLevel());
      catch (Exception $e) {
      if ($this->highwaterField) {
      // Reset row properties.
      $this->needsUpdate = FALSE;
      // TODO: Temporary. Remove when is committed.
      // TODO: Should be done in MigrateDestinationEntity
      if (!empty($this->destination->entityType)) {

  // Utility methods

   * Convenience function to return count of total source records
   * @param boolean $refresh
   *  Pass TRUE to refresh the cached count.
  public function sourceCount($refresh = FALSE) {
    return $this->source->count($refresh);

   * Get the number of records successfully imported.
   * @return int
   *  Number of imported records.
  public function importedCount() {
    return $this->map->importedCount();

   * Get the number of source records which failed to import.
   * TODO: Doesn't yet account for informationals, or multiple errors for
   * a source record.
   * @return int
   *  Number of records errored out.
  public function errorCount() {
    return $this->map->errorCount();
   * Get the number of messages associated with this migration
   * @return int
   *  Number of messages.
    return $this->map->messageCount();
   * Prepares this migration to run as an update - that is, in addition to
   * unmigrated content (source records not in the map table) being imported,
   * previously-migrated content will also be updated in place.
  public function prepareUpdate() {

   * Outputs a progress message, reflecting the current status of a migration process.
   * @param int $result
   *  Status of the process, represented by one of the Migration::RESULT_* constants.
  protected function progressMessage($result) {
    // In the INCOMPLETE (feedback) case, only proceed under the proper conditions
    if ($result == Migration::RESULT_INCOMPLETE) {
      if (isset($this->frequency)) {
        if (($this->frequency_unit == 'seconds' && time()-$this->lastfeedback >= $this->frequency) ||
            ($this->frequency_unit == 'items' && $this->processed_since_feedback >= $this->frequency)) {
          // Fall through
        else {
      else {

    $time = microtime(TRUE) - $this->lastfeedback;
    if ($time > 0) {
      $perminute = round(60*$this->processed_since_feedback/$time);
      $time = round($time, 1);
    else {
      $perminute = '?';

    if ($this->status == Migration::STATUS_IMPORTING) {
      switch ($result) {
        case Migration::RESULT_COMPLETED:
          $basetext = "Imported !successes (!failed failed) in !time sec (!perminute/min) - done with '!name'";
          $type = 'completed';
        case Migration::RESULT_FAILED:
          $basetext = "Imported !successes (!failed failed) in !time sec (!perminute/min) - failure with '!name'";
          $type = 'failed';
        case Migration::RESULT_INCOMPLETE:
          $basetext = "Imported !successes (!failed failed) in !time sec (!perminute/min) - continuing with '!name'";
          $type = 'ok';
        case Migration::RESULT_STOPPED:
          $basetext = "Imported !successes (!failed failed) in !time sec (!perminute/min) - stopped '!name'";
          $type = 'warning';
    else {
      switch ($result) {
        case Migration::RESULT_COMPLETED:
          $basetext = "Rolled back !numitems in !time sec (!perminute/min) - done with '!name'";
          $type = 'completed';
        case Migration::RESULT_FAILED:
          $basetext = "Rolled back !numitems in !time sec (!perminute/min) - failure with '!name'";
          $type = 'failed';
        case Migration::RESULT_INCOMPLETE:
          $basetext = "Rolled back !numitems in !time sec (!perminute/min) - continuing with '!name'";
          $type = 'ok';
        case Migration::RESULT_STOPPED:
          $basetext = "Rolled back !numitems in !time sec (!perminute/min) - stopped '!name'";
          $type = 'warning';
    $message = t($basetext,
        array('!numitems' => $this->processed_since_feedback,
              '!successes' => $this->successes_since_feedback,
              '!failed' => $this->processed_since_feedback - $this->successes_since_feedback,
              '!time' => $time,
              '!perminute' => $perminute,
              '!name' => $this->machineName));
    $this->showMessage($message, $type);

    // Report on lookup_cache hit rate. Only visible at 'debug' level.
    if ($result != Migration::RESULT_INCOMPLETE && !empty($this->counts['lookup_cache'])) {
      foreach ($this->counts['lookup_cache'] as $name => $tallies) {
        $tallies += array('hit' => 0, 'miss_hit' => 0, 'miss_miss' => 0); // Set defaults to avoid NOTICE.
        $sum = $tallies['hit']+$tallies['miss_hit']+$tallies['miss_miss'];
          t('Lookup cache: !mn SM=!name !hit hit, !miss_hit miss_hit, !miss_miss miss_miss (!total total).', array(
                '!mn' => $this->machineName,
                '!name' => $name,
                '!hit' => round((100*$tallies['hit'])/$sum) . '%',
                '!miss_hit' => round((100*$tallies['miss_hit'])/$sum) . '%',
                '!miss_miss' => round((100*$tallies['miss_miss'])/$sum) . '%',
                '!total' => $sum
        )), 'debug');
      $this->counts['lookup_cache'] = array();
    if ($result == Migration::RESULT_INCOMPLETE) {
      $this->lastfeedback = time();
      $this->processed_since_feedback = $this->successes_since_feedback = 0;
   * Standard top-of-loop stuff, common between rollback and import - check
   * for exceptional conditions, and display feedback.
  protected function checkStatus() {
    if ($this->memoryExceeded()) {
      return MigrationBase::RESULT_INCOMPLETE;
    if ($this->timeExceeded()) {
      return MigrationBase::RESULT_INCOMPLETE;
    if ($this->getStatus() == Migration::STATUS_STOPPING) {
      return MigrationBase::RESULT_STOPPED;
    return MigrationBase::RESULT_COMPLETED;
   * Apply field mappings to a data row received from the source, returning
   * a populated destination object.
   * @param stdClass $data_row
  protected function applyMappings(stdClass $data_row) {
    // Apply mappings.
    $values = new stdClass;
    foreach ($this->fieldMappings as $mapping) {
      $destination = $mapping->getDestinationField();
      // Skip mappings with no destination (source fields marked DNM)
      if ($destination) {
        $source = $mapping->getSourceField();
        $default = $mapping->getDefaultValue();
        // If there's a source mapping, and a source value in the data row, copy
        // to the destination
        if ($source && isset($data_row->$source)) {
          $values->$destination = $data_row->$source;
        // Otherwise, apply the default value (if any)
        elseif (!is_null($default)) {
          $values->$destination = $default;

        // If there's a separator specified for this destination, then it
        // will be populated as an array exploded from the source value
        $separator = $mapping->getSeparator();
        if ($separator && isset($values->$destination)) {
          $values->$destination = explode($separator, $values->$destination);

        // If a source migration is supplied, use the current value for this field
        // to look up a destination ID from the provided migration
        $source_migration = $mapping->getSourceMigration();
        if ($source_migration && isset($values->$destination)) {
          $values->$destination = $this->handleSourceMigration($source_migration, $values->$destination, $default);
        // If specified, assure a unique value for this property.
        $dedupe = $mapping->getDedupe();
        if ($dedupe && isset($values->$destination)) {
          $values->$destination = $this->handleDedupe($dedupe, $values->$destination, $mapping);

        // Assign any arguments
        if (isset($values->$destination)) {
          $arguments = $mapping->getArguments();
          if ($arguments) {
            if (!is_array($values->$destination)) {
              $values->$destination = array($values->$destination);
            // TODO: Stuffing arguments into the destination field is gross - can
            // we come up with a better way to communicate them to the field
            // handlers?
            $values->{$destination}['arguments'] = array();
            foreach ($arguments as $argname => $destarg) {
              if (is_array($destarg) && isset($destarg['source_field']) && isset($data_row->$destarg['source_field'])) {
                $values->{$destination}['arguments'][$argname] = $data_row->$destarg['source_field'];
              elseif (is_array($destarg) && isset($destarg['default_value'])) {
                $values->{$destination}['arguments'][$argname] = $destarg['default_value'];
              else {
                $values->{$destination}['arguments'][$argname] = $destarg;

        // When we're updating existing nodes, if there is a source mapping but there
        // was no value for this row, add a null destination value so it gets removed
        // from the node
        if ($this->systemOfRecord == Migration::DESTINATION && $source && !isset($values->$destination)) {
          $values->$destination = NULL;
   * Look up a value migrated in another migration.
  protected function handleSourceMigration($source_migration, $source_values, $default = NULL) {
Mike Ryan's avatar
Mike Ryan committed
    $source_migration = Migration::getInstance($source_migration);
    // Might already be an array, after separator processing
    if (is_array($source_values)) {
      $source_keys = $source_values;
    else {
      $source_keys = array($source_values);
    $results = array();
    foreach ($source_keys as $source_key) {
      $destids = $source_migration->getMap()->lookupDestinationID(array($source_key), $this);
      if (!$destids) {
        // No match found, give the implementing migration a chance to create
        // a stub for us to reference (presumably the real object will be
        // migrated later in the process)
        $destids = $source_migration->createStubWrapper(array($source_key));
      if ($destids) {
        // Assume that if the destination key is a single value, it
        // should be passed as such
        if (count($destids) == 1) {
          $results[] = reset($destids);
        else {
          $results[] = $destids;
      // If no match found, apply the default value (if any)
      elseif (!is_null($default)) {
        $results[] = $default;
    if (is_array($source_values) || count($results) > 1) {
      return $results;
    else {
      return $results[0];
   * Assign a non-existing value for current mapping.
  protected function handleDedupe($dedupe, $original, $mapping) {
    $sql = "SELECT 1 FROM {%s} WHERE %s='%s'";
    $i = 1;
    $candidate = $original;
    while ($count = db_result(db_query_range($sql, $dedupe['table'], $dedupe['column'], $candidate, 0, 1))) {
      // We already have the candidate value. Find a non-existing value.
      // @TODO: support custom replacement pattern instead of just append.
      $candidate = $original . '_' . $i;
    if ($i > 1) {
      $message = t('Replacing !column !original with !candidate',
                 array('!column' => $dedupe['column'],
                       '!original' => $original,
                       '!candidate' => $candidate));
      $migration = Migration::currentMigration();
      $migration->saveMessage($message, Migration::MESSAGE_INFORMATIONAL);
    return $candidate;
   * If stub creation is enabled, try to create a stub and save the mapping.
  protected function createStubWrapper(array $source_key) {
    if (method_exists($this, 'createStub')) {
      $destids = $this->createStub();
      if ($destids) {
        // Fake a data row with the source key in it
        $map_source_key = $this->map->getSourceKey();
        $data_row = new stdClass;
        $i = 0;
        foreach ($map_source_key as $key => $definition) {
          $data_row->$key = $source_key[$i++];
        $this->map->saveIDMapping($data_row, $destids, TRUE);
    else {
      $destids = NULL;
    return $destids;

   * Pass messages through to the map class
   * @param string $message
   *  The message to record.
   * @param int $level
   *  Optional message severity (defaults to MESSAGE_ERROR).
  public function saveMessage($message, $level = MigrationBase::MESSAGE_ERROR) {
    $output = $this->map->saveMessage($this->currentSourceKey, $message, $level);
   * Set the specified row to be updated, if it exists.
  public function setUpdate(array $source_key = NULL) {
    if (!$source_key) {
      $source_key = $this->currentSourceKey;

 * Convenience class - deriving from this rather than directory from Migration
 * ensures that a class will not be registered as a migration itself - it is
 * the implementor's responsibility to register each instance of a dynamic
 * migration class.
abstract class DynamicMigration extends Migration {
   * Overrides default of FALSE
  static public function isDynamic() {
    return TRUE;