Quote Manager Basic
the whole project code can be found in sj-trading, the whole example jupyter notebook can be found in quote_manager_usage.
this project is created by using uv
, if you are not familiar with how to use uv
to create a project and manage dependencies, it is recommended to learn from the environment setup chapter.
before start writing the quote manager, we will use the Polars package to process the quote data, so we need to add it to the project dependencies, at the same time, this tutorial will have an example of how to use Polars to quickly calculate technical indicators for multiple commodities, so we also need to add the polars_talib package to the project dependencies.
add Polars dependencies
uv add polars polars_talib
if you are not familiar with Polars, you can refer to the Polars official documentation to learn how to use it.
polars_talib is a Polars extension package that provides the complete functionality of the ta-lib library in the polars expression version, allowing us to easily calculate technical indicators using Polars. It is developed by the shioaji author, and detailed usage can be found in polars_ta_extension.
Polars is an efficient DataFrame package that is suitable for processing large amounts of data and can use multiple cores without any additional configuration. In this example, we can see how to use the Shioaji quote manager to obtain quote data, and use Polars for parallel computation, while converting the ticks of the commodity into K lines, and performing parallel multi-commodity technical indicator calculations.
add quote.py
add quote.py
file in src/sj_trading/
, and add the following code
import shioaji as sj
from typing import List
class QuoteManager:
def __init__(self, api: sj.Shioaji):
self.api = api
self.api.quote.set_on_tick_stk_v1_callback(self.on_stk_v1_tick_handler)
self.api.quote.set_on_tick_fop_v1_callback(self.on_fop_v1_tick_handler)
self.ticks_stk_v1: List[sj.TickSTKv1] = []
self.ticks_fop_v1: List[sj.TickFOPv1] = []
def on_stk_v1_tick_handler(self, _exchange: sj.Exchange, tick: sj.TickSTKv1):
self.ticks_stk_v1.append(tick)
def on_fop_v1_tick_handler(self, _exchange: sj.Exchange, tick: sj.TickFOPv1):
self.ticks_fop_v1.append(tick)
this part is relatively simple, let the handle func of receiving the quote data do as little as possible, we define a QuoteManager
class, and register two callback functions in the initialization, respectively on_stk_v1_tick_handler
and on_fop_v1_tick_handler
, these two functions will be called when receiving the quote data, and the quote data will be stored in ticks_stk_v1
and ticks_fop_v1
.
add QuoteManager
subscribe and unsubscribe methods
def __init__(self, api: sj.Shioaji):
# skip
self.subscribed_stk_tick: Set[str] = set()
def subscribe_stk_tick(self, codes: List[str], recover: bool = False):
for code in codes:
contract = self.api.Contracts.Stocks[code]
if contract is not None and code not in self.subscribed_stk_tick:
self.api.quote.subscribe(contract, "tick")
self.subscribed_stk_tick.add(code)
def unsubscribe_stk_tick(self, codes: List[str]):
for code in codes:
contract = self.api.Contracts.Stocks[code]
if contract is not None and code in self.subscribed_stk_tick:
self.api.quote.unsubscribe(contract, "tick")
self.subscribed_stk_tick.remove(code)
def unsubscribe_all_stk_tick(self):
for code in self.subscribed_stk_tick:
contract = self.api.Contracts.Stocks[code]
if contract is not None:
self.api.quote.unsubscribe(contract, "tick")
self.subscribed_stk_tick.clear()
in the above code, we have added the subscribe_stk_tick
method, this method will add the commodity codes in the incoming commodity code list to the subscribed_stk_tick
, and call the subscribe
method of Shioaji to subscribe to the market, subscribed_stk_tick
is a Set
, used to store the commodity codes that have been subscribed to avoid duplicate subscriptions and facilitate subsequent unsubscribing all subscribed commodities.
add QuoteManager
get ticks method
def __init__(self, api: sj.Shioaji):
# skip
self.df_stk: pl.DataFrame = pl.DataFrame(
[],
schema=[
("datetime", pl.Datetime),
("code", pl.Utf8),
("price", pl.Float64),
("volume", pl.Int64),
("tick_type", pl.Int8),
],
)
def get_df_stk(self) -> pl.DataFrame:
poped_ticks, self.ticks_stk_v1 = self.ticks_stk_v1, []
if poped_ticks:
df = pl.DataFrame([tick.to_dict() for tick in poped_ticks]).select(
pl.col("datetime", "code"),
pl.col("close").cast(pl.Float64).alias("price"),
pl.col("volume").cast(pl.Int64),
pl.col("tick_type").cast(pl.Int8),
)
self.df_stk = self.df_stk.vstack(df)
return self.df_stk
in __init__
we define a df_stk
Polars DataFrame, used to store all subscribed stock tick data, get_df_stk
method will convert the ticks_stk_v1
list to a Polars DataFrame, and return it, at this point, we have already got a DataFrame that can be used to calculate technical indicators.
add QuoteManager
get kbar method
def get_df_stk_kbar(
self, unit: str = "1m", exprs: List[pl.Expr] = []
) -> pl.DataFrame:
df = self.get_df_stk()
df = df.group_by(
pl.col("datetime").dt.truncate(unit),
pl.col("code"),
maintain_order=True,
).agg(
pl.col("price").first().alias("open"),
pl.col("price").max().alias("high"),
pl.col("price").min().alias("low"),
pl.col("price").last().alias("close"),
pl.col("volume").sum().alias("volume"),
)
if exprs:
df = df.with_columns(exprs)
return df
in get_df_stk_kbar
method, we will use get_df_stk
method to get the Ticks DataFrame and then group the data by truncated datetime
and code
, and then aggregate the data in each group to get the K line data, finally, we will return the K line DataFrame. Here we remain the exprs
parameter, allowing users to pass in additional expressions for more calculations.
In this part, we use 1m
to represent 1 minute, if you want to get 5 minutes K line, you can change the unit to 5m
, 1 hour K line can be changed to 1h
, if you want more different units, you can refer to the truncate API documentation.
add custom technical indicator calculation method
import polars as pl
import polars_talib as plta
quote_manager.get_df_stk_kbar("5m", [
pl.col("close").ta.ema(5).over("code").fill_nan(None).alias("ema5"),
plta.macd(pl.col("close"), 12, 26, 9).over("code").struct.field("macd").fill_nan(None),
])
in this part, we use polars_ta to calculate technical indicators and add them to the K line data, here we calculate ema
and macd
two indicators, more indicators can refer to polars_ta_extension supported indicators list.
in this polars_ta expression, we use over("code")
to group the data by commodity code for independent calculation of each commodity, so even if all commodities are in the same DataFrame, the calculation results are independent of each other, and this over
partition is automatically parallel computing, so even if there are a large number of commodities, the calculation can be very fast and then using alias
to set the field name of the calculation result as ema5
, in the macd
indicator, the return is a struct with multiple fields, and this part gets the macd
field.
because this part only passes in expressions and is very lightweight, you can pass in any expressions you need according to your needs, and you can also make your own indicators using polars expression, this part just provides an interface for calculation and a simple usage example.
add QuoteManager
backfill missed ticks method
def fetch_ticks(self, contract: BaseContract) -> pl.DataFrame:
code = contract.code
ticks = self.api.ticks(contract)
df = pl.DataFrame(ticks.dict()).select(
pl.from_epoch("ts", time_unit="ns").dt.cast_time_unit("us").alias("datetime"),
pl.lit(code).alias("code"),
pl.col("close").alias("price"),
pl.col("volume").cast(pl.Int64),
pl.col("tick_type").cast(pl.Int8),
)
return df
def subscribe_stk_tick(self, codes: List[str], recover: bool = False):
for code in codes:
# skop
if recover:
df = self.fetch_ticks(contract)
if not df.is_empty():
code_ticks = [t for t in self.ticks_stk_v1 if t.code == code]
if code_ticks:
t_first = code_ticks[0].datetime
df = df.filter(pl.col("datetime") < t_first)
self.df_stk = self.df_stk.vstack(df)
else:
self.df_stk = self.df_stk.vstack(df)
in subscribe_stk_tick
method, we will check if the recover
parameter is true, if it is, we will call fetch_ticks
method to get the historical ticks data, and then use filter
method to filter out the ticks data that have been received, and use vstack
method to add the historical ticks data to the df_stk
DataFrame.
In above we have completed a quote manager that can subscribe to market data, backfill missed ticks, and calculate technical indicators. Next, we will integrate all the code and use it in a jupyter lab environment.
The complete QuoteManager can be found in quote.py.
The complete example jupyter notebook can be found in quote_manager_usage.