Skip to content
sql.inc 13.6 KiB
Newer Older
<?php
// $Id$

/**
 * @file
 * Define a MigrateSource for importing from Drupal connections
 */

/**
 * Implementation of MigrateSource, to handle imports from Drupal connections.
 */
class MigrateSourceSQL extends MigrateSource {
  /**
   * The SQL query objects from which to obtain data, and counts of data
   *
   * @var SelectQuery
   */
  protected $originalQuery, $query, $countQuery;

  /**
   * The result object from executing the query - traversed to process the
   * incoming data.
   *
   * @var DatabaseStatementInterface
   */
  protected $result;

  /**
   * Number of eligible rows processed so far (used for itemlimit checking)
   *
   * @var int
   */
  protected $numProcessed = 0;

  /**
   * List of available source fields.
   *
   * @var array
   */
  protected $fields = array();

  /**
   * If the map is a MigrateSQLMap, and the table is compatible with the
   * source query, we can join directly to the map and make things much faster
   * and simpler.
   *
   * @var boolean
   */
  protected $mapJoinable = FALSE;
  // Dynamically set whether the map is joinable - not really for production use,
  // this is primarily to support simpletests
  public function setMapJoinable($map_joinable) {
    $this->mapJoinable = $map_joinable;
  }
  /**
   * Whether this source is configured to use a highwater mark, and there is
   * a highwater mark present to use.
   *
   * @var boolean
   */
  protected $usingHighwater = FALSE;

