cancel
Showing results for 
Search instead for 
Did you mean: 
cancel
Showing results for 
Search instead for 
Did you mean: 

Concurrency / Synchronisation / ConcurrentModificationException

SOLVED
Gravel

Concurrency / Synchronisation / ConcurrentModificationException

Hi everybody,

 

Thingworx for sure supports concurrency. I quite often see all the different Thread IDs in the Script Log.

But I reached a point in Thingworx Script Development where I get concurrent conflicts and race conditions. Where are the synchronisation primitives to handle this?

 

The use case:

We have continous running machine providing it's data to twx via the KepServer. Tags of main interest are running meter, speed and timestamps of certain events. 

Now we want to do some aggregation in memory and so I implemented an info table as property of the thing and a timer that frequently calls a service that just accumulates the total meters and the time to the last row of the infotable.

 

For certain events (e.g. change of reel) we now need to create a new row in the infotable and now I run into trouble, as the service to accumulate and the events leading to a new row are not on the same thread.

This lead to synchronisation issues, why I implemented a second infotable as a queue where the events are just stored. Afterwards the timer checks the queue for new rows and creates them. So now creation of row and the accumulation are on the same thread again. Should be OK, ... but it is not. I still get "ConcurrentModificationException".

 

The test/challange:

I found an article here that gave me some info on this issue:

https://community.ptc.com/t5/ThingWorx-Developers/Concurrency/m-p/502299

"To get a lock you need to update a persistent property, if you update a persistent no one will update that Thing until the service you are it's done."

 

With this information I did a quite brutal test (see attached Thing):

  • This thing has a persistant and a not persistant property of type Integer.
  • The service IncrementCounterAsyn increments both properties, first the persisted one.
  • Between the two increments it calls a passive wait "pause(100)" which still should leave the thing locked
  • The service ConcurrentIncrementCounter call the service IncrementCounterAsync 100 times in a loop.
  • Waits for 2 Seconds and than returns the CounterValues ..

As the access to CounterMtx should lock the Thing all the async services should not be finished after 2 Sek. But they are ?

The final result for Counter & CounterMtx should be 100, but it never is. Indeed this is a "nice" random number generator. ;-)

 

The question:

Ok how can I lock the Thing, that the final result is 100 for both Counter and CounterMtx??

I know that this test is quite stupid, but it's usefull to show and reproduce the issue.

 

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Concurrency / Synchronisation / ConcurrentModificationException

Hello @atondorf ,

 

I had created a similar JAVA  extension using the ReentrantLock as well,  yes , it works locally and works for updating Thingworx's persistent property concurrently as well.   But  it doesn't work for updating ThingWorx DataTable concurrently.  

For example,  a DataTable which has several fields named : PropA, PropB , PropC .... PropG.   Now multiple requests update the DataTable simultaneously,  requestA updates PropA,  requestB updates PropB ..... .  The code would be : 

me.lock();

try{
   var entry = 
   me.FindDataTableEntries({
       values: query_value /* INFOTABLE */
   });
     
   //update certain Prop* field 

   me.UpdateDataTableEntry({
       tags: undefined /* TAGS */,
       location: undefined /* LOCATION */,
       source: undefined /* STRING */,
       sourceType: undefined /* STRING */,
       values: station1Clone /* INFOTABLE */
   });    

}catch(err){
   logger.debug(err);
}finally{
   me.unlock();
}

The expected result would be the PropA ... PropG get updated with new value , but actually only some of the fields get updated . 

 

I also tried your code and got the same inconsistent result . 

 

You can try the attached file to reproduce it : 

1. Run ResetTestData  to reset the test data. 

2. Run ConCurrentUpdateWithoutQuery to call async services to simulate the concurrent requests. 

3. Check the table entry whose key = station1,  you'll see only some of the Prop* field get updated with new value new_xxx. 

 

If you check the log, you'll find these async  services do get executed sequentially.  So I guess  either FindDataTableEntries read from certain old cache, or UpdateDataTableEntry is async somehow. 

 

View solution in original post

10 REPLIES 10

Re: Concurrency / Synchronisation / ConcurrentModificationException

Hi atondorf,

 

I don't think that a persisted property can be used as a mutex.

 

You may want to have a look at the Extension SDK. I have seen in the past Custom Java Extensions for "Concurrent Queue", "Atomic" properties and other "Lock" Things ....

 

Re: Concurrency / Synchronisation / ConcurrentModificationException

Hi smainente,

 

a look at the Extension-SDK sounds great! I already searched a lot, but did not find a lot extensions with source code. Even GitHub has only 44 projects with java code for thingworx. No concurrenct extension therer.

And as I am not a great Java-Dev and SDK Documentation is only very rudimentary I do not dare to implement this by myself, yet. Any Tipp where you saw these concurrenct extensions?

Highlighted

Re: Concurrency / Synchronisation / ConcurrentModificationException

Hi @atondorf 

 

A customers and/or consultants probably sent me those extensions - I'm pretty sure they are not public.

 

Alternatively, if you want to do in-memory calculations of ingested data, you may want to have a look at How can I use statistical transforms for my property values in ThingWorx ? - this feature requires ThingWorx Analytics.

Re: Concurrency / Synchronisation / ConcurrentModificationException

Hi  smainente,

 

these extensions are not public? But they should be! No even more: They should not be extensions at all, but part of the default scope of operation of Thingworx.

I think it's irresponsible to publish a concurrent sytem (Thingworx is one) without synchronisation mechanisms.

I gathered a lot of experience and situations, where Thingworx does not behave deterministic and predictable and I ran into timing issues.

E.g:
1.) I have a S7-PLC connected to Kepware and this connected to a Thing. 
There is a single DB in the PLC providing some data, which changes in block. The order of the "PropertyChanged" events is not predictable. So I am not sure when all properties have the new value set! Currently as an ugly workaround I call a "pause(100)" in the subscription handler of the triggering Tag, to ensure all properties have been set by the other TWEventProcessor Threads ... 

2.) I want to do some aggregation of values on the fly "In Memory" grouped by the state of a Thing, where the state depends on some properties, RunStatus, ProductNr, ShiftNr. I calculate the aggregation in an infotable with a new row for each change of the status. Multiple events try to access the same infotable unsynchronised => non deterministic behaviour.

 

So I urgently need this kind of extension. 

 

 

Re: Concurrency / Synchronisation / ConcurrentModificationException

By not public, I meant that these extensions have been developed by customers, and have not been shared publicly  (github or other public code sharing platform) - I have been exposed to these extensions in the context of technical support investigations.

 

The Extension SDK is not necessarily the ideal solution for this use case, I just want to be sure that you were aware of all the techniques available.

 

I understand your problematic, but I don't have an immediate solution in mind.

If you don't receive any suggestions in the community I will open a support case for you.

Re: Concurrency / Synchronisation / ConcurrentModificationException

I was also struggling at concurrency handling , it's a so important feature and should be part of the product instead of extension . 

Re: Concurrency / Synchronisation / ConcurrentModificationException

@smainente ,

 

I doubt that even using the extended SDK will not solve the problem. .  I created a Java extension and use the ReentrantLock for serialize the concurrent service .  

I tested it by executing multiple Async services which increasing a persistent property in Thingworx,  it works. 

 

However,  it DOES NOT work for data table(maybe stream as well) .   The ReentrantLock can ensure the async services get executed one by one,  the later service's FindDataTableEntries may still get the old data of Data Table although the data has been updated by the previous service.    It seems certain old cache is still used by FindDataTableEntries , or UpdateDataTableEntry has something Async. 

 

Regards,

Sean

Re: Concurrency / Synchronisation / ConcurrentModificationException

Dear Sean,

 

had the same idea and just implemented a "LockableTS" ... and tested it. 

Seems to be working for me, but I am currently not on a persisted table but a local in memory only.

For performance I moved the persistance to a custom datatable, that is only updated one / minute.

 

So finally it's working! Even my brutal test from the beginning is working.

For everybody interested, here is the code:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;
import com.thingworx.logging.LogUtilities;
import com.thingworx.metadata.annotations.ThingworxServiceDefinition;
import com.thingworx.metadata.annotations.ThingworxServiceParameter;
import com.thingworx.metadata.annotations.ThingworxServiceResult;
import org.slf4j.Logger;

public class LockableTS {

	private final ReentrantLock mtx = new ReentrantLock(true);

	@ThingworxServiceDefinition(name = "lock", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false"})
	@ThingworxServiceResult(name = "Result", description = "", baseType = "NOTHING", aspects = {})
	public void lock() {
	    mtx.lock();
	}

	@ThingworxServiceDefinition(name = "tryLock", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false"})
	@ThingworxServiceResult(name = "Result", description = "", baseType = "BOOLEAN", aspects = {})
	public Boolean tryLock( @ThingworxServiceParameter(name = "TimeOut", description = "", baseType = "LONG") Long TimeOut) 
	throws Exception {
	    return mtx.tryLock((long)TimeOut, TimeUnit.MILLISECONDS);
	}

	@ThingworxServiceDefinition(name = "unlock", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false"})
	@ThingworxServiceResult(name = "Result", description = "", baseType = "NOTHING", aspects = {})
	public void unlock() {
	   mtx.unlock();
	}

	@ThingworxServiceDefinition(name = "isLocked", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false"})
	@ThingworxServiceResult(name = "Result", description = "", baseType = "BOOLEAN", aspects = {})
	public Boolean isLocked() {
	    return mtx.isLocked();
	}
}

Usage in the code:

try {
    me.lock();
    do_something_critical();
} finally {
   me.unlock();
}

The try/finally is important, if the do_something_critical can throw exceptions.
Otherwise the lock will be left locked.

Re: Concurrency / Synchronisation / ConcurrentModificationException

Hello @atondorf ,

 

I had created a similar JAVA  extension using the ReentrantLock as well,  yes , it works locally and works for updating Thingworx's persistent property concurrently as well.   But  it doesn't work for updating ThingWorx DataTable concurrently.  

For example,  a DataTable which has several fields named : PropA, PropB , PropC .... PropG.   Now multiple requests update the DataTable simultaneously,  requestA updates PropA,  requestB updates PropB ..... .  The code would be : 

me.lock();

try{
   var entry = 
   me.FindDataTableEntries({
       values: query_value /* INFOTABLE */
   });
     
   //update certain Prop* field 

   me.UpdateDataTableEntry({
       tags: undefined /* TAGS */,
       location: undefined /* LOCATION */,
       source: undefined /* STRING */,
       sourceType: undefined /* STRING */,
       values: station1Clone /* INFOTABLE */
   });    

}catch(err){
   logger.debug(err);
}finally{
   me.unlock();
}

The expected result would be the PropA ... PropG get updated with new value , but actually only some of the fields get updated . 

 

I also tried your code and got the same inconsistent result . 

 

You can try the attached file to reproduce it : 

1. Run ResetTestData  to reset the test data. 

2. Run ConCurrentUpdateWithoutQuery to call async services to simulate the concurrent requests. 

3. Check the table entry whose key = station1,  you'll see only some of the Prop* field get updated with new value new_xxx. 

 

If you check the log, you'll find these async  services do get executed sequentially.  So I guess  either FindDataTableEntries read from certain old cache, or UpdateDataTableEntry is async somehow. 

 

View solution in original post