Streamer Functions
Prerequisitesโ
Connecting to the WebSocket for market and portfolio updates is streamlined through two primary Feeder functions. Both functions are designed to simplify the process of subscribing to essential data streams, ensuring users have quick and easy access to the information they need.
You need to have the SDK installed for the specific language you are using. For detailed installation instructions and repository links, refer to the Installing the Upstox SDK guide.
MarketDataStreamerV3โ
The MarketDataStreamerV3 interface is designed for effortless connection to the market WebSocket, enabling users to receive instantaneous updates on various instruments. The following example demonstrates how to quickly set up and start receiving market updates for selected instrument keys:
- Python SDK
- Node.js SDK
- Java SDK
- PHP SDK
import upstox_client
def on_message(message):
print(message)
def main():
configuration = upstox_client.Configuration()
access_token = <ACCESS_TOKEN>
configuration.access_token = access_token
streamer = upstox_client.MarketDataStreamerV3(
upstox_client.ApiClient(configuration), ["NSE_INDEX|Nifty 50", "NSE_INDEX|Nifty Bank"], "full")
streamer.on("message", on_message)
streamer.connect()
if __name__ == "__main__":
main()
let UpstoxClient = require("upstox-js-sdk");
let defaultClient = UpstoxClient.ApiClient.instance;
var OAUTH2 = defaultClient.authentications["OAUTH2"];
OAUTH2.accessToken = <ACCESS_TOKEN>;
const streamer = new UpstoxClient.MarketDataStreamerV3(["MCX_FO|426268", "MCX_FO|427608"], "full");
streamer.connect();
streamer.on("message", (data) => {
const feed = data.toString("utf-8");
console.log(feed);
});
public class MarketFeederTest {
public static void main(String[] args) throws ApiException {
ApiClient defaultClient = Configuration.getDefaultApiClient();
Set<String> instrumentKeys = new HashSet<>();
instrumentKeys.add("NSE_INDEX|Nifty 50");
instrumentKeys.add("NSE_INDEX|Nifty Bank");
OAuth oAuth = (OAuth) defaultClient.getAuthentication("OAUTH2");
oAuth.setAccessToken(<ACESS_TOKEN>);
final MarketDataStreamerV3 marketDataStreamer = new MarketDataStreamerV3(defaultClient, instrumentKeys, Mode.FULL);
marketDataStreamer.setOnMarketUpdateListener(new OnMarketUpdateV3Listener() {
@Override
public void onUpdate(MarketUpdateV3 marketUpdate) {
System.out.println(marketUpdate);
}
});
marketDataStreamer.connect();
}
}
use Upstox\Client\Configuration;
use Upstox\Client\Feeder\MarketDataStreamerV3;
use Revolt\EventLoop;
function on_message($streamer, $data)
{
print($data);
}
$config = Configuration::getDefaultConfiguration()->setAccessToken(<ACCESS_TOKEN>);
$streamer = new MarketDataStreamerV3($config, ["NSE_INDEX|Nifty 50", "NSE_INDEX|Nifty Bank"], "full");
$streamer->on("message", 'on_message');
$streamer->connect();
EventLoop::run();
In this example, you first authenticate using an access token, then instantiate MarketDataStreamerV3 with specific instrument keys and a subscription mode. Upon connecting, the streamer listens for market updates, which are logged to the console as they arrive.
Feel free to adjust the access token placeholder and any other specifics to better fit your actual implementation or usage scenario.
Exploring the MarketDataStreamerV3 Functionalityโ
Modesโ
- ltpc: ltpc provides information solely about the most recent trade, encompassing details such as the last trade price, time of the last trade, quantity traded, and the closing price from the previous day.
- full: The full option offers comprehensive information, including the latest trade prices, D5 depth, 1-minute, 30-minute, and daily candlestick data, along with some additional details.
- option_greeks: Contains only option greeks.
- full_d30: full_d30 includes Full mode data plus 30 market level quotes.
Functionsโ
- constructor MarketDataStreamerV3(apiClient, instrumentKeys, mode): Initializes the streamer with optional instrument keys and mode (
full,ltpc,full_d30, oroption_greeks). - connect(): Establishes the WebSocket connection.
- subscribe(instrumentKeys, mode): Subscribes to updates for given instrument keys in the specified mode. Both parameters are mandatory.
- unsubscribe(instrumentKeys): Stops updates for the specified instrument keys.
- changeMode(instrumentKeys, mode): Switches the mode for already subscribed instrument keys.
- disconnect(): Ends the active WebSocket connection.
- auto_reconnect(enable, interval, retryCount): Customizes auto-reconnect functionality. Parameters include a flag to enable/disable it, the interval(in seconds) between attempts, and the maximum number of retries.
Eventsโ
- open: Emitted upon successful connection establishment.
- close: Indicates the WebSocket connection has been closed.
- message: Delivers market updates.
- error: Signals an error has occurred.
- reconnecting: Announced when a reconnect attempt is initiated.
- autoReconnectStopped: Informs when auto-reconnect efforts have ceased after exhausting the retry count.
The following documentation includes examples to illustrate the usage of these functions and events, providing a practical understanding of how to interact with the MarketDataStreamerV3 effectively.
Subscribing to Market Data on Connection Open with MarketDataStreamerV3โ
- Python SDK
- Node.js SDK
- Java SDK
- PHP SDK
import upstox_client
def main():
configuration = upstox_client.Configuration()
access_token = <ACCESS_TOKEN>
configuration.access_token = access_token
streamer = upstox_client.MarketDataStreamerV3(
upstox_client.ApiClient(configuration))
def on_open():
streamer.subscribe(
["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"], "full")
def on_message(message):
print(message)
streamer.on("open", on_open)
streamer.on("message", on_message)
streamer.connect()
if __name__ == "__main__":
main()
let UpstoxClient = require("upstox-js-sdk");
let defaultClient = UpstoxClient.ApiClient.instance;
var OAUTH2 = defaultClient.authentications["OAUTH2"];
OAUTH2.accessToken = <ACCESS_TOKEN>;
const streamer = new UpstoxClient.MarketDataStreamerV3();
streamer.connect();
// Subscribe to instrument keys upon the 'open' event
streamer.on("open", () => {
streamer.subscribe(["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"], "full");
});
// Handle incoming market data messages
streamer.on("message", (data) => {
const feed = data.toString("utf-8");
console.log(feed);
});
public class MarketFeederTest {
public static void main(String[] args) throws ApiException {
ApiClient defaultClient = Configuration.getDefaultApiClient();
OAuth oAuth = (OAuth) defaultClient.getAuthentication("OAUTH2");
oAuth.setAccessToken(<ACCESS_TOKEN>);
final MarketDataStreamerV3 marketDataStreamer = new MarketDataStreamerV3(defaultClient);
marketDataStreamer.setOnOpenListener(new OnOpenListener() {
@Override
public void onOpen() {
System.out.println("Connection Established");
Set<String> instrumentKeys = new HashSet<>();
instrumentKeys.add("NSE_INDEX|Nifty 50");
instrumentKeys.add("NSE_INDEX|Nifty Bank");
marketDataStreamer.subscribe(instrumentKeys, Mode.FULL);
}
});
marketDataStreamer.setOnMarketUpdateListener(new OnMarketUpdateV3Listener() {
@Override
public void onUpdate(MarketUpdateV3 marketUpdate) {
System.out.println(marketUpdate);
}
});
marketDataStreamer.connect();
}
}
use Upstox\Client\Configuration;
use Upstox\Client\Feeder\MarketDataStreamerV3;
use Revolt\EventLoop;
function on_open($streamer)
{
print("Connection Established");
$streamer->subscribe(["NSE_INDEX|Nifty 50", "NSE_INDEX|Nifty Bank"], "full");
}
function on_message($streamer, $data)
{
print($data);
}
$config = Configuration::getDefaultConfiguration()->setAccessToken(<ACCESS_TOKEN>);
$streamer = new MarketDataStreamerV3($config);
$streamer->on("open", 'on_open');
$streamer->on("message", 'on_message');
$streamer->connect();
EventLoop::run();
Subscribing to Instruments with Delaysโ
- Python SDK
- Node.js SDK
- Java SDK
- PHP SDK
import upstox_client
import time
def main():
configuration = upstox_client.Configuration()
access_token = <ACCESS_TOKEN>
configuration.access_token = access_token
streamer = upstox_client.MarketDataStreamerV3(
upstox_client.ApiClient(configuration))
def on_open():
streamer.subscribe(
["NSE_EQ|INE020B01018"], "full")
# Handle incoming market data messages\
def on_message(message):
print(message)
streamer.on("open", on_open)
streamer.on("message", on_message)
streamer.connect()
time.sleep(5)
streamer.subscribe(
["NSE_EQ|INE467B01029"], "full")
if __name__ == "__main__":
main()
let UpstoxClient = require("upstox-js-sdk");
let defaultClient = UpstoxClient.ApiClient.instance;
var OAUTH2 = defaultClient.authentications["OAUTH2"];
OAUTH2.accessToken = <ACCESS_TOKEN>;
const streamer = new UpstoxClient.MarketDataStreamerV3();
streamer.connect();
// Subscribe to the first set of instrument keys immediately upon connection
streamer.on("open", () => {
streamer.subscribe(["NSE_EQ|INE020B01018"], "full");
// Subscribe to another set of instrument keys after a delay
setTimeout(() => {
streamer.subscribe(["NSE_EQ|INE467B01029"], "full");
}, 5000); // 5-second delay before subscribing to the second set
});
// Handle incoming market data messages
streamer.on("message", (data) => {
const feed = data.toString("utf-8");
console.log(feed);
});
public class MarketFeederTest {
public static void main(String[] args) throws ApiException {
ApiClient defaultClient = Configuration.getDefaultApiClient();
OAuth oAuth = (OAuth) defaultClient.getAuthentication("OAUTH2");
oAuth.setAccessToken(<ACCESS_TOKEN>);
final MarketDataStreamerV3 marketDataStreamer = new MarketDataStreamerV3(defaultClient);
marketDataStreamer.setOnOpenListener(new OnOpenListener() {
@Override
public void onOpen() {
System.out.println("Connection Established");
Set<String> instrumentKeys1 = new HashSet<>();
instrumentKeys1.add("NSE_INDEX|Nifty 50");
marketDataStreamer.subscribe(instrumentKeys1, Mode.FULL);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.schedule(() -> {
Set<String> instrumentKeys2 = new HashSet<>();
instrumentKeys2.add("NSE_INDEX|Nifty Bank");
marketDataStreamer.subscribe(instrumentKeys2, Mode.FULL);
scheduler.shutdown();
}, 5, TimeUnit.SECONDS);
}
});
marketDataStreamer.setOnMarketUpdateListener(new OnMarketUpdateV3Listener() {
@Override
public void onUpdate(MarketUpdateV3 marketUpdate) {
System.out.println(marketUpdate);
}
});
marketDataStreamer.connect();
}
}
use Upstox\Client\Configuration;
use Upstox\Client\Feeder\MarketDataStreamerV3;
use Revolt\EventLoop;
use function Amp\delay;
function on_open($streamer)
{
print("Connection Established");
$streamer->subscribe(["NSE_INDEX|Nifty 50"], "full");
delay(5);
$streamer->subscribe(["NSE_INDEX|Nifty Bank"], "full");
}
function on_message($streamer, $data)
{
print($data);
}
$config = Configuration::getDefaultConfiguration()->setAccessToken(<ACCESS_TOKEN>);
$streamer = new MarketDataStreamerV3($config);
$streamer->on("open", 'on_open');
$streamer->on("message", 'on_message');
$streamer->connect();
EventLoop::run();
Subscribing and Unsubscribing Instrumentsโ
- Python SDK
- Node.js SDK
- Java SDK
- PHP SDK
import upstox_client
import time
def main():
configuration = upstox_client.Configuration()
access_token = <ACCESS_TOKEN>
configuration.access_token = access_token
streamer = upstox_client.MarketDataStreamerV3(
upstox_client.ApiClient(configuration))
def on_open():
print("Connected. Subscribing to instrument keys.")
streamer.subscribe(
["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"], "full")
# Handle incoming market data messages\
def on_message(message):
print(message)
streamer.on("open", on_open)
streamer.on("message", on_message)
streamer.connect()
time.sleep(5)
print("Unsubscribing from instrument keys.")
streamer.unsubscribe(["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"])
if __name__ == "__main__":
main()
let UpstoxClient = require("upstox-js-sdk");
let defaultClient = UpstoxClient.ApiClient.instance;
var OAUTH2 = defaultClient.authentications["OAUTH2"];
OAUTH2.accessToken = <ACCESS_TOKEN>;
const streamer = new UpstoxClient.MarketDataStreamerV3();
streamer.connect();
// Subscribe to instrument keys immediately upon connection
streamer.on("open", () => {
console.log("Connected. Subscribing to instrument keys.");
streamer.subscribe(["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"], "full");
// Unsubscribe after a delay
setTimeout(() => {
console.log("Unsubscribing from instrument keys.");
streamer.unsubscribe(["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"]);
}, 5000); // Adjust delay as needed
});
streamer.on("message", (data) => {
const feed = data.toString("utf-8");
console.log("Market Update:", feed);
});
public class MarketFeederTest {
public static void main(String[] args) throws ApiException {
ApiClient defaultClient = Configuration.getDefaultApiClient();
OAuth oAuth = (OAuth) defaultClient.getAuthentication("OAUTH2");
oAuth.setAccessToken(<ACCESS_TOKEN>);
final MarketDataStreamerV3 marketDataStreamer = new MarketDataStreamerV3(defaultClient);
marketDataStreamer.setOnOpenListener(new OnOpenListener() {
@Override
public void onOpen() {
System.out.println("Connection Established");
Set<String> instrumentKeys = new HashSet<>();
instrumentKeys.add("NSE_INDEX|Nifty 50");
instrumentKeys.add("NSE_INDEX|Nifty Bank");
marketDataStreamer.subscribe(instrumentKeys, Mode.FULL);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.schedule(() -> {
marketDataStreamer.unsubscribe(instrumentKeys);
scheduler.shutdown();
}, 5, TimeUnit.SECONDS);
}
});
marketDataStreamer.setOnMarketUpdateListener(new OnMarketUpdateV3Listener() {
@Override
public void onUpdate(MarketUpdateV3 marketUpdate) {
System.out.println(marketUpdate);
}
});
marketDataStreamer.connect();
}
}
use Upstox\Client\Configuration;
use Upstox\Client\Feeder\MarketDataStreamerV3;
use Revolt\EventLoop;
use function Amp\delay;
function on_open($streamer)
{
print("Connection Established");
$streamer->subscribe(["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"], "full");
delay(5);
$streamer->unsubscribe(["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"]);
}
function on_message($streamer, $data)
{
print($data);
}
$config = Configuration::getDefaultConfiguration()->setAccessToken(<ACCESS_TOKEN>);
$streamer = new MarketDataStreamerV3($config);
$streamer->on("open", 'on_open');
$streamer->on("message", 'on_message');
$streamer->connect();
EventLoop::run();
Subscribe, Change Mode and Unsubscribeโ
- Python SDK
- Node.js SDK
- Java SDK
- PHP SDK
import upstox_client
import time
def main():
configuration = upstox_client.Configuration()
access_token = <ACCESS_TOKEN>
configuration.access_token = access_token
streamer = upstox_client.MarketDataStreamerV3(
upstox_client.ApiClient(configuration))
def on_open():
print("Connected. Subscribing to instrument keys.")
streamer.subscribe(
["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"], "full")
# Handle incoming market data messages\
def on_message(message):
print(message)
streamer.on("open", on_open)
streamer.on("message", on_message)
streamer.connect()
time.sleep(5)
print("Changing subscription mode to ltpc...")
streamer.change_mode(
["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"], "ltpc")
time.sleep(5)
print("Unsubscribing from instrument keys.")
streamer.unsubscribe(["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"])
if __name__ == "__main__":
main()
let UpstoxClient = require("upstox-js-sdk");
let defaultClient = UpstoxClient.ApiClient.instance;
var OAUTH2 = defaultClient.authentications["OAUTH2"];
OAUTH2.accessToken = <ACCESS_TOKEN>;
const streamer = new UpstoxClient.MarketDataStreamerV3();
streamer.connect();
// Initially subscribe to instrument keys in 'full' mode
streamer.on("open", async () => {
console.log("Connected. Subscribing in full mode...");
streamer.subscribe(["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"], "full");
// Change mode to 'ltpc' after a short delay
setTimeout(() => {
console.log("Changing subscription mode to ltpc...");
streamer.changeMode(["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"], "ltpc");
}, 5000); // 5-second delay
// Unsubscribe after another delay
setTimeout(() => {
console.log("Unsubscribing...");
streamer.unsubscribe(["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"]);
}, 10000); // 10 seconds after subscription
});
// Setup event listeners to log messages, errors, and closure
streamer.on("message", (data) => {
const feed = data.toString("utf-8");
console.log("Market Update:", feed);
});
streamer.on("error", (error) => console.error("Error:", error));
streamer.on("close", () => console.log("Connection closed."));
public class MarketFeederTest {
public static void main(String[] args) throws ApiException {
ApiClient defaultClient = Configuration.getDefaultApiClient();
OAuth oAuth = (OAuth) defaultClient.getAuthentication("OAUTH2");
oAuth.setAccessToken(<ACCESS_TOKEN>);
final MarketDataStreamerV3 marketDataStreamer = new MarketDataStreamerV3(defaultClient);
marketDataStreamer.setOnOpenListener(new OnOpenListener() {
@Override
public void onOpen() {
System.out.println("Connection Established");
Set<String> instrumentKeys = new HashSet<>();
instrumentKeys.add("NSE_INDEX|Nifty 50");
instrumentKeys.add("NSE_INDEX|Nifty Bank");
marketDataStreamer.subscribe(instrumentKeys, Mode.FULL);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.schedule(() -> {
marketDataStreamer.changeMode(instrumentKeys, Mode.LTPC);
scheduler.shutdown();
}, 5, TimeUnit.SECONDS);
scheduler.schedule(() -> {
marketDataStreamer.unsubscribe(instrumentKeys);
scheduler.shutdown();
}, 10, TimeUnit.SECONDS);
}
});
marketDataStreamer.setOnMarketUpdateListener(new OnMarketUpdateV3Listener() {
@Override
public void onUpdate(MarketUpdateV3 marketUpdate) {
System.out.println(marketUpdate);
}
});
marketDataStreamer.connect();
}
}
use Upstox\Client\Configuration;
use Upstox\Client\Feeder\MarketDataStreamerV3;
use Revolt\EventLoop;
use function Amp\delay;
function on_open($streamer)
{
print("Connection Established");
$streamer->subscribe(["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"], "full");
delay(5);
$streamer->changeMode(
["NSE_EQ|INE020B01018", "NSE_EQ|INE467B01029"], "ltpc");
}
function on_message($streamer, $data)
{
print($data);
}
$config = Configuration::getDefaultConfiguration()->setAccessToken(<ACCESS_TOKEN>);
$streamer = new MarketDataStreamerV3($config);
$streamer->on("open", 'on_open');
$streamer->on("message", 'on_message');
$streamer->connect();
EventLoop::run();
Disable Auto-Reconnectโ
- Python SDK
- Node.js SDK
- Java SDK
- PHP SDK
import upstox_client
import time
def main():
configuration = upstox_client.Configuration()
access_token = <ACCESS_TOKEN>
configuration.access_token = access_token
streamer = upstox_client.MarketDataStreamerV3(
upstox_client.ApiClient(configuration))
def on_reconnection_halt(message):
print(message)
streamer.on("autoReconnectStopped", on_reconnection_halt)
# Disable auto-reconnect feature
streamer.auto_reconnect(False)
streamer.connect()
if __name__ == "__main__":
main()
let UpstoxClient = require("upstox-js-sdk");
let defaultClient = UpstoxClient.ApiClient.instance;
var OAUTH2 = defaultClient.authentications["OAUTH2"];
OAUTH2.accessToken = <ACCESS_TOKEN>;
const streamer = new UpstoxClient.MarketDataStreamerV3();
streamer.connect();
// Disable auto-reconnect feature
streamer.autoReconnect(false);
streamer.on("autoReconnectStopped", (data) => {
console.log(data);
});
public class MarketFeederTest {
public static void main(String[] args) throws ApiException {
ApiClient defaultClient = Configuration.getDefaultApiClient();
OAuth oAuth = (OAuth) defaultClient.getAuthentication("OAUTH2");
oAuth.setAccessToken(<ACCESS_TOKEN>);
final MarketDataStreamerV3 marketDataStreamer = new MarketDataStreamerV3(defaultClient);
marketDataStreamer.setOnAutoReconnectStoppedListener(new OnAutoReconnectStoppedListener() {
@Override
public void onHault(String message) {
System.out.println(message);
}
});
marketDataStreamer.autoReconnect(false);
marketDataStreamer.connect();
}
}
use Upstox\Client\Configuration;
use Upstox\Client\Feeder\MarketDataStreamerV3;
use Revolt\EventLoop;
function on_reconnectstopped($streamer, $data)
{
print($data);
}
$config = Configuration::getDefaultConfiguration()->setAccessToken(<ACCESS_TOKEN>);
$streamer = new MarketDataStreamerV3($config);
$streamer->on("autoReconnectStopped", 'on_reconnectstopped');
$streamer->autoReconnect(false);
$streamer->connect();
EventLoop::run();
Modify Auto-Reconnect parametersโ
- Python SDK
- Node.js SDK
- Java SDK
- PHP SDK
import upstox_client
def main():
configuration = upstox_client.Configuration()
access_token = <ACCESS_TOKEN>
configuration.access_token = access_token
streamer = upstox_client.MarketDataStreamerV3(
upstox_client.ApiClient(configuration))
# Modify auto-reconnect parameters: enable it, set interval to 10 seconds, and retry count to 3
streamer.auto_reconnect(True, 10, 3)
streamer.connect()
if __name__ == "__main__":
main()
let UpstoxClient = require("upstox-js-sdk");
let defaultClient = UpstoxClient.ApiClient.instance;
var OAUTH2 = defaultClient.authentications["OAUTH2"];
OAUTH2.accessToken = <ACCESS_TOKEN>;
const streamer = new UpstoxClient.MarketDataStreamerV3();
streamer.connect();
// Modify auto-reconnect parameters: enable it, set interval to 10 seconds, and retry count to 3
streamer.autoReconnect(true, 10, 3);
public class MarketFeederTest {
public static void main(String[] args) throws ApiException {
ApiClient defaultClient = Configuration.getDefaultApiClient();
OAuth oAuth = (OAuth) defaultClient.getAuthentication("OAUTH2");
oAuth.setAccessToken(<ACCESS_TOKEN>);
final MarketDataStreamerV3 marketDataStreamer = new MarketDataStreamerV3(defaultClient);
marketDataStreamer.autoReconnect(true, 10, 3);
marketDataStreamer.connect();
}
}
use Upstox\Client\Configuration;
use Upstox\Client\Feeder\MarketDataStreamerV3;
use Revolt\EventLoop;
$config = Configuration::getDefaultConfiguration()->setAccessToken(<ACCESS_TOKEN>);
$streamer = new MarketDataStreamerV3($config);
$streamer->autoReconnect(true, 10, 3);
$streamer->connect();
EventLoop::run();
PortfolioDataStreamerโ
Connecting to the Portfolio WebSocket for real-time order updates is straightforward with the PortfolioDataStreamer function. Below is a concise guide to get you started on receiving updates. For detailed API documentation, refer to the Portfolio Stream Feed API.
- Python SDK
- Node.js SDK
- Java SDK
- PHP SDK
import upstox_client
def on_message(message):
print(message)
def main():
configuration = upstox_client.Configuration()
access_token = <ACCESS_TOKEN>
configuration.access_token = access_token
streamer = upstox_client.PortfolioDataStreamer(
upstox_client.ApiClient(configuration))
streamer.on("message", on_message)
streamer.connect()
if __name__ == "__main__":
main()
let UpstoxClient = require("upstox-js-sdk");
let defaultClient = UpstoxClient.ApiClient.instance;
var OAUTH2 = defaultClient.authentications["OAUTH2"];
OAUTH2.accessToken = "<ACCESS_TOKEN>";
const streamer = new UpstoxClient.PortfolioDataStreamer();
streamer.connect();
streamer.on("message", (data) => {
const feed = data.toString("utf-8");
console.log(feed);
});
public class PortfolioFeederTest {
public static void main(String[] args) throws ApiException {
ApiClient defaultClient = Configuration.getDefaultApiClient();
OAuth oAuth = (OAuth) defaultClient.getAuthentication("OAUTH2");
oAuth.setAccessToken(<ACCESS_TOKEN>);
final PortfolioDataStreamer portfolioDataStreamer = new PortfolioDataStreamer(defaultClient);
portfolioDataStreamer.setOnOrderUpdateListener(new OnOrderUpdateListener() {
@Override
public void onUpdate(OrderUpdate orderUpdate) {
System.out.println(orderUpdate);
}
});
portfolioDataStreamer.connect();
}
}
use Upstox\Client\Configuration;
use Upstox\Client\Feeder\PortfolioDataStreamer;
use Revolt\EventLoop;
function on_message($streamer, $data)
{
print($data);
}
$config = Configuration::getDefaultConfiguration()->setAccessToken(<ACCESS_TOKEN>);
$streamer = new PortfolioDataStreamer($config);
$streamer->on("message", 'on_message');
$streamer->connect();
EventLoop::run();
Position, Holding, and GTT order updates can be enabled by setting the corresponding flag to True in the constructor of the PortfolioDataStreamer class.
- Python SDK
- Node.js SDK
- Java SDK
- PHP SDK
import upstox_client
import data_token
def on_message(message):
print(message)
def on_open():
print("connection opened")
def main():
configuration = upstox_client.Configuration()
configuration.access_token = <ACCESS_TOKEN>
streamer = upstox_client.PortfolioDataStreamer(upstox_client.ApiClient(configuration),
order_update=True,
position_update=True,
holding_update=True,
gtt_update=True)
streamer.on("message", on_message)
streamer.on("open", on_open)
streamer.connect()
if __name__ == "__main__":
main()
let UpstoxClient = require("upstox-js-sdk");
let defaultClient = UpstoxClient.ApiClient.instance;
var OAUTH2 = defaultClient.authentications["OAUTH2"];
OAUTH2.accessToken = "<ACCESS_TOKEN>";
// Enable all update types: orders, positions, holdings, and GTT orders
const streamer = new UpstoxClient.PortfolioDataStreamer(true, true, true, true);
streamer.connect();
streamer.on("message", (data) => {
const feed = data.toString("utf-8");
console.log(feed);
});
import com.upstox.feeder.HoldingUpdate;
import com.upstox.feeder.PositionUpdate;
public class PortfolioFeederTest {
public static void main(String[] args) throws ApiException {
ApiClient defaultClient = Configuration.getDefaultApiClient();
OAuth oAuth = (OAuth) defaultClient.getAuthentication("OAUTH2");
oAuth.setAccessToken( < ACCESS_TOKEN >);
final PortfolioDataStreamer portfolioDataStreamer = new PortfolioDataStreamer(defaultClient, true, true, true, true);
portfolioDataStreamer.setOnOrderUpdateListener(new OnOrderUpdateListener() {
@Override
public void onUpdate(OrderUpdate orderUpdate) {
System.out.println(orderUpdate);
}
});
portfolioDataStreamer.setOnHoldingUpdateListener(new OnHoldingUpdateListener() {
@Override
public void onUpdate(HoldingUpdate holdingUpdate) {
System.out.println(holdingUpdate);
}
});
portfolioDataStreamer.setOnPositionUpdateListener(new OnPositionUpdateListener() {
@Override
public void onUpdate(PositionUpdate positionUpdate) {
System.out.println(positionUpdate);
}
});
portfolioDataStreamer.setOnGttUpdateListener(new OnGttUpdateListener() {
@Override
public void onUpdate(GttUpdate gttUpdate) {
System.out.println(gttUpdate);
}
});
portfolioDataStreamer.connect();
}
}
use Upstox\Client\Configuration;
use Upstox\Client\Feeder\PortfolioDataStreamer;
use Revolt\EventLoop;
function on_message($streamer,$data)
{
print("on_message= " . ($data) . "\n");
}
$config = Upstox\Client\Configuration::getDefaultConfiguration()->setAccessToken(<ACCESS_TOKEN>);
$streamer = new PortfolioDataStreamer(
$config,
orderUpdate: true,
holdingUpdate: true,
positionUpdate: true,
gttUpdate: true
);
$streamer->on("message", 'on_message');
$streamer->connect();
EventLoop::run();
Exploring the PortfolioDataStreamer Functionalityโ
Constructor Parametersโ
- api_client: Your API client instance
- order_update: Set to
Trueto receive real-time order updates (default:True) - position_update: Set to
Trueto receive position updates (default:False) - holding_update: Set to
Trueto receive holding updates (default:False) - gtt_update: Set to
Trueto receive GTT order updates (default:False)
Functionsโ
- constructor PortfolioDataStreamer(): Initializes the streamer.
- connect(): Establishes the WebSocket connection.
- disconnect(): Ends the active WebSocket connection.
- auto_reconnect(enable, interval, retryCount): Customizes auto-reconnect functionality. Parameters include a flag to enable/disable it, the interval(in seconds) between attempts, and the maximum number of retries.
Eventsโ
- open: Emitted upon successful connection establishment.
- close: Indicates the WebSocket connection has been closed.
- message: Delivers market updates.
- error: Signals an error has occurred.
- reconnecting: Announced when a reconnect attempt is initiated.
- autoReconnectStopped: Informs when auto-reconnect efforts have ceased after exhausting the retry count.