Hi,
I am starting with QuestDB. When I saw that the technology can ingest millions of rows per second, I thought: finally, I found what I need. I am trying a very simple example shown below. The system works for a few seconds, then it breaks with the connection failed error.
Can someone please help me understand what I am doing wrong? There’s a lot of messages being streamed, but it’s not even close to a million/second.
import os
import sys
from dataclasses import asdict
from typing import List
import pandas as pd
from polygon import WebSocketClient
from polygon.websocket.models import WebSocketMessage, EquityTrade
from questdb.ingress import Sender
from dotenv import load_dotenv
load_dotenv('../.env')
QDB_CLIENT_CONF=(
f"{os.environ.get('QUESTDB_PROTOCOL')}::"
f"addr={os.environ.get('QUESTDB_ADDR')}:{os.environ.get('QUESTDB_PORT')};"
f"username={os.environ.get('QUESTDB_USERNAME')};password={os.environ.get('QUESTDB_PASSWORD')};"
)
def handle_message(messages: List[WebSocketMessage]):
with Sender.from_conf(QDB_CLIENT_CONF) as sender:
for message in messages:
data = asdict(message)
if isinstance(message, EquityTrade):
symbols = {'event_type': 'T', 'symbol': data.pop('symbol')}
# Convert timestamp fields to datetime objects
for date_col in ['timestamp', 'trf_timestamp']:
if data[date_col] is not None:
data[date_col] = pd.to_datetime(data[date_col], unit='ms')
# Convert conditions list to string if it exists
if data['conditions'] is not None:
data['conditions'] = ','.join(map(str, data['conditions']))
# Send data to QuestDB
sender.row('equity_trade', symbols=symbols, columns=data, at=data['timestamp'])
print(
f"{message.symbol.rjust(6)}\t"
f"{pd.Timestamp(message.timestamp, unit='ms')}\t"
f"{message.price:>12,.2f}\t"
f"{message.size:>8,d}"
)
sender.flush()
if __name__ == '__main__':
try:
client = WebSocketClient(
api_key=os.getenv('POLYGON_API_KEY'),
subscriptions=['T.*']
)
client.run(handle_message)
except KeyboardInterrupt:
print("\nShutting down...")
sys.exit(0)
Does it work if you use 127.0.01 instead? Normally, it wouldn’t work for a bit and then you get error 49, it just wouldn’t work at all.
Also, you won’t achieve millions per second with just a single sender, you will need to use multiple concurrent senders. We run benchmarks using TSBS with a certain number of workers. For example, in the Influx benchmark, we use 12 workers to achieve that speed.
Usually, a sender-per-core is a good start, if you are 100% maxing out each sender.
A single sender on its own can usually handle at least 100k rows/second.
Thanks @nwoolmer ! I figured it out: the problem was the with Sender.from_conf(QDB_CLIENT_CONF) as sender line.
This opens thousands of short lived connections (we can see the with netstat -anp tcp | grep 9000: many many many lines ending with TIME_WAIT), and ultimately exhausts the system resources.
The solution is to ditch Python’s context manager completely, and initialize a single sender with:
Even better, now that I understand your amazing solution a bit better, I can initiatize a few senders, keep them open throughout the duration of the program, and call sender.close() at the end.
If I may, a suggestion: the documentation could be better by mentioning that, if you call the context manager thousands of times per second, you quickly exhausts the system resources because each context manager opens a connection. The solution is to use a single (or a few) senders throughout the duration of the program. Imho, this should be high priority in the docs (I lost a few hours trying to solve it…)
Again, congrats for your product: it’s an amazing tech, really useful!
Yes, a good place to start is around one sender per table. Each sender can usually do 100k+ rows, and using one per table helps to make sure all sends are transactional (transactions are per-table).
Usually, we see this problem when somebody uses with inside the for loop, but you skipped that mistake!
But we haven’t decided on the best way to go about this. So as you test, if you can think of a good way to go, let us know and we can see if we can solve this for you and other users up-front.
I would like to add if you are using the HTTP transport for QuestDB, it is best to rely on autoflush and call flush() manually only right before closing the connection. That way data will be sent in batches (it defaults to 75K messages or 1000ms, whatever comes first, but can be changed), and it will be more efficient.
If you call flush() after every message, it is unlikely you will see millions of writes per second.