  /**
   * Simple initialization.
   *
   * @param SelectQuery $query
   *  The query we are iterating over.
   * @param array $fields
   *  Optional - keys are field names, values are descriptions. Use to override
   *  the default descriptions, or to add additional source fields which the
   *  migration will add via other means (e.g., prepareRow()).
   * @param SelectQuery $count_query
   *  Optional - an explicit count query, primarily used when counting the
   *  primary query is slow.
   * @param boolean $map_joinable
   *  Indicates whether the map table can be joined directly to the source query.
  public function __construct(SelectQuery $query,
      array $fields = array(), SelectQuery $count_query = NULL, $map_joinable = NULL) {
    $this->originalQuery = $query;
    $this->query = clone $query;
    if (is_null($count_query)) {
      $this->countQuery = clone $query->countQuery();
    }
    else {
      $this->countQuery = $count_query;
    }

    if (!is_null($map_joinable)) {
      $this->mapJoinable = $map_joinable;
    }
    else {
      // TODO: We want to automatically determine if the map table can be joined
      // directly to the query, but this won't work unless/until
      // http://drupal.org/node/802514 is committed, assume joinable for now
      $this->mapJoinable = TRUE;
/*      // To be able to join the map directly, it must be a PDO map on the same
      // connection, or a compatible connection
      if (is_a($map, 'MigrateSQLMap')) {
        $map_options = $map->getConnection()->getConnectionOptions();
        $query_options = $this->query->connection()->getConnectionOptions();

        // Identical options means it will work
        if ($map_options == $query_options) {
          $this->mapJoinable = TRUE;
        }
        else {
          // Otherwise, the one scenario we know will work is if it's MySQL and
          // the credentials match (SQLite too?)
          if ($map_options['driver'] == 'mysql' && $query_options['driver'] == 'mysql') {
            if ($map_options['host'] == $query_options['host'] &&
                $map_options['port'] == $query_options['port'] &&
                $map_options['username'] == $query_options['username'] &&
                $map_options['password'] == $query_options['password']) {
              $this->mapJoinable = TRUE;
            }
          }
        }
      }*/
    }
  /**
   * Return a string representing the source query.
   *
   * @return string
   */
  public function __toString() {
    return (string) $this->query;
  }

  /**
   * Returns a list of fields available to be mapped from the source query.
   *
   * @return array
   *  Keys: machine names of the fields (to be passed to addFieldMapping)
   *  Values: Human-friendly descriptions of the fields.
   */
  public function fields() {
    $fields = array();
    $queryFields = $this->query->getFields();
    if ($queryFields) {
      // Not much we can do in terms of describing the fields without manual intervention
      foreach ($queryFields as $field_name => $field_info) {
        // Lower case, because Drupal forces lowercase on fetch
        $fields[drupal_strtolower($field_name)] = drupal_strtolower(
          $field_info['table'] . '.' . $field_info['field']);
      }

/*
 * Handle queries without explicit field lists
 * TODO: Waiting on http://drupal.org/node/814312
    $info = Database::getConnectionInfo($query->getConnection());
    $database = $info['default']['database'];
    foreach ($this->query->getTables() as $table) {
      if (isset($table['all_fields']) && $table['all_fields']) {

        $database = 'plants';
        $table = $table['table'];
        $sql = 'SELECT column_name
                FROM information_schema.columns
                WHERE table_schema=:database AND table_name = :table
                ORDER BY ordinal_position';
        $result = dbtng_query($sql, array(':database' => $database, ':table' => $table));
        foreach ($result as $row) {
          $fields[drupal_strtolower($row->column_name)] = drupal_strtolower(
            $table . '.' . $row->column_name);
        }
      }
    }*/
    $expressionFields = $this->query->getExpressions();
    foreach ($expressionFields as $field_name => $field_info) {
      // Lower case, because Drupal forces lowercase on fetch
      $fields[drupal_strtolower($field_name)] = drupal_strtolower($field_info['alias']);
    }

    // Any caller-specified fields with the same names as extracted fields will
    // override them; any others will be added
    if ($this->fields) {
      $fields = $this->fields + $fields;
    return $fields;
  }

  /**
   * Return a count of all available source records.
   *
   * @param boolean $refresh
   *  If TRUE, or if there is no cached count, perform a SQL COUNT query to
   *  retrieve and cache the number of rows in the query. Otherwise, return
   *  the last cached value.
   *
   *  TODO: Implement caching
   */
  public function count($refresh = FALSE) {
    $count = $this->countQuery->execute()->fetchField();
    return $count;
  }

  /**
   * Implementation of Iterator::rewind() - called before beginning a foreach loop.
  public function rewind() {
    $migration = Migration::currentMigration();
    $this->result = NULL;
    $this->currentRow = NULL;
    $this->numProcessed = 0;
    $this->query = clone $this->originalQuery;
    // Get the keys, for potential use in joining map/message table, or enforcing
    // idlist
    $keys = array();
    foreach ($map->getSourceKey() as $field_name => $field_schema) {
      if (isset($field_schema['alias'])) {
        $field_name = $field_schema['alias'] . '.' . $field_name;
      }
      $keys[] = $field_name;
    }
    // The rules for determining what conditions to add to the query are as
    // follows (applying first applicable rule)
    // 1. If idlist is provided, then only process items in that list (AND key IN (idlist))
    // 2. Otherwise, If there's no highwater mark, process items which are not in the map table, or
    //    are marked needs_update=1 in the map table
    //    a. mapJoinable: AND (key not in map OR map.needs_update=1)
    //    b. !mapJoinable: Apply map logic in next()
    // 3. If there is a highwater mark (and no idlist), process items above the highwater mark, plus any
    //      items with needs_update=1
    //    For now, we set mapJoinable=FALSE in this case, until we work out exactly
    //    how to manipulate the query.
    if ($migration->getOption('idlist')) {
      // Only supp for single-field key
      $this->query->condition($keys[0], explode(',', $migration->getOption('idlist')), 'IN');
      $this->usingHighwater = FALSE;
    }
    else {
      $highwaterField = $migration->getHighwaterField();
      $this->usingHighwater = isset($highwaterField['name']);
      if ($this->usingHighwater) {
        $this->mapJoinable = FALSE;
      }
      if ($this->mapJoinable) {
        // Build the joins to the map and message tables. Because the source key
        // could have multiple fields, we need to build things up.
        // The logic is that we want to include all source rows which have no
        // existing map or message table entries, or which have map table entry
        // marked with needs_update=1. Except with highwater marks, where we don't care
        // about the existence of map/message table entries, only needs_update.
        $first = TRUE;
        $map_join = $msg_join = $map_condition = $msg_condition = '';
        $count = 1;
        foreach ($map->getSourceKey() as $field_name => $field_schema) {
          if (isset($field_schema['alias'])) {
            $field_name = $field_schema['alias'] . '.' . $field_name;
          }
          $map_key = 'sourceid' . $count++;
          if ($first) {
            $first = FALSE;
          }
          else {
            $map_join .= ' AND ';
            if (!$this->usingHighwater) {
              $msg_join .= ' AND ';
              $map_condition .= ' OR ';
              $msg_condition .= ' OR ';
            }
          }
          $map_join .= "$field_name = map.$map_key";
          if (!$this->usingHighwater) {
            $msg_join .= "$field_name = msg.$map_key";
            $map_condition .= "map.$map_key IS NULL";
            $msg_condition .= "msg.$map_key IS NULL";
          }
        $this->query->leftJoin($map->getMapTable(), 'map', $map_join);
        $map_condition .= ' OR map.needs_update = 1';
        $this->query->leftJoin($map->getMessageTable(), 'msg', $msg_join);
        $this->query->where($map_condition);
        $this->query->where($msg_condition);

        // And as long as we have the map table, get the destination ID, the
        // import hook will need it to identify the existing destination object.
        // Alias to reduce possible collisions.
        $count = 1;
        foreach ($map->getDestinationKey() as $field_name => $field_schema) {
          $map_key = 'destid' . $count++;
          $this->query->addField('map', $map_key, "migrate_map_$map_key");
        $this->query->addField('map', 'needs_update');
        if ($migration->getOption('itemlimit')) {
          $this->query->range(0, $migration->getOption('itemlimit'));
Mike Ryan's avatar
Mike Ryan committed

    migrate_instrument_start('MigrateSourceSQL execute');
    $migration->showMessage(dpq($this->query), 'debug');
    $this->result = $this->query->execute();
    migrate_instrument_stop('MigrateSourceSQL execute');

    // Load up the first row
    $this->next();
  }

  /**
   * Implementation of Iterator::next() - called at the bottom of the loop implicitly,
   * as well as explicitly from rewind().
   */
  public function next() {
    $migration = Migration::currentMigration();
    $this->currentRow = NULL;
    $this->currentKey = NULL;
    // If we couldn't add the itemlimit to the query directly, enforce it here
    if (!$this->mapJoinable) {
      $itemlimit = $migration->getOption('itemlimit');
      if ($itemlimit && $this->numProcessed >= $itemlimit) {
      }
    }

    // get next row
    migrate_instrument_start('MigrateSourceSQL next');
    while ($this->currentRow = $this->result->fetchObject()) {
      foreach ($map->getSourceKey() as $field_name => $field_schema) {
        $this->currentKey[$field_name] = $this->currentRow->$field_name;
      }
      if (!$this->mapJoinable) {
        $map_row = $migration->getMap()->getRowBySource($this->currentKey);
        if (!$map_row) {
          // Unmigrated row, take it
        }
        elseif ($map_row && $map_row['needs_update'] == 1) {
          // We always want to take this row if needs_update = 1
        }
        else {
          if ($this->usingHighwater) {
            // With highwater, we want to take this row if it's above the highwater
            // mark
            $highwaterField = $migration->getHighwaterField();
            $highwaterField = $highwaterField['name'];
            if ($this->currentRow->$highwaterField <= $migration->getHighwater()) {
              continue;
            }
          }
          else {
            // With no highwater, we want to take this row if it's not in the map table
            if ($map_row) {
              continue;
            }
          }
        }
        // Add map info to the row, if present
        if ($map_row) {
          foreach ($map_row as $field => $value) {
            $field = 'migrate_map_' . $field;
            $this->currentRow->$field = $value;
Mike Ryan's avatar
Mike Ryan committed

      // Add some debugging, just for the first row. Requires --debug
      if (empty($this->numProcessed)) {
Mike Ryan's avatar
Mike Ryan committed
        $migration->showMessage(print_r($this->currentRow, TRUE));
Mike Ryan's avatar
Mike Ryan committed
      // Allow the Migration to prepare this row. prepareRow() can return boolean
      // FALSE to stop processing this row. To add/modify fields on the
      // result, modify $row by reference.
      $return = TRUE;
      if (method_exists($migration, 'prepareRow')) {
        $return = $migration->prepareRow($this->currentRow);
Mike Ryan's avatar
Mike Ryan committed

    }
    if (!is_object($this->currentRow)) {
      $this->currentRow = NULL;
    }
    migrate_instrument_stop('MigrateSourceSQL next');
  }