Community Tip - Need to share some code when posting a question or reply? Make sure to use the "Insert code sample" menu option. Learn more! X
Hi everybody,
Thingworx for sure supports concurrency. I quite often see all the different Thread IDs in the Script Log.
But I reached a point in Thingworx Script Development where I get concurrent conflicts and race conditions. Where are the synchronisation primitives to handle this?
We have continous running machine providing it's data to twx via the KepServer. Tags of main interest are running meter, speed and timestamps of certain events.
Now we want to do some aggregation in memory and so I implemented an info table as property of the thing and a timer that frequently calls a service that just accumulates the total meters and the time to the last row of the infotable.
For certain events (e.g. change of reel) we now need to create a new row in the infotable and now I run into trouble, as the service to accumulate and the events leading to a new row are not on the same thread.
This lead to synchronisation issues, why I implemented a second infotable as a queue where the events are just stored. Afterwards the timer checks the queue for new rows and creates them. So now creation of row and the accumulation are on the same thread again. Should be OK, ... but it is not. I still get "ConcurrentModificationException".
I found an article here that gave me some info on this issue:
https://community.ptc.com/t5/ThingWorx-Developers/Concurrency/m-p/502299
"To get a lock you need to update a persistent property, if you update a persistent no one will update that Thing until the service you are it's done."
With this information I did a quite brutal test (see attached Thing):
As the access to CounterMtx should lock the Thing all the async services should not be finished after 2 Sek. But they are ?
The final result for Counter & CounterMtx should be 100, but it never is. Indeed this is a "nice" random number generator. ;-)
Ok how can I lock the Thing, that the final result is 100 for both Counter and CounterMtx??
I know that this test is quite stupid, but it's usefull to show and reproduce the issue.
Solved! Go to Solution.
Hello @atondorf ,
I had created a similar JAVA extension using the ReentrantLock as well, yes , it works locally and works for updating Thingworx's persistent property concurrently as well. But it doesn't work for updating ThingWorx DataTable concurrently.
For example, a DataTable which has several fields named : PropA, PropB , PropC .... PropG. Now multiple requests update the DataTable simultaneously, requestA updates PropA, requestB updates PropB ..... . The code would be :
me.lock();
try{
var entry =
me.FindDataTableEntries({
values: query_value /* INFOTABLE */
});
//update certain Prop* field
me.UpdateDataTableEntry({
tags: undefined /* TAGS */,
location: undefined /* LOCATION */,
source: undefined /* STRING */,
sourceType: undefined /* STRING */,
values: station1Clone /* INFOTABLE */
});
}catch(err){
logger.debug(err);
}finally{
me.unlock();
}
The expected result would be the PropA ... PropG get updated with new value , but actually only some of the fields get updated .
I also tried your code and got the same inconsistent result .
You can try the attached file to reproduce it :
1. Run ResetTestData to reset the test data.
2. Run ConCurrentUpdateWithoutQuery to call async services to simulate the concurrent requests.
3. Check the table entry whose key = station1, you'll see only some of the Prop* field get updated with new value new_xxx.
If you check the log, you'll find these async services do get executed sequentially. So I guess either FindDataTableEntries read from certain old cache, or UpdateDataTableEntry is async somehow.
Hi atondorf,
I don't think that a persisted property can be used as a mutex.
You may want to have a look at the Extension SDK. I have seen in the past Custom Java Extensions for "Concurrent Queue", "Atomic" properties and other "Lock" Things ....
Hi smainente,
a look at the Extension-SDK sounds great! I already searched a lot, but did not find a lot extensions with source code. Even GitHub has only 44 projects with java code for thingworx. No concurrenct extension therer.
And as I am not a great Java-Dev and SDK Documentation is only very rudimentary I do not dare to implement this by myself, yet. Any Tipp where you saw these concurrenct extensions?
Hi @atondorf
A customers and/or consultants probably sent me those extensions - I'm pretty sure they are not public.
Alternatively, if you want to do in-memory calculations of ingested data, you may want to have a look at How can I use statistical transforms for my property values in ThingWorx ? - this feature requires ThingWorx Analytics.
Hi smainente,
these extensions are not public? But they should be! No even more: They should not be extensions at all, but part of the default scope of operation of Thingworx.
I think it's irresponsible to publish a concurrent sytem (Thingworx is one) without synchronisation mechanisms.
I gathered a lot of experience and situations, where Thingworx does not behave deterministic and predictable and I ran into timing issues.
E.g:
1.) I have a S7-PLC connected to Kepware and this connected to a Thing. There is a single DB in the PLC providing some data, which changes in block. The order of the "PropertyChanged" events is not predictable. So I am not sure when all properties have the new value set! Currently as an ugly workaround I call a "pause(100)" in the subscription handler of the triggering Tag, to ensure all properties have been set by the other TWEventProcessor Threads ...
2.) I want to do some aggregation of values on the fly "In Memory" grouped by the state of a Thing, where the state depends on some properties, RunStatus, ProductNr, ShiftNr. I calculate the aggregation in an infotable with a new row for each change of the status. Multiple events try to access the same infotable unsynchronised => non deterministic behaviour.
So I urgently need this kind of extension.
By not public, I meant that these extensions have been developed by customers, and have not been shared publicly (github or other public code sharing platform) - I have been exposed to these extensions in the context of technical support investigations.
The Extension SDK is not necessarily the ideal solution for this use case, I just want to be sure that you were aware of all the techniques available.
I understand your problematic, but I don't have an immediate solution in mind.
If you don't receive any suggestions in the community I will open a support case for you.
I was also struggling at concurrency handling , it's a so important feature and should be part of the product instead of extension .
I doubt that even using the extended SDK will not solve the problem. . I created a Java extension and use the ReentrantLock for serialize the concurrent service .
I tested it by executing multiple Async services which increasing a persistent property in Thingworx, it works.
However, it DOES NOT work for data table(maybe stream as well) . The ReentrantLock can ensure the async services get executed one by one, the later service's FindDataTableEntries may still get the old data of Data Table although the data has been updated by the previous service. It seems certain old cache is still used by FindDataTableEntries , or UpdateDataTableEntry has something Async.
Regards,
Sean
Dear Sean,
had the same idea and just implemented a "LockableTS" ... and tested it.
Seems to be working for me, but I am currently not on a persisted table but a local in memory only.
For performance I moved the persistance to a custom datatable, that is only updated one / minute.
So finally it's working! Even my brutal test from the beginning is working.
For everybody interested, here is the code:
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.TimeUnit; import com.thingworx.logging.LogUtilities; import com.thingworx.metadata.annotations.ThingworxServiceDefinition; import com.thingworx.metadata.annotations.ThingworxServiceParameter; import com.thingworx.metadata.annotations.ThingworxServiceResult; import org.slf4j.Logger; public class LockableTS { private final ReentrantLock mtx = new ReentrantLock(true); @ThingworxServiceDefinition(name = "lock", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false"}) @ThingworxServiceResult(name = "Result", description = "", baseType = "NOTHING", aspects = {}) public void lock() { mtx.lock(); } @ThingworxServiceDefinition(name = "tryLock", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false"}) @ThingworxServiceResult(name = "Result", description = "", baseType = "BOOLEAN", aspects = {}) public Boolean tryLock( @ThingworxServiceParameter(name = "TimeOut", description = "", baseType = "LONG") Long TimeOut) throws Exception { return mtx.tryLock((long)TimeOut, TimeUnit.MILLISECONDS); } @ThingworxServiceDefinition(name = "unlock", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false"}) @ThingworxServiceResult(name = "Result", description = "", baseType = "NOTHING", aspects = {}) public void unlock() { mtx.unlock(); } @ThingworxServiceDefinition(name = "isLocked", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false"}) @ThingworxServiceResult(name = "Result", description = "", baseType = "BOOLEAN", aspects = {}) public Boolean isLocked() { return mtx.isLocked(); } }
Usage in the code:
try { me.lock(); do_something_critical(); } finally { me.unlock(); }
The try/finally is important, if the do_something_critical can throw exceptions.
Otherwise the lock will be left locked.
Hello @atondorf ,
I had created a similar JAVA extension using the ReentrantLock as well, yes , it works locally and works for updating Thingworx's persistent property concurrently as well. But it doesn't work for updating ThingWorx DataTable concurrently.
For example, a DataTable which has several fields named : PropA, PropB , PropC .... PropG. Now multiple requests update the DataTable simultaneously, requestA updates PropA, requestB updates PropB ..... . The code would be :
me.lock();
try{
var entry =
me.FindDataTableEntries({
values: query_value /* INFOTABLE */
});
//update certain Prop* field
me.UpdateDataTableEntry({
tags: undefined /* TAGS */,
location: undefined /* LOCATION */,
source: undefined /* STRING */,
sourceType: undefined /* STRING */,
values: station1Clone /* INFOTABLE */
});
}catch(err){
logger.debug(err);
}finally{
me.unlock();
}
The expected result would be the PropA ... PropG get updated with new value , but actually only some of the fields get updated .
I also tried your code and got the same inconsistent result .
You can try the attached file to reproduce it :
1. Run ResetTestData to reset the test data.
2. Run ConCurrentUpdateWithoutQuery to call async services to simulate the concurrent requests.
3. Check the table entry whose key = station1, you'll see only some of the Prop* field get updated with new value new_xxx.
If you check the log, you'll find these async services do get executed sequentially. So I guess either FindDataTableEntries read from certain old cache, or UpdateDataTableEntry is async somehow.
I think I probably found a workaround to bypass the cache, just call GetDataTableEntries() before calling FindDataTableEntries then FindDataTableEntries will get the latest values.
Hi All,
I'm doing test migration from TW 7.3 to TW 8.4, and yes you are right my old way of mutex blocking doesn't works anymore
Do you have the java extension with LockableTS on GitHub? do you want that I pack it all together and publish the extension?
Best Regards,
Carles.
Hi all,
I've done by myself, here it's the post on the community that describes it: https://community.ptc.com/t5/ThingWorx-Developers/ThingWorx-Concurrency-Extension/m-p/669151
Hi @atondorf I don't know if you are aware, but your code posted (which it's the base of my extension) creates a unique Mutex for all Things not one Mutex for each thing, my extension does in case you want to review it.
Best Regards and thanks,
Carles.
Hi @CarlesColl,
thanks for the important hint ... no I was not aware of the unique character of the code.
So how is this? Does thingworx not create intances of a shape class for each Thing?
This might explain some issues I faced. Therefore I currently use a "Ressource" with some services, that store mutexes and also a Queue<JSON> in Maps with ThingNames ... so quite similar to your approach, but not that elegant.
Maybe we can work together on this ... but it seems, you are the far better Java Developer
Greetings
Andreas
Hi Andreas,
There isn't one ThingShape for each Thing, they instantiate only once, that's the reason you have always the context for the caller Thing. It's more kind of a static class...
For sure we can work on it! ping me whenever you want.
Regards,
Carles.
I just noticed your post , I simply use a enum singleton object to manage lock/unlock with ConcurrentHashMap<String, ReentrantLock> , but your implementation is much more elegant .
Regards,
Sean