yesterday - last edited yesterday
hallo,
i am currently trying to test the use of ElasticSearchService.scroll combined with the WorkManager.schedule to query document instead of using CoreSession.query, it does not seem to index documents either created of modified
currently:
public void should_ArchiveDocument_WhenStale() throws IOException {
// given an stale document
final var template = session.createDocumentModel("/", "custom", "CustomType");
final var midnight = ZonedDateTime.now().with(LocalTime.MIDNIGHT);
final var yesterday = GregorianCalendar.from(midnight.minusDays(1));
template.setPropertyValye("custom:date", yesterday);
session.createDocument(template);
session.save();
awaitCompletion();
// when archiving
eventService.fireEvent("custom_archive", new EventContextImpl(session, session.getPrincipal()));
awaitCompletion();
// then document is archived
final var expected = session.query(ARCHIVED)
.stream().map(DoyenDocument::new)
.toList();
Assertions.assertThat(expected)
.describedAs("should archive the document")
.hasSize(1) <- fails here
.element(0)
.matches(it -> "custom".equals(it.getTitle()), "should be named 'custom'")
.matches(it -> "archived".equals(it.state()), "should be archived");
}
private void awaitCompletion() {
eventService.waitForAsyncCompletion();
Awaitility.await("WorkManager.awaitCompletion")
.atMost(Duration.FIVE_SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> workManager.awaitCompletion(99, TimeUnit.MILLISECONDS));
elasticSearchAdmin.prepareWaitForIndexing().get();
elasticSearchAdmin.refresh();
}
@RunWith(FeaturesRunner.class)
@Features({
MockitoFeature.class,
RestServerFeature.class,
RuntimeStreamFeature.class,
RepositoryElasticSearchFeature.class })
@Deploy("org.nuxeo.ecm.jwt")
@Deploy("org.nuxeo.ecm.core.management")
@Deploy("org.nuxeo.ecm.platform.suggestbox.core")
@Deploy("org.nuxeo.ecm.platform.thumbnail")
@Deploy("org.nuxeo.ecm.platform.convert")
@Deploy("org.nuxeo.ecm.platform.rendition.core")
@Deploy("org.nuxeo.ecm.platform.io.core")
@Deploy("org.nuxeo.ecm.default.config")
@Deploy("my-core-bundle-symbolic-name")
@Deploy("studio.extensions.my-studio-project")
@Deploy("org.nuxeo.ecm.platform.notification")
@Deploy("my-core-test-bundle-symbolic-name:OSGI-INF/default-general-settings-contrib.xml")
@Deploy("org.nuxeo.web.resources.core")
documentService.scrollAndSchedule(session, "SELECT * FROM CustomType WHERE custom:date < NOW()", documents -> {
for (final var document : documents) {
session.followTransition(document, "archiving");
session.saveDocument(document);
}
});
public Set<String> scrollAndSchedule(final CoreSession session, final String query, final Consumer<EsScrollResult> consumer) {
// Get the ElasticSearchService
final var ess = Framework.getService(ElasticSearchService.class);
final var workManager = Framework.getService(WorkManager.class);
// Build the NXQL query with pagination settings
final var nxql = new NxQueryBuilder(session)
.nxql(query).fetchFromElasticsearch()
.limit(CoreConstants.queryLimit());
// Initialize the scroll with the NXQL query
var response = ess.scroll(nxql, CoreConstants.queryTimeout());
try {
// Initialize the page counter
var pages = 0;
// Initalize the work IDs set
final var works = new HashSet<String>();
// Loop through the recovered documents page until there are no more documents or the upper limit is reached
while (!response.getDocuments().isEmpty() && pages < CoreConstants.LOOP_UPPER_LIMIT) {
// create the work to be excuted in an asynchronous parallel manner
// TODO: use the bulk action framework
final var work = ConsumerWork.<EsScrollResult>builder()
.data(response)
.consumer(consumer)
.build();
// Add the work ID to the set of works
works.add(work.getId());
// executes the consumer function on the current page of documents
workManager.schedule(work);
// Scroll to the next page of documents
response = ess.scroll(response);
// Increment the number of pages processed
pages++;
}
return works;
} finally {
// Clear the scroll context to free resources
ess.clearScroll(response);
}
}
@Slf4j
@Setter
@Accessors(fluent = true)
@EqualsAndHashCode(callSuper = true)
public class ConsumerWork<T> extends AbstractWork {
private static final long serialVersionUID = ClassUtils.hash(ConsumerWork.class);
private final transient T data;
private final transient Consumer<T> consumer;
@Builder
public ConsumerWork(
T data,
Consumer<T> consumer) {
super(UUID.randomUUID().toString());
this.data = data;
this.data = data;
}
public String getTitle() {
return String.format("Consuming data with id [%s]", getId());
}
public void work() {
if (ObjectUtils.allNotNull(data, consumer)) {
consumer.accept(data);
} else {
log.warn("[data] or [consumer] was not provided");
}
}
public String getCategory() {
return "consumeData";
}
}
the results i am getting currently:
Find what you came for
We want to make your experience in Hyland Connect as valuable as possible, so we put together some helpful links.