Python Avro - Kafka Producer -


problem in short:

python avro kafka producer. can implement complex type avro format produce kfk message (just header avro type). but, when use header part of "rawmessage" avro, don't see way create message , publish kfk topic. help?

avro:

rawmessage - parent

{"namespace": "com.example",   "type": "record",   "name": "rawmessage",   "fields": [     {       "name": "header",       "type": "header"     },     {       "name": "payload",       "type": "string"     }   ] } 

header -- child

{"namespace": "com.example",   "type": "record",   "name": "header",   "fields": [     {       "name": "channeltype",       "type": ["null", "string"] , "default" : null     },     {       "name": "flowtype",       "type": ["null", "string"] , "default" : null     },     {       "name": "type",       "type": "string" , "default" : "unassigned"     },     {       "name": "alerttype",       "type": ["null", "string"] , "default" : null     },     {       "name": "id",       "type": "string" , "default" : "unassigned"     },     {       "name": "version",       "type": "string" , "default" : "unassigned"     },     {       "name": "apikey",       "type": ["null", "string"] , "default" : null     },     {       "name": "sourcefile",       "type": ["null", "string"] , "default" : null     },     {       "name": "createdtimestamp",       "type": ["null", "string"] , "default" : null     },     {       "name": "enrichedheaders",       "type": ["null", {"type": "map", "values": "string"}]     }   ] } 

python code::

from kafka import kafkaconsumer, kafkaproducer import avro.schema import io, random avro.datafile import datafilereader, datafilewriter avro.io import datumreader, datumwriter   # send messages synchronously bootstrap_servers_pt_list = 'localhost:9092' producer = kafkaproducer(bootstrap_servers=bootstrap_servers_pt_list)   # kafka topic topic = 'test.all.avro'    # path rawmessage.avsc avro schema schema_path = "/<path>/header.avsc" schema = avro.schema.parse(open(schema_path).read())  header = {     "channeltype": "channeltype",     "flowtype": "flowtype",     "type": "type",     "alerttype": "alerttype",     "id": "id",     "version": "version",     "apikey": "apikey",     "sourcefile": "aws_nrch_part_00000.dat",     "createdtimestamp": "20170504100126",     "enrichedheaders": {"filename": "20170504100126_eml_to_aws_nrch_part_00000.dat", "timestamp": "20170504100126"} }  # path rawmessage.avsc avro schema schema_path = "rawmessage.avsc" schema = avro.schema.parse(open(schema_path).read())   # path rawmessage.avsc avro schema schema_path = "/users/repo/rawmessage.avsc" schema_raw = avro.schema.parse(open(schema_path).read())   rawmessage = {     "header": header,     "payload": "payload" }   in xrange(10):     writer = avro.io.datumwriter(schema)     bytes_writer = io.bytesio()     encoder = avro.io.binaryencoder(bytes_writer)     writer.write(header, encoder)     raw_bytes = bytes_writer.getvalue()     producer.send(topic, raw_bytes) 

question::

how use "header" part of "rawmessage" avro?? how make code work ?? possible in python avro have parent , child avro formats.. (i know can in java)


Comments

Popular posts from this blog

javascript - Create a stacked percentage column -

Optimising Firebase database by automatically overwriting data -

javascript - Angular UI-Grid customTemplate directive causing rows to load slowly/? -