how to get them quickly and easily using Java/Kotlin and Spring Boot + TinkoffApi

how to get them quickly and easily using Java/Kotlin and Spring Boot + TinkoffApi

If you want to write a project related to market data or stock trading, and you are familiar with Java or Kotlin and have heard of Spring Boot, then this article is for you.

I am the author of a Spring Boot starter that allows you to easily integrate TinkoffInvestApi into your Spring Boot applications. The starter focuses on streaming various market data – you only need to write the logic for their processing.

In this article, I just want to introduce you to my starter, show ideas and some of the possibilities. Without implementation details, they will be in a series of subsequent articles.

To requisition

  • You need to be a Tinkoff client because we will be using the Tinkoff Investments token

  • jdk17+

  • SpringBoot 3.0+

For those who are not familiar with TinkoffInvestApi – you can find out on their GitHub page.
In short: Tinkoff Invest API is a REST/gRPC interface for interacting with the Tinkoff Invest trading platform.

What tasks can be solved:

  • Analysis of paper quotations

  • Signals for buying or selling (For example, notifications about entering a position in Telegram)

  • Stock movement forecasts

  • Portfolio analysis

  • Automation of trading and creation of trading robots

  • Maintenance of own statistics system

  • Testing story strategies

What problem do I see directly in using the Tinkoff Invest API?

  1. The complexity of writing code + a lot of boilerplate

  2. As a result, the entry threshold is high

  3. The same solutions to the same type of problems will appear, and these solutions are better made and shared with the community

Now let’s try to solve such a problem

I would like to send a notification to Telegram when the dollar on the stock exchange approaches 100 rubles per piece. If I use the Tinkoff Invest API, I will have to write a bunch of big and scary things that are not related to my task, in addition to writing the very logic of sending the notification. Such an example is given by the developers of obtaining market data

An example in java

Took it from here

private static void marketdataStreamExample(InvestApi api) {
  var randomFigi = randomFigi(api, 5);

  //Описываем, что делать с приходящими в стриме данными
  StreamProcessor<MarketDataResponse> processor = response -> {
    if (response.hasTradingStatus()) {
      log.info("Новые данные по статусам: {}", response);
    } else if (response.hasPing()) {
      log.info("пинг сообщение");
    } else if (response.hasCandle()) {
      log.info("Новые данные по свечам: {}", response);
    } else if (response.hasOrderbook()) {
      log.info("Новые данные по стакану: {}", response);
    } else if (response.hasTrade()) {
      log.info("Новые данные по сделкам: {}", response);
    } else if (response.hasSubscribeCandlesResponse()) {
      var subscribeResult = response.getSubscribeCandlesResponse().getCandlesSubscriptionsList().stream()
        .collect(Collectors.groupingBy(el -> el.getSubscriptionStatus().equals(SubscriptionStatus.SUBSCRIPTION_STATUS_SUCCESS), Collectors.counting()));
      logSubscribeStatus("свечи", subscribeResult.getOrDefault(true, 0L), subscribeResult.getOrDefault(false, 0L));
    } else if (response.hasSubscribeInfoResponse()) {
      var subscribeResult = response.getSubscribeInfoResponse().getInfoSubscriptionsList().stream()
        .collect(Collectors.groupingBy(el -> el.getSubscriptionStatus().equals(SubscriptionStatus.SUBSCRIPTION_STATUS_SUCCESS), Collectors.counting()));
      logSubscribeStatus("статусы", subscribeResult.getOrDefault(true, 0L), subscribeResult.getOrDefault(false, 0L));
    } else if (response.hasSubscribeOrderBookResponse()) {
      var subscribeResult = response.getSubscribeOrderBookResponse().getOrderBookSubscriptionsList().stream()
        .collect(Collectors.groupingBy(el -> el.getSubscriptionStatus().equals(SubscriptionStatus.SUBSCRIPTION_STATUS_SUCCESS), Collectors.counting()));
      logSubscribeStatus("стакан", subscribeResult.getOrDefault(true, 0L), subscribeResult.getOrDefault(false, 0L));
    } else if (response.hasSubscribeTradesResponse()) {
      var subscribeResult = response.getSubscribeTradesResponse().getTradeSubscriptionsList().stream()
        .collect(Collectors.groupingBy(el -> el.getSubscriptionStatus().equals(SubscriptionStatus.SUBSCRIPTION_STATUS_SUCCESS), Collectors.counting()));
      logSubscribeStatus("сделки", subscribeResult.getOrDefault(true, 0L), subscribeResult.getOrDefault(false, 0L));
    } else if (response.hasSubscribeLastPriceResponse()) {
      var subscribeResult = response.getSubscribeLastPriceResponse().getLastPriceSubscriptionsList().stream()
        .collect(Collectors.groupingBy(el -> el.getSubscriptionStatus().equals(SubscriptionStatus.SUBSCRIPTION_STATUS_SUCCESS), Collectors.counting()));
      logSubscribeStatus("последние цены", subscribeResult.getOrDefault(true, 0L), subscribeResult.getOrDefault(false, 0L));
    }
  };
  Consumer<Throwable> onErrorCallback = error -> log.error(error.toString());

  //Подписка на список инструментов. Не блокирующий вызов
  //При необходимости обработки ошибок (реконнект по вине сервера или клиента), рекомендуется сделать onErrorCallback
  api.getMarketDataStreamService().newStream("trades_stream", processor, onErrorCallback).subscribeTrades(randomFigi);
  api.getMarketDataStreamService().newStream("candles_stream", processor, onErrorCallback).subscribeCandles(randomFigi);
  api.getMarketDataStreamService().newStream("info_stream", processor, onErrorCallback).subscribeInfo(randomFigi);
  api.getMarketDataStreamService().newStream("orderbook_stream", processor, onErrorCallback).subscribeOrderbook(randomFigi);
  api.getMarketDataStreamService().newStream("last_prices_stream", processor, onErrorCallback).subscribeLastPrices(randomFigi);


  //Для стримов стаканов и свечей есть перегруженные методы с дефолтными значениями
  //глубина стакана = 10, интервал свечи = 1 минута
  api.getMarketDataStreamService().getStreamById("trades_stream").subscribeOrderbook(randomFigi);
  api.getMarketDataStreamService().getStreamById("candles_stream").subscribeCandles(randomFigi);
  api.getMarketDataStreamService().getStreamById("candles_stream").cancel();
  //отписываемся от стримов с задержкой
  CompletableFuture.runAsync(()->{

    //Отписка на список инструментов. Не блокирующий вызов
    api.getMarketDataStreamService().getStreamById("trades_stream").unsubscribeTrades(randomFigi);
    api.getMarketDataStreamService().getStreamById("candles_stream").unsubscribeCandles(randomFigi);
    api.getMarketDataStreamService().getStreamById("info_stream").unsubscribeInfo(randomFigi);
    api.getMarketDataStreamService().getStreamById("orderbook_stream").unsubscribeOrderbook(randomFigi);
    api.getMarketDataStreamService().getStreamById("last_prices_stream").unsubscribeLastPrices(randomFigi);

    //закрытие стрима
    api.getMarketDataStreamService().getStreamById("candles_stream").cancel();

  }, delayedExecutor)
    .thenRun(()->log.info("market data unsubscribe done"));


  //Каждый marketdata стрим может отдавать информацию максимум по 300 инструментам
  //Если нужно подписаться на большее количество, есть 2 варианта:
  // - открыть новый стрим
  api.getMarketDataStreamService().newStream("new_stream", processor, onErrorCallback).subscribeCandles(randomFigi);
  // - отписаться от инструментов в существующем стриме, освободив место под новые
  api.getMarketDataStreamService().getStreamById("new_stream").unsubscribeCandles(randomFigi);

  //При вызове newStream с id уже подписаного приведет к пересозданию стрима с версии 1.4
  api.getMarketDataStreamService().newStream("candles_stream", processor, onErrorCallback)
    .subscribeCandles(randomFigi);
}

It looks incomprehensible – some streams, processors and a bunch of lambdas. How does a newbie who just wants to play around with market data make sense of it all? I suggest using my starter.

This is what receiving a dollar price change event would look like:

@HandleLastPrice(ticker = "USDRUB")
class DollarLastPriceHandler implements BlockingLastPriceHandler {

    @Override
    public void handleBlocking(LastPrice lastPrice) {
       //отправляем notification в telegram когда цена == 100
    }
}

All the internal implementation is hidden, just write what you will do every time the price changes, handleBlocking will be called whenever the dollar price changes on the exchange. By the way, if you use jdk21+ BlockingLastPriceHandler will be executed on a virtual thread. If jdk below version 21, I recommend using the method with CompletableFuture

@HandleLastPrice(ticker = "USDRUB")
class DollarLastPriceAsyncHandler implements AsyncLastPriceHandler {

    @Override
    public CompletableFuture<Void> handleAsync(LastPrice lastPrice) {
        return CompletableFuture.runAsync(() ->  //отправляем notification в telegram когда цена == 100);
    }
}

And if you want to write the same thing, for example, for an Oschadbank promotion, do you really need to create another class and copy-paste the processing?
No, general logic, for example, logging can be carried out like this

@HandleAllLastPrices(tickers = {"USDRUB", "SBER"})
class CommonLastPriceHandler implements AsyncLastPriceHandler {

    @Override
    public CompletableFuture<Void> handleAsync(LastPrice lastPrice) {
        return CompletableFuture.runAsync(() -> System.out.println("CommonLastPriceHandler: " + lastPrice));
    }
}

Or in the style of configuration


    @Bean
    public BlockingLastPriceStreamProcessorAdapter coroutineLastPriceStreamProcessorAdapter() {
        return LastPriceStreamProcessorAdapterFactory
//            .runAfterEachLastPriceHandler(true) опционально
//            .runBeforeEachLastPriceHandler(true) опционально
                .withTickers(List.of("USDRUB", "SBER"))
                .createBlockingHandler(lastPrice -> System.out.println("LastPriceStreamProcessorAdapterFactory" + lastPrice)); // для jdk 21+ BlockingHandler будет исполнен в виртуальном потоке
    }

All handlers will be created as spring components, so you can write a conditional TelegramService separately and inject it into any handler

@HandleLastPrice(ticker = "USDRUB")
class DollarLastPriceHandler implements BlockingLastPriceHandler {
    private TelegramService telegramService;

    public void DollarLastPriceHandler(TelegramService telegramService) {
       this.telegramService = telegramService;
    }

    @Override
    public void handleBlocking(LastPrice lastPrice) {
       //отправляем notification в telegram когда цена == 100
       //telegramService.sendNotification()
    }
}

Process deals, glasses, latest prices, portfolio updates, and more. can be done similarly. The difference will be in the name of annotations and interfaces. More details can be found in the README. There are also two demo projects:

On kotlin + gradle.kts
On java + maven

To make it all work:

  1. I would recommend reading about the capabilities, concepts and definitions in the Tinkoff Invest API official documentation. If you are already familiar with it, you can skip this point

  2. We create a spring boot project in any convenient way and connect the dependencies:

For build.gradle.kts
implementation("io.github.dankosik:invest-api-java-sdk-starter:1.6.0-RC1")

And you will need one of:

implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-webflux")
For build.gradle
implementation 'io.github.dankosik:invest-api-java-sdk-starter:1.6.0-RC1'

And you will need one of:

implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
For Maven
<dependency>
    <groupId>io.github.dankosik</groupId>
    <artifactId>invest-api-java-sdk-starter</artifactId>
    <version>1.6.0-RC1</version>
    <classifier>plain</classifier>
</dependency>

And you will need one of:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
  1. We install it in application.yml replacing the token with a real one (how to get a token)

tinkoff:
  starter:
    apiToken:
      fullAccess:
        "ваш токен"
  1. We take any example from the demo projects or, as an option, we print to the console all the transactions executed on the Oschadbank stock exchange

@HandleTrade(ticker = "SBER")
class BlockingSberTradeHandler implements BlockingTradeHandler {

    @Override
    public void handleBlocking(Trade trade) {
        System.out.println(trade);
    }
}

Further plans:

  • Writing a doc/wiki by starter.

  • Support for Tinkoff Invest API versions 1.7 and 1.8 (since the launcher is currently running on version 1.6)

  • After receiving feedback, I will create new features. Therefore, be sure to write if you have an interesting project, suggest improvements.

Useful links:

Related posts