Commit 52d86364 authored by Caleb Weeks's avatar Caleb Weeks

final implementing broad changes to restubbing / posting mechanism

parent 151fb96f
const SC = require("sling-connector");
const _=require("lodash");
const defaultSite = _.get(global.bl,["config","environment","defaultSite"]);
const defaultSC = new SC(_.get(global.bl.config,[defaultSite, "modes", "author", "sling"]));
const log = global.bl.logger.get("blacklight.post-data");
/*
The purpose of this queue is simply to post data to the indicated SC
*/
module.exports.maxRetries = 10;
module.exports.retryInterval = 500; // 500 ms between retrying failed job
module.exports.taskInterval = 100; // 500 ms between queue processing
module.exports.process = (params, cb)=>{
var {path, form , attempt} = params;
let sc = defaultSC;
if(!path || !form){
cb('Insufficient data provided to the post-data queue');
}else{
let targetPath = path;
let formParams = form;
let submitRequest = (touchErr)=>{
if(touchErr){
log.error("Couldn't touch. Not trying POST", {path, form});
cb(touchErr);
}else{
log.debug('Attempting POST to ' + path);
sc.post(targetPath, formParams, (err)=>{
if(err){
if(attempt < module.exports.maxRetries){
log.warning(`Failed to POST to ${path} on attempt ${attempt}`);
}else{
log.error(`Failed to POST to ${path} on attempt ${attempt}`, {targetPath, formParams});
}
}
cb(err);
});
}
};
//we only want to go with the touch approach when we're under or on a page
if(_.includes(targetPath, "/jcr:content")){
//do a 'touch' to ensure there are no current conflicts
let jcrPath = targetPath.split('/jcr:content')[0] + '/jcr:content';
//first ensure the path exists
sc.getSling(`/bin/exists.json?path=${jcrPath}`, (err, data) => {
if (!data || !data.exists) {
log.debug(`${jcrPath} doesn't exist. Not touching.`);
//page doesn't exst, can't touch
submitRequest();
}else{
log.debug(`${jcrPath} exists. Touching.`);
//make sure all is good before proceeding
module.exports.touch(sc, jcrPath, submitRequest);
}
});
}else{
submitRequest();
}
}
};
//keep retring saving to the path until success, or until 30 seconds has passed
module.exports.touch = (sc, path, touchCb, attempt) => {
attempt = attempt || 0;
if(attempt < 60){
sc.post(path, {'bl:touch': (new Date()).getTime() + ""}, (err)=>{
if(!err){
log.debug(`Successfully touched ${path} on attempt ${attempt}`);
touchCb();
}else{
setTimeout(()=>{module.exports.touch(sc, path, touchCb, attempt + 1)}, 500);
}
});
}else{
let touchErr = `Failed to successfully touch ${path} after 60 attempts`;
log.error(touchErr)
touchCb(touchErr);
}
}
\ No newline at end of file
......@@ -8,6 +8,7 @@ const log = global.bl.logger.get("blacklight.restub");
var defaultSite = _.get(global.bl,["config","environment","defaultSite"]);
var defaultSC = new SC(_.get(global.bl.config,[defaultSite, "modes", "author", "sling"]));
var isBlQuery = /\/bl:query$/;
/**
......@@ -22,6 +23,20 @@ module.exports = function restubFromBlQuery(params, cb){
var {blQueryPath, sc} = params;
sc = sc || defaultSC;
// function to add jobs to the post update queue
let addPostJob = (path, form) =>{
const postQueue = global.bl.queues.get("blacklight.edit.post-data");
let tasks={prototype:{}, tasks:[{path, form}]};
log.debug('Restub query job', tasks);
postQueue.makeJob(tasks,(err,job)=>{
if(err){log.error("Couldn't add job to 'post-data' queue", err);}
else{
log.info("Success adding job to 'post-data' queue:", job.id);
}
});
};
if(!isBlQuery.test(blQueryPath)){cb("Provided 'blQueryPath' does not end with 'bl:query': " + blQueryPath); return;}
......@@ -36,7 +51,7 @@ module.exports = function restubFromBlQuery(params, cb){
//we remove the note about processing when the processing is done
let origCb = cb;
cb = (cbErr) => {
sc.post(path, {'bl:processing@Delete': "true" }, _.noop);
addPostJob(path, {'bl:processing@Delete': "true" });
origCb(cbErr)
}
......@@ -339,27 +354,6 @@ module.exports = function restubFromBlQuery(params, cb){
// TODO: update local bl:query params to exclude local page, upon delete of that local page
//start dealing with deactivation requests and create promises for it
var deactivatePage = function(path) {
return new Promise((resolve) => {
sc.post("/bin/replicate.json", {path: path, cmd: 'deactivate'}, (err) => {resolve();});
});
}
//deactive one at a time to try and fix org.apache.sling.api.resource.PersistenceException
var deactivatePages = function(paths) {
var p = Promise.resolve();
paths.forEach(function(path){
p = p.then(function(){
return deactivatePage(path);
});
});
return p;
};
var deactivatePromise = deactivatePages(deactivations);
let throttledRequests = [];
let reqIdx = 0;
......@@ -387,85 +381,20 @@ module.exports = function restubFromBlQuery(params, cb){
});
//keep retring saving to the path until success, or until 30 seconds has passed
let touch = (path, touchCb, attempt) => {
attempt = attempt || 0;
if(attempt < 60){
sc.post(path, {'bl:touch': (new Date()).getTime() + ""}, (err)=>{
if(!err){
log.info(`Successfully touched ${path} on attempt ${attempt}`);
touchCb();
}else{
setTimeout(()=>{touch(path, touchCb, attempt + 1)}, 500);
}
});
}else{
let touchErr = `Failed to successfully touch ${path} after 60 attempts`;
log.error(touchErr)
touchCb(touchErr);
}
}
let next = function(err){
if(!err){
if(throttledRequests.length){
let submitThrottledRequest = (touchErr)=>{
if(touchErr){
log.error('Not trying stub POST', {targetPath, formParams});
next(touchErr);
}else{
var formParams = throttledRequests.shift();
sc.post(targetPath, formParams, (err)=>{
if(err){
log.error('Error processing stub POST', {targetPath, formParams});
}
next(err);
});
}
};
//we only want to go with the touch approach when we're under or on a page
if(_.includes(targetPath, "/jcr:content")){
//do a 'touch' to ensure there are no current conflicts
let jcrPath = targetPath.split('/jcr:content')[0] + '/jcr:content';
//first ensure the path exists
sc.getSling(`/bin/exists.json?path=${jcrPath}`, (err, data) => {
if (!data || !data.exists) {
console.log(`${jcrPath} doesn't exist. Not touching.`);
//page doesn't exst, can't touch
submitThrottledRequest();
}else{
console.log(`${jcrPath} exists. Touching.`);
//make sure all is good before proceeding
touch(jcrPath, submitThrottledRequest);
}
});
}else{
submitThrottledRequest();
}
}else{
cb();
}
}else{
cb(err);
return;
}
}
//wait for deactivations to finish
deactivatePromise.then(() => {
next();
/***************************/
// actually add our params to the queue for processing
/***************************/
_.each(deactivations, deactivation => {
addPostJob("/bin/replicate.json", {path: path, cmd: 'deactivate'});
});
_.each(throttledRequests, (throttled)=>{
addPostJob(targetPath, throttled);
});
log.info('Added ' + (_.size(deactivations) + _.size(throttledRequests)) + ' post update jobs.' );
cb();
});
}
});
......
......@@ -4,6 +4,7 @@ const _=require("lodash");
const defaultSite = _.get(global.bl,["config","environment","defaultSite"]);
const defaultSC = new SC(_.get(global.bl.config,[defaultSite, "modes", "author", "sling"]));
const config = global.bl.config.environment;
const log = global.bl.logger.get("blacklight.restub-move");
/*
......@@ -23,7 +24,7 @@ module.exports = function restubFromMove(params, cb){
var {moveInfo, sc} = params;
sc = sc || defaultSC;
console.log('made it to the restubFromMove', moveInfo);
log.debug('made it to the restubFromMove', moveInfo);
if(moveInfo && moveInfo.originalPath && moveInfo.newPath && moveInfo.distanceMoved === 0){
let originalNodeName = _.last(moveInfo.originalPath.split('/'));
......@@ -55,7 +56,7 @@ module.exports = function restubFromMove(params, cb){
let url = buildQueryUrl(prop);
sc.getSling(url,{leaveMangledNames:true}, (err, matches)=>{
if(err){
console.log("Couldn't query Sling service for " + prop, err);
log.error("Couldn't query Sling service for " + prop, err);
queryCb(err);
}else{
if(matches.length){
......@@ -68,7 +69,7 @@ module.exports = function restubFromMove(params, cb){
}
});
}else{
console.log('No ' + prop + ' entries found for this move');
log.warn('No ' + prop + ' entries found for this move');
}
}
......@@ -79,7 +80,7 @@ module.exports = function restubFromMove(params, cb){
/////////////////////////////
let doneQuerying = (err)=>{
console.log('done querying: ', {err, updates, renames});
log.debug('done querying: ', {err, updates, renames});
if(err){
cb(err)
}else{
......@@ -89,7 +90,6 @@ module.exports = function restubFromMove(params, cb){
let nextOp = ()=>{
if(allOperations.length){
let curOp = allOperations.shift();
console.log('curOp', curOp);
let formParams = {}, slingTarget;
if(curOp.prop){
formParams[curOp.prop] = curOp.updatedValue;
......@@ -99,7 +99,7 @@ module.exports = function restubFromMove(params, cb){
formParams[':dest'] = curOp.to;
slingTarget = curOp.from;
}
console.log('post info', {slingTarget, formParams});
log.debug('post info', {slingTarget, formParams});
sc.post(slingTarget, formParams, (err)=>{
if(err){
cb(err);
......@@ -131,7 +131,7 @@ module.exports = function restubFromMove(params, cb){
let oldStubPrefix = pathParts.splice(0, pathParts.length - (pageMode ? 2 : 1)).join('/');
let oldStubbedNodeName = pathParts[0];
console.log('looking for rename', {path, oldStubPrefix, oldStubbedNodeName});
log.debug('looking for rename', {path, oldStubPrefix, oldStubbedNodeName});
if(oldStubbedNodeName === originalNodeName){
renames.push({from: `${oldStubPrefix}/${originalNodeName}`, to: `${oldStubPrefix}/${newNodeName}`});
}
......
var SC = require("sling-connector");
var restubFromBlQuery = require("./lib/restub-from-blquery");
var restubFromMove = require("./lib/restub-from-move");
var postData = require("./lib/post-data");
var performMigration = require("./lib/migrations").performMigration;
var _=require("lodash");
const _=require("lodash");
const log = global.bl.logger.get("blacklight.queues");
var queues = module.exports;
var config=global.bl.config;
......@@ -12,7 +14,6 @@ var docx=/":docx/g ;
var jcrIndex=/\[(\d)\]\"\:/g;
// console.log("SITE:", global.bl.modules)
// TODO: store details of source connection
......@@ -37,7 +38,7 @@ queues.transfer={
var sourceSC=_.get(global.bl.config,[task.sourceSite,"modes",task.sourceMode, "sling"]);
sourceSC = new SC(sourceSC);
console.log("Transfering: ", task.url, "to", targetSC.baseUri);
log.debug("Transfering: ", task.url, "to", targetSC.baseUri);
var path = task.url.replace(/\/$/,"") + "/jcr:content"
var sourcePath = path + ".infinity.json";
......@@ -55,7 +56,7 @@ queues.transfer={
targetSC.import(path,data,{}, function(err, result, resp, respBody){
if(err){
console.error("Transfer error: ", path, "\n", data, "\nErr:",err,"\n");
log.error("Transfer error: ", path, "\n", data, "\nErr:",err,"\n");
cb({message:"ERROR replicating:" + sourcePath, error: err}); return;
}
cb(null, targetSC.baseUri + sourcePath)
......@@ -66,9 +67,9 @@ queues.transfer={
complete:(err, task, job, results)=>{
if(err){
console.log("Back in bl.edit.queue.js. Transfer failed:", err);
log.error("Back in bl.edit.queue.js. Transfer failed:", err);
}else{
console.log("OK. Back in bl.edit.queue.js. Transfer complete for job:",job.id);
log.info("OK. Back in bl.edit.queue.js. Transfer complete for job:",job.id);
}
}
}
......@@ -100,9 +101,9 @@ queues["mapped-stubs"]={
complete:(err, task, job, results)=>{
if(err){
console.log("mapped-stub queue failed when processing changes to collection data:", err);
log.error("mapped-stub queue failed when processing changes to collection data:", err);
}else{
console.log("mapped-stub job complete, id:",job.id);
log.info("mapped-stub job complete, id:",job.id);
}
}
}
......@@ -127,9 +128,9 @@ queues["mapped-stubs-rename"]={
complete:(err, task, job, results)=>{
if(err){
console.log("rename mapped-stub queue failed:", err);
log.error("rename mapped-stub queue failed:", err);
}else{
console.log("mapped-stub job complete, id:",job.id);
log.info("mapped-stub job complete, id:",job.id);
}
}
}
......@@ -137,6 +138,37 @@ queues["mapped-stubs-rename"]={
/*************************************************************************************************/
/*************************************************************************************************/
/*************************************************************************************************/
queues["post-data"]={
description: "Queue for POSTing data to a sling connector",
displayMap: (task, taskPrototype)=>{
return {title:task.path, link: task.path}
},
on:{
process:(task, job, cb)=>{
postData.process({path: task.path, form: task.form, attempt: task.attemptCount}, cb);
},
complete:(err, task, job, results)=>{
if(err){
log.error(`post-data queue failed after ${task.attemptCount + 1} attempts:`, err);
}else{
log.info(`post-data job complete on attempt ${task.attemptCount + 1}, id:`,job.id);
}
}
},
maxRetries : postData.maxRetries,
retryInternal : postData.retryInterval,
taskInterval: postData.taskInterval
}
/*************************************************************************************************/
/*************************************************************************************************/
......@@ -158,9 +190,9 @@ queues.migrations={
complete:(err, task, job, results)=>{
if(err){
console.log("migrations job failed:", job.id, err);
log.error("migrations job failed:", job.id, err);
}else{
console.log("migrations job complete, id:", job.id);
log.info("migrations job complete, id:", job.id);
}
}
}
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment