2007年8月28日火曜日

Pentaho Spoon ETLのJobをWeb Appサーバーで動かす

Pentaho のETLツールのひとつであるSpoonはそれ自体でETLのデザインと実行、
タイマー起動まで行えるので良いと考えている。
しかし訳があり、
 UNIX機(Solaris)
 GUI無し
 Cron起動等のバッチはダメ
 Webアプリなら良い
と言う制約からWebアプリでPentahoで作成したJobを動かす様にした。

やった事
  1. PentahoのソースKettle-src-2.5.0.zipをSourceForgeから取得する。
  2. be.ibridge.kettle.panのソースをみて、SpoonのJobを起動方法を理解し、コピペ等してWebアプリから呼ばれるクラスを作成する
  3. WebアプリのLibにPentaho KettleのJarファイルをごっそり入れる。とってくる元は、Kettle/libext,Kettle/libswt,Kettle/libである。
参考ファイル
 作成したクラスファイルの中身
  注意:Jobファイルはリポジトリからではなく、ファイル渡し前提、もちろん改良の余地あり

package daemon;

import be.ibridge.kettle.core.Const;
import be.ibridge.kettle.core.LocalVariables;
import be.ibridge.kettle.core.LogWriter;
import be.ibridge.kettle.core.Result;
import be.ibridge.kettle.core.exception.KettleException;
import be.ibridge.kettle.core.exception.KettleJobException;
import be.ibridge.kettle.core.util.EnvUtil;
import be.ibridge.kettle.job.Job;
import be.ibridge.kettle.job.JobEntryLoader;
import be.ibridge.kettle.job.JobMeta;
import be.ibridge.kettle.pan.CommandLineOption;
import be.ibridge.kettle.repository.RepositoriesMeta;
import be.ibridge.kettle.repository.Repository;
//import be.ibridge.kettle.repository.RepositoryDirectory;
import be.ibridge.kettle.repository.RepositoryMeta;
import be.ibridge.kettle.repository.UserInfo;
import be.ibridge.kettle.trans.StepLoader;
import be.ibridge.kettle.version.BuildVersion;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;

