Newer
Older
Mike Ryan
committed
define('MIGRATE_API_VERSION', 1);
Mike Ryan
committed
define('MIGRATE_ACCESS_BASIC', 'basic migration tools');
define('MIGRATE_ACCESS_ADVANCED', 'advanced migration tools');
define('MIGRATE_MESSAGE_ERROR', 1);
define('MIGRATE_MESSAGE_WARNING', 2);
define('MIGRATE_MESSAGE_NOTICE', 3);
define('MIGRATE_MESSAGE_INFORMATIONAL', 4);
define('MIGRATE_STATUS_IDLE', 0);
define('MIGRATE_STATUS_IMPORTING', 1);
define('MIGRATE_STATUS_CLEARING', 2);
define('MIGRATE_RESULT_COMPLETED', 1);
define('MIGRATE_RESULT_INCOMPLETE', 2);
define('MIGRATE_RESULT_STOPPED', 3);
define('MIGRATE_RESULT_FAILED', 4);
define('MIGRATE_RESULT_IN_PROGRESS', 5);
define('MIGRATE_MEMORY_THRESHOLD', .8);
Mike Ryan
committed
/**
* @file
* This module provides tools at "administer >> content >> migrate"
* for analyzing data from various sources and importing them into Drupal tables.
*/
/**
* Call a migrate hook. Like module_invoke_all, but gives modules a chance
* to do one-time initialization before any of their hooks are called, and
* adds "migrate" to the hook name.
* @param $hook
* Hook to invoke (e.g., 'types', 'fields_node', etc.)
* @return
* Merged array of results.
Mike Ryan
committed
*/
function migrate_invoke_all($hook) {
// Let modules do any one-time initialization (e.g., including migration support files)
global $_migrate_inited;
if (!isset($_migrate_inited)) {
module_invoke_all('migrate_init');
Mike Ryan
committed
$args = func_get_args();
Mike Ryan
committed
unset($args[0]);
$return = array();
$modulelist = module_implements($hookfunc);
foreach ($modulelist as $module) {
Mike Ryan
committed
$result = call_user_func_array($function, $args);
if (isset($result) && is_array($result)) {
$return = array_merge_recursive($return, $result);
}
Mike Ryan
committed
$return[] = $result;
}
}
return $return;
}
Mike Ryan
committed
/**
* Call a destination hook (e.g., hook_migrate_prepare_node). Use this version
* for hooks with the precise signature below, so that the object can be passed by
* reference.
*
* @param $hook
* Hook to invoke (e.g., 'types', 'fields_node', etc.)
Mike Ryan
committed
* @param $object
* Destination object being built - passed by reference, so hooks can modify it
Mike Ryan
committed
* @param $tblinfo
* Metadata about the content set
Mike Ryan
committed
* @param $row
Mike Ryan
committed
* @return
Mike Ryan
committed
*/
function migrate_destination_invoke_all($hook, &$object, $tblinfo, $row) {
// Let modules do any one-time initialization (e.g., including migration support files)
global $_migrate_inited;
if (!isset($_migrate_inited)) {
module_invoke_all('migrate_init');
$_migrate_inited = TRUE;
}
Mike Ryan
committed
// We could have used module_invoke_all, but unfortunately
// module_invoke_all passes all arguments by value.
$errors = array();
$hook = 'migrate_' . $hook;
foreach (module_implements($hook) as $module_name) {
$function = $module_name . '_' . $hook;
if (function_exists($function)) {
timer_start($function);
$errors = array_merge($errors, (array)$function($object, $tblinfo, $row));
timer_stop($function);
}
}
return $errors;
}
/**
* Save a new or updated content set
*
* @param $content_set
* An array or object representing the content set. This is passed by reference (so
* when adding a new content set the ID can be set)
* @param $options
* Array of additional options for saving the content set. Currently:
* base_table: The base table of the view - if provided, we don't need
* to load the view.
* base_database: The database of the base table - if base_table is present
* and base_database omitted, it defaults to 'default'
* @return
* The ID of the content set that was saved, or NULL if nothing was saved
function migrate_save_content_set(&$content_set, $options = array()) {
// Deal with objects internally (but remember if we need to put the parameter
// back to an array)
if (is_array($content_set)) {
Mike Ryan
committed
$was_array = TRUE;
$content_set = (object) $content_set;
$schema_change = FALSE;
// Update or insert the content set record as appropriate
// If machine_name changes, need to rename the map/message tables
$old_machine_name = db_query("SELECT machine_name FROM {migrate_content_sets}
WHERE mcsid=%d", $content_set->mcsid);
if ($old_machine_name != $content_set->machine_name) {
$old_maptablename = migrate_map_table_name($content_set->mcsid);
$old_msgtablename = migrate_message_table_name($content_set->mcsid);
}
drupal_write_record('migrate_content_sets', $content_set, 'mcsid');
if (isset($old_maptablename) && db_table_exists($old_maptablename)) {
$ret = array();
$new_maptablename = migrate_map_table_name($content_set->mcsid);
db_rename_table($ret, $old_maptablename, $new_maptablename);
$schema_change = TRUE;
}
if (isset($old_msgtablename) && db_table_exists($old_msgtablename)) {
$ret = array();
$new_msgtablename = migrate_message_table_name($content_set->mcsid);
db_rename_table($ret, $old_msgtablename, $new_msgtablename);
$schema_change = TRUE;
}
drupal_write_record('migrate_content_sets', $content_set);
}
// Create or modify map and message tables
Mike Ryan
committed
$maptablename = migrate_map_table_name($content_set->mcsid);
$msgtablename = migrate_message_table_name($content_set->mcsid);
// If the caller tells us the base table of the view, we don't need
// to load the view (which would not work when called from hook_install())
if (isset($options['base_table'])) {
$tablename = $options['base_table'];
if (isset($options['base_database'])) {
$tabledb = $options['base_database'];
}
else {
$tabledb = 'default';
}
}
else {
// Get the proper field definition for the sourcekey
$view = views_get_view($content_set->view_name);
if (!$view) {
drupal_set_message(t('View !view does not exist - either (re)create this view, or
remove the content set using it.', array('!view' => $content_set->view_name)));
return NULL;
}
// Must do this to load the database
$views_version = (string)views_api_version();
if (views_api_version() >= '3') {
$view->init_display('default');
}
$view->init_query();
if (isset($view->base_database)) {
$tabledb = $view->base_database;
}
else {
$tabledb = 'default';
}
$tablename = $view->base_table;
}
$sourceschema = _migrate_inspect_schema($tablename, $tabledb);
// If the PK of the content set is defined, make sure we have a mapping table
if (isset($content_set->sourcekey) && $content_set->sourcekey) {
$sourcefield = $sourceschema['fields'][$content_set->sourcekey];
Mike Ryan
committed
// The field name might be <table>_<column>...
if (!$sourcefield) {
$sourcekey = drupal_substr($content_set->sourcekey, drupal_strlen($tablename) + 1);
Mike Ryan
committed
$sourcefield = $sourceschema['fields'][$sourcekey];
}
Mike Ryan
committed
// But - we don't want serial fields to behave serially, so change to int
if ($sourcefield['type'] == 'serial') {
$sourcefield['type'] = 'int';
}
if (!db_table_exists($maptablename)) {
$schema = _migrate_map_table_schema($sourcefield);
db_create_table($ret, $maptablename, $schema);
// Expose map table to views
if (module_exists('tw')) {
tw_add_tables(array($maptablename));
tw_add_fk($maptablename, 'destid');
}
Mike Ryan
committed
$schema = _migrate_message_table_schema($sourcefield);
db_create_table($ret, $msgtablename, $schema);
// Expose messages table to views
if (module_exists('tw')) {
tw_add_tables(array($msgtablename));
tw_add_fk($msgtablename, 'sourceid');
}
$schema_change = TRUE;
Mike Ryan
committed
// TODO: Deal with varchar->int case where there is existing non-int data
$desired_schema = _migrate_map_table_schema($sourcefield);
$actual_schema = _migrate_inspect_schema($maptablename);
Mike Ryan
committed
if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
$ret = array();
db_drop_primary_key($ret, $maptablename);
db_change_field($ret, $maptablename, 'sourceid', 'sourceid',
Mike Ryan
committed
$sourcefield, array('primary key' => array('sourceid')));
if (module_exists('tw')) {
tw_perform_analysis($maptablename);
}
$schema_change = TRUE;
Mike Ryan
committed
}
$desired_schema = _migrate_message_table_schema($sourcefield);
$actual_schema = _migrate_inspect_schema($msgtablename);
Mike Ryan
committed
if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
$ret = array();
db_drop_index($ret, $msgtablename, 'sourceid');
db_change_field($ret, $msgtablename, 'sourceid', 'sourceid',
Mike Ryan
committed
$sourcefield, array('indexes' => array('sourceid' => array('sourceid'))));
if (module_exists('tw')) {
tw_perform_analysis($maptablename);
}
$schema_change = TRUE;
Mike Ryan
committed
}
// Make sure the schema gets updated to reflect changes
if ($schema_change) {
cache_clear_all('schema', 'cache');
}
Mike Ryan
committed
if ($was_array) {
$content_set = (array)$content_set;
Mike Ryan
committed
}
/**
* Save a new or updated content mapping
*
* @param $mapping
* An object representing the mapping. This is passed by reference (so
* when adding a new mapping the ID can be set)
* @return
* The ID of the mapping that was saved, or NULL if nothing was saved
*/
function migrate_save_content_mapping(&$mapping) {
drupal_write_record('migrate_content_mappings', $mapping, 'mcmid');
drupal_write_record('migrate_content_mappings', $mapping);
/**
* Delete the specified content set, including map and message tables.
*
* @param $mcsid
* Unique identifier of the content set to delete.
*/
function migrate_delete_content_set($mcsid) {
// First, remove the map and message tables from the Table Wizard, and drop them
$ret = array();
Mike Ryan
committed
$maptable = migrate_map_table_name($mcsid);
$msgtable = migrate_message_table_name($mcsid);
if (db_table_exists($maptable)) {
tw_remove_tables(array($maptable, $msgtable));
db_drop_table($ret, $maptable);
db_drop_table($ret, $msgtable);
}
// Then, delete the content set data
$sql = "DELETE FROM {migrate_content_mappings} WHERE mcsid=%d";
db_query($sql, $mcsid);
$sql = "DELETE FROM {migrate_content_sets} WHERE mcsid=%d";
db_query($sql, $mcsid);
}
/**
* Delete the specified content mapping.
*
* @param $mcmid
* Unique identifier of the mapping to delete.
*/
function migrate_delete_content_mapping($mcmid) {
$sql = "DELETE FROM {migrate_content_mappings} WHERE mcmid=%d";
db_query($sql, $mcmid);
}
/**
* Convenience function for generating a message array
*
* @param $message
* Text describing the error condition
* @param $type
* One of the MIGRATE_MESSAGE constants, identifying the level of error
* @return
* Structured array suitable for return from an import hook
function migrate_message($message, $type = MIGRATE_MESSAGE_ERROR) {
$error = array(
'level' => $type,
'message' => $message,
);
return $error;
}
/**
* Add a mapping from source ID to destination ID for the specified content set
*
* @param $mcsid
* ID of the content set being processed
* @param $sourceid
* Primary key value from the source
* @param $destid
* Primary key value from the destination
*/
function migrate_add_mapping($mcsid, $sourceid, $destid) {
static $maptables = array();
if (!isset($maptables[$mcsid])) {
Mike Ryan
committed
$maptables[$mcsid] = migrate_map_table_name($mcsid);
$needs_update = db_result(db_query('SELECT needs_update
FROM {' . $maptables[$mcsid] . "}
WHERE sourceid='%s'",
$sourceid));
if ($needs_update == 1) {
db_query('UPDATE {' . $maptables[$mcsid] . "}
$sourceid);
}
elseif ($needs_update !== 0) {
db_query('INSERT INTO {' . $maptables[$mcsid] . "}
(sourceid, destid, needs_update)
$sourceid, $destid);
}
/**
* Clear migrated objects from the specified content set
*
* @param $mcsid
* ID of the content set to clear
* @param $options
* Keyed array of optional options:
* itemlimit - Maximum number of items to process
* timelimit - Unix timestamp after which to stop processing
* idlist - Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* feedback - Keyed array controlling status feedback to the caller
* function - PHP function to call, passing a message to be displayed
* frequency - How often to call the function
* frequency_unit - How to interpret frequency (items or seconds)
*
* @return
* Status of the migration process:
*/
function migrate_content_process_clear($mcsid, &$options = array()) {
$itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL;
$timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL;
$idlist = isset($options['idlist']) ? $options['idlist'] : NULL;
$lastfeedback = time();
if (isset($options['feedback'])) {
$feedback = $options['feedback']['function'];
$frequency = isset($options['feedback']['frequency']) ?
$options['feedback']['frequency'] : NULL;
$frequency_unit = isset($options['feedback']['frequency_unit']) ?
$options['feedback']['frequency_unit'] : NULL;
Mike Ryan
committed
$result = db_query("SELECT *
FROM {migrate_content_sets}
WHERE mcsid=%d", $mcsid);
$tblinfo = db_fetch_object($result);
$description = $tblinfo->description;
if ($tblinfo->status != MIGRATE_STATUS_IDLE) {
return MIGRATE_RESULT_IN_PROGRESS;
db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d",
MIGRATE_STATUS_CLEARING, $mcsid);
Mike Ryan
committed
$desttype = $tblinfo->desttype;
$contenttype = $tblinfo->contenttype;
$sourcekey = $tblinfo->sourcekey;
Mike Ryan
committed
$maptable = migrate_map_table_name($mcsid);
$msgtablename = migrate_message_table_name($mcsid);
$processstart = microtime(TRUE);
$memory_limit = _migrate_memory_limit();
Mike Ryan
committed
// If this content set is set up to update existing content, we don't
// want to delete the content on clear, just the map/message tables
$sql = "SELECT srcfield FROM {migrate_content_mappings}
WHERE mcsid=%d AND primary_key=1";
$srcfield = db_result(db_query($sql, $mcsid));
if ($srcfield) {
$full_clear = FALSE;
}
else {
$full_clear = TRUE;
}
// Assume success until proven otherwise
$return = MIGRATE_RESULT_COMPLETED;
Mike Ryan
committed
$deleted = 0;
$args = array();
Mike Ryan
committed
if ($idlist) {
$args = array_map('trim', explode(',', $idlist));
if (is_numeric($args[0])) {
$placeholders = db_placeholders($args, 'int');
}
else {
$placeholders = db_placeholders($args, 'varchar');
}
$sql = "SELECT sourceid,destid FROM {" . $maptable . "} WHERE sourceid IN ($placeholders)";
Mike Ryan
committed
$sql = "SELECT sourceid,destid FROM {" . $maptable . "}";
}
timer_start('delete query');
if ($itemlimit) {
$deletelist = db_query_range($sql, $args, 0, $itemlimit);
$deletelist = db_query($sql, $args);
Mike Ryan
committed
}
timer_stop('delete query');
Mike Ryan
committed
while ($row = db_fetch_object($deletelist)) {
// Recheck status - permits dynamic interruption of jobs
$sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d";
$status = db_result(db_query($sql, $mcsid));
if ($status != MIGRATE_STATUS_CLEARING) {
$return = MIGRATE_RESULT_STOPPED;
break;
}
// Check for time out if there is time info present
if (isset($timelimit) && time() >= $timelimit) {
$return = MIGRATE_RESULT_INCOMPLETE;
break;
}
// Check for closeness to memory limit
$usage = memory_get_usage();
$pct_memory = $usage/$memory_limit;
if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) {
if (isset($feedback)) {
$feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch',
array('!pct' => round($pct_memory*100), '!usage' => $usage, '!limit' => $memory_limit)));
}
break;
}
if (isset($feedback)) {
if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) ||
($frequency_unit == 'items' && $deleted >= $frequency)) {
$message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, MIGRATE_RESULT_INCOMPLETE);
$feedback($message);
$lastfeedback = time();
$deleted = 0;
Mike Ryan
committed
}
Mike Ryan
committed
// @TODO: Should return success/failure. Problem: node_delete doesn't return anything...
Mike Ryan
committed
if ($full_clear) {
timer_start('delete hooks');
migrate_invoke_all("delete_$contenttype", $tblinfo, $row->destid);
Mike Ryan
committed
timer_stop('delete hooks');
}
Mike Ryan
committed
timer_start('clear map/msg');
db_query("DELETE FROM {" . $maptable . "} WHERE sourceid='%s'", $row->sourceid);
db_query("DELETE FROM {" . $msgtablename . "} WHERE sourceid='%s' AND level=%d",
Mike Ryan
committed
$row->sourceid, MIGRATE_MESSAGE_INFORMATIONAL);
timer_stop('clear map/msg');
$deleted++;
}
// Mark that we're done
$sql = "UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d";
db_query($sql, MIGRATE_STATUS_IDLE, $mcsid);
// If we've completed a total clear, make sure all messages are gone
if ($return == MIGRATE_RESULT_COMPLETED && !$idlist && !$itemlimit) {
db_query('TRUNCATE TABLE {' . $msgtablename . '}');
}
// In other cases (except when we're still in the middle of a process), keep
// informationals, which should still be attached to uncleared items
else if ($return != MIGRATE_RESULT_INCOMPLETE) {
Mike Ryan
committed
// Remove old messages before beginning new import process
db_query("DELETE FROM {" . $msgtablename . "} WHERE level <> %d", MIGRATE_MESSAGE_INFORMATIONAL);
}
$message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, $return);
if (isset($feedback)) {
$feedback($message);
}
Mike Ryan
committed
}
/**
* Import objects from the specified content set
*
* @param $mcsid
* ID of the content set to clear
* @param $options
* Keyed array of optional options:
* itemlimit - Maximum number of items to process
* timelimit - Unix timestamp after which to stop processing
* idlist - Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* feedback - Keyed array controlling status feedback to the caller
* function - PHP function to call, passing a message to be displayed
* frequency - How often to call the function
* frequency_unit - How to interpret frequency (items or seconds)
*
* @return
* Status of the migration process:
*/
function migrate_content_process_import($mcsid, &$options = array()) {
$tblinfo = db_fetch_object(db_query("SELECT *
FROM {migrate_content_sets}
WHERE mcsid=%d",
$mcsid));
if ($tblinfo->status != MIGRATE_STATUS_IDLE) {
return MIGRATE_RESULT_IN_PROGRESS;
db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d",
MIGRATE_STATUS_IMPORTING, $mcsid);
$itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL;
$timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL;
$idlist = isset($options['idlist']) ? $options['idlist'] : NULL;
$lastfeedback = time();
if (isset($options['feedback'])) {
$feedback = $options['feedback']['function'];
$frequency = isset($options['feedback']['frequency']) ?
$options['feedback']['frequency'] : NULL;
$frequency_unit = isset($options['feedback']['frequency_unit']) ?
$options['feedback']['frequency_unit'] : NULL;
$description = $tblinfo->description;
Mike Ryan
committed
$desttype = $tblinfo->desttype;
$view_name = $tblinfo->view_name;
Mike Ryan
committed
$view_args = $tblinfo->view_args;
Mike Ryan
committed
$contenttype = $tblinfo->contenttype;
$sourcekey = $tblinfo->sourcekey;
Mike Ryan
committed
$maptable = migrate_map_table_name($mcsid);
$msgtablename = migrate_message_table_name($mcsid);
Mike Ryan
committed
$processstart = microtime(TRUE);
$memory_limit = _migrate_memory_limit();
// Assume success until proven otherwise
$return = MIGRATE_RESULT_COMPLETED;
Mike Ryan
committed
$collist = db_query("SELECT srcfield, destfield, default_value
FROM {migrate_content_mappings}
WHERE mcsid=%d AND (srcfield <> '' OR default_value <> '')
ORDER BY mcmid",
Mike Ryan
committed
$mcsid);
$fields = array();
while ($row = db_fetch_object($collist)) {
$fields[$row->destfield]['srcfield'] = $row->srcfield;
$fields[$row->destfield]['default_value'] = $row->default_value;
}
$tblinfo->fields = $fields;
$tblinfo->maptable = $maptable;
// We pick up everything in the input view that is not already imported, and
Mike Ryan
committed
// not already errored out
// Emulate views execute(), so we can scroll through the results ourselves
$view = views_get_view($view_name);
if (!$view) {
if ($feedback) {
$feedback(t('View !view does not exist - either (re)create this view, or
remove the content set using it.', array('!view' => $view_name)));
}
return MIGRATE_RESULT_FAILED;
}
Moshe Weitzman
committed
// Identify the content set being processed. Simplifies $view alterations.
$view->migrate_content_set = $tblinfo;
$view->is_cacheable = FALSE;
Mike Ryan
committed
if ($view_args) {
$view->set_arguments(explode('/', $view_args));
}
Mike Ryan
committed
$view->build();
Mike Ryan
committed
// Let modules modify the view just prior to executing it.
foreach (module_implements('views_pre_execute') as $module) {
$function = $module . '_views_pre_execute';
$function($view);
}
if (isset($view->base_database)) {
$viewdb = $view->base_database;
}
else {
$viewdb = 'default';
}
Mike Ryan
committed
// Add a left join to the map table, and only include rows not in the map
$join = new views_join;
// Views prepends <base_table>_ to column names other than the base table's
// primary key - we need to strip that here for the join to work. But, it's
// common for tables to have the tablename beginning field names (e.g.,
// table cms with PK cms_id). Deal with that as well...
$baselen = drupal_strlen($view->base_table);
if (!strncasecmp($sourcekey, $view->base_table . '_', $baselen + 1)) {
// So, which case is it? Ask the schema module...
db_set_active($viewdb);
$inspect = schema_invoke('inspect', db_prefix_tables('{' . $view->base_table . '}'));
db_set_active('default');
$tableschema = $inspect[$view->base_table];
$sourcefield = $tableschema['fields'][$sourcekey];
if (!$sourcefield) {
$joinkey = drupal_substr($sourcekey, $baselen + 1);
$sourcefield = $tableschema['fields'][$joinkey];
if (!$sourcefield) {
if ($feedback) {
$feedback(t("In view !view, can't find key !key for table !table",
array('!view' => $view_name, '!key' => $sourcekey, '!table' => $view->base_table)));
}
return MIGRATE_RESULT_FAILED;
}
}
else {
$joinkey = $sourcekey;
}
else {
$joinkey = $sourcekey;
}
$join->construct($maptable, $view->base_table, $joinkey, 'sourceid');
Mike Ryan
committed
$view->query->add_relationship($maptable, $join, $view->base_table);
// We want both unimported and unupdated content
$where = "$maptable.sourceid IS NULL OR $maptable.needs_update = 1";
// And as long as we have the map table, get the destination ID, the
// import hook will need it to identify the existing destination object
$view->query->add_field($maptable, 'destid', 'destid');
$view->query->add_where(0, $where, $view->base_table);
Mike Ryan
committed
// Ditto for the errors table
$join = new views_join;
$join->construct($msgtablename, $view->base_table, $joinkey, 'sourceid');
Mike Ryan
committed
$view->query->add_relationship($msgtablename, $join, $view->base_table);
$view->query->add_where(0, "$msgtablename.sourceid IS NULL", $view->base_table);
// If running over a selected list of IDs, pass those in to the query
if ($idlist) {
$where_args = $idlist_array = array_map('trim', explode(',', $idlist));
if (is_numeric($idlist_array[0])) {
$placeholders = db_placeholders($idlist_array, 'int');
}
else {
$placeholders = db_placeholders($idlist_array, 'varchar');
}
array_unshift($where_args, $view->base_table);
$view->query->add_where($view->options['group'], $view->base_table . ".$joinkey IN ($placeholders)",
$where_args);
Mike Ryan
committed
}
// We can't seem to get $view->build() to rebuild build_info, so go straight into the query object
$query = $view->query->query();
$query = db_rewrite_sql($query, $view->base_table, $view->base_field,
Mike Ryan
committed
array('view' => &$view));
if ($idlist) {
// Merge idlist into args since build_info hasn't been rebuilt.
$args = array_merge($view->build_info['query_args'], $idlist_array);
}
else {
$args = $view->build_info['query_args'];
}
Mike Ryan
committed
$replacements = module_invoke_all('views_query_substitutions', $view);
$query = str_replace(array_keys($replacements), $replacements, $query);
if (is_array($args)) {
foreach ($args as $id => $arg) {
$args[$id] = str_replace(array_keys($replacements), $replacements, $arg);
}
}
// Now, make the current db name explicit if content set is pulling tables from another DB
if ($viewdb <> 'default') {
global $db_url;
$url = parse_url(is_array($db_url) ? $db_url['default'] : $db_url);
$currdb = drupal_substr($url['path'], 1);
$query = str_replace('{' . $maptable . '}',
Mike Ryan
committed
$currdb . '.' . '{' . $maptable . '}', $query);
$query = str_replace('{' . $msgtablename . '}',
Mike Ryan
committed
$currdb . '.' . '{' . $msgtablename . '}', $query);
db_set_active($viewdb);
}
Mike Ryan
committed
timer_start('execute view query');
if ($itemlimit) {
$importlist = db_query_range($query, $args, 0, $itemlimit);
}
else {
$importlist = db_query($query, $args);
Mike Ryan
committed
}
timer_stop('execute view query');
if ($viewdb != 'default') {
db_set_active('default');
}
$imported = 0;
timer_start('db_fetch_object');
Mike Ryan
committed
timer_stop('db_fetch_object');
// Recheck status - permits dynamic interruption of cron jobs
$sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d";
$status = db_result(db_query($sql, $mcsid));
if ($status != MIGRATE_STATUS_IMPORTING) {
$return = MIGRATE_RESULT_STOPPED;
break;
}
Mike Ryan
committed
// Check for time out if there is time info present
if (isset($timelimit) && time() >= $timelimit) {
$return = MIGRATE_RESULT_INCOMPLETE;
break;
}
// Check for closeness to memory limit
$usage = memory_get_usage();
$pct_memory = $usage/$memory_limit;
if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) {
if (isset($feedback)) {
$feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch',
array('!pct' => round($pct_memory*100), '!usage' => $usage, '!limit' => $memory_limit)));
}
break;
}
Mike Ryan
committed
if (isset($feedback)) {
if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) ||
($frequency_unit == 'items' && $imported >= $frequency)) {
$message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, MIGRATE_RESULT_INCOMPLETE);
$feedback($message);
$lastfeedback = time();
$imported = 0;
}
}
Mike Ryan
committed
timer_start('import hooks');
$errors = migrate_invoke_all("import_$contenttype", $tblinfo, $row);
Mike Ryan
committed
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
timer_stop('import hooks');
// Ok, we're done. Preview the node or save it (if no errors).
if (count($errors)) {
$success = TRUE;
foreach ($errors as $error) {
if (!isset($error['level'])) {
$error['level'] = MIGRATE_MESSAGE_ERROR;
}
if ($error['level'] != MIGRATE_MESSAGE_INFORMATIONAL) {
$success = FALSE;
}
db_query("INSERT INTO {" . $msgtablename . "}
(sourceid, level, message)
VALUES('%s', %d, '%s')",
$row->$sourcekey, $error['level'], $error['message']);
}
if ($success) {
$imported++;
}
}
else {
$imported++;
}
timer_start('db_fetch_object');
}
timer_stop('db_fetch_object');
$message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $return);
// Remember we're done
$tblinfo->status = MIGRATE_STATUS_IDLE;
if ($return == MIGRATE_RESULT_COMPLETED) {
$tblinfo->lastimported = date('Y-m-d H:i:s');
Mike Ryan
committed
}
if (isset($feedback)) {
$feedback($message);
}
watchdog('migrate', $message);
drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid');
Mike Ryan
committed
}
/* Revisit
function migrate_content_process_all_action(&$dummy, $action_context, $a1, $a2) {
migrate_content_process_all(time());
}
/**
* Process all enabled migration processes in a browser, using the Batch API
* to break it into manageable chunks.
*
* @param $clearing
* Array of content set ids (keyed by content set id) to clear
* @param $importing
* Array of content set ids (keyed by content set id) to import
* @param $limit
* Maximum number of items to process
* @param $idlist
* Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* @param $context
* Batch API context structure
*/
function migrate_content_process_batch($clearing, $importing, $limit, $idlist, &$context) {
// A zero max_execution_time means no limit - but let's set a reasonable
// limit anyway
$starttime = time();
$maxexectime = ini_get('max_execution_time');
if (!$maxexectime) {
$maxexectime = 240;
}
// Initialize the Batch API context
$context['finished'] = 0;
// The Batch API progress bar will reflect the number of operations being
if (!isset($context['sandbox']['numops'])) {
$context['sandbox']['numops'] = count($clearing) + count($importing);
$context['sandbox']['numopsdone'] = 0;
$context['sandbox']['clearing'] = $clearing;
$context['sandbox']['importing'] = $importing;
$context['sandbox']['message'] = '';
$context['sandbox']['times'] = array();
}
// For the timelimit, subtract more than enough time to clean up
$options = array(
'itemlimit' => $limit,
'timelimit' => $starttime + (($maxexectime < 5) ? $maxexectime : ($maxexectime - 5)),
'idlist' => $idlist,
'feedback' => array('function' => '_migrate_process_message'),
global $_migrate_messages;
if (!isset($_migrate_messages)) {
$_migrate_messages = array();
}
// Work on the last clearing op (if any)
if (count($context['sandbox']['clearing'])) {
$row = db_fetch_object(db_query(
"SELECT mcsid,description FROM {migrate_content_sets}
WHERE mcsid IN (%s)
ORDER BY weight DESC
LIMIT 1",
implode(',', $context['sandbox']['clearing'])));
$status = migrate_content_process_clear($row->mcsid, $options);
if ($status != MIGRATE_RESULT_INCOMPLETE) {
unset($context['sandbox']['clearing'][$row->mcsid]);
}
}
// If not, work on the first importing op
elseif (count($context['sandbox']['importing'])) {
$row = db_fetch_object(db_query(
"SELECT mcsid,description FROM {migrate_content_sets}
WHERE mcsid IN (%s)
ORDER BY weight ASC
LIMIT 1",
implode(',', $context['sandbox']['importing'])));
if (variable_get('migrate_update', 0)) {
migrate_content_set_update($row->mcsid);
variable_set('migrate_update', 0);
$status = migrate_content_process_import($row->mcsid, $options);
if ($status != MIGRATE_RESULT_INCOMPLETE) {
unset($context['sandbox']['importing'][$row->mcsid]);
}
}
// If not, nothing to do
else {
$context['finished'] = 1;
}
// Make sure the entire process stops if requested
if ($status == MIGRATE_RESULT_STOPPED) {
$context['finished'] = 1;
}
if ($context['finished'] != 1) {
if ($status != MIGRATE_RESULT_INCOMPLETE) {
$context['sandbox']['numopsdone']++;
}
$context['finished'] = $context['sandbox']['numopsdone']/$context['sandbox']['numops'];
}
foreach ($_migrate_messages as $message) {
if (!isset($context['sandbox']['message'])) {
$context['sandbox']['message'] = $message . '<br />';
}
else {
$context['sandbox']['message'] .= $message . '<br />';
}
$context['message'] = $context['sandbox']['message'];
$context['results'][] .= $message;
}
$context['message'] = $context['sandbox']['message'];
// If requested save timers for eventual display
if (variable_get('migrate_display_timers', 0)) {
global $timers;
foreach ($timers as $name => $timerec) {
if (isset($timerec['time'])) {
if (isset($context['sandbox']['times'][$name])) {
$context['sandbox']['times'][$name] += $timerec['time']/1000;
}
else {
$context['sandbox']['times'][$name] = $timerec['time']/1000;
}
}
}
// When all done, display the timers
if ($context['finished'] == 1 && isset($context['sandbox']['times'])) {
global $timers;
arsort($context['sandbox']['times']);
foreach ($context['sandbox']['times'] as $name => $total) {
drupal_set_message("$name: " . round($total, 2));
}
}
}
/**
* Capture messages generated during an import or clear process
*/
function _migrate_process_message($message) {
global $_migrate_messages;
$_migrate_messages[] = $message;
}