Subscribe Topics
Now that we are connected successfully to the Watchtower service, let's see how we can stream down some Topics from the HCS. See the protobuf definition.
info
Find a working example under Arkhia Quick start
hedera/reactjs/demo-app/src/watchtower/streaming.topics.tsx
section.
Payload Static Reference
export interface TopicID { 'shardNum'?: (number | string | Long); 'realmNum'?: (number | string | Long); 'topicNum'?: (number | string | Long);}export interface Timestamp { 'seconds'?: (number | string | Long); 'nanos'?: (number);}export interface TopicQuery { 'topicID'?: (TopicID | null); 'consensusStartTime'?: (Timestamp | null); 'consensusEndTime'?: (Timestamp | null); 'limit'?: (number | string | Long);}export interface SubscribeTopicPayload { subscribe: string; body: TopicQuery;}const subscribeTopicRoute = `/com.hedera.mirror.api.proto.ConsensusService/subscribeTopic`;const payloadBody: TopicQuery = { consensusStartTime: { // start date nanos: 0, seconds: `0` }, limit: 100, // limit of topics streamed down topicID: { realmNum: `0`, // realm and shard are still zero by default for now shardNum: `0`, topicNum: 'any_topic_id' // Here we add the topic Id we want to subscribe }}const payload: SubscribeTopicPayload = { subscribe: subscribeTopicRoute, body: payloadBody}
Payload breakdown
To subscribe this stream, our payload should match reference protobuf definition.
View Definition
message ConsensusTopicQuery { /** * A required topic ID to retrieve messages for. */ .proto.TopicID topicID = 1; /** * Include messages which reached consensus on or after this time. Defaults to current time if * not set. */ .proto.Timestamp consensusStartTime = 2; /** * Include messages which reached consensus before this time. If not set it will receive * indefinitely. */ .proto.Timestamp consensusEndTime = 3; /** * The maximum number of messages to receive before stopping. If not set or set to zero it will * return messages indefinitely. */ uint64 limit = 4;}
By looking into the defination, we can see that we need to provide a topicID
, consensusStartTime
(optional), consensusEndTime
(optional) and limit
(optional) to get the latest messages.
For forming the payload, we should also be aware of basic_types.proto
, timestamp.proto
used in the defination.
View Definition
/** * Unique identifier for a topic (used by the consensus service) */message TopicID { /** * The shard number (nonnegative) */ int64 shardNum = 1; /** * The realm number (nonnegative) */ int64 realmNum = 2; /** * Unique topic identifier within a realm (nonnegative). */ int64 topicNum = 3;}/** * An exact date and time. This is the same data structure as the protobuf Timestamp.proto (see the * comments in https://github.com/google/protobuf/blob/master/src/google/protobuf/timestamp.proto) */message Timestamp { /** * Number of complete seconds since the start of the epoch */ int64 seconds = 1; /** * Number of nanoseconds since the start of the last second */ int32 nanos = 2;}
All we need is to build a payload as below:
- React.js
- Node.js
- HTML/Javascript
import React, { useEffect, useState } from "react";import io from 'socket.io-client';function StreamingInit () { // Make sure you match the url for testnet/mainnet const demoTopicId = `<YOUR_TOPIC_ID>`; // https://explorer.arkhia.io/#/testnet/topics const watchtowerURL = `wss://<WATCHTOWER_URL>?api_key=<ARKHIA_API_KEY>`; const socket = io(watchtowerURL); const [socketStatus, setSocketStatus] = useState(``); const [socketConnect, setSocketConnect] = useState(false); // Available services in page 2 of these docs const subscribeTopicRoute = `/com.hedera.mirror.api.proto.ConsensusService/subscribeTopic`; const getSubscriptionPayload = (topic_id: string, limit: string) => { const subscriptionPayload = { subscribe: subscribeTopicRoute, body: { consensusStartTime: { // start date nanos: 0, seconds: `0` }, limit: limit, // limit of topics streamed down topicID: { realmNum: `0`, // realm and shard are still zero by default for now shardNum: `0`, topicNum: topic_id // Here we add the topic Id we want to subscribe } } }; }; const subscribeToTopic = () => { const requestPayload = getSubscriptionPayload(demoTopicId , `100`); // get latest 100 socket.emit(`subscribe`, requestPayload, (msg: any) => { socket.on(msg.listeners.data, function (message: any) { console.log(`Topic messages streaming down...`); console.log(message) }); socket.on(msg.listeners.error, function (message: any) { setSocketError(message); console.log(`Error`); console.log(message) }); }); }; useEffect(() => { socket.on(`connect`, () => { setSocketConnect(true); socket.on(`status`, (msg) => { console.log(`status`, msg); setSocketStatus(`Watchtower is successfully connected`); }); socket.emit(`list-services`, (services: any) => { console.log(`Listing available services`, services); }); }); socket.on(`disconnect`, (msg: any) => { setSocketConnect(false); setSocketStatus(`Watchtower is disconnected.`) console.log(`Disconnected.`) }); socket.on(`error`, (message: any) => { setSocketConnect(false); console.error(message); setSocketStatus(message) }) }, [socket]); return ( <> <div> { socketStatus && ( <> <h6>Watchtower status</h6> <div>{socketStatus}</div> </> ) } { socketConnect && ( <> <button onClick={subscribeToTopic}> </> ) } </div> </> );}export default StreamingInit;
const io = require('socket.io-client');// Make sure you match the url for testnet/mainnet const demoTopicId = `<YOUR_TOPIC_ID>`; // https://explorer.arkhia.io/#/testnet/topicsconst watchtowerURL = `wss://<WATCHTOWER_URL>?api_key=<ARKHIA_API_KEY>`;const socket = io(watchtowerURL);// Available services in page 2 of these docsconst subscribeTopicRoute = `/com.hedera.mirror.api.proto.ConsensusService/subscribeTopic`;const getSubscriptionPayload = (topic_id, limit) => { return { subscribe: subscribeTopicRoute, body: { consensusStartTime: { // start date nanos: 0, seconds: `0` }, limit: limit, // limit of topics streamed down topicID: { realmNum: `0`, // realm and shard are still zero by default for now shardNum: `0`, topicNum: topic_id // Here we add the topic Id we want to subscribe } } };};const subscribeToTopic = () => { const requestPayload = getSubscriptionPayload(demoTopicId, `100`); // get latest 100 socket.emit(`subscribe`, requestPayload, (msg) => { socket.on(msg.listeners.data, function (message) { console.log(`Topic messages streaming down...`); console.log(message) }); socket.on(msg.listeners.error, function (message) { console.log(`Error`); console.log(message) }); });}socket.on(`connect`, () => { socket.on(`status`, (msg) => { console.log(`Watchtower is successfully connected`); console.log(`status`, msg); }); subscribeToTopic();});socket.on(`disconnect`, (msg) => { console.log(`Watchtower is disconnected.`) console.log(`Disconnected.`)});socket.on(`error`, (message) => { console.error(message); console.log(message)})
<!DOCTYPE html><html><head> <style> body { margin: 0; padding-bottom: 3rem; font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif; } #messages { list-style-type: none; margin: 0; padding: 0; } #messages>li { padding: 0.5rem 1rem; background: #868484; } #messages>li:nth-child(odd) { background: #efefef; } </style></head><body> <ul id="messages"></ul></body><script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.5.4/socket.io.min.js"></script><script> // Make sure you match the url for testnet/mainnet const demoTopicId = `<YOUR_TOPIC_ID>`; // https://explorer.arkhia.io/#/testnet/topics const watchtowerURL = `wss://<WATCHTOWER_URL>?api_key=<ARKHIA_API_KEY>`; const socket = io(watchtowerURL); // Available services in page 2 of these docs const subscribeTopicRoute = `/com.hedera.mirror.api.proto.ConsensusService/subscribeTopic`; socket.on('connect', function () { socket.on('status', (msg) => { console.log('status', msg); addMessage('status', msg); }); socket.emit('list-services', function (services) { console.log('services', services); // addMessage('services', services); }) subscribeToTopic(); }); const subscribeToTopic = () => { const requestPayload = getSubscriptionPayload(demoTopicId, `100`); // get latest 100 socket.emit(`subscribe`, requestPayload, (msg) => { socket.on(msg.listeners.data, function (message) { addMessage(`Message from: ${demoTopicId}`, `Topic messages streaming down...`); addMessage(`Message from: ${demoTopicId}`, message); }); socket.on(msg.listeners.error, function (message) { console.log(`Error`); addMessage(`Error from: ${demoTopicId}`, { error: message }); }); }); } const getSubscriptionPayload = (topic_id, limit) => { return { subscribe: subscribeTopicRoute, body: { consensusStartTime: { // start date nanos: 0, seconds: `0` }, limit: limit, // limit of topics streamed down topicID: { realmNum: `0`, // realm and shard are still zero by default for now shardNum: `0`, topicNum: topic_id // Here we add the topic Id we want to subscribe } } }; }; function addMessage(event, message) { var item = document.createElement('li'); item.innerHTML = `` messages.appendChild(item); window.scrollTo(0, document.body.scrollHeight); }</script></html>