Community Tip - Did you get an answer that solved your problem? Please mark it as an Accepted Solution so others with the same problem can find the answer easily. X
Thread Safe Coding, Part 1: The Java Extension Approach
Written by Desheng Xu and edited by @vtielebein
Overview
Time and again, customers report that one of their favorite ThingWorx features is using However, the Javascript language doesn't have a built-in semaphore locker mechanism, nothing to enable thread-safe concurrent processing, like you find in the Java language. This article demonstrates why thread safe coding is necessary and how to use the Java Extension for this purpose. Part 2 presents an alternative approach using database lockers.
Demo Use Case
Let's use a highly abstracted use case to demo thread-safe code practices:
Similar use cases might be to:
Modeling
Thing: DashboardCounter, which includes:
GetCounter
var result = me.counter;
IncreaseCounter
me.counter = me.counter + 1;
var result = me.counter;
ResetCounter
me.counter = 0;
var result = 0;
Subscription MonitorCounter
Logger.info(eventData.newValue.value+":"+eventData.newValue.time.getTime());
ValueStream
For simplicity, the value stream entity is not included in the attachment. Please go ahead and assign a value stream to this Thing to monitor the property values.
Test Tool
A small test tool mulreqs is attached here, along with some extensions and ThingWorx entities that are useful. The mulreqs tool uses a configuration file from the OS variable definition MULTI_REQUEST_CONFIG.
In Linux/MacOS:
export MULTI_REQUEST_CONFIG="./config.json"
in config.json file, you can use the following configuration:
{
"host":"twx85.desheng.io",
"port":443,
"protocol":"https",
"endpoint":"/Thingworx/Things/DashboardCounter/services/IncreaseCounter",
"headers":{
"Content-Type":"application/json",
"Accept": "application/json",
"AppKey":"5cafe6eb-adba-41df-a7d6-4fc8088125c1"
},
"payload":{},
"round_break":50000,
"req_break":0,
"round_size":50,
"total_round":20
}
host, port, protocol, headers are very identical to define a ThingWorx server.
endpoint defines which service is called during the test.
payload is not in use at this moment but you have to keep it here.
total_round is how many rounds of the test you want to run.
round_size defines how many requests will be sent simultaneously during each round.
round_break is the pause time during each round in Microseconds, so 50000 in the above example means 50ms.
req_break is 0, this is the delay between requests. "0" means requests to the server will happen simultaneously.
The expectation from the above configuration is service execution a total of 20*50 times, 1000 times. So, we can expect that if the initial value is 0, then counter should be 1000 at the end, and if the value stream is clean initially, then the value stream should have a history from 1 to 1000.
Run Test
Use the following command to perform the test:
.<your path>/mulreqs
Execution output will look like:
Check Result
You will be surprised that the final value is 926 instead of 1000. (Caution: this value will be different in different tests and it can be any value in the range of 1 and 1000).
Now, look at the value stream by using QueryPropertyHistory. There are many values missing here, and while the total count can vary in different tests, it is unlikely to be exactly the last value (926). Notice that the last 5 values are: 926, 925, 921, 918. The values 919, 920, 922, and 923 are all missing. So next we check if there are any errors in the script log, and there are none. There are only print statements we deliberately placed in the logs.
So, we have observed two symptoms here:
What's the reason behind each symptom, and which one is a thread-safe issue?
Understanding Timestamp Granularity
ThingWorx facilitates the collection of time series data and solutions centered around such data by allowing for use of the timestamp as the primary key. However, a timestamp will always have a minimal granularity definition when you process it. In ThingWorx, the minimal granularity or unit of a timestamp is one millisecond.
Looking at the log we generated from the subscription again, we see that several data points (922, 923, 924, 925) have the same timestamp (1596089891147), which is GMT Thursday, July 30, 2020, 6:18:11.147 AM. When each of these data points is flushed into the database, the later data points overwrite the earlier ones since they all have the same timestamp. So, data point 922 went into the value stream first, and then was overwritten by data point 923, and then 924, and then 925. The next data point in the value stream is 926, which has a new timestamp (1596089891148), 1ms behind the previous one. Therefore, data points 925 and 926 are stored while 922, 923, 924 are not. These lost data points are therefore NOT a thread-safe issue.
The reason why some of these data points have the same timestamp in this example is because multiple machines write to the same value stream. The right approach is to log data points at the individual machine level, with a different value stream per machine.
However, what happens if one machine emits data too frequently? If data points from the same machine still have a timestamp clash issue, then the signal frequency is too high. The recommended approach would be to down-sample the update frequency, as any frequency higher than 1000Hz will result in unexpected results like these.
Real Thread Safe Issue from Demo Use Case
The final value of the counter being an arbitrary random number is the real thread-safe coding issue. if we take a look at the code again:
me.counter = me.counter + 1;
This piece of code can be split into three-piece:
In a multi-threaded environment, not performing the above three steps as a single operation will lead to a race issue. The way to solve this issue is to use a locking mechanism to serialize access to the property, which will acquire the lock, perform the three operations, and then release the lock. This can be done using either the Java Extension or the database thing to leverage the database lock mechanism.
Use Java Extension to Handle Thread Safe Challenge
This tutorial assumes that the Eclipse plug-in for ThingWorx extension development is already installed. The following will guide you through creating a simple Java extension step by step:
@SuppressWarnings("deprecation")
@ThingworxServiceDefinition(name = "IncreaseCounter", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false" })
@ThingworxServiceResult(name = "Result", description = "", baseType = "INTEGER", aspects = {})
public synchronized Integer IncreaseCounter() throws Exception {
_logger.trace("Entering Service: IncreaseCounter");
int current_value = ((IntegerPrimitive (this.getPropertyValue("Counter"))).getValue();
current_value ++;
this.setPropertyValue("Counter", new IntegerPrimitive(current_value));
_logger.trace("Exiting Service: IncreaseCounter");
return current_value;
}
@ThingworxServiceDefinition(name = "GetCounter", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false" })
@ThingworxServiceResult(name = "Result", description = "", baseType = "INTEGER", aspects = {})
public synchronized Integer GetCounter() throws Exception {
_logger.trace("Entering Service: GetCounter");
int current_value = ((IntegerPrimitive)(this.getPropertyValue("Counter"))).getValue();
_logger.trace("Exiting Service: GetCounter");
return current_value;
}
@SuppressWarnings("deprecation")
@ThingworxServiceDefinition(name = "ResetCounter", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false" })
@ThingworxServiceResult(name = "Result", description = "", baseType = "INTEGER", aspects = {})
public synchronized Integer ResetCounter() throws Exception {
_logger.trace("Entering Service: ResetCounter");
this.setPropertyValue("Counter", new IntegerPrimitive(0));
_logger.trace("Exiting Service: ResetCounter");
return 0;
}
{
"host":"twx85.desheng.io",
"port":443,
"protocol":"https",
"endpoint":"/Thingworx/Things/DeoLockerThing/services/IncreaseCounter",
"headers":{
"Content-Type":"application/json",
"Accept": "application/json",
"AppKey":"5cafe6eb-adba-41df-a7d6-4fc8088125c1"
},
"payload":{},
"round_break":50000,
"req_break":0,
"round_size":50,
"total_round":20
}
Just change the endpoint to point to the new thing.
Check the Test Result
Repeat the same test several times to ensure the results are consistent and expected (and don't forget to reset the counter between tests).
Summary of Java Extension Approach
The Java extension approach shown here uses the synchronized keyword to thread-safe the operation of several actions. Other options are to use a ReentryLock or Semaphore locker for the same purpose, but the synchronized keyword approach is much cleaner.
However, the Java extension locker will NOT work in 9.0 horizontal architecture since Java doesn't a have distributed locker. IgniteLocker wouldn't work in the current horizontal architecture, either. So if using a thread-safe counter in version 9.0+ horizontal architecture, then leverage the database thing, as discussion below.
Very interesting post. I had similar "fun" when assigning infotable. e.g. add a row. Event the write on the thing property ( x:=y) can cause a java exception because of on concurrency. Therefore concurrency and threadsafe is something that should take more focus. In my old days (1990++) it was a good practice to write new values in a fast queue that has e.g. place for 10k objects and then have a second service FIFO that just executes on the loop objects. If objects get faster IN than OUT the queue starts filling (hopefully not overfill) and if the load goes down again also the queue empty. But this was far before Cloud and hyperscale. How do the KAFKA message bus overcome this common topic?
Thanks for these articles @dxu . I've been thinking about this however and it is making me question how to tackle commonly used counter properties on a Thing, when the triggering event would be asynchronous? How then to handle this when I would want to increment a property based on another property being updated and using a subscription? As I understand how the thread pools would work, there would be a risk that two or more updates received close together could be executed at the same time in different threads.
Here is an example of what I'm thinking of.
5. 6. and 7. aren't thread safe however, so I understand that there is a risk that one of the consumption values could be overwritten, and then my index would progressively become more and more incorrect.
Have I appropriately understood this? If so, shouldn't there be a thread-safe ThingWorx service which allows incrementing/decrementing Properties? (ie: IncrementProperty( amount ))
Hi, Geva:
You are right, the use case you mentioned potentially has the thread-safe issue. However, the potential thread-safe issue can only be a real issue in two circumstances:
#1) Your energy meter sends (or receieves) energy demands very frequently, or
#2) You have several sources to send energy demands to the same meter.
In reality, most energy meters will receive demand info on a daily basis (more or less). The thread-safe concern wouldn't be a real issue.
The example used in this article is to handle "counter", but you can replace it with any floating number and use the same pattern to build an extension if you want to make sure to handle requests without thread-safe concern.
Hope this answers your question.
Desheng
Hi, Stefan:
Thingworx always uses multi-threads to process queues (like event queue, WS execution queue, etc). You can't designate a single thread to process those queues and therefore you have to put thread-safe in design consideration.
Theoretically, you can limit a single thread to process queues in Thingworx to avoid the thread-safe issue but you can imagine it would be a huge performance draw-back.
Kafka is a publish-subscribe message system and for sure you can use it as a queue system too. It will not tell you how to process or consume the messages it delivered. That's the logic you have to build in your client and the thread-safe concern should be addressed too.
Desheng
Thanks Desheng, yes it does answer my question.
I agree about the frequency and target aspects - if an energy meter was sending every 10 minutes as they normally would, then this would never present itself. What I understand is that it is more of a risk when operating in the millisecond timescale (really fast data ingestion, platform code execution). So in ThingWorx code, if you were planning on reading, modifying, and writing back a property value from the Thing Model, then you should try to do all the steps as close together as possible; as putting a lot of time between the read and write would increase the risk of collisions if you had different services which happened to update the same properties.
Greg