cancel
Showing results for 
Search instead for 
Did you mean: 
cancel
Showing results for 
Search instead for 
Did you mean: 

Community Tip - You can change your system assigned username to something more personal in your community settings. X

Thread Safe Coding in ThingWorx, Part 1: The Java Extension Approach

No ratings

image

Thread Safe Coding, Part 1: The Java Extension Approach

Written by Desheng Xu and edited by @vtielebein 

 

Overview

Time and again, customers report that one of their favorite ThingWorx features is using However, the Javascript language doesn't have a built-in semaphore locker mechanism, nothing to enable thread-safe concurrent processing, like you find in the Java language. This article demonstrates why thread safe coding is necessary and how to use the Java Extension for this purpose. Part 2 presents an alternative approach using database lockers.

 

Demo Use Case

Let's use a highly abstracted use case to demo thread-safe code practices:

  • There are tens of machines in a factory, and PLC will emit a signal to indicate an issue happens during run-time.
  • The customer expects to have a dashboard that shows today's total count of issues from all machines in real-time.
  • The customer is also expecting that a timestamp of each issue can be logged (regardless of the machine).

Similar use cases might be to:

  • Show the total product counts from each sub-line in the current shift.
  • Show the total rentals of bicycles from all remote sites.
  • Show the total issues of distant cash machines across the country.

 

Modeling

Thing: DashboardCounter, which includes:

  • 1 Property: name:counter, type:integer, logged:true, default value:0
  • 3 services:
    • IncreaseCounter(): increase counter value 1
    • GetCounter(): return current counter value
    • ResetCounter(): set counter value to 0
  • 1 Subscription: a subscription to the data change event of the property counter, which will print the new value and timestamp to the log.

imageimage

 

GetCounter

var result = me.counter;

 

IncreaseCounter

me.counter = me.counter + 1;

var result = me.counter;

 

ResetCounter

me.counter = 0;

var result = 0;

 

image

Subscription MonitorCounter

Logger.info(eventData.newValue.value+":"+eventData.newValue.time.getTime());

 

ValueStream

For simplicity, the value stream entity is not included in the attachment. Please go ahead and assign a value stream to this Thing to monitor the property values.

 

Test Tool

A small test tool mulreqs is attached here, along with some extensions and ThingWorx entities that are useful. The mulreqs tool uses a configuration file from the OS variable definition MULTI_REQUEST_CONFIG.

 

In Linux/MacOS:

export MULTI_REQUEST_CONFIG="./config.json"

in config.json file, you can use the following configuration:

 

 

 

{
    "host":"twx85.desheng.io",
    "port":443,
    "protocol":"https",
    "endpoint":"/Thingworx/Things/DashboardCounter/services/IncreaseCounter",
    "headers":{
        "Content-Type":"application/json",
        "Accept": "application/json",
        "AppKey":"5cafe6eb-adba-41df-a7d6-4fc8088125c1"
    },
    "payload":{},
    "round_break":50000,
    "req_break":0,
    "round_size":50,
    "total_round":20
}

 

 

 

host, port, protocol, headers are very identical to define a ThingWorx server.

endpoint defines which service is called during the test.

payload is not in use at this moment but you have to keep it here.

total_round is how many rounds of the test you want to run.

round_size defines how many requests will be sent simultaneously during each round.

round_break is the pause time during each round in Microseconds, so 50000 in the above example means 50ms.

req_break is 0, this is the delay between requests. "0" means requests to the server will happen simultaneously.

 

The expectation from the above configuration is service execution a total of 20*50 times, 1000 times. So, we can expect that if the initial value is 0, then counter should be 1000 at the end, and if the value stream is clean initially, then the value stream should have a history from 1 to 1000.

 

Run Test

Use the following command to perform the test:

.<your path>/mulreqs

Execution output will look like:
image

 

Check Result

image

You will be surprised that the final value is 926 instead of 1000. (Caution: this value will be different in different tests and it can be any value in the range of 1 and 1000).

image

Now, look at the value stream by using QueryPropertyHistory. There are many values missing here, and while the total count can vary in different tests, it is unlikely to be exactly the last value (926). Notice that the last 5 values are: 926, 925, 921, 918. The values 919, 920, 922, and 923 are all missing. So next we check if there are any errors in the script log, and there are none. There are only print statements we deliberately placed in the logs.

image

So, we have observed two symptoms here:

  1. The final value from property counter doesn't have the expected value.
  2. The value stream doesn't have the expected history of the counter property changes.

What's the reason behind each symptom, and which one is a thread-safe issue?

 

Understanding Timestamp Granularity

ThingWorx facilitates the collection of time series data and solutions centered around such data by allowing for use of the timestamp as the primary key. However, a timestamp will always have a minimal granularity definition when you process it. In ThingWorx, the minimal granularity or unit of a timestamp is one millisecond.

 

Looking at the log we generated from the subscription again, we see that several data points (922, 923, 924, 925) have the same timestamp (1596089891147), which is GMT Thursday, July 30, 2020, 6:18:11.147 AM. When each of these data points is flushed into the database, the later data points overwrite the earlier ones since they all have the same timestamp. So, data point 922 went into the value stream first, and then was overwritten by data point 923, and then 924, and then 925. The next data point in the value stream is 926, which has a new timestamp (1596089891148), 1ms behind the previous one. Therefore, data points 925 and 926 are stored while 922, 923, 924 are not. These lost data points are therefore NOT a thread-safe issue.

 

The reason why some of these data points have the same timestamp in this example is because multiple machines write to the same value stream. The right approach is to log data points at the individual machine level, with a different value stream per machine.

 

However, what happens if one machine emits data too frequently? If data points from the same machine still have a timestamp clash issue, then the signal frequency is too high. The recommended approach would be to down-sample the update frequency, as any frequency higher than 1000Hz will result in unexpected results like these.

 

Real Thread Safe Issue from Demo Use Case

The final value of the counter being an arbitrary random number is the real thread-safe coding issue. if we take a look at the code again:

me.counter = me.counter + 1;

This piece of code can be split into three-piece:

  • Step 1: read current value of me.counter
  • Step 2: increase this value
  • Step 3: set me.counter with new value.

In a multi-threaded environment, not performing the above three steps as a single operation will lead to a race issue. The way to solve this issue is to use a locking mechanism to serialize access to the property, which will acquire the lock, perform the three operations, and then release the lock. This can be done using either the Java Extension or the database thing to leverage the database lock mechanism.

 

Use Java Extension to Handle Thread Safe Challenge

This tutorial assumes that the Eclipse plug-in for ThingWorx extension development is already installed. The following will guide you through creating a simple Java extension step by step:

  1. Create a Java Extension ProjectimageChoose the minimal ThingWorx version to support and select the corresponding SDK. Let's name it JavaExtLocker, though it’s best to use lower-case in the project name.

  2. Add a ThingWorx Template in the src Folder
    image
    Right-click the src folder and a a Thing Template.

  3. Add a Thing property
    image
    Right click on the Java source file created in the above step and click the menu option called Thingworx Source, then select Add Property.

  4. Add Three Services: IncreaseCounter, GetCounter, ResetCounter
    image
    Right click the Java source file and select the menu option called Thingworx source, then select Add Service. See above for the IncreaseCounter service details.

    Repeat these same steps to add GetCounter and ResetCounter:
    image
    image
  5. (Optionally) Add a Generated Serial ID
    image
  6. Add Code to the Three Services
    @SuppressWarnings("deprecation")
    @ThingworxServiceDefinition(name = "IncreaseCounter", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false" })
    @ThingworxServiceResult(name = "Result", description = "", baseType = "INTEGER", aspects = {})
    public synchronized Integer IncreaseCounter() throws Exception {
    	_logger.trace("Entering Service: IncreaseCounter");
    	int current_value = ((IntegerPrimitive (this.getPropertyValue("Counter"))).getValue();
    	current_value ++;
    	this.setPropertyValue("Counter", new IntegerPrimitive(current_value));
    		
    	_logger.trace("Exiting Service: IncreaseCounter");
    	return current_value;
    }
    
    @ThingworxServiceDefinition(name = "GetCounter", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false" })
    @ThingworxServiceResult(name = "Result", description = "", baseType = "INTEGER", aspects = {})
    public synchronized Integer GetCounter() throws Exception {
    	_logger.trace("Entering Service: GetCounter");
    	int current_value = ((IntegerPrimitive)(this.getPropertyValue("Counter"))).getValue();
    	_logger.trace("Exiting Service: GetCounter");
    	return current_value;
    }
    
    @SuppressWarnings("deprecation")
    @ThingworxServiceDefinition(name = "ResetCounter", description = "", category = "", isAllowOverride = false, aspects = {"isAsync:false" })
    @ThingworxServiceResult(name = "Result", description = "", baseType = "INTEGER", aspects = {})
    public synchronized Integer ResetCounter() throws Exception {
    	_logger.trace("Entering Service: ResetCounter");
    	this.setPropertyValue("Counter", new IntegerPrimitive(0));
    	_logger.trace("Exiting Service: ResetCounter");
    	return 0;
    }​
    image
    The key here is the synchronized modifier, which is what allows for Java to control the multi-threading to prevent data loss.

  7. Build the Application
    Use 'gradle build' to generate a build of the extension.

  8. Import the Extension into ThingWorx
    image

  9. Create Thing Based on New Thing Template
    image
  10. Check the New Thing Property and Service Definition
    imageimage

  11. Use the Same Test Tool to Run the Test Again
     {
      "host":"twx85.desheng.io",
      "port":443,
      "protocol":"https",
      
     	     "endpoint":"/Thingworx/Things/DeoLockerThing/services/IncreaseCounter",
      "headers":{
            "Content-Type":"application/json",
            "Accept": "application/json",
            "AppKey":"5cafe6eb-adba-41df-a7d6-4fc8088125c1"
      },
      "payload":{},
      "round_break":50000,
      "req_break":0,
      "round_size":50,
      "total_round":20
    }
    ​

    Just change the endpoint to point to the new thing.

    image

  12.  Check the Test Resultimage
    Repeat the same test several times to ensure the results are consistent and expected (and don't forget to reset the counter between tests).

Summary of Java Extension Approach

The Java extension approach shown here uses the synchronized keyword to thread-safe the operation of several actions. Other options are to use a ReentryLock or Semaphore locker for the same purpose, but the synchronized keyword approach is much cleaner.

 

However, the Java extension locker will NOT work in 9.0 horizontal architecture since Java doesn't a have distributed locker. IgniteLocker wouldn't work in the current horizontal architecture, either. So if using a thread-safe counter in version 9.0+ horizontal architecture, then leverage the database thing, as discussion below.

Comments

Very interesting post. I had similar "fun" when assigning infotable. e.g. add a row. Event the write on the thing property ( x:=y) can cause a java exception because of on concurrency. Therefore concurrency and threadsafe is something that should take more focus. In my old days (1990++) it was a good practice to write new values in a fast queue that has e.g. place for 10k objects and then have a second service FIFO that just executes on the loop objects. If objects get faster IN than OUT the queue starts filling (hopefully not overfill) and if the load goes down again also the queue empty. But this was far before Cloud and hyperscale. How do the KAFKA message bus overcome this common topic?

Thanks for these articles @dxu .  I've been thinking about this however and it is making me question how to tackle commonly used counter properties on a Thing, when the triggering event would be asynchronous?  How then to handle this when I would want to increment a property based on another property being updated and using a subscription?  As I understand how the thread pools would work, there would be a risk that two or more updates received close together could be executed at the same time in different threads.

 

Here is an example of what I'm thinking of.

  1. Energy meter sends messages containing energy consumed
  2. In ThingWorx I want to receive these and add them together to form an energy meter index of total consumed energy
  3. EnergyDemand (being consumed) is bound to a Remote Thing
  4. I put a subscription on EnergyDemand DataChange event to trigger incrementing the total
  5. The totalling service get the current total from a Property
  6. Then adds the new consumed energy
  7. And writes the total back to the EnergyIndex Property

5. 6. and 7. aren't thread safe however, so I understand that there is a risk that one of the consumption values could be overwritten, and then my index would progressively become more and more incorrect.

 

Have I appropriately understood this?  If so, shouldn't there be a thread-safe ThingWorx service which allows incrementing/decrementing Properties? (ie: IncrementProperty( amount ))

 

 

 

 
 

Hi, Geva:

    You are right, the use case you mentioned potentially has the thread-safe issue. However, the potential thread-safe issue can only be a real issue in two circumstances:

#1) Your energy meter sends (or receieves) energy demands very frequently, or

#2) You have several sources to send energy demands to the same meter.

 

    In reality, most energy meters will receive demand info on a daily basis (more or less). The thread-safe concern wouldn't be a real issue.

 

    The example used in this article is to handle "counter", but you can replace it with any floating number and use the same pattern to build an extension if you want to make sure to handle requests without thread-safe concern.

   Hope this answers your question.

Desheng

 

 

Hi, Stefan:

    Thingworx always uses multi-threads to process queues (like event queue, WS execution queue, etc). You can't designate a single thread to process those queues and therefore you have to put thread-safe in design consideration.

    Theoretically, you can limit a single thread to process queues in Thingworx to avoid the thread-safe issue but you can imagine it would be a huge performance draw-back.

 

    Kafka is a publish-subscribe message system and for sure you can use it as a queue system too. It will not tell you how to process or consume the messages it delivered. That's the logic you have to build in your client and the thread-safe concern should be addressed too.

 

Desheng

 

Thanks Desheng, yes it does answer my question.

I agree about the frequency and target aspects - if an energy meter was sending every 10 minutes as they normally would, then this would never present itself.  What I understand is that it is more of a risk when operating in the millisecond timescale (really fast data ingestion, platform code execution).  So in ThingWorx code, if you were planning on reading, modifying, and writing back a property value from the Thing Model, then you should try to do all the steps as close together as possible; as putting a lot of time between the read and write would increase the risk of collisions if you had different services which happened to update the same properties.

Greg

Version history
Last update:
‎Nov 30, 2020 09:08 PM
Updated by:
Labels (1)
Attachments