const EventEmitter = require('events');
|
const LoadBalancer = require('../load-balancer');
|
const Loader = require('../../loader');
|
const ForkProcess = require('../child/forkProcess');
|
const Channel = require('../../const/channel');
|
const Helper = require('../../utils/helper');
|
const Conf = require('../../config/cache');
|
|
class ChildPoolJob extends EventEmitter {
|
|
constructor(opt = {}) {
|
super();
|
let options = Object.assign({
|
weights: [],
|
}, opt);
|
|
this.config = {};
|
this.boundMap = new Map();
|
this.children = {};
|
this.min = 3;
|
this.max = 6;
|
this.strategy = 'polling';
|
this.weights = new Array(this.max).fill().map((v, i) => {
|
let w = Helper.validValue(options.weights[i]) ? options.weights[i] : 1
|
return w;
|
});
|
|
let lbOpt = {
|
algorithm: LoadBalancer.Algorithm.polling,
|
targets: [],
|
}
|
this.LB = new LoadBalancer(lbOpt);
|
|
const cfg = Conf.getValue('jobs');
|
if (cfg) {
|
this.config = cfg;
|
}
|
|
this._initEvents();
|
}
|
|
/**
|
* 初始化监听
|
*/
|
_initEvents() {
|
this.on(Channel.events.childProcessExit, (data) => {
|
this._removeChild(data.pid);
|
});
|
this.on(Channel.events.childProcessError, (data) => {
|
this._removeChild(data.pid);
|
});
|
}
|
|
/**
|
* 移除对象
|
*/
|
_removeChild(pid) {
|
const length = Object.keys(this.children).length;
|
const lbOpt = {
|
id: pid,
|
weight: this.weights[length - 1],
|
}
|
this.LB.del(lbOpt);
|
delete this.children[pid];
|
}
|
|
/**
|
* 创建一个池子
|
*/
|
async create(number = 3) {
|
if (number < 0 || number > this.max) {
|
throw new Error(`[ee-core] [jobs/child-pool] The number is invalid !`);
|
}
|
let currentNumber = this.children.length;
|
if (currentNumber > this.max) {
|
throw new Error(`[ee-core] [jobs/child-pool] The number of current processes number: ${currentNumber} is greater than the maximum: ${this.max} !`);
|
}
|
|
if (number + currentNumber > this.max) {
|
number = this.max - currentNumber;
|
}
|
|
// args
|
let options = Object.assign({
|
processArgs: {
|
type: 'childPoolJob'
|
}
|
}, {});
|
for (let i = 1; i <= number; i++) {
|
let task = new ForkProcess(this, options);
|
this._childCreated(task);
|
}
|
|
let pids = Object.keys(this.children);
|
|
return pids;
|
}
|
|
/**
|
* 子进程创建后处理
|
*/
|
_childCreated(childProcess) {
|
let pid = childProcess.pid;
|
this.children[pid] = childProcess;
|
|
const length = Object.keys(this.children).length;
|
let lbTask = {
|
id: pid,
|
weight: this.weights[length - 1],
|
}
|
this.LB.add(lbTask);
|
}
|
|
/**
|
* 执行一个job文件
|
*/
|
run(filepath, params = {}) {
|
const jobPath = Loader.getFullpath(filepath);
|
const childProcess = this.getChild();
|
childProcess.dispatch('run', jobPath, params);
|
|
return childProcess;
|
}
|
|
/**
|
* 异步执行一个job文件
|
*/
|
async runPromise(filepath, params = {}) {
|
return this.run(filepath, params);
|
}
|
|
/**
|
* 获取绑定的进程对象
|
*/
|
getBoundChild(boundId) {
|
let proc;
|
const boundPid = this.boundMap.get(boundId);
|
if (boundPid) {
|
proc = this.children[boundPid];
|
return proc;
|
}
|
|
// 获取进程并绑定
|
proc = this.getChild();
|
this.boundMap.set(boundId, proc.pid);
|
|
return proc;
|
}
|
|
/**
|
* 通过pid获取一个子进程对象
|
*/
|
getChildByPid(pid) {
|
let proc = this.children[pid] || null;
|
return proc;
|
}
|
|
/**
|
* 获取一个子进程对象
|
*/
|
getChild() {
|
let proc;
|
const currentPids = Object.keys(this.children);
|
|
// 没有则创建
|
if (currentPids.length == 0) {
|
let subIds = this.create(1);
|
proc = this.children[subIds[0]];
|
} else {
|
// 从池子中获取一个
|
let onePid = this.LB.pickOne().id;
|
proc = this.children[onePid];
|
}
|
|
if (!proc) {
|
let errorMessage = `[ee-core] [jobs/child-pool] Failed to obtain the child process !`
|
throw new Error(errorMessage);
|
}
|
|
return proc;
|
}
|
|
/**
|
* 获取当前pids
|
*/
|
getPids() {
|
let pids = Object.keys(this.children);
|
return pids;
|
}
|
|
/**
|
* kill all
|
* @param type {String} - 'sequence' | 'parallel'
|
*/
|
killAll(type = 'parallel') {
|
let i = 1;
|
Object.keys(this.children).forEach(key => {
|
let proc = this.children[key];
|
if (proc) {
|
if (type == 'sequence') {
|
setTimeout(()=>{
|
proc.kill();
|
}, i * 1000)
|
i++;
|
} else {
|
proc.kill();
|
}
|
}
|
});
|
}
|
}
|
|
module.exports = ChildPoolJob;
|