Skip to content
migrate.module 54.7 KiB
Newer Older
define('MIGRATE_ACCESS_BASIC', 'basic migration tools');
define('MIGRATE_ACCESS_ADVANCED', 'advanced migration tools');

Mike Ryan's avatar
Mike Ryan committed
define('MIGRATE_MESSAGE_ERROR', 1);
define('MIGRATE_MESSAGE_WARNING', 2);
define('MIGRATE_MESSAGE_NOTICE', 3);
define('MIGRATE_MESSAGE_INFORMATIONAL', 4);

define('MIGRATE_STATUS_IDLE', 0);
define('MIGRATE_STATUS_IMPORTING', 1);
define('MIGRATE_STATUS_CLEARING', 2);

define('MIGRATE_RESULT_COMPLETED', 1);
define('MIGRATE_RESULT_INCOMPLETE', 2);
define('MIGRATE_RESULT_STOPPED', 3);
define('MIGRATE_RESULT_FAILED', 4);
define('MIGRATE_RESULT_IN_PROGRESS', 5);

define('MIGRATE_MEMORY_THRESHOLD', .8);
/**
 * @file
 * This module provides tools at "administer >> content >> migrate"
 * for analyzing data from various sources and importing them into Drupal tables.
 */

/**
 * Call a migrate hook. Like module_invoke_all, but gives modules a chance
 * to do one-time initialization before any of their hooks are called, and
 * adds "migrate" to the hook name.
 * @param $hook
 *  Hook to invoke (e.g., 'types', 'fields_node', etc.)
 * @return
 *  Merged array of results.
function migrate_invoke_all($hook) {
  // Let modules do any one-time initialization (e.g., including migration support files)
  global $_migrate_inited;
  if (!isset($_migrate_inited)) {
    module_invoke_all('migrate_init');
    $_migrate_inited = TRUE;
  $hookfunc = "migrate" . "_$hook";
  unset($args[0]);
  $return = array();
  $modulelist = module_implements($hookfunc);
  foreach ($modulelist as $module) {
    $function = $module . '_' . $hookfunc;
    $result = call_user_func_array($function, $args);
    if (isset($result) && is_array($result)) {
      $return = array_merge_recursive($return, $result);
    }
    elseif (isset($result)) {
/**
 * Call a destination hook (e.g., hook_migrate_prepare_node). Use this version
 * for hooks with the precise signature below, so that the object can be passed by
 * reference.
 *
 * @param $hook
 *  Hook to invoke (e.g., 'types', 'fields_node', etc.)
 *  Destination object being built - passed by reference, so hooks can modify it
 *  Metadata about the content set
 *  The raw source row.
 *  Merged array of results.
 */
function migrate_destination_invoke_all($hook, &$object, $tblinfo, $row) {
  // Let modules do any one-time initialization (e.g., including migration support files)
  global $_migrate_inited;
  if (!isset($_migrate_inited)) {
    module_invoke_all('migrate_init');
    $_migrate_inited = TRUE;
  }

  // We could have used module_invoke_all, but unfortunately
  // module_invoke_all passes all arguments by value.
  $errors = array();
  $hook = 'migrate_' . $hook;
  foreach (module_implements($hook) as $module_name) {
    $function = $module_name . '_' . $hook;
    if (function_exists($function)) {
      timer_start($function);
      $errors = array_merge($errors, (array)$function($object, $tblinfo, $row));
      timer_stop($function);
    }
  }
  return $errors;
}

/**
 * Save a new or updated content set
 *
 * @param $content_set
 *  An array or object representing the content set. This is passed by reference (so
 *  when adding a new content set the ID can be set)
 * @param $options
 *  Array of additional options for saving the content set. Currently:
 *    base_table: The base table of the view - if provided, we don't need
 *                to load the view.
 *    base_database: The database of the base table - if base_table is present
 *                and base_database omitted, it defaults to 'default'
 *  The ID of the content set that was saved, or NULL if nothing was saved
function migrate_save_content_set(&$content_set, $options = array()) {
  // Deal with objects internally (but remember if we need to put the parameter
  // back to an array)
  if (is_array($content_set)) {
    $content_set = (object) $content_set;
    $was_array = FALSE;
  // Update or insert the content set record as appropriate
  if (isset($content_set->mcsid)) {
    // If machine_name changes, need to rename the map/message tables
    $old_machine_name = db_query("SELECT machine_name FROM {migrate_content_sets}
                                  WHERE mcsid=%d", $content_set->mcsid);
    if ($old_machine_name != $content_set->machine_name) {
      $old_maptablename = migrate_map_table_name($content_set->mcsid);
      $old_msgtablename = migrate_message_table_name($content_set->mcsid);
    }
    drupal_write_record('migrate_content_sets', $content_set, 'mcsid');
    if (isset($old_maptablename) && db_table_exists($old_maptablename)) {
      $ret = array();
      $new_maptablename = migrate_map_table_name($content_set->mcsid);
      db_rename_table($ret, $old_maptablename, $new_maptablename);
      $schema_change = TRUE;
    }
    if (isset($old_msgtablename) && db_table_exists($old_msgtablename)) {
      $ret = array();
      $new_msgtablename = migrate_message_table_name($content_set->mcsid);
      db_rename_table($ret, $old_msgtablename, $new_msgtablename);
      $schema_change = TRUE;
    }
    drupal_write_record('migrate_content_sets', $content_set);
  }
  // Create or modify map and message tables
  $maptablename = migrate_map_table_name($content_set->mcsid);
  $msgtablename = migrate_message_table_name($content_set->mcsid);
Mike Ryan's avatar
Mike Ryan committed
  // TODO: For now, PK must be in base_table
  // If the caller tells us the base table of the view, we don't need
  // to load the view (which would not work when called from hook_install())
  if (isset($options['base_table'])) {
    $tablename = $options['base_table'];
    if (isset($options['base_database'])) {
      $tabledb = $options['base_database'];
    }
    else {
      $tabledb = 'default';
    }
  }
  else {
    // Get the proper field definition for the sourcekey
    $view = views_get_view($content_set->view_name);
    if (!$view) {
      drupal_set_message(t('View !view does not exist - either (re)create this view, or
        remove the content set using it.', array('!view' => $content_set->view_name)));
      return NULL;
    }
    // Must do this to load the database
    $views_version = (string)views_api_version();
    if (views_api_version() >= '3') {
      $view->init_display('default');
    }
    if (isset($view->base_database)) {
      $tabledb = $view->base_database;
    }
    else {
      $tabledb = 'default';
    }
  $sourceschema = _migrate_inspect_schema($tablename, $tabledb);
  // If the PK of the content set is defined, make sure we have a mapping table
  if (isset($content_set->sourcekey) && $content_set->sourcekey) {
    $sourcefield = $sourceschema['fields'][$content_set->sourcekey];
    // The field name might be <table>_<column>...
    if (!$sourcefield) {
      $sourcekey = drupal_substr($content_set->sourcekey, drupal_strlen($tablename) + 1);
      $sourcefield = $sourceschema['fields'][$sourcekey];
    }
    // But - we don't want serial fields to behave serially, so change to int
    if ($sourcefield['type'] == 'serial') {
      $sourcefield['type'] = 'int';
    }

    if (!db_table_exists($maptablename)) {
      $schema = _migrate_map_table_schema($sourcefield);
      db_create_table($ret, $maptablename, $schema);
      // Expose map table to views
      if (module_exists('tw')) {
        tw_add_tables(array($maptablename));
        tw_add_fk($maptablename, 'destid');
      }
      $schema = _migrate_message_table_schema($sourcefield);
      db_create_table($ret, $msgtablename, $schema);
      // Expose messages table to views
      if (module_exists('tw')) {
        tw_add_tables(array($msgtablename));
        tw_add_fk($msgtablename, 'sourceid');
      }
      // TODO: Deal with varchar->int case where there is existing non-int data
      $desired_schema = _migrate_map_table_schema($sourcefield);
      $actual_schema = _migrate_inspect_schema($maptablename);
      if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
        $ret = array();
        db_drop_primary_key($ret, $maptablename);
        db_change_field($ret, $maptablename, 'sourceid', 'sourceid',
          $sourcefield, array('primary key' => array('sourceid')));
        if (module_exists('tw')) {
          tw_perform_analysis($maptablename);
        }
      }
      $desired_schema = _migrate_message_table_schema($sourcefield);
      $actual_schema = _migrate_inspect_schema($msgtablename);
      if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
        $ret = array();
        db_drop_index($ret, $msgtablename, 'sourceid');
        db_change_field($ret, $msgtablename, 'sourceid', 'sourceid',
          $sourcefield, array('indexes' => array('sourceid' => array('sourceid'))));
        if (module_exists('tw')) {
          tw_perform_analysis($maptablename);
        }
    // Make sure the schema gets updated to reflect changes
    if ($schema_change) {
      cache_clear_all('schema', 'cache');
    }
Mike Ryan's avatar
Mike Ryan committed
  }
  if ($was_array) {
    $content_set = (array)$content_set;
    return $content_set['mcsid'];
    return $content_set->mcsid;
/**
 * Save a new or updated content mapping
 *
 * @param $mapping
 *  An object representing the mapping. This is passed by reference (so
 *  when adding a new mapping the ID can be set)
 * @return
 *  The ID of the mapping that was saved, or NULL if nothing was saved
 */
function migrate_save_content_mapping(&$mapping) {
Mike Ryan's avatar
Mike Ryan committed
  if ($mapping->mcmid) {
    drupal_write_record('migrate_content_mappings', $mapping, 'mcmid');
    drupal_write_record('migrate_content_mappings', $mapping);
  return $mapping->mcmid;
/**
 * Delete the specified content set, including map and message tables.
 *
 * @param $mcsid
 *  Unique identifier of the content set to delete.
 */
function migrate_delete_content_set($mcsid) {
  // First, remove the map and message tables from the Table Wizard, and drop them
  $maptable = migrate_map_table_name($mcsid);
  $msgtable = migrate_message_table_name($mcsid);
  if (db_table_exists($maptable)) {
    tw_remove_tables(array($maptable, $msgtable));
    db_drop_table($ret, $maptable);
    db_drop_table($ret, $msgtable);
  }

  // Then, delete the content set data
  $sql = "DELETE FROM {migrate_content_mappings} WHERE mcsid=%d";
  db_query($sql, $mcsid);
  $sql = "DELETE FROM {migrate_content_sets} WHERE mcsid=%d";
  db_query($sql, $mcsid);
}

/**
 * Delete the specified content mapping.
 *
 * @param $mcmid
 *  Unique identifier of the mapping to delete.
 */
function migrate_delete_content_mapping($mcmid) {
  $sql = "DELETE FROM {migrate_content_mappings} WHERE mcmid=%d";
  db_query($sql, $mcmid);
}

/**
 * Convenience function for generating a message array
 *
 * @param $message
 *  Text describing the error condition
 * @param $type
 *  One of the MIGRATE_MESSAGE constants, identifying the level of error
 *  Structured array suitable for return from an import hook
function migrate_message($message, $type = MIGRATE_MESSAGE_ERROR) {
  $error = array(
    'level' => $type,
    'message' => $message,
  );
  return $error;
}

Mike Ryan's avatar
Mike Ryan committed
/**
 * Add a mapping from source ID to destination ID for the specified content set
 *
 * @param $mcsid
 *  ID of the content set being processed
 * @param $sourceid
 *  Primary key value from the source
 * @param $destid
 *  Primary key value from the destination
 */
function migrate_add_mapping($mcsid, $sourceid, $destid) {
  static $maptables = array();
  if (!isset($maptables[$mcsid])) {
    $maptables[$mcsid] = migrate_map_table_name($mcsid);
Mike Ryan's avatar
Mike Ryan committed
  }
  $needs_update = db_result(db_query('SELECT needs_update
                                      FROM {' . $maptables[$mcsid] . "}
                                      WHERE sourceid='%s'",
                                     $sourceid));
  if ($needs_update == 1) {
    db_query('UPDATE {' . $maptables[$mcsid] . "}
              WHERE sourceid='%s'",
             $sourceid);
  }
  elseif ($needs_update !== 0) {
    db_query('INSERT INTO {' . $maptables[$mcsid] . "}
              (sourceid, destid, needs_update)
              VALUES('%s', %d, 0)",
/**
 * Clear migrated objects from the specified content set
 *
 * @param $mcsid
 *  ID of the content set to clear
 * @param $options
 *  Keyed array of optional options:
 *    itemlimit - Maximum number of items to process
 *    timelimit - Unix timestamp after which to stop processing
 *    idlist - Comma-separated list of source IDs to process, instead of proceeding through
 *      all unmigrated rows
 *    feedback - Keyed array controlling status feedback to the caller
 *      function - PHP function to call, passing a message to be displayed
 *      frequency - How often to call the function
 *      frequency_unit - How to interpret frequency (items or seconds)
 *
 * @return
 *  Status of the migration process:
 */
function migrate_content_process_clear($mcsid, &$options = array()) {
  $itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL;
  $timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL;
  $idlist = isset($options['idlist']) ? $options['idlist'] : NULL;
  if (isset($options['feedback'])) {
    $feedback = $options['feedback']['function'];
    $frequency = isset($options['feedback']['frequency']) ?
                 $options['feedback']['frequency'] : NULL;
    $frequency_unit = isset($options['feedback']['frequency_unit']) ?
                      $options['feedback']['frequency_unit'] : NULL;
  $result = db_query("SELECT *
                      FROM {migrate_content_sets}
                      WHERE mcsid=%d", $mcsid);
  $tblinfo = db_fetch_object($result);
  $tblinfo->maptable = $maptable;

  $description = $tblinfo->description;
  if ($tblinfo->status != MIGRATE_STATUS_IDLE) {
    return MIGRATE_RESULT_IN_PROGRESS;
    db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d",
             MIGRATE_STATUS_CLEARING, $mcsid);
  $desttype = $tblinfo->desttype;
  $contenttype = $tblinfo->contenttype;
  $sourcekey = $tblinfo->sourcekey;
  $maptable = migrate_map_table_name($mcsid);
  $msgtablename = migrate_message_table_name($mcsid);
  $processstart = microtime(TRUE);
  $memory_limit = _migrate_memory_limit();
  // If this content set is set up to update existing content, we don't
  // want to delete the content on clear, just the map/message tables
  $sql = "SELECT srcfield FROM {migrate_content_mappings}
          WHERE mcsid=%d AND primary_key=1";
  $srcfield = db_result(db_query($sql, $mcsid));
  if ($srcfield) {
    $full_clear = FALSE;
  }
  else {
    $full_clear = TRUE;
  }

  // Assume success until proven otherwise
  $return = MIGRATE_RESULT_COMPLETED;
    $args = array_map('trim', explode(',', $idlist));
    if (is_numeric($args[0])) {
      $placeholders = db_placeholders($args, 'int');
    }
    else {
      $placeholders = db_placeholders($args, 'varchar');
    }
    $sql = "SELECT sourceid,destid FROM {" . $maptable . "} WHERE sourceid IN ($placeholders)";
    $sql = "SELECT sourceid,destid FROM {" . $maptable . "}";
  }

  timer_start('delete query');
    $deletelist = db_query_range($sql, $args, 0, $itemlimit);
    $deletelist = db_query($sql, $args);
  while ($row = db_fetch_object($deletelist)) {
    // Recheck status - permits dynamic interruption of jobs
    $sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d";
    $status = db_result(db_query($sql, $mcsid));
    if ($status != MIGRATE_STATUS_CLEARING) {
      $return = MIGRATE_RESULT_STOPPED;
      break;
    }
    // Check for time out if there is time info present
    if (isset($timelimit) && time() >= $timelimit) {
      $return = MIGRATE_RESULT_INCOMPLETE;
      break;
    }

    // Check for closeness to memory limit
    $usage = memory_get_usage();
    $pct_memory = $usage/$memory_limit;
    if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) {
      if (isset($feedback)) {
        $feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch',
                        array('!pct' => round($pct_memory*100), '!usage' => $usage, '!limit' => $memory_limit)));
      }
      $return = MIGRATE_RESULT_INCOMPLETE;
    if (isset($feedback)) {
      if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) ||
          ($frequency_unit == 'items' && $deleted >= $frequency)) {
        $message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, MIGRATE_RESULT_INCOMPLETE);
        $feedback($message);
        $lastfeedback = time();
        $deleted = 0;
    // @TODO: Should return success/failure. Problem: node_delete doesn't return anything...
    if ($full_clear) {
      timer_start('delete hooks');
      migrate_invoke_all("delete_$contenttype", $tblinfo, $row->destid);
    db_query("DELETE FROM {" . $maptable . "} WHERE sourceid='%s'", $row->sourceid);
    db_query("DELETE FROM {" . $msgtablename . "} WHERE sourceid='%s' AND level=%d",
      $row->sourceid, MIGRATE_MESSAGE_INFORMATIONAL);
    timer_stop('clear map/msg');
    $deleted++;
  }

  // Mark that we're done
  $sql = "UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d";
  db_query($sql, MIGRATE_STATUS_IDLE, $mcsid);
  // If we've completed a total clear, make sure all messages are gone
  if ($return == MIGRATE_RESULT_COMPLETED && !$idlist && !$itemlimit) {
    db_query('TRUNCATE TABLE {' . $msgtablename . '}');
  }
  // In other cases (except when we're still in the middle of a process), keep
  // informationals, which should still be attached to uncleared items
  else if ($return != MIGRATE_RESULT_INCOMPLETE) {
    // Remove old messages before beginning new import process
    db_query("DELETE FROM {" . $msgtablename . "} WHERE level <> %d", MIGRATE_MESSAGE_INFORMATIONAL);
  }
  $message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, $return);
  if (isset($feedback)) {
    $feedback($message);
  }
  return $return;
/**
 * Import objects from the specified content set
 *
 * @param $mcsid
 *  ID of the content set to clear
 * @param $options
 *  Keyed array of optional options:
 *    itemlimit - Maximum number of items to process
 *    timelimit - Unix timestamp after which to stop processing
 *    idlist - Comma-separated list of source IDs to process, instead of proceeding through
 *      all unmigrated rows
 *    feedback - Keyed array controlling status feedback to the caller
 *      function - PHP function to call, passing a message to be displayed
 *      frequency - How often to call the function
 *      frequency_unit - How to interpret frequency (items or seconds)
 *
 * @return
 *  Status of the migration process:
 */
function migrate_content_process_import($mcsid, &$options = array()) {
  $tblinfo = db_fetch_object(db_query("SELECT *
                                   FROM {migrate_content_sets}
                                   WHERE mcsid=%d",
                                  $mcsid));
  if ($tblinfo->status != MIGRATE_STATUS_IDLE) {
    return MIGRATE_RESULT_IN_PROGRESS;
    db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d",
             MIGRATE_STATUS_IMPORTING, $mcsid);
  $itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL;
  $timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL;
  $idlist = isset($options['idlist']) ? $options['idlist'] : NULL;
  if (isset($options['feedback'])) {
    $feedback = $options['feedback']['function'];
    $frequency = isset($options['feedback']['frequency']) ?
                 $options['feedback']['frequency'] : NULL;
    $frequency_unit = isset($options['feedback']['frequency_unit']) ?
                      $options['feedback']['frequency_unit'] : NULL;
  $description = $tblinfo->description;
  $desttype = $tblinfo->desttype;
  $view_name = $tblinfo->view_name;
  $contenttype = $tblinfo->contenttype;
  $sourcekey = $tblinfo->sourcekey;
  $maptable = migrate_map_table_name($mcsid);
  $msgtablename = migrate_message_table_name($mcsid);
  $memory_limit = _migrate_memory_limit();
  // Assume success until proven otherwise
  $return = MIGRATE_RESULT_COMPLETED;
  $collist = db_query("SELECT srcfield, destfield, default_value
                       FROM {migrate_content_mappings}
                       WHERE mcsid=%d AND (srcfield <> '' OR default_value <> '')
                       ORDER BY mcmid",
                      $mcsid);
  $fields = array();
  while ($row = db_fetch_object($collist)) {
    $fields[$row->destfield]['srcfield'] = $row->srcfield;
    $fields[$row->destfield]['default_value'] = $row->default_value;
  }
  $tblinfo->fields = $fields;
  $tblinfo->maptable = $maptable;
  // We pick up everything in the input view that is not already imported, and
  // not already errored out
  // Emulate views execute(), so we can scroll through the results ourselves
  $view = views_get_view($view_name);
    if ($feedback) {
      $feedback(t('View !view does not exist - either (re)create this view, or
        remove the content set using it.', array('!view' => $view_name)));
    }
    return MIGRATE_RESULT_FAILED;
  // Identify the content set being processed. Simplifies $view alterations.
  $view->migrate_content_set = $tblinfo;
  $view->is_cacheable = FALSE;

  if ($view_args) {
    $view->set_arguments(explode('/', $view_args));
  }
  // Let modules modify the view just prior to executing it.
  foreach (module_implements('views_pre_execute') as $module) {
    $function = $module . '_views_pre_execute';
    $function($view);
  }

  if (isset($view->base_database)) {
    $viewdb = $view->base_database;
  }
  else {
    $viewdb = 'default';
  }
  // Add a left join to the map table, and only include rows not in the map
  $join = new views_join;
  // Views prepends <base_table>_ to column names other than the base table's
  // primary key - we need to strip that here for the join to work. But, it's
  // common for tables to have the tablename beginning field names (e.g.,
  // table cms with PK cms_id). Deal with that as well...
  $baselen = drupal_strlen($view->base_table);
  if (!strncasecmp($sourcekey, $view->base_table . '_', $baselen + 1)) {
    // So, which case is it? Ask the schema module...
    db_set_active($viewdb);
    $inspect = schema_invoke('inspect', db_prefix_tables('{' . $view->base_table . '}'));
    db_set_active('default');
    $tableschema = $inspect[$view->base_table];
    $sourcefield = $tableschema['fields'][$sourcekey];
    if (!$sourcefield) {
      $joinkey = drupal_substr($sourcekey, $baselen + 1);
      $sourcefield = $tableschema['fields'][$joinkey];
      if (!$sourcefield) {
        if ($feedback) {
          $feedback(t("In view !view, can't find key !key for table !table",
            array('!view' => $view_name, '!key' => $sourcekey, '!table' => $view->base_table)));
        }
        return MIGRATE_RESULT_FAILED;
  else {
    $joinkey = $sourcekey;
  }
  $join->construct($maptable, $view->base_table, $joinkey, 'sourceid');
  $view->query->add_relationship($maptable, $join, $view->base_table);
  // We want both unimported and unupdated content
  $where = "$maptable.sourceid IS NULL OR $maptable.needs_update = 1";
  // And as long as we have the map table, get the destination ID, the
  // import hook will need it to identify the existing destination object
  $view->query->add_field($maptable, 'destid', 'destid');
  $view->query->add_where(0, $where, $view->base_table);

  // Ditto for the errors table
  $join = new views_join;
  $join->construct($msgtablename, $view->base_table, $joinkey, 'sourceid');
  $view->query->add_relationship($msgtablename, $join, $view->base_table);
  $view->query->add_where(0, "$msgtablename.sourceid IS NULL", $view->base_table);

  // If running over a selected list of IDs, pass those in to the query
  if ($idlist) {
    $where_args = $idlist_array = array_map('trim', explode(',', $idlist));
    if (is_numeric($idlist_array[0])) {
      $placeholders = db_placeholders($idlist_array, 'int');
    }
    else {
      $placeholders = db_placeholders($idlist_array, 'varchar');
    }
    array_unshift($where_args, $view->base_table);
    $view->query->add_where($view->options['group'], $view->base_table . ".$joinkey IN ($placeholders)",
      $where_args);
  }

  // We can't seem to get $view->build() to rebuild build_info, so go straight into the query object
  $query = $view->query->query();

  $query = db_rewrite_sql($query, $view->base_table, $view->base_field,
  if ($idlist) {
    // Merge idlist into args since build_info hasn't been rebuilt.
    $args = array_merge($view->build_info['query_args'], $idlist_array);
  }
  else {
    $args = $view->build_info['query_args'];
  }
  $replacements = module_invoke_all('views_query_substitutions', $view);
  $query = str_replace(array_keys($replacements), $replacements, $query);
  if (is_array($args)) {
    foreach ($args as $id => $arg) {
      $args[$id] = str_replace(array_keys($replacements), $replacements, $arg);
    }
  }

  // Now, make the current db name explicit if content set is pulling tables from another DB
  if ($viewdb <> 'default') {
    global $db_url;
    $url = parse_url(is_array($db_url) ? $db_url['default'] : $db_url);
    $currdb = drupal_substr($url['path'], 1);
    $query = str_replace('{' . $maptable . '}',
      $currdb . '.' . '{' . $maptable . '}', $query);
    $query = str_replace('{' . $msgtablename . '}',
      $currdb . '.' . '{' . $msgtablename . '}', $query);
    db_set_active($viewdb);
  }
  //drupal_set_message($query);
Mike Ryan's avatar
Mike Ryan committed
  if ($itemlimit) {
    $importlist = db_query_range($query, $args, 0, $itemlimit);
  }
  else {
    $importlist = db_query($query, $args);
  }
  timer_stop('execute view query');

  if ($viewdb != 'default') {
    db_set_active('default');
  }

  $imported = 0;
  timer_start('db_fetch_object');
Mike Ryan's avatar
Mike Ryan committed
  while ($row = db_fetch_object($importlist)) {
    // Recheck status - permits dynamic interruption of cron jobs
    $sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d";
    $status = db_result(db_query($sql, $mcsid));
    if ($status != MIGRATE_STATUS_IMPORTING) {
      $return = MIGRATE_RESULT_STOPPED;
    // Check for time out if there is time info present
    if (isset($timelimit) && time() >= $timelimit) {
      $return = MIGRATE_RESULT_INCOMPLETE;
      break;
    }

    // Check for closeness to memory limit
    $usage = memory_get_usage();
    $pct_memory = $usage/$memory_limit;
    if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) {
      if (isset($feedback)) {
        $feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch',
                        array('!pct' => round($pct_memory*100), '!usage' => $usage, '!limit' => $memory_limit)));
      }
      $return = MIGRATE_RESULT_INCOMPLETE;
    if (isset($feedback)) {
      if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) ||
          ($frequency_unit == 'items' && $imported >= $frequency)) {
        $message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, MIGRATE_RESULT_INCOMPLETE);
        $feedback($message);
        $lastfeedback = time();
        $imported = 0;
      }
    }
    $errors = migrate_invoke_all("import_$contenttype", $tblinfo, $row);
    timer_stop('import hooks');

    // Ok, we're done. Preview the node or save it (if no errors).
    if (count($errors)) {
      $success = TRUE;
      foreach ($errors as $error) {
        if (!isset($error['level'])) {
          $error['level'] = MIGRATE_MESSAGE_ERROR;
        }
        if ($error['level'] != MIGRATE_MESSAGE_INFORMATIONAL) {
          $success = FALSE;
        }
        db_query("INSERT INTO {" . $msgtablename . "}
                  (sourceid, level, message)
                  VALUES('%s', %d, '%s')",
                  $row->$sourcekey, $error['level'], $error['message']);
      }
      if ($success) {
        $imported++;
      }
    }
    else {
      $imported++;
    }
    timer_start('db_fetch_object');
  }
  timer_stop('db_fetch_object');

  $message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $return);
  // Remember we're done
  $tblinfo->status = MIGRATE_STATUS_IDLE;
  if ($return == MIGRATE_RESULT_COMPLETED) {
    $tblinfo->lastimported = date('Y-m-d H:i:s');
  if (isset($feedback)) {
    $feedback($message);
  }
  watchdog('migrate', $message);

  drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid');

  return $return;
/* Revisit
function migrate_content_process_all_action(&$dummy, $action_context, $a1, $a2) {
  migrate_content_process_all(time());
}
/**
 * Process all enabled migration processes in a browser, using the Batch API
 * to break it into manageable chunks.
 *
 * @param $clearing
 *  Array of content set ids (keyed by content set id) to clear
 * @param $importing
 *  Array of content set ids (keyed by content set id) to import
 * @param $limit
 *  Maximum number of items to process
 * @param $idlist
 *  Comma-separated list of source IDs to process, instead of proceeding through
 *  all unmigrated rows
 * @param $context
 *  Batch API context structure
 */
function migrate_content_process_batch($clearing, $importing, $limit, $idlist, &$context) {
  // A zero max_execution_time means no limit - but let's set a reasonable
  // limit anyway
  $maxexectime = ini_get('max_execution_time');
  if (!$maxexectime) {
    $maxexectime = 240;
  }

  // Initialize the Batch API context
  $context['finished'] = 0;
  // The Batch API progress bar will reflect the number of operations being
  // done (clearing/importing)
  if (!isset($context['sandbox']['numops'])) {
    $context['sandbox']['numops'] = count($clearing) + count($importing);
    $context['sandbox']['numopsdone'] = 0;
    $context['sandbox']['clearing'] = $clearing;
    $context['sandbox']['importing'] = $importing;
    $context['sandbox']['message'] = '';
    $context['sandbox']['times'] = array();
  // For the timelimit, subtract more than enough time to clean up
  $options = array(
    'itemlimit' => $limit,
    'timelimit' => $starttime + (($maxexectime < 5) ? $maxexectime : ($maxexectime - 5)),
    'feedback' => array('function' => '_migrate_process_message'),
  global $_migrate_messages;
  if (!isset($_migrate_messages)) {
    $_migrate_messages = array();
  }

  // Work on the last clearing op (if any)
  if (count($context['sandbox']['clearing'])) {
    $row = db_fetch_object(db_query(
      "SELECT mcsid,description FROM {migrate_content_sets}
       WHERE mcsid IN (%s)
       ORDER BY weight DESC
       LIMIT 1",
      implode(',', $context['sandbox']['clearing'])));
    $status = migrate_content_process_clear($row->mcsid, $options);
    if ($status != MIGRATE_RESULT_INCOMPLETE) {
      unset($context['sandbox']['clearing'][$row->mcsid]);
    }
  }
  // If not, work on the first importing op
  elseif (count($context['sandbox']['importing'])) {
    $row = db_fetch_object(db_query(
      "SELECT mcsid,description FROM {migrate_content_sets}
       WHERE mcsid IN (%s)
       ORDER BY weight ASC
       LIMIT 1",
      implode(',', $context['sandbox']['importing'])));
    if (variable_get('migrate_update', 0)) {
      migrate_content_set_update($row->mcsid);
      variable_set('migrate_update', 0);
    $status = migrate_content_process_import($row->mcsid, $options);
    if ($status != MIGRATE_RESULT_INCOMPLETE) {
      unset($context['sandbox']['importing'][$row->mcsid]);
    }
  }
  // If not, nothing to do
  else {
    $context['finished'] = 1;
  }

  // Make sure the entire process stops if requested
  if ($status == MIGRATE_RESULT_STOPPED) {
    $context['finished'] = 1;
  }

  if ($context['finished'] != 1) {
    if ($status != MIGRATE_RESULT_INCOMPLETE) {
      $context['sandbox']['numopsdone']++;
    }
    $context['finished'] = $context['sandbox']['numopsdone']/$context['sandbox']['numops'];
  }

  foreach ($_migrate_messages as $message) {
    if (!isset($context['sandbox']['message'])) {
      $context['sandbox']['message'] = $message . '<br />';
    }
    else {
      $context['sandbox']['message'] .= $message . '<br />';
    }
    $context['message'] = $context['sandbox']['message'];
    $context['results'][] .= $message;
  $context['message'] = $context['sandbox']['message'];
  // If requested save timers for eventual display
  if (variable_get('migrate_display_timers', 0)) {
    global $timers;
    foreach ($timers as $name => $timerec) {
      if (isset($timerec['time'])) {
        if (isset($context['sandbox']['times'][$name])) {
          $context['sandbox']['times'][$name] += $timerec['time']/1000;
        }
        else {
          $context['sandbox']['times'][$name] = $timerec['time']/1000;
        }
      }
    }
    // When all done, display the timers
    if ($context['finished'] == 1 && isset($context['sandbox']['times'])) {
      global $timers;
      arsort($context['sandbox']['times']);
      foreach ($context['sandbox']['times'] as $name => $total) {
        drupal_set_message("$name: " . round($total, 2));
/**
 * Capture messages generated during an import or clear process
 */
function _migrate_process_message($message) {
  global $_migrate_messages;
  $_migrate_messages[] = $message;
}

/**
 * Prepare a content set for updating of existing items
 * @param $mcsid
 *  ID of the content set to update
 */