cancel
Showing results for 
Search instead for 
Did you mean: 

Node does not exist when using JMS in a custom behavior

geemorin
Champ in-the-making
Champ in-the-making
Hi,

I hope I'm in the right forum for that kind of question!

I'm actually working on a custom behavior that produces a JMS message when an "onContentUpdate" event is raised containing the new node id. I'm injecting the JNDI name of the connection factory and destination in my bean, I initilize them, create a session, create a producer and send my JMS message. Then my message is consumed by a remote client which fetches the document through the Rivet API.

My problem is that the JMS messages seems to be sent before the transaction is commited and then the client sometime consumes the message and tries to fetch the document before it is commited so I get an exception saying "Node does not exist". I can loop until it exists but you'll guess that its not a viable solution. In fact, I'd like to know how I could make sure JMS messages are commited with the document transaction!

I must admit I'm not really familiar with Spring. Any hint/suggestion will be appreciated!

Thank you

Here are snippets of my code:

*I removed the exception handling for easier reading

Here is the initialisation function:

public void init() {
      // Create behaviours
      this.onContentUpdate = new JavaBehaviour(this, "onContentUpdate", NotificationFrequency.TRANSACTION_COMMIT);

      // Bind behaviours to policies
      this.policyComponent.bindClassBehaviour(QName.createQName(NamespaceService.ALFRESCO_URI, "onContentUpdate"),
            ContentModel.TYPE_CONTENT, this.onContentUpdate);

      Properties properties = new Properties();
      properties.load(new FileReader(contextPropFile));
      Context ctx = new InitialContext(properties);
      Object objHandler = ctx.lookup(this.connectionFactoryJNDI);
      this.connectionFactory = (ConnectionFactory)objHandler;

      objHandler = ctx.lookup(this.destinationJNDI);
      this.destination = (Destination)objHandler;

      connection = connectionFactory.createConnection();
      session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
      messageProducer = session.createProducer(destination);
}

Here is my custom behavior:

public void onContentUpdate(NodeRef updatedNodeRef, boolean newContent) {
   if (updatedNodeRef != null) {
      // Retrieve the document name in its metadata properties
      String docName = (String) this.nodeService.getProperty(updatedNodeRef, ContentModel.PROP_NAME);
               
      byte[] documentContent = null;

      if (this.sendDocumentContent) {
         // Retrieve the document content
         ContentReader contentReader = this.contentService.getReader(updatedNodeRef, ContentModel.PROP_CONTENT);

         // Then read it in a byte array
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         contentReader.getContent(out);

         documentContent = out.toByteArray();
         out.close();
      }

      if (newContent) {
         String creator = (String) this.nodeService.getProperty(updatedNodeRef, ContentModel.PROP_CREATOR);

         // The document has been added
         sendJMSEvent(updatedNodeRef.getId(), // Document
            docName,  // name
            creator,  // Created by username
            EVENT_TYPE.DOCUMENT_ADDED,
            documentContent); // Document content
         }
         else {
            String modifier = (String) this.nodeService.getProperty(updatedNodeRef, ContentModel.PROP_MODIFIER);

            // The document has been updated
            sendJMSEvent(updatedNodeRef.getId(), // Document
            docName,    // name
            modifier, // Updated by username
            EVENT_TYPE.DOCUMENT_UPDATED,
            documentContent); // Document content
         }
      }
   }
}

Here is the sendJMSEvent method:

private void sendJMSEvent(String documentId, String documentName, String user,, byte[] documentContent){
   BytesMessage bytesMessage = session.createBytesMessage();
   bytesMessage.setStringProperty(PROP_KEY_DOCUMENT_ID, documentId);
   bytesMessage.setStringProperty(PROP_KEY_EVENT_TYPE, event_type.toString());
   
   switch(event_type){
      case DOCUMENT_ADDED:
      case DOCUMENT_UPDATED:
         bytesMessage.setStringProperty(PROP_KEY_DOCUMENT_NAME, documentName);
         bytesMessage.setStringProperty(PROP_KEY_DOCUMENT_CREATOR, user);
         break;
      case DOCUMENT_DELETED:
      default:
   }
   
   if(this.sendDocumentContent && documentContent != null){
      bytesMessage.writeBytes(documentContent);
   }
   
   messageProducer.send(bytesMessage);
}
5 REPLIES 5

mrogers
Star Contributor
Star Contributor
You are about to go into a mine-field of distributed transaction processing considerations.    And while I'm sure it can be done, as I've said in other threads it won't be easy.   For a start you will have to re-plumb all Alfrescos existing transaction handling and caching implementations.     

A much easier solution would be to use two separate transactions and drive your JMS message production from the result of your node creation transaction.     Its a tiny bit less reliable but much easier.

Another solution would be to kick off a workflow which produces your JMS message and that would have full transactional guarantees.

geemorin
Champ in-the-making
Champ in-the-making
Thank you for your prompt reply!!

I'm currently investigating if I could use the org.springframework.transaction.support.TransactionSynchronizationManager in combination with the org.springframework.transaction.support.TransactionSynchronization and use the after commit or after completion callback to send my JMS message. I think it might work but again I've never worked with spring so its progressing slowly but surely Smiley Happy I'll keep you informed of my progressionn/regression Smiley Wink

Have a nice day!

geemorin
Champ in-the-making
Champ in-the-making
Up to now, this solution seems to work. If you have any comment on how i could improve it or if you see any problems i might run into, tell me!!! Thank you!

private class JMSSynchronization implements TransactionSynchronization{
       
        public JMSSynchronization(String documentId, String documentName, String user, EVENT_TYPE event_type, byte[] documentContent){
            super();
            this.documentId = documentId;
            this.documentName = documentName;
            this.user = user;
            this.event_type = event_type;
            this.documentContent = documentContent;
        }
       
        private String documentId;
        private String documentName;
        private String user;
        private EVENT_TYPE event_type;
        private byte[] documentContent;
       
        @Override
        public void afterCommit() {}

        @Override
        public void afterCompletion(int arg0) {
            System.out.println("Entering after completion…");
            try {
                BytesMessage bytesMessage = session.createBytesMessage();
                bytesMessage.setStringProperty(PROP_KEY_DOCUMENT_ID, documentId);
                bytesMessage.setStringProperty(PROP_KEY_EVENT_TYPE, event_type.toString());
               
                switch(event_type){
                    case DOCUMENT_ADDED:
                    case DOCUMENT_UPDATED:
                        bytesMessage.setStringProperty(PROP_KEY_DOCUMENT_NAME, documentName);
                        bytesMessage.setStringProperty(PROP_KEY_DOCUMENT_CREATOR, user);
                        break;
                    case DOCUMENT_DELETED:
                    default:
                }
               
                if(sendDocumentContent && documentContent != null){
                    bytesMessage.writeBytes(documentContent);
                }
               
                messageProducer.send(bytesMessage);
            }
            catch (JMSException e) {
                e.printStackTrace();
            }
            System.out.println("Leaving after completion…");
        }

        @Override
        public void beforeCommit(boolean arg0) {}

        @Override
        public void beforeCompletion() {}

        @Override
        public void resume() {}

        @Override
        public void suspend() {}
    }


private void sendJMSEvent(String documentId, String documentName, String user, EVENT_TYPE event_type, byte[] documentContent){
       JMSSynchronization jmsSynch = new JMSSynchronization(documentId, documentName, user, event_type, documentContent);
       TransactionSynchronizationManager.registerSynchronization(jmsSynch);      
   }

mrogers
Star Contributor
Star Contributor
Your solution is sound.    But have you thought through the consequences of what happens if your JMS call in the callback fails?

sradha
Champ on-the-rise
Champ on-the-rise
Hi,

I am trying to execute a script which send jms message to server while any content will getting updated.Can you please mention the steps you followed?