I have to integrate my project with salesforce. More specifically, I need to be able to bulk-import some salesforce.com data into our system, Accounts for example. I've done enough R&D to conclude that SF Bulk API is the way to go. Our project is a Java system, and we are already using Camel which happens to have a Salesforce component that seems to work quite nicely.
This question is as much about Salesforce as it is about Camel.
The nature of SF bulk api is they are asynchronous. That means, that I submit a job, then I have to poll for status, and then the status becomes COMPLETED at some point and then I can pull the results. So far so good, but what I need to understand is how, from Camel standpoint, I can initiate this polling after I submit my job ? As you can see in my example I am using a delayer, but that is suboptimal -- some batches may take hours to be ready.
Furthermore, as you can see I am holding on to jobInfo variable. That is because it does not seem to be passed by getQueryResultIds. Is there any way in camel to preserve some parts of the exchange as it gets pushed down the pipeline ?
Here is the code I've written to bulk-import accounts (I took some proprietary pieces out):
final BatchInfo[] bi = new BatchInfo[1];
from("timer://runOnce?repeatCount=1&delay=10")
.process(new Processor() {
public void process(Exchange exchange) throws Exception
{
JobInfo jobInfo = new JobInfo();
jobInfo.setContentType(ContentType.CSV);
jobInfo.setOperation(OperationEnum.QUERY);
jobInfo.setObject("Account");
jobInfo.setConcurrencyMode(ConcurrencyModeEnum.PARALLEL);
exchange.getOut().setBody(jobInfo);
}
})
.to("salesforce:createJob")
.to("salesforce:createBatchQuery?sObjectQuery=select Id,Name,Type,BillingCity,BillingState," +
"BillingPostalCode,BillingCountry,Phone from Account")
.delay(10000)
.to("salesforce:getBatch")
.process(new Processor() {
public void process(Exchange exchange) throws Exception
{
BatchInfo batchInfo = exchange.getIn().getBody(BatchInfo.class);
bi[0] = batchInfo;
exchange.getOut().setBody(batchInfo);
}
})
.to("salesforce:getQueryResultIds")
.process(new Processor() {
public void process(Exchange exchange) throws Exception
{
if (exchange.getException() != null)
{
exchange.getException().printStackTrace();
}
System.out.println(exchange.getIn().getBody());
Collection resultIds = exchange.getIn().getBody(Collection.class);
String resultId = (String) resultIds.iterator().next();
exchange.getOut().setHeader(SalesforceEndpointConfig.RESULT_ID, resultId);
exchange.getOut().setHeader(SalesforceEndpointConfig.JOB_ID, bi[0].getJobId());
exchange.getOut().setHeader(SalesforceEndpointConfig.BATCH_ID, bi[0].getId());
exchange.getOut().setBody(exchange.getIn().getBody());
}
})
.to("salesforce:getQueryResult")
.process(new Processor() {
public void process(Exchange exchange) throws Exception
{
InputStream is = exchange.getIn().getBody(InputStream.class);
CachedOutputStream cos = new CachedOutputStream(exchange);
BufferedReader r = new BufferedReader(new InputStreamReader(is));
PrintWriter pw = new PrintWriter(cos);
boolean header = false;
String line = null;
com.google.gson.stream.JsonWriter writer = new com.google.gson.stream.JsonWriter(pw);
Gson gson = new Gson();
writer.beginArray();
while ((line = r.readLine()) != null)
{
if (!header)
{
header = true;
continue;
}
String[] split = line.split(",");
JsonObject account = new JsonObject();
JsonObject headers = new JsonObject();
JsonObject data = new JsonObject();
account.add("headers", headers);
headers.addProperty("category", "entity");
headers.addProperty("type", "Account");
account.add("data", data);
data.addProperty("accountId", split[0].substring(1, split[0].length() - 1));
data.addProperty("name", split[1].substring(1, split[1].length() - 1));
data.addProperty("accountType", split[2].substring(1, split[2].length() - 1));
gson.toJson(account, writer);
}
writer.endArray();
writer.flush();
exchange.getOut().setBody(cos.getInputStream());
}
})
.to("stream:out");
As you can see, what I am doing here is bulk-importing data from SF and converting it into JSON and saving it into a file. I simplified things for the purposes of this question.
I know that for the real world scenario I will have to replace the "from timer" part of the route with a SEDA or a VM producer.
Any help is great appreciated.