Newer
Older
Mike Ryan
committed
// $Id$
define('MIGRATE_ACCESS_BASIC', 'basic migration tools');
define('MIGRATE_ACCESS_ADVANCED', 'advanced migration tools');
define('MIGRATE_MESSAGE_ERROR', 1);
define('MIGRATE_MESSAGE_WARNING', 2);
define('MIGRATE_MESSAGE_NOTICE', 3);
define('MIGRATE_MESSAGE_INFORMATIONAL', 4);
define('MIGRATE_STATUS_SUCCESS', 1);
define('MIGRATE_STATUS_FAILURE', 2);
define('MIGRATE_STATUS_TIMEDOUT', 3);
define('MIGRATE_STATUS_CANCELLED', 4);
define('MIGRATE_STATUS_IN_PROGRESS', 5);
Mike Ryan
committed
/**
* @file
* This module provides tools at "administer >> content >> migrate"
* for analyzing data from various sources and importing them into Drupal tables.
*/
/**
* Call a migrate hook
*/
function migrate_invoke_all($hook) {
// Let modules do any one-time initialization (e.g., including migration support files)
static $inited = FALSE;
if (!$inited) {
module_invoke_all('migrate_init');
$inited = TRUE;
}
Mike Ryan
committed
$args = func_get_args();
Mike Ryan
committed
unset($args[0]);
$return = array();
$modulelist = module_implements($hookfunc);
foreach ($modulelist as $module) {
Mike Ryan
committed
$result = call_user_func_array($function, $args);
if (isset($result) && is_array($result)) {
$return = array_merge_recursive($return, $result);
}
Mike Ryan
committed
$return[] = $result;
}
}
return $return;
}
Mike Ryan
committed
/**
* Call a destination hook (e.g., hook_migrate_prepare_node). Use this version
* for hooks with the precise signature below, so that the object can be passed by
* reference.
*
* @param $hook
* @param $object
* @param $tblinfo
* @param $row
* @return
*/
function migrate_destination_invoke_all($hook, &$object, $tblinfo, $row) {
// We could have used module_invoke_all, but unfortunately
// module_invoke_all passes all arguments by value.
$errors = array();
$hook = 'migrate_' . $hook;
foreach (module_implements($hook) as $module_name) {
$function = $module_name . '_' . $hook;
if (function_exists($function)) {
timer_start($function);
$errors = array_merge($errors, (array)$function($object, $tblinfo, $row));
timer_stop($function);
}
}
return $errors;
}
/**
* Save a new or updated content set
*
* @param $content_set
* An array or object representing the content set. This is passed by reference (so
* when adding a new content set the ID can be set)
* @param $options
* Array of additional options for saving the content set. Currently:
* base_table: The base table of the view - if provided, we don't need
* to load the view.
* base_database: The database of the base table - if base_table is present
* and base_database omitted, it defaults to 'default'
* The ID of the content set that was saved, or NULL if nothing was saved
function migrate_save_content_set(&$content_set, $options = array()) {
// Deal with objects internally (but remember if we need to put the parameter
// back to an array)
if (is_array($content_set)) {
Mike Ryan
committed
$was_array = TRUE;
$content_set = (object) $content_set;
// Update or insert the content set record as appropriate
drupal_write_record('migrate_content_sets', $content_set, 'mcsid');
drupal_write_record('migrate_content_sets', $content_set);
}
// Create or modify map and message tables
Mike Ryan
committed
$maptablename = migrate_map_table_name($content_set->mcsid);
$msgtablename = migrate_message_table_name($content_set->mcsid);
// If the caller tells us the base table of the view, we don't need
// to load the view (which would not work when called from hook_install())
if (isset($options['base_table'])) {
$tablename = $options['base_table'];
if (isset($options['base_database'])) {
$tabledb = $options['base_database'];
}
else {
$tabledb = 'default';
}
}
else {
// Get the proper field definition for the sourcekey
$view = views_get_view($content_set->view_name);
if (!$view) {
drupal_set_message(t('View !view does not exist - either (re)create this view, or
remove the content set using it.', array('!view' => $content_set->view_name)));
return NULL;
}
// Must do this to load the database
$view->init_query();
if (isset($view->base_database)) {
$tabledb = $view->base_database;
}
else {
$tabledb = 'default';
}
$tablename = $view->base_table;
}
Mike Ryan
committed
db_set_active($tabledb);
Mike Ryan
committed
db_set_active('default');
// If the PK of the content set is defined, make sure we have a mapping table
if (isset($content_set->sourcekey) && $content_set->sourcekey) {
$sourcefield = $sourceschema['fields'][$content_set->sourcekey];
Mike Ryan
committed
// The field name might be <table>_<column>...
if (!$sourcefield) {
$sourcekey = drupal_substr($content_set->sourcekey, drupal_strlen($tablename) + 1);
Mike Ryan
committed
$sourcefield = $sourceschema['fields'][$sourcekey];
}
Mike Ryan
committed
// But - we don't want serial fields to behave serially, so change to int
if ($sourcefield['type'] == 'serial') {
$sourcefield['type'] = 'int';
}
$schema_change = FALSE;
if (!db_table_exists($maptablename)) {
$schema = _migrate_map_table_schema($sourcefield);
db_create_table($ret, $maptablename, $schema);
// Expose map table to views
tw_add_tables(array($maptablename));
tw_add_fk($maptablename, 'destid');
Mike Ryan
committed
$schema = _migrate_message_table_schema($sourcefield);
db_create_table($ret, $msgtablename, $schema);
// Expose messages table to views
tw_add_tables(array($msgtablename));
tw_add_fk($msgtablename, 'sourceid');
$schema_change = TRUE;
Mike Ryan
committed
// TODO: Deal with varchar->int case where there is existing non-int data
$desired_schema = _migrate_map_table_schema($sourcefield);
$actual_schema = $inspect[$maptablename];
if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
$ret = array();
db_drop_primary_key($ret, $maptablename);
db_change_field($ret, $maptablename, 'sourceid', 'sourceid',
Mike Ryan
committed
$sourcefield, array('primary key' => array('sourceid')));
tw_perform_analysis($maptablename);
$schema_change = TRUE;
Mike Ryan
committed
}
$desired_schema = _migrate_message_table_schema($sourcefield);
$actual_schema = $inspect[$msgtablename];
if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
$ret = array();
db_drop_index($ret, $msgtablename, 'sourceid');
db_change_field($ret, $msgtablename, 'sourceid', 'sourceid',
Mike Ryan
committed
$sourcefield, array('indexes' => array('sourceid' => array('sourceid'))));
tw_perform_analysis($maptablename);
$schema_change = TRUE;
Mike Ryan
committed
}
// Make sure the schema gets updated to reflect changes
if ($schema_change) {
cache_clear_all('schema', 'cache');
}
Mike Ryan
committed
if ($was_array) {
$content_set = (array)$content_set;
Mike Ryan
committed
}
}
function migrate_save_content_mapping(&$mapping) {
drupal_write_record('migrate_content_mappings', $mapping, 'mcmid');
drupal_write_record('migrate_content_mappings', $mapping);
function migrate_delete_content_set($mcsid) {
// First, remove the map and message tables from the Table Wizard, and drop them
$ret = array();
Mike Ryan
committed
$maptable = migrate_map_table_name($mcsid);
$msgtable = migrate_message_table_name($mcsid);
if (db_table_exists($maptable)) {
tw_remove_tables(array($maptable, $msgtable));
db_drop_table($ret, $maptable);
db_drop_table($ret, $msgtable);
}
// Then, delete the content set data
$sql = "DELETE FROM {migrate_content_mappings} WHERE mcsid=%d";
db_query($sql, $mcsid);
$sql = "DELETE FROM {migrate_content_sets} WHERE mcsid=%d";
db_query($sql, $mcsid);
}
function migrate_delete_content_mapping($mcmid) {
$sql = "DELETE FROM {migrate_content_mappings} WHERE mcmid=%d";
db_query($sql, $mcmid);
}
/**
* Convenience function for generating a message array
*
* @param $message
* Text describing the error condition
* @param $type
* One of the MIGRATE_MESSAGE constants, identifying the level of error
* @return
* Structured array suitable for return from an import hook
function migrate_message($message, $type = MIGRATE_MESSAGE_ERROR) {
$error = array(
'level' => $type,
'message' => $message,
);
return $error;
}
/**
* Add a mapping from source ID to destination ID for the specified content set
*
* @param $mcsid
* ID of the content set being processed
* @param $sourceid
* Primary key value from the source
* @param $destid
* Primary key value from the destination
*/
function migrate_add_mapping($mcsid, $sourceid, $destid) {
static $maptables = array();
if (!isset($maptables[$mcsid])) {
Mike Ryan
committed
$maptables[$mcsid] = migrate_map_table_name($mcsid);
$mapping = new stdClass;
$mapping->sourceid = $sourceid;
$mapping->destid = $destid;
drupal_write_record($maptables[$mcsid], $mapping);
/**
* Clear migrated objects from the specified content set
*
* @param $mcsid
* ID of the content set to clear
* @param $messages
* Array of messages to (ultimately) be displayed by the caller.
* @param $options
* Keyed array of optional options:
* itemlimit - Maximum number of items to process
* timelimit - Unix timestamp after which to stop processing
* idlist - Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* feedback - Keyed array controlling status feedback to the caller
* function - PHP function to call, passing a message to be displayed
* frequency - How often to call the function
* frequency_unit - How to interpret frequency (items or seconds)
*
* @return
* Status of the migration process:
*/
function migrate_content_process_clear($mcsid, &$messages = array(), &$options = array()) {
$itemlimit = $options['itemlimit'];
$timelimit = $options['timelimit'];
$idlist = $options['idlist'];
$lastfeedback = time();
if (isset($options['feedback'])) {
$feedback = $options['feedback']['function'];
$frequency = $options['feedback']['frequency'];
$frequency_unit = $options['feedback']['frequency_unit'];
}
Mike Ryan
committed
$result = db_query("SELECT *
FROM {migrate_content_sets}
WHERE mcsid=%d", $mcsid);
$tblinfo = db_fetch_object($result);
$description = $tblinfo->description;
if ($tblinfo->semaphore) {
$messages[] = t('Content set !content_set has an operation already in progress.',
array('!content_set' => $description));
return MIGRATE_STATUS_IN_PROGRESS;
}
else {
$tblinfo->semaphore = TRUE;
drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid');
}
Mike Ryan
committed
$desttype = $tblinfo->desttype;
$view_name = $tblinfo->view_name;
$contenttype = $tblinfo->contenttype;
$sourcekey = $tblinfo->sourcekey;
Mike Ryan
committed
$maptable = migrate_map_table_name($mcsid);
$msgtablename = migrate_message_table_name($mcsid);
$processstart = microtime(TRUE);
$status = MIGRATE_STATUS_IN_PROGRESS;
// If we're being called on a content set that isn't flagged for clearing, temporarily flag it
$original_clearing = $tblinfo->clearing;
if (!$original_clearing) {
$sql = "UPDATE {migrate_content_sets} SET clearing=1 WHERE mcsid=%d";
db_query($sql, $mcsid);
}
Mike Ryan
committed
$deleted = 0;
if ($idlist) {
$sql = "SELECT sourceid,destid FROM {" . $maptable . "} WHERE sourceid IN ($idlist)";
Mike Ryan
committed
$sql = "SELECT sourceid,destid FROM {" . $maptable . "}";
}
timer_start('delete query');
if ($itemlimit) {
$deletelist = db_query_range($sql, 0, $itemlimit);
Mike Ryan
committed
$deletelist = db_query($sql);
}
timer_stop('delete query');
Mike Ryan
committed
while ($row = db_fetch_object($deletelist)) {
// Recheck clearing flag - permits dynamic interruption of jobs
Mike Ryan
committed
$sql = "SELECT clearing FROM {migrate_content_sets} WHERE mcsid=%d";
$clearing = db_result(db_query($sql, $mcsid));
if (!$clearing) {
$status = MIGRATE_STATUS_CANCELLED;
break;
}
// Check for time out if there is time info present
if (isset($timelimit) && time() >= $timelimit) {
$status = MIGRATE_STATUS_TIMEDOUT;
break;
}
if (isset($feedback)) {
if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) ||
($frequency_unit == 'items' && $deleted >= $frequency)) {
$message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, $status);
$feedback($message);
$lastfeedback = time();
$deleted = 0;
Mike Ryan
committed
}
Mike Ryan
committed
// @TODO: Should return success/failure. Problem: node_delete doesn't return anything...
migrate_invoke_all("delete_$contenttype", $row->destid);
Mike Ryan
committed
timer_start('clear map/msg');
db_query("DELETE FROM {" . $maptable . "} WHERE sourceid=%d", $row->sourceid);
db_query("DELETE FROM {" . $msgtablename . "} WHERE sourceid=%d AND level=%d",
$row->sourceid, MIGRATE_MESSAGE_INFORMATIONAL);
timer_stop('clear map/msg');
$deleted++;
}
if ($status == MIGRATE_STATUS_IN_PROGRESS) {
$status = MIGRATE_STATUS_SUCCESS;
}
$message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, $status);
if ($status == MIGRATE_STATUS_SUCCESS) {
Mike Ryan
committed
// Mark that we're done
$tblinfo->clearing = 0;
drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid');
Mike Ryan
committed
// Remove old messages before beginning new import process
db_query("DELETE FROM {" . $msgtablename . "} WHERE level <> %d", MIGRATE_MESSAGE_INFORMATIONAL);
}
if (isset($feedback)) {
$feedback($message);
}
else {
$messages[] = $message;
}
watchdog('migrate', $message);
if (!$original_clearing) {
$sql = "UPDATE {migrate_content_sets} SET clearing=0 WHERE mcsid=%d";
db_query($sql, $mcsid);
}
$tblinfo->semaphore = FALSE;
drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid');
return $status;
Mike Ryan
committed
}
/**
* Import objects from the specified content set
*
* @param $mcsid
* ID of the content set to clear
* @param $messages
* Array of messages to (ultimately) be displayed by the caller.
* @param $options
* Keyed array of optional options:
* itemlimit - Maximum number of items to process
* timelimit - Unix timestamp after which to stop processing
* idlist - Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* feedback - Keyed array controlling status feedback to the caller
* function - PHP function to call, passing a message to be displayed
* frequency - How often to call the function
* frequency_unit - How to interpret frequency (items or seconds)
*
* @return
* Status of the migration process:
*/
function migrate_content_process_import($mcsid, &$messages = array(), &$options = array()) {
$itemlimit = $options['itemlimit'];
$timelimit = $options['timelimit'];
$idlist = $options['idlist'];
$lastfeedback = time();
if (isset($options['feedback'])) {
$feedback = $options['feedback']['function'];
$frequency = $options['feedback']['frequency'];
$frequency_unit = $options['feedback']['frequency_unit'];
}
Mike Ryan
committed
$result = db_query("SELECT *
FROM {migrate_content_sets}
WHERE mcsid=%d", $mcsid);
$tblinfo = db_fetch_object($result);
$description = $tblinfo->description;
if ($tblinfo->semaphore) {
$messages[] = t('Content set !content_set has an operation already in progress.',
array('!content_set' => $description));
return MIGRATE_STATUS_IN_PROGRESS;
}
else {
$tblinfo->semaphore = TRUE;
drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid');
}
Mike Ryan
committed
$desttype = $tblinfo->desttype;
$view_name = $tblinfo->view_name;
$contenttype = $tblinfo->contenttype;
$sourcekey = $tblinfo->sourcekey;
Mike Ryan
committed
$maptable = migrate_map_table_name($mcsid);
$msgtablename = migrate_message_table_name($mcsid);
Mike Ryan
committed
$processstart = microtime(TRUE);
$status = MIGRATE_STATUS_IN_PROGRESS;
// If we're being called on a content set that isn't flagged for importing, temporarily flag it
$original_importing = $tblinfo->importing || $tblinfo->scanning;
if (!$original_importing) {
$sql = "UPDATE {migrate_content_sets} SET importing=1 WHERE mcsid=%d";
db_query($sql, $mcsid);
}
Mike Ryan
committed
$collist = db_query("SELECT srcfield, destfield, default_value
FROM {migrate_content_mappings}
WHERE mcsid=%d AND (srcfield <> '' OR default_value <> '')
ORDER BY mcmid",
Mike Ryan
committed
$mcsid);
$fields = array();
while ($row = db_fetch_object($collist)) {
$fields[$row->destfield]['srcfield'] = $row->srcfield;
$fields[$row->destfield]['default_value'] = $row->default_value;
}
$tblinfo->fields = $fields;
$tblinfo->maptable = $maptable;
// We pick up everything in the input view that is not already imported, and
Mike Ryan
committed
// not already errored out
// Emulate views execute(), so we can scroll through the results ourselves
$view = views_get_view($view_name);
if (!$view) {
$messages[] = t('View !view does not exist - either (re)create this view, or
remove the content set using it.', array('!view' => $view_name));
return MIGRATE_STATUS_FAILURE;
}
Mike Ryan
committed
$view->build();
Mike Ryan
committed
// Let modules modify the view just prior to executing it.
foreach (module_implements('views_pre_execute') as $module) {
$function = $module . '_views_pre_execute';
$function($view);
}
$viewdb = $view->base_database;
Mike Ryan
committed
// Add a left join to the map table, and only include rows not in the map
$join = new views_join;
// Views prepends <base_table>_ to column names other than the base table's
// primary key - we need to strip that here for the join to work. But, it's
// common for tables to have the tablename beginning field names (e.g.,
// table cms with PK cms_id). Deal with that as well...
$baselen = drupal_strlen($view->base_table);
if (!strncasecmp($sourcekey, $view->base_table . '_', $baselen + 1)) {
// So, which case is it? Ask the schema module...
db_set_active($viewdb);
$inspect = schema_invoke('inspect', db_prefix_tables('{'. $view->base_table .'}'));
db_set_active('default');
$tableschema = $inspect[$view->base_table];
$sourcefield = $tableschema['fields'][$sourcekey];
if (!$sourcefield) {
$joinkey = drupal_substr($sourcekey, $baselen + 1);
$sourcefield = $tableschema['fields'][$joinkey];
if (!$sourcefield) {
$messages[] = t("In view !view, can't find key !key for table !table",
array('!view' => $view_name, '!key' => $sourcekey, '!table' => $view->base_table));
return MIGRATE_STATUS_FAILURE;
}
}
else {
$joinkey = $sourcekey;
}
else {
$joinkey = $sourcekey;
}
$join->construct($maptable, $view->base_table, $joinkey, 'sourceid');
Mike Ryan
committed
$view->query->add_relationship($maptable, $join, $view->base_table);
$view->query->add_where(0, "$maptable.sourceid IS NULL", $view->base_table);
// Ditto for the errors table
$join = new views_join;
$join->construct($msgtablename, $view->base_table, $joinkey, 'sourceid');
Mike Ryan
committed
$view->query->add_relationship($msgtablename, $join, $view->base_table);
$view->query->add_where(0, "$msgtablename.sourceid IS NULL", $view->base_table);
// If running over a selected list of IDs, pass those in to the query
if ($idlist) {
$view->query->add_where($view->options['group'], $view->base_table . ".$sourcekey IN ($idlist)",
Mike Ryan
committed
$view->base_table);
}
// We can't seem to get $view->build() to rebuild build_info, so go straight into the query object
$query = $view->query->query();
$query = db_rewrite_sql($query, $view->base_table, $view->base_field,
Mike Ryan
committed
array('view' => &$view));
$args = $view->build_info['query_args'];
$replacements = module_invoke_all('views_query_substitutions', $view);
$query = str_replace(array_keys($replacements), $replacements, $query);
if (is_array($args)) {
foreach ($args as $id => $arg) {
$args[$id] = str_replace(array_keys($replacements), $replacements, $arg);
}
}
// Now, make the current db name explicit if content set is pulling tables from another DB
if ($viewdb <> 'default') {
global $db_url;
$url = parse_url(is_array($db_url) ? $db_url['default'] : $db_url);
$currdb = drupal_substr($url['path'], 1);
$query = str_replace('{' . $maptable . '}',
Mike Ryan
committed
$currdb . '.' . '{' . $maptable . '}', $query);
$query = str_replace('{' . $msgtablename . '}',
Mike Ryan
committed
$currdb . '.' . '{' . $msgtablename . '}', $query);
db_set_active($viewdb);
}
Mike Ryan
committed
//drupal_set_message($query);
timer_start('execute view query');
if ($itemlimit) {
$importlist = db_query_range($query, $args, 0, $itemlimit);
Mike Ryan
committed
$importlist = db_query($query, $args);
}
timer_stop('execute view query');
if ($viewdb != 'default') {
db_set_active('default');
}
$imported = 0;
timer_start('db_fetch_object');
while ($row = db_fetch_object($importlist)) {
timer_stop('db_fetch_object');
// Recheck importing flag - permits dynamic interruption of cron jobs
$sql = "SELECT importing,scanning FROM {migrate_content_sets} WHERE mcsid=%d";
$checkrow = db_fetch_object(db_query($sql, $mcsid));
$importing = $checkrow->importing;
$scanning = $checkrow->scanning;
if (!($importing || $scanning)) {
$status = MIGRATE_STATUS_CANCELLED;
break;
}
Mike Ryan
committed
// Check for time out if there is time info present
if (isset($timelimit) && time() >= $timelimit) {
$status = MIGRATE_STATUS_TIMEDOUT;
break;
}
Mike Ryan
committed
if (isset($feedback)) {
if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) ||
($frequency_unit == 'items' && $imported >= $frequency)) {
$message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $status);
$feedback($message);
$lastfeedback = time();
$imported = 0;
}
}
Mike Ryan
committed
timer_start('import hooks');
$errors = migrate_invoke_all("import_$contenttype", $tblinfo, $row);
Mike Ryan
committed
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
timer_stop('import hooks');
// Ok, we're done. Preview the node or save it (if no errors).
if (count($errors)) {
$success = TRUE;
foreach ($errors as $error) {
if (!isset($error['level'])) {
$error['level'] = MIGRATE_MESSAGE_ERROR;
}
if ($error['level'] != MIGRATE_MESSAGE_INFORMATIONAL) {
$success = FALSE;
}
db_query("INSERT INTO {" . $msgtablename . "}
(sourceid, level, message)
VALUES('%s', %d, '%s')",
$row->$sourcekey, $error['level'], $error['message']);
}
if ($success) {
$imported++;
}
}
else {
$imported++;
}
timer_start('db_fetch_object');
}
timer_stop('db_fetch_object');
if ($status == MIGRATE_STATUS_IN_PROGRESS) {
$status = MIGRATE_STATUS_SUCCESS;
}
$message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $status);
if ($status == MIGRATE_STATUS_SUCCESS) {
// Remember we're done
Mike Ryan
committed
if ($importing) {
$tblinfo->importing = 0;
Mike Ryan
committed
}
$tblinfo->lastimported = date('Y-m-d H:i:s');
Mike Ryan
committed
}
if (isset($feedback)) {
$feedback($message);
}
else {
$messages[] = $message;
}
watchdog('migrate', $message);
if (!$original_importing) {
$sql = "UPDATE {migrate_content_sets} SET importing=0 WHERE mcsid=%d";
db_query($sql, $mcsid);
}
$tblinfo->semaphore = FALSE;
drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid');
return $status;
Mike Ryan
committed
}
/* Revisit
function migrate_content_process_all_action(&$dummy, $action_context, $a1, $a2) {
migrate_content_process_all(time());
}
function migrate_content_process_all_batch($starttime, $limit, $idlist, &$context) {
$messages = array();
// A zero max_execution_time means no limit - but let's set a reasonable
// limit anyway
$starttime = time();
$maxexectime = ini_get('max_execution_time');
if (!$maxexectime) {
$maxexectime = 240;
}
// Initialize the Batch API context
$context['finished'] = 0;
// The Batch API progress bar will reflect the number of operations being
// done (clearing/importing/scanning)
if (!isset($context['sandbox']['numops'])) {
$numops = 0;
$sql = "SELECT COUNT(*) FROM {migrate_content_sets} WHERE clearing=1";
$numops = db_result(db_query($sql));
$sql = "SELECT COUNT(*) FROM {migrate_content_sets} WHERE importing=1 OR scanning=1";
$numops += db_result(db_query($sql));
$context['sandbox']['numops'] = $numops;
$context['sandbox']['numopsdone'] = 0;
}
// For the timelimit, subtract more than enough time to clean up
$options = array(
'itemlimit' => $limit,
'timelimit' => $starttime + (($maxexectime < 20) ? $maxexectime : ($maxexectime - 20)),
'idlist' => $idlist,
);
$status = migrate_content_process_all($messages, $options);
foreach ($messages as $message) {
if (!isset($context['sandbox']['message'])) {
$context['sandbox']['message'] = $message . '<br />';
}
else {
$context['sandbox']['message'] .= $message . '<br />';
}
$context['message'] = $context['sandbox']['message'];
$context['results'][] = $message;
$context['sandbox']['numopsdone'] += $options['opcount'];
}
// If we did not arrive via a timeout, we must have finished all operations
if ($status != MIGRATE_STATUS_TIMEDOUT) {
$context['finished'] = 1;
}
else {
// Not done, report what percentage done we are (in terms of number of operations)
$context['finished'] = $context['sandbox']['numopsdone']/$context['sandbox']['numops'];
}
// If requested save timers for eventual display
if (variable_get('migrate_display_timers', 0)) {
global $timers;
foreach ($timers as $name => $timerec) {
if (isset($timerec['time'])) {
$context['sandbox']['times'][$name] += $timerec['time']/1000;
}
}
// When all done, display the timers
if ($context['finished'] == 1 && isset($context['sandbox']['times'])) {
global $timers;
arsort($context['sandbox']['times']);
foreach ($context['sandbox']['times'] as $name => $total) {
drupal_set_message("$name: " . round($total, 2));
}
}
}
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
}
/**
* Process all enabled migration processes
*
* @param $messages
* Array of messages to (ultimately) be displayed by the caller.
* @param $options
* Keyed array of optional options:
* itemlimit - Maximum number of items to process
* timelimit - Unix timestamp after which to stop processing
* idlist - Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* opcount - Number of clearing or import operations performed
* feedback - Keyed array controlling status feedback to the caller
* function - PHP function to call, passing a message to be displayed
* frequency - How often to call the function
* frequency_unit - How to interpret frequency (items or seconds)
*
* @return
* Status of the migration process:
*/
function migrate_content_process_all(&$messages = array(), &$options = array()) {
// First, perform any clearing actions in reverse order
$result = db_query("SELECT mcsid
FROM {migrate_content_sets}
WHERE clearing=1
ORDER BY weight DESC");
$context['sandbox']['timedout'] = FALSE;
if (!isset($options['opcount'])) {
$options['opcount'] = 0;
}
while ($row = db_fetch_object($result)) {
$status = migrate_content_process_clear($row->mcsid, $messages, $options);
if ($status != MIGRATE_STATUS_SUCCESS) {
break;
}
$options['opcount']++;
}
// Then, any import actions going forward
$result = db_query("SELECT mcsid
FROM {migrate_content_sets}
WHERE importing=1 OR scanning=1
ORDER BY weight");
while ($row = db_fetch_object($result)) {
$status = migrate_content_process_import($row->mcsid, $messages, $options);
if ($status != MIGRATE_STATUS_SUCCESS) {
break;
}
$options['opcount']++;
}
return $status;
}
function _migrate_progress_message($starttime, $numitems, $description, $import = TRUE, $status = MIGRATE_STATUS_SUCCESS) {
$time = (microtime(TRUE) - $starttime);
if ($time > 0) {
$perminute = round(60*$numitems/$time);
$time = round($time, 1);
$perminute = '?';
}
if ($import) {
switch ($status) {
case MIGRATE_STATUS_SUCCESS:
$basetext = "!numitems items imported in !time seconds (!perminute/min) - done importing '!description'";;
break;
case MIGRATE_STATUS_FAILURE:
$basetext = "!numitems items imported in !time seconds (!perminute/min) - failure importing '!description'";;
break;
case MIGRATE_STATUS_TIMEDOUT:
case MIGRATE_STATUS_IN_PROGRESS:
$basetext = "!numitems items imported in !time seconds (!perminute/min) - continuing importing '!description'";
break;
case MIGRATE_STATUS_CANCELLED:
$basetext = "!numitems items imported in !time seconds (!perminute/min) - cancelled importing '!description'";
break;
}
}
else {
switch ($status) {
case MIGRATE_STATUS_SUCCESS:
$basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - done clearing '!description'";;
break;
case MIGRATE_STATUS_FAILURE:
$basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - failure clearing '!description'";;
break;
case MIGRATE_STATUS_TIMEDOUT:
case MIGRATE_STATUS_IN_PROGRESS:
$basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - continuing clearing '!description'";
break;
case MIGRATE_STATUS_CANCELLED:
$basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - cancelled clearing '!description'";
break;
}
}
$message = t($basetext,
array('!numitems' => $numitems, '!time' => $time, '!perminute' => $perminute,
'!description' => $description));
return $message;
}
Mike Ryan
committed
/*
Mike Ryan
committed
*/
function migrate_init() {
// Loads the hooks for the supported modules.
// TODO: Be more lazy - only load when really needed
$path = drupal_get_path('module', 'migrate') .'/modules';
$files = drupal_system_listing('.*\.inc$', $path, 'name', 0);
foreach ($files as $module_name => $file) {
if (module_exists($module_name)) {
include_once($file->filename);
}
}
Mike Ryan
committed
drupal_add_css(drupal_get_path('module', 'migrate') .'/migrate.css');
}
Mike Ryan
committed
/**
* Implementation of hook_action_info().
Mike Ryan
committed
*/
/* Revisit
Mike Ryan
committed
function migrate_action_info() {
$info['migrate_content_process_clear'] = array(
Mike Ryan
committed
'type' => 'migrate',
'description' => t('Clear a migration content set'),
'configurable' => FALSE,
'hooks' => array(
'cron' => array('run'),
),
);
$info['migrate_content_process_import'] = array(
'type' => 'migrate',
'description' => t('Import a migration content set'),
'configurable' => FALSE,
'hooks' => array(
'cron' => array('run'),
),
);
$info['migrate_content_process_all_action'] = array(
Mike Ryan
committed
'type' => 'migrate',
'description' => t('Perform all active migration processes'),
'configurable' => FALSE,
'hooks' => array(
'cron' => array('run'),
),
);
return $info;
}
Mike Ryan
committed
Mike Ryan
committed
/**
Mike Ryan
committed
*/
function migrate_cron() {
if (variable_get('migrate_enable_cron', 0)) {
$path = drupal_get_path('module', 'migrate') . '/migrate_pages.inc';
include_once($path);
// Elevate privileges so node deletion/creation works in cron
session_save_session(FALSE);
global $user;
$saveuser = $user;
$user = user_load(array('uid' => 1));
$messages = array();
// A zero max_execution_time means no limit - but let's set a reasonable
// limit anyway
$starttime = variable_get('cron_semaphore', 0);
$maxexectime = ini_get('max_execution_time');
if (!$maxexectime) {
$maxexectime = 240;
}
$options = array('timelimit' => $starttime + (($maxexectime < 20) ? $maxexectime : ($maxexectime - 20)));
migrate_content_process_all($messages, $options);
$user = $saveuser;
session_save_session(TRUE);
Mike Ryan
committed
}
/**
* Implementation of hook_perm().
*/
function migrate_perm() {
return array(MIGRATE_ACCESS_BASIC, MIGRATE_ACCESS_ADVANCED);
Mike Ryan
committed
}
/**
* Implementation of hook_help().
*/
function migrate_help($page, $arg) {
switch ($page) {
case 'admin/content/migrate':
return theme('advanced_help_topic', 'migrate', 'about', 'icon') .
t('Click the question marks like this one to read the migrate module help topics.');
case 'admin/content/migrate/content_sets':
return t('Define sets of mappings from imported tables to Drupal content. These are the
migrations which are later processed.');
Moshe Weitzman
committed
case 'admin/content/migrate/process':
return t('View and manage import processes here. Processes that are enabled for processing
are checked - they can be cancelled by unchecking, or new processes begun by checking,
then clicking Submit. Any checked process will run in the background (via cron)
automatically - you may also run them interactively or in drush. A process that is
actively running will be <span class="migrate-running">highlighted</span>.');
Moshe Weitzman
committed
case 'admin/content/migrate/tools':
return t('Besides content that is migrated into a new site, nodes may be manually
created during the testing process. Typically you will want to clear these before the
final migration - if you are <strong>absolutely positive</strong> that all nodes of a
given type should be deleted, you may do so here.');