Python Avro - Kafka Producer -
problem in short:
python avro kafka producer. can implement complex type avro format produce kfk message (just header avro type). but, when use header part of "rawmessage" avro, don't see way create message , publish kfk topic. help?
avro:
rawmessage - parent
{"namespace": "com.example", "type": "record", "name": "rawmessage", "fields": [ { "name": "header", "type": "header" }, { "name": "payload", "type": "string" } ] } header -- child
{"namespace": "com.example", "type": "record", "name": "header", "fields": [ { "name": "channeltype", "type": ["null", "string"] , "default" : null }, { "name": "flowtype", "type": ["null", "string"] , "default" : null }, { "name": "type", "type": "string" , "default" : "unassigned" }, { "name": "alerttype", "type": ["null", "string"] , "default" : null }, { "name": "id", "type": "string" , "default" : "unassigned" }, { "name": "version", "type": "string" , "default" : "unassigned" }, { "name": "apikey", "type": ["null", "string"] , "default" : null }, { "name": "sourcefile", "type": ["null", "string"] , "default" : null }, { "name": "createdtimestamp", "type": ["null", "string"] , "default" : null }, { "name": "enrichedheaders", "type": ["null", {"type": "map", "values": "string"}] } ] } python code::
from kafka import kafkaconsumer, kafkaproducer import avro.schema import io, random avro.datafile import datafilereader, datafilewriter avro.io import datumreader, datumwriter # send messages synchronously bootstrap_servers_pt_list = 'localhost:9092' producer = kafkaproducer(bootstrap_servers=bootstrap_servers_pt_list) # kafka topic topic = 'test.all.avro' # path rawmessage.avsc avro schema schema_path = "/<path>/header.avsc" schema = avro.schema.parse(open(schema_path).read()) header = { "channeltype": "channeltype", "flowtype": "flowtype", "type": "type", "alerttype": "alerttype", "id": "id", "version": "version", "apikey": "apikey", "sourcefile": "aws_nrch_part_00000.dat", "createdtimestamp": "20170504100126", "enrichedheaders": {"filename": "20170504100126_eml_to_aws_nrch_part_00000.dat", "timestamp": "20170504100126"} } # path rawmessage.avsc avro schema schema_path = "rawmessage.avsc" schema = avro.schema.parse(open(schema_path).read()) # path rawmessage.avsc avro schema schema_path = "/users/repo/rawmessage.avsc" schema_raw = avro.schema.parse(open(schema_path).read()) rawmessage = { "header": header, "payload": "payload" } in xrange(10): writer = avro.io.datumwriter(schema) bytes_writer = io.bytesio() encoder = avro.io.binaryencoder(bytes_writer) writer.write(header, encoder) raw_bytes = bytes_writer.getvalue() producer.send(topic, raw_bytes) question::
how use "header" part of "rawmessage" avro?? how make code work ?? possible in python avro have parent , child avro formats.. (i know can in java)
Comments
Post a Comment