cancel
Showing results for 
Search instead for 
Did you mean: 

[TEST] [LTS-2023] elasticsearch instance not indexing documents

cheshire
Champ on-the-rise
Champ on-the-rise

hallo,

i am currently trying to test the use of ElasticSearchService.scroll combined with the WorkManager.schedule to query document instead of using CoreSession.query, it does not seem to index documents either created of modified

currently:

  • through debugging in real situation i was able to confirm that the work planned executes properly under normal circumstances 
  • debugging the test reveals that the event triggers ElasticSearchService.scroll, but no result is returned
  • SELECT * FROM CustomType query return nothing, even though the document is created beforehand (CoreSession.createDocument assigns an ID to the template)
  • here is my test
  
  public void should_ArchiveDocument_WhenStale() throws IOException {

    // given an stale document
    final var template = session.createDocumentModel("/", "custom", "CustomType");
    final var midnight = ZonedDateTime.now().with(LocalTime.MIDNIGHT);
    final var yesterday = GregorianCalendar.from(midnight.minusDays(1));
    template.setPropertyValye("custom:date", yesterday); 
    session.createDocument(template);
    session.save();
    awaitCompletion();

    // when archiving
    eventService.fireEvent("custom_archive", new EventContextImpl(session, session.getPrincipal()));
    awaitCompletion();

    // then document is archived
    final var expected = session.query(ARCHIVED)
        .stream().map(DoyenDocument::new)
        .toList();

    Assertions.assertThat(expected)
        .describedAs("should archive the document")
        .hasSize(1) <- fails here
        .element(0)
        .matches(it -> "custom".equals(it.getTitle()), "should be named 'custom'")
        .matches(it -> "archived".equals(it.state()), "should be archived");

  }

  private void awaitCompletion() {
    eventService.waitForAsyncCompletion();
    Awaitility.await("WorkManager.awaitCompletion")
        .atMost(Duration.FIVE_SECONDS)
        .pollInterval(100, TimeUnit.MILLISECONDS)
        .until(() -> workManager.awaitCompletion(99, TimeUnit.MILLISECONDS));
    elasticSearchAdmin.prepareWaitForIndexing().get();
    elasticSearchAdmin.refresh();
  }
  • i am deploying this for my tests
@RunWith(FeaturesRunner.class)
@Features({
    MockitoFeature.class,
    RestServerFeature.class,
    RuntimeStreamFeature.class,
    RepositoryElasticSearchFeature.class })
@Deploy("org.nuxeo.ecm.jwt")
@Deploy("org.nuxeo.ecm.core.management")
@Deploy("org.nuxeo.ecm.platform.suggestbox.core")
@Deploy("org.nuxeo.ecm.platform.thumbnail")
@Deploy("org.nuxeo.ecm.platform.convert")
@Deploy("org.nuxeo.ecm.platform.rendition.core")
@Deploy("org.nuxeo.ecm.platform.io.core")
@Deploy("org.nuxeo.ecm.default.config")
@Deploy("my-core-bundle-symbolic-name")
@Deploy("studio.extensions.my-studio-project")
@Deploy("org.nuxeo.ecm.platform.notification")
@Deploy("my-core-test-bundle-symbolic-name:OSGI-INF/default-general-settings-contrib.xml")
@Deploy("org.nuxeo.web.resources.core")
  •  this is what is executed from the triggered event
    documentService.scrollAndSchedule(session, "SELECT * FROM CustomType WHERE custom:date < NOW()", documents -> {
      for (final var document : documents) {
        session.followTransition(document, "archiving");
        session.saveDocument(document);
      }
    });
  • here is the method being called
    
    public Set<String> scrollAndSchedule(final CoreSession session, final String query, final Consumer<EsScrollResult> consumer) {

        // Get the ElasticSearchService
        final var ess = Framework.getService(ElasticSearchService.class);
        final var workManager = Framework.getService(WorkManager.class);

        // Build the NXQL query with pagination settings
        final var nxql = new NxQueryBuilder(session)
                .nxql(query).fetchFromElasticsearch()
                .limit(CoreConstants.queryLimit());

        // Initialize the scroll with the NXQL query
        var response = ess.scroll(nxql, CoreConstants.queryTimeout());

        try {

            // Initialize the page counter
            var pages = 0;

            // Initalize the work IDs set
            final var works = new HashSet<String>();

            // Loop through the recovered documents page until there are no more documents or the upper limit is reached
            while (!response.getDocuments().isEmpty() && pages < CoreConstants.LOOP_UPPER_LIMIT) {

                // create the work to be excuted in an asynchronous parallel manner
                // TODO: use the bulk action framework
                final var work = ConsumerWork.<EsScrollResult>builder()
                        .data(response)
                        .consumer(consumer)
                        .build();

                // Add the work ID to the set of works
                works.add(work.getId());

                // executes the consumer function on the current page of documents
                workManager.schedule(work);

                // Scroll to the next page of documents
                response = ess.scroll(response);

                // Increment the number of pages processed
                pages++;
            }

            return works;

        } finally {

            // Clear the scroll context to free resources
            ess.clearScroll(response);
        }
    }
  • here is the work used
@Slf4j
@Setter
@Accessors(fluent = true)
@EqualsAndHashCode(callSuper = true)
public class ConsumerWork<T> extends AbstractWork {

  private static final long serialVersionUID = ClassUtils.hash(ConsumerWork.class);

  private final transient T data;
  private final transient Consumer<T> consumer;

  @Builder
  public ConsumerWork(
      T data,
      Consumer<T> consumer) {
    super(UUID.randomUUID().toString());
    this.data = data;
    this.data = data;
  }

  
  public String getTitle() {
    return String.format("Consuming data with id [%s]", getId());
  }

  
  public void work() {
    if (ObjectUtils.allNotNull(data, consumer)) {
      consumer.accept(data);
    } else {
      log.warn("[data] or [consumer] was not provided");
    }
  }

  
  public String getCategory() {
    return "consumeData";
  }

}

the results i am getting currently:

  • when using CoreSession.query it passes
  • with ElasticSearchService.scroll, no exception is thrown, it simply expects 1 archived element and finds none
0 REPLIES 0