Thread Safe Coding, Part 1: The Java Extension Approach
Written by Desheng Xu and edited by @vtielebein
Overview
Time and again, customers report that one of their favorite ThingWorx features is using However, the Javascript language doesn't have a built-in semaphore locker mechanism, nothing to enable thread-safe concurrent processing, like you find in the Java language. This article demonstrates why thread safe coding is necessary and how to use the Java Extension for this purpose. Part 2 presents an alternative approach using database lockers.
Demo Use Case
Let's use a highly abstracted use case to demo thread-safe code practices:
There are tens of machines in a factory, and PLC will emit a signal to indicate an issue happens during run-time.
The customer expects to have a dashboard that shows today's total count of issues from all machines in real-time.
The customer is also expecting that a timestamp of each issue can be logged (regardless of the machine).
Similar use cases might be to:
Show the total product counts from each sub-line in the current shift.
Show the total rentals of bicycles from all remote sites.
Show the total issues of distant cash machines across the country.
Modeling
Thing: DashboardCounter, which includes:
1 Property: name:counter, type:integer, logged:true, default value:0
3 services:
IncreaseCounter(): increase counter value 1
GetCounter(): return current counter value
ResetCounter(): set counter value to 0
1 Subscription: a subscription to the data change event of the property counter, which will print the new value and timestamp to the log.
GetCounter
var result = me.counter;
IncreaseCounter
me.counter = me.counter + 1;
var result = me.counter;
ResetCounter
me.counter = 0;
var result = 0;
Subscription MonitorCounter
Logger.info(eventData.newValue.value+":"+eventData.newValue.time.getTime());
ValueStream
For simplicity, the value stream entity is not included in the attachment. Please go ahead and assign a value stream to this Thing to monitor the property values.
Test Tool
A small test tool mulreqs is attached here, along with some extensions and ThingWorx entities that are useful. The mulreqs tool uses a configuration file from the OS variable definition MULTI_REQUEST_CONFIG.
In Linux/MacOS:
export MULTI_REQUEST_CONFIG="./config.json"
in config.json file, you can use the following configuration:
{
"host":"twx85.desheng.io",
"port":443,
"protocol":"https",
"endpoint":"/Thingworx/Things/DashboardCounter/services/IncreaseCounter",
"headers":{
"Content-Type":"application/json",
"Accept": "application/json",
"AppKey":"5cafe6eb-adba-41df-a7d6-4fc8088125c1"
},
"payload":{},
"round_break":50000,
"req_break":0,
"round_size":50,
"total_round":20
}
host, port, protocol, headers are very identical to define a ThingWorx server.
endpoint defines which service is called during the test.
payload is not in use at this moment but you have to keep it here.
total_round is how many rounds of the test you want to run.
round_size defines how many requests will be sent simultaneously during each round.
round_break is the pause time during each round in Microseconds, so 50000 in the above example means 50ms.
req_break is 0, this is the delay between requests. "0" means requests to the server will happen simultaneously.
The expectation from the above configuration is service execution a total of 20*50 times, 1000 times. So, we can expect that if the initial value is 0, then counter should be 1000 at the end, and if the value stream is clean initially, then the value stream should have a history from 1 to 1000.
Run Test
Use the following command to perform the test:
.<your path>/mulreqs
Execution output will look like:
Check Result
You will be surprised that the final value is 926 instead of 1000. (Caution: this value will be different in different tests and it can be any value in the range of 1 and 1000).
Now, look at the value stream by using QueryPropertyHistory. There are many values missing here, and while the total count can vary in different tests, it is unlikely to be exactly the last value (926). Notice that the last 5 values are: 926, 925, 921, 918. The values 919, 920, 922, and 923 are all missing. So next we check if there are any errors in the script log, and there are none. There are only print statements we deliberately placed in the logs.
So, we have observed two symptoms here:
The final value from property counter doesn't have the expected value.
The value stream doesn't have the expected history of the counter property changes.
What's the reason behind each symptom, and which one is a thread-safe issue?
Understanding Timestamp Granularity
ThingWorx facilitates the collection of time series data and solutions centered around such data by allowing for use of the timestamp as the primary key. However, a timestamp will always have a minimal granularity definition when you process it. In ThingWorx, the minimal granularity or unit of a timestamp is one millisecond.
Looking at the log we generated from the subscription again, we see that several data points (922, 923, 924, 925) have the same timestamp (1596089891147), which is GMT Thursday, July 30, 2020, 6:18:11.147 AM. When each of these data points is flushed into the database, the later data points overwrite the earlier ones since they all have the same timestamp. So, data point 922 went into the value stream first, and then was overwritten by data point 923, and then 924, and then 925. The next data point in the value stream is 926, which has a new timestamp (1596089891148), 1ms behind the previous one. Therefore, data points 925 and 926 are stored while 922, 923, 924 are not. These lost data points are therefore NOT a thread-safe issue.
The reason why some of these data points have the same timestamp in this example is because multiple machines write to the same value stream. The right approach is to log data points at the individual machine level, with a different value stream per machine.
However, what happens if one machine emits data too frequently? If data points from the same machine still have a timestamp clash issue, then the signal frequency is too high. The recommended approach would be to down-sample the update frequency, as any frequency higher than 1000Hz will result in unexpected results like these.
Real Thread Safe Issue from Demo Use Case
The final value of the counter being an arbitrary random number is the real thread-safe coding issue. if we take a look at the code again:
me.counter = me.counter + 1;
This piece of code can be split into three-piece:
Step 1: read current value of me.counter
Step 2: increase this value
Step 3: set me.counter with new value.
In a multi-threaded environment, not performing the above three steps as a single operation will lead to a race issue. The way to solve this issue is to use a locking mechanism to serialize access to the property, which will acquire the lock, perform the three operations, and then release the lock. This can be done using either the Java Extension or the database thing to leverage the database lock mechanism.
Use Java Extension to Handle Thread Safe Challenge
This tutorial assumes that the Eclipse plug-in for ThingWorx extension development is already installed. The following will guide you through creating a simple Java extension step by step:
Create a Java Extension Project Choose the minimal ThingWorx version to support and select the corresponding SDK. Let's name it JavaExtLocker, though it’s best to use lower-case in the project name.
Add a ThingWorx Template in the src Folder Right-click the src folder and a a Thing Template.
Add a Thing property Right click on the Java source file created in the above step and click the menu option called Thingworx Source, then select Add Property.
Add Three Services: IncreaseCounter, GetCounter, ResetCounter Right click the Java source file and select the menu option called Thingworx source, then select Add Service. See above for the IncreaseCounter service details. Repeat these same steps to add GetCounter and ResetCounter:
(Optionally) Add a Generated Serial ID
Add Code to the Three Services @SuppressWarnings("deprecation")
@ThingworxServiceDefinition(name = "IncreaseCounter", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false" })
@ThingworxServiceResult(name = "Result", description = "", baseType = "INTEGER", aspects = {})
public synchronized Integer IncreaseCounter() throws Exception {
_logger.trace("Entering Service: IncreaseCounter");
int current_value = ((IntegerPrimitive (this.getPropertyValue("Counter"))).getValue();
current_value ++;
this.setPropertyValue("Counter", new IntegerPrimitive(current_value));
_logger.trace("Exiting Service: IncreaseCounter");
return current_value;
}
@ThingworxServiceDefinition(name = "GetCounter", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false" })
@ThingworxServiceResult(name = "Result", description = "", baseType = "INTEGER", aspects = {})
public synchronized Integer GetCounter() throws Exception {
_logger.trace("Entering Service: GetCounter");
int current_value = ((IntegerPrimitive)(this.getPropertyValue("Counter"))).getValue();
_logger.trace("Exiting Service: GetCounter");
return current_value;
}
@SuppressWarnings("deprecation")
@ThingworxServiceDefinition(name = "ResetCounter", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false" })
@ThingworxServiceResult(name = "Result", description = "", baseType = "INTEGER", aspects = {})
public synchronized Integer ResetCounter() throws Exception {
_logger.trace("Entering Service: ResetCounter");
this.setPropertyValue("Counter", new IntegerPrimitive(0));
_logger.trace("Exiting Service: ResetCounter");
return 0;
} The key here is the synchronized modifier, which is what allows for Java to control the multi-threading to prevent data loss.
Build the Application Use 'gradle build' to generate a build of the extension.
Import the Extension into ThingWorx
Create Thing Based on New Thing Template
Check the New Thing Property and Service Definition
Use the Same Test Tool to Run the Test Again {
"host":"twx85.desheng.io",
"port":443,
"protocol":"https",
"endpoint":"/Thingworx/Things/DeoLockerThing/services/IncreaseCounter",
"headers":{
"Content-Type":"application/json",
"Accept": "application/json",
"AppKey":"5cafe6eb-adba-41df-a7d6-4fc8088125c1"
},
"payload":{},
"round_break":50000,
"req_break":0,
"round_size":50,
"total_round":20
}
Just change the endpoint to point to the new thing.
Check the Test Result Repeat the same test several times to ensure the results are consistent and expected (and don't forget to reset the counter between tests).
Summary of Java Extension Approach
The Java extension approach shown here uses the synchronized keyword to thread-safe the operation of several actions. Other options are to use a ReentryLock or Semaphore locker for the same purpose, but the synchronized keyword approach is much cleaner.
However, the Java extension locker will NOT work in 9.0 horizontal architecture since Java doesn't a have distributed locker. IgniteLocker wouldn't work in the current horizontal architecture, either. So if using a thread-safe counter in version 9.0+ horizontal architecture, then leverage the database thing, as discussion below.
View full tip