cancel
Showing results for 
Search instead for 
Did you mean: 

How to run bulk commands in parallel

Ahmad_Ben_Maall
Star Contributor
Star Contributor

Hello, I am trying to learn how to use the bulk framework in nuxeo based on this document https://doc.nuxeo.com/nxdoc/1010/bulk-action-framework/.

But until now I could not find how to process documents in parallel.

For example, I defined the following action (just to test) which allows to sleep one second for each document,

...
public class MyComputation extends AbstractBulkComputation {
...
@Override
    protected void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties) {
	for (DocumentModel doc : loadDocuments(session, ids)) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
	}
}
...
}

By performing this action on 10 documents, I will have a Bulkstatus with the COMPLETED state only after 10 seconds.

How can I parallelize the processing so that I will have an answer in one second in my example ?

Thanks.

1 ACCEPTED ANSWER

Ahmad_Ben_Maall
Star Contributor
Star Contributor

I think we can define parallelism in streamproceesor extenision point using defaultConcurrency and defaultPartitions parameters.

For example the following configuration allows to launch 6 buckets in parallel

<extension target="org.nuxeo.runtime.stream.service" point="streamProcessor">
 <streamProcessor name="syllabsIndexing" class="com.adequat.nuxeo.glm.addons.MyActionProcessor" logConfig="bulk" defaultConcurrency="6" defaultPartitions="6">
	 <policy name="syllabsIndexing" maxRetries="10" delay="1s" maxDelay="60s" continueOnFailure="true" />  
	</streamProcessor>
</extension>

View answer in original post

1 REPLY 1

Ahmad_Ben_Maall
Star Contributor
Star Contributor

I think we can define parallelism in streamproceesor extenision point using defaultConcurrency and defaultPartitions parameters.

For example the following configuration allows to launch 6 buckets in parallel

<extension target="org.nuxeo.runtime.stream.service" point="streamProcessor">
 <streamProcessor name="syllabsIndexing" class="com.adequat.nuxeo.glm.addons.MyActionProcessor" logConfig="bulk" defaultConcurrency="6" defaultPartitions="6">
	 <policy name="syllabsIndexing" maxRetries="10" delay="1s" maxDelay="60s" continueOnFailure="true" />  
	</streamProcessor>
</extension>
Getting started

Find what you came for

We want to make your experience in Hyland Connect as valuable as possible, so we put together some helpful links.