项目中需要构造带有中文字符非json的测试数据,格式如下:
{‘userid’: 0, ‘ts’: ‘2022-08-03 16:33:38.487973’, ‘user_name’: ‘中国人’}
发过去之后发现消费出来的都是unicode的编码,且指定了utf-8也没用,一开始以为是kafka producer的value_serializer序列化器用的不对,后面发现其实是代码里json.dumps没用好的原因
# -*- coding: utf-8 -*-
import time
from kafka import KafkaConsumer, KafkaProducer
import json
from kafka.errors import KafkaError
import datetime
producer = KafkaProducer(sasl_mechanism='PLAIN',
security_protocol='SASL_PLAINTEXT',
sasl_plain_username='xxxxx',
sasl_plain_password='xxxxxxxx',
bootstrap_servers=['xxxxxxxxxxx'],
#这里的dumps可以指定ensure_ascii=False
value_serializer=lambda m: json.dumps(m,ensure_ascii=False).encode(),
api_version="2.0.0")
try:
# produce asynchronously
for i in range(100):
now_time = str(datetime.datetime.now())
send_json={
"userid": i,
"ts":now_time,
"user_name":"中国人"
}
print(send_json)
future = producer.send('xxxxxxxxxxx', send_json)
try:
record_metadata = future.get(timeout=2)
except KafkaError:
# Decide what to do if produce request failed...
print("send error!")
pass
time.sleep(1)
print(record_metadata.partition)
print(record_metadata.offset)
finally:
producer.close()
这样就可以把原来的{“userid”: 1, “ts”: “2022-08-03 16:12:26.595478”, “user_name”: “/u4e2d/u56fd/u4eba”}改成{“userid”: 1, “ts”: “2022-08-03 16:33:39.576068”, “user_name”: “中国人”}
另外1个新手容易犯的错误
1、pyhton中通过str将json强行转换成str类型时,key和value的引号是单引号的,这样发送到kafka,对下游不是很友好,比如下游用java或者flinksql消费的时候可能会出问题,建议用标准序列化json.dumps来转
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/278730.html