public class KettleLuncher {

public static final String STRING_KITCHEN = "Kitchen";

public KettleLuncher() throws KettleException {
EnvUtil.environmentInit();
Thread parentThread = Thread.currentThread();
LocalVariables.getInstance().createKettleVariables(parentThread.getName(), null, false);

ArrayList args = new ArrayList();
args.add("-file=/home/rsos01/kettleJob/testKettleJob.kjb");
args.add("-logfile=/home/rsos01/logs/testKettleJob.log");
args.add("-norep=Y");

RepositoryMeta repinfo = null;
UserInfo userinfo = null;
Job job = null;

StringBuffer optionRepname;
StringBuffer optionUsername;
StringBuffer optionPassword;
StringBuffer optionJobname;
StringBuffer optionDirname;
StringBuffer optionFilename;
StringBuffer optionLoglevel;
StringBuffer optionLogfile;
StringBuffer optionLogfileOld;
StringBuffer optionListdir;
StringBuffer optionListjobs;
StringBuffer optionListrep;
StringBuffer optionNorep;
StringBuffer optionVersion;

CommandLineOption[] options = new CommandLineOption[]{
new CommandLineOption("rep", "Repository name", optionRepname = new StringBuffer())
, new CommandLineOption("user", "Repository username", optionUsername = new StringBuffer())
, new CommandLineOption("pass", "Repository password", optionPassword = new StringBuffer())
, new CommandLineOption("job", "The name of the transformation to launch", optionJobname = new StringBuffer())
, new CommandLineOption("dir", "The directory (don\'t forget the leading /)", optionDirname = new StringBuffer())
, new CommandLineOption("file", "The filename (Job XML) to launch", optionFilename = new StringBuffer())
, new CommandLineOption("level", "The logging level (Basic, Detailed, Debug, Rowlevel, Error, Nothing)", optionLoglevel = new StringBuffer())
, new CommandLineOption("logfile", "The logging file to write to", optionLogfile = new StringBuffer())
, new CommandLineOption("log", "The logging file to write to (deprecated)", optionLogfileOld = new StringBuffer(), false, true)
, new CommandLineOption("listdir", "List the directories in the repository", optionListdir = new StringBuffer(), true, false)
, new CommandLineOption("listjobs", "List the jobs in the specified directory", optionListjobs = new StringBuffer(), true, false)
, new CommandLineOption("listrep", "List the available repositories", optionListrep = new StringBuffer(), true, false)
, new CommandLineOption("norep", "Do not log into the repository", optionNorep = new StringBuffer(), true, false)
, new CommandLineOption("version", "show the version, revision and build date", optionVersion = new StringBuffer(), true, false)};

if (args.size() == 0) {
CommandLineOption.printUsage(options);
throw new KettleException("No Argments");
}
CommandLineOption.parseArguments(args, options);

LogWriter log;
if (Const.isEmpty(optionLogfile) && !Const.isEmpty(optionLogfileOld)) {
// if the old style of logging name is filled in, and the new one is not
// overwrite the new by the old
optionLogfile = optionLogfileOld;
}

if (Const.isEmpty(optionLogfile)) {
log = LogWriter.getInstance(LogWriter.LOG_LEVEL_BASIC);
} else {
log = be.ibridge.kettle.core.LogWriter.getInstance(optionLogfile.toString(), true, be.ibridge.kettle.core.LogWriter.LOG_LEVEL_BASIC);
}

if (!Const.isEmpty(optionLoglevel)) {
log.setLogLevel(optionLoglevel.toString());
log.logMinimal(STRING_KITCHEN, "Logging is at level : "+log.getLogLevelDesc());
}

if (!Const.isEmpty(optionVersion)) {
BuildVersion buildVersion = BuildVersion.getInstance();
log.logBasic("Pan", "Kettle version "+Const.VERSION+", build "+buildVersion.getVersion()+", build date : "+buildVersion.getBuildDate());
}

// Start the action...
//
if (!Const.isEmpty(optionRepname) && !Const.isEmpty(optionUsername)) {
log.logDetailed(STRING_KITCHEN, "Repository and username supplied");
}
log.logMinimal(STRING_KITCHEN, "Repository and username supplied");

/* Load the plugins etc.*/
StepLoader steploader = StepLoader.getInstance();
if (!steploader.read()) {
log.logError(STRING_KITCHEN, "Error loading steps... halting Kitchen!");
throw new KettleException("Error loading steps...");
}

/* Load the plugins etc.*/
JobEntryLoader jeloader = JobEntryLoader.getInstance();
if (!jeloader.read())
{
log.logError(STRING_KITCHEN, "Error loading job entries & plugins... halting Kitchen!");
}

Date start, stop;
Calendar cal;
SimpleDateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
cal=Calendar.getInstance();
start=cal.getTime();

log.logDebug(STRING_KITCHEN, "Allocate new job.");
JobMeta jobMeta = new JobMeta(log);

// In case we use a repository...
Repository repository = null;

try {
// Read kettle job specified on command-line?
if (!Const.isEmpty(optionRepname) || !Const.isEmpty(optionFilename)) {
// log.logDebug(STRING_KITCHEN, "Parsing command line options.");
// しばらくコメント
// }
// }
// Try to load if from file anyway.
if (!Const.isEmpty(optionFilename) && job == null) {
jobMeta = new JobMeta(log, optionFilename.toString(), null);
job = new Job(log, steploader, null, jobMeta);
}
} else if ("Y".equalsIgnoreCase(optionListrep.toString())) {
RepositoriesMeta ri = new RepositoriesMeta(log);
if (ri.readData()) {
System.out.println("List of repositories:");
for (int i = 0; i < ri.nrRepositories(); i++) {
RepositoryMeta rinfo = ri.getRepository(i);
System.out.println("#" + (i + 1) + " : " + rinfo.getName() + " [" + rinfo.getDescription() + "] ");
}
} else {
System.out.println("ERROR: Unable to read/parse the repositories XML file.");
}
}
} catch (KettleException e) {
job = null;
jobMeta = null;
System.out.println("Processing stopped because of an error: " + e.getMessage());
}

if (job == null) {
if (!"Y".equalsIgnoreCase(optionListjobs.toString()) &&
!"Y".equalsIgnoreCase(optionListdir.toString()) &&
!"Y".equalsIgnoreCase(optionListrep.toString())) {
System.out.println("ERROR: Kitchen can't continue because the job couldn't be loaded.");
}
throw new KettleException("Error job couldn't be loaded...");
}

Result result = null;

int returnCode = 0;

try {
// Add Kettle variables for the job thread...
//LocalVariables.getInstance().createKettleVariables(job.getName(), parentThread.getName(), true);
// Set the arguments on the job metadata as well...
if (args.size() == 0) {
job.getJobMeta().setArguments(null);
} else {
job.getJobMeta().setArguments((String[]) args.toArray(new String[args.size()]));
}

result = job.execute(); // Execute the selected job.
job.endProcessing("end", result); // The bookkeeping...
} catch (KettleJobException je) {
if (result == null) {
result = new Result();
}
result.setNrErrors(1L);

try {
job.endProcessing("error", result);
} catch (KettleJobException je2) {
log.logError(job.getName(), "A serious error occured : " + je2.getMessage());
returnCode = 2;
}
} finally {
if (repository != null) {
repository.disconnect();
}
}

log.logMinimal(STRING_KITCHEN, "Finished!");

if (result != null && result.getNrErrors() != 0) {
log.logError(STRING_KITCHEN, "Finished with errors");
returnCode = 1;
}

cal=Calendar.getInstance();
stop=cal.getTime();
String begin=df.format(start).toString();
String end =df.format(stop).toString();

log.logMinimal(STRING_KITCHEN, "Start="+begin+", Stop="+end);
long millis=stop.getTime()-start.getTime();
log.logMinimal(STRING_KITCHEN, "Processing ended after "+(millis/1000)+" seconds.");

}

public KettleLuncher(String kettelJob) {
}
}

Webアプリに加えたJarファイル(

kettle.jar
CacheDB.jar
commands.jar
common.jar
edtftpj-1.5.4.jar
jackcess-1.1.5.jar
jakarta-oro-2.0.8.jar
javadbf.jar
jface.jar
js.jar
jsch-0.1.24.jar
jug-lgpl-2.0.0.jar
jxl.jar
mail.jar
runtime.jar
commons-codec-1.3.jar
commons-fileupload-1.0.jar
commons-httpclient-3.0.1.jar
commons-lang-2.2.jar
commons-net-1.4.1.jar
asjava.zip
jtds-1.2.jar
nzjdbc.jar
ojdbc14.jar
orai18n.jar
rdbthin.jar
sapdbc.jar
unijdbc.jar
xdbjdbc.jar
jcommon-1.0.8.jar
libformula-0.1.3.jar
pentaho-1.2.0.jar
activation.jar
simple-jndi-0.11.1.jar
log4j-1.2.8.jar
commons-logging-1.1.jar
commons-vfs-1.0.jar

0 件のコメント: