cancel
Showing results for 
Search instead for 
Did you mean: 
cancel
Showing results for 
Search instead for 
Did you mean: 

Community Tip - Need to share some code when posting a question or reply? Make sure to use the "Insert code sample" menu option. Learn more! X

How to copy entries from one Stream to other Stream

No ratings

Let's consider that we have two Streams Stream1 and Stream2 with same DataShape StreamDS.

DataShape StreamDS has two fields Id (number) and Name (string).

We want to copy all the entries from Stream1 to Stream2.

Steps:

1. Open Stream1 Stream in Composer and run GetStreamEntriesWithData service.

2. In the popup click on Create DataShape from Result option to create a new DataShape GetStreamEntriesDS.

3. Create a Service and use JavaScript like below (Added Comments for Details):

// Create Temporary Infotable to hold output of GetStreamEntriesWithData Service

var paramsForInfotable = {

  infoTableName: "InfoTable" /* STRING */,

  dataShapeName: "GetStreamEntriesDS" /* DATASHAPENAME */

};

// result: INFOTABLE

var InfotableForCopy = Resources["InfoTableFunctions"].CreateInfoTableFromDataShape(paramsForInfotable);

//Save output of GetStreamEntriesWithData Service to Temporary Infotable InfotableForCopy

var paramsForGetStreamEntriesWithDataService = {

  oldestFirst: false /* BOOLEAN */,

  maxItems: 10000 /* NUMBER */

};

// result: INFOTABLE dataShape: "GetStreamEntriesDS"

InfotableForCopy = Things["Stream1"].GetStreamEntriesWithData(paramsForGetStreamEntriesWithDataService);

// Read the data from Infotable row by row and add it to new Stream

var tableLength = InfotableForCopy.rows.length;

for (var x = 0; x < tableLength; x++) {

  var row = InfotableForCopy.rows;

// values:INFOTABLE(Datashape: StreamDS)

var values = Things["Stream2"].CreateValues();

values.Id = row.Id; //NUMBER

values.Name = row.Name; //STRING

var paramsForAddStreamEntryService = {

  sourceType: row.sourceType /* STRING */,

  values: values /* INFOTABLE*/,

  location: row.location /* LOCATION */,

  source: row.source /* STRING */,

  timestamp: row.timestamp /* DATETIME */,

  tags: row.tags /* TAGS */

};

// AddStreamEntry(tags:TAGS, timestamp:DATETIME, source:STRING, values:INFOTABLE(StreamDS), location:LOCATION):NOTHING

Things["Stream2"].AddStreamEntry(paramsForAddStreamEntryService);

}

var result = InfotableForCopy;

Comments

Very nice.

However outside of the Neo4J data store, this is probably best done directly in the database.

Hi Pai,

Why? then if you want to change the database provider you will have a problem, we must not care about the underlying database... There's already too much on a big IoT project already...

Best Regards,

Carles.

My thought is that you wouldn't do an action like this too often so you can do this very quickly with an update query.

If there is no real time need for this duplicated data, this would be a way to offload processing from the JVM and if you have deployed runtime and data store separately, it also offloads load from the runtime server.

Logical layer it's logical layer, and data layer it's data layer

But of course for sporadic operations can be an option.

Hi Ankit,

I am trying to add entries to stream by using "FOR Loop".

But when I am checking entries with "GetStreamEntriesWithData" service only last iterated entry only saving in Stream.

Observed like at 2nd iteration of loop,1st entry is deleting from stream, like this finally last iterated entry only showing in result of Stream.

Can you please suggest how to enter multiple entries to Stream.

Also providing my code ,suggest me if I am doing anything wrong.

For your reference sharing my code :

// timestamp:DATETIME

var timestamp = new Date();

var Types = new Array();

Types = ['A','B','C','D','E'];

for (var i=0 ;i < Types.length ; i++)

{

  var values = Things["test"].CreateValues(); 

   

    var  params_sql = {

  pIncidentType: Types /* STRING */

  };

// result: INFOTABLE dataShape: "undefined"

  var SQL_Qry = me.GetIncidentsTypeDataQry(params_sql);

values.Type = Types ; //STRING

values.Count = SQL_Qry.count; //NUMBER

// location:LOCATION

var location = new Object();

location.latitude = 0;

location.longitude = 0;

location.elevation = 0;

location.units ="WGS84";

var params = {

    sourceType : "Thing",

  timestamp : timestamp,

  source : me.name,

  values : values,

  location : location

};

Things["test"].AddStreamEntry(params);

}

Appreciate your help.

Thanks,

Rajee

On your for loop you have the same timestamp for all the values as you get it's value pre-loop, on a stream there can be only one entry per timestamp.

Still if you put timestamp inside for loop you will not get all the entries, as it will be super fast and more than one iteration will get same timestamp ( same millisecond ).

One option can be on each loop iteration add a millisecond to initial one:

timestamp = dateAddMilliseconds(timestamp,1);

This will create one different row per entry, the other option would be to use a DataTable and set primaryKey to other than timestamp.

Carles.

I usually do what Charles has with the add millisecond but in my loop i increase and subtract first and then run it up to current date time to have a 'historical' series.

Thanks for your reply.

Its working now.

Thanks for your reply.

Issue resolved by adding millisecond to timestamp 

Take it easy this approach, as if you receive data to frequently you will have again the same problem.

Millisecond 0 -> You receive 1000 entries --> you added data until the future 1000 milliseconds

Millisecond 500 --> You receive 1000 entries --> As you already added data until the 1000 millisecond you will start overwriting entries above 500 millisecond.

Carles.

Version history
Last update:
‎Sep 21, 2016 02:11 AM
Updated by:
Labels (1)