Skip to main content

Subscribe Topics

Now that we are connected successfully to the Watchtower service, let's see how we can stream down some Topics from the HCS. See the protobuf definition.

info

Find a working example under Arkhia Quick start

hedera/reactjs/demo-app/src/watchtower/streaming.topics.tsx section.

Payload Static Reference
export interface TopicID {  'shardNum'?: (number | string | Long);  'realmNum'?: (number | string | Long);  'topicNum'?: (number | string | Long);}export interface Timestamp {  'seconds'?: (number | string | Long);  'nanos'?: (number);}export interface TopicQuery {  'topicID'?: (TopicID | null);  'consensusStartTime'?: (Timestamp | null);  'consensusEndTime'?: (Timestamp | null);  'limit'?: (number | string | Long);}export interface SubscribeTopicPayload {  subscribe: string;  body: TopicQuery;}const subscribeTopicRoute = `/com.hedera.mirror.api.proto.ConsensusService/subscribeTopic`;const payloadBody: TopicQuery = {    consensusStartTime: { // start date         nanos: 0,        seconds: `0`    },    limit: 100,  // limit of topics streamed down    topicID: {        realmNum: `0`,  // realm and shard are still zero by default for now        shardNum: `0`,        topicNum: 'any_topic_id' // Here we add the topic Id we want to subscribe    }}const payload: SubscribeTopicPayload = {    subscribe: subscribeTopicRoute,    body: payloadBody}
Payload breakdown

To subscribe this stream, our payload should match reference protobuf definition.

View Definition
message ConsensusTopicQuery {    /**     * A required topic ID to retrieve messages for.     */    .proto.TopicID topicID = 1;    /**     * Include messages which reached consensus on or after this time. Defaults to current time if     * not set.     */    .proto.Timestamp consensusStartTime = 2;    /**     * Include messages which reached consensus before this time. If not set it will receive     * indefinitely.     */    .proto.Timestamp consensusEndTime = 3;    /**     * The maximum number of messages to receive before stopping. If not set or set to zero it will     * return messages indefinitely.     */    uint64 limit = 4;}

By looking into the defination, we can see that we need to provide a topicID, consensusStartTime (optional), consensusEndTime (optional) and limit (optional) to get the latest messages.

For forming the payload, we should also be aware of basic_types.proto, timestamp.proto used in the defination.

View Definition
/** * Unique identifier for a topic (used by the consensus service) */message TopicID {    /**     * The shard number (nonnegative)     */    int64 shardNum = 1;    /**     * The realm number (nonnegative)     */    int64 realmNum = 2;    /**     * Unique topic identifier within a realm (nonnegative).     */    int64 topicNum = 3;}/** * An exact date and time. This is the same data structure as the protobuf Timestamp.proto (see the * comments in https://github.com/google/protobuf/blob/master/src/google/protobuf/timestamp.proto) */message Timestamp {    /**     * Number of complete seconds since the start of the epoch     */    int64 seconds = 1;    /**     * Number of nanoseconds since the start of the last second     */    int32 nanos = 2;}

All we need is to build a payload as below:

import React, { useEffect, useState } from "react";import io from 'socket.io-client';function StreamingInit () {  // Make sure you match the url for testnet/mainnet   const demoTopicId = `<YOUR_TOPIC_ID>`; // https://explorer.arkhia.io/#/testnet/topics  const watchtowerURL = `wss://<WATCHTOWER_URL>?api_key=<ARKHIA_API_KEY>`;  const socket = io(watchtowerURL);  const [socketStatus, setSocketStatus] = useState(``);  const [socketConnect, setSocketConnect] = useState(false);  // Available services in page 2 of these docs  const subscribeTopicRoute = `/com.hedera.mirror.api.proto.ConsensusService/subscribeTopic`;  const getSubscriptionPayload =  (topic_id: string, limit: string) => {      const subscriptionPayload = {          subscribe: subscribeTopicRoute,          body: {              consensusStartTime: { // start date                   nanos: 0,                  seconds: `0`              },              limit: limit,  // limit of topics streamed down              topicID: {                  realmNum: `0`,  // realm and shard are still zero by default for now                  shardNum: `0`,                  topicNum: topic_id // Here we add the topic Id we want to subscribe              }          }      };  };  const subscribeToTopic = () => {      const requestPayload =  getSubscriptionPayload(demoTopicId , `100`); // get latest 100      socket.emit(`subscribe`, requestPayload, (msg: any) => {                     socket.on(msg.listeners.data, function (message: any) {              console.log(`Topic messages streaming down...`);              console.log(message)          });          socket.on(msg.listeners.error, function (message: any) {              setSocketError(message);              console.log(`Error`);              console.log(message)          });      });  };  useEffect(() => {      socket.on(`connect`, () => {          setSocketConnect(true);          socket.on(`status`, (msg) => {              console.log(`status`, msg);              setSocketStatus(`Watchtower is successfully connected`);          });          socket.emit(`list-services`, (services: any) => {              console.log(`Listing available services`, services);          });      });      socket.on(`disconnect`, (msg: any) => {          setSocketConnect(false);          setSocketStatus(`Watchtower is disconnected.`)          console.log(`Disconnected.`)      });      socket.on(`error`, (message: any) => {          setSocketConnect(false);          console.error(message);          setSocketStatus(message)      })  }, [socket]);  return (      <>          <div>              {                  socketStatus && (                      <>                          <h6>Watchtower status</h6>                          <div>{socketStatus}</div>                      </>                  )              }              {                  socketConnect &&  (                      <>                         <button onClick={subscribeToTopic}>                      </>                  )              }          </div>      </>  );}export default StreamingInit;