package twx.extension.test; import com.thingworx.metadata.annotations.ThingworxBaseTemplateDefinition; import com.thingworx.things.Thing; import com.thingworx.types.primitives.StringPrimitive; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; @ThingworxBaseTemplateDefinition(name = "GenericThing") public class KFKBKRTemplate extends Thing implements Runnable { private KafkaStream m_stream; private int m_threadNumber; private Thing m_thing; public KFKBKRTemplate(KafkaStream a_stream, int a_threadNumber,Thing thing){ m_threadNumber = a_threadNumber; m_stream = a_stream; m_thing = thing; } @Override public void run() { // TODO Auto-generated method stub try { ConsumerIterator it = m_stream.iterator(); while (it.hasNext()) { String value = new String(it.next().message()); m_thing.setPropertyValue("ConsumeMessage", new StringPrimitive(value)); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }