-
Notifications
You must be signed in to change notification settings - Fork 0
/
producers.py
160 lines (126 loc) · 4.76 KB
/
producers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import boto
from boto.regioninfo import RegionInfo
from boto.kinesis.exceptions import ResourceNotFoundException
import os
from configparser import ConfigParser
from kafka import KafkaProducer
import json
class StreamProducer:
"""
a client stream producer class supporting the following stream
types:
['kinesis', 'kafka']
default stream_type is 'kinesis'
"""
def __init__(self, stream_name, stream_platform='kinesis', hosts=None, conn=None):
"""
:param stream_name:
:param part_key:
:param stream_platform:
:param hosts: for kafka
:param conn: for kinesis
"""
ConnectParameterValidation.validate(stream_platform, conn, hosts)
self.type = stream_platform
# if conn and hosts:
# raise ValueError('either a conn object OR hosts should be used. Not both!')
# can we make this resolution elsewhere?
# take out to separate method?
if stream_platform == 'kinesis':
conn_args = conn, stream_name
elif stream_platform == 'kafka':
conn_args = hosts, stream_name
else:
raise ValueError('unknown platform!')
self._producer = self._get_producer()(*conn_args)
def put_records(self, messages, part_key=None):
producer = self._producer
if self.type == 'kinesis':
assert part_key is not None, "For kinesis app the part_key arg should not be None"
producer.put_records(messages, part_key)
print('DONE!')
def put_record(self, message, part_key=None):
if self.type == 'kinesis':
assert part_key is not None, "For kinesis app the part_key arg should not be None"
producer = self._producer
producer.put_record(message)
def _get_producer(self):
if self.type == 'kinesis':
return KinesisProducer
elif self.type == 'kafka':
return KafkaProducerWrapper
else:
raise ValueError('! unknown stream type: {}'.format(self.type))
class KinesisProducer:
"""
a Kinesis Stream producer class responsible for pushing
messages into an AWS Kinesis Stream
"""
def __init__(self, kinesis_con, stream_name):
self.stream_name = stream_name
self.kinesis_con = kinesis_con
def put_record(self, msg, part_key):
self.kinesis_con.put_record(self.stream_name, msg, part_key)
def put_records(self, msgs, part_key):
for m in msgs:
self.put_record(m, part_key)
class KinesisStreamHealthCheck:
"""
a Kinesis stream health checker to get information on
a given stream's operability
"""
def __init__(self, stream_conn, stream_name):
self._stream_connection = stream_conn
self.stream_name = stream_name
def check_active(self):
return self._check_status() == 'ACTIVE'
def check_deleting(self):
return self._check_status() == 'DELETING'
def _check_status(self):
description_map = self._stream_connection.describe_stream(self.stream_name)
description = description_map.get('StreamDescription')
return description.get('StreamStatus')
class KafkaProducerWrapper:
"""
Kafka stream producer
"""
def __init__(self, hosts, topic_name):
self.producer = KafkaProducer(bootstrap_servers=hosts,
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
self.topic_name = topic_name
def put_record(self, msg, part_key=None):
self.producer.send(self.topic_name, msg, partition=part_key)
def put_records(self, msgs, part_key=None):
for msg in msgs:
self.put_record(msg, part_key=part_key)
self.producer.flush()
class ConnectionConfig:
def __init__(self, platform):
self.platform = platform
def get(self):
if self.platform == 'kinesis':
return 'conn'
elif self.platform == 'kafka':
return 'hosts'
else:
raise ValueError('platform {} not supported!'.format(
self.platform
))
class ConnectParameterValidation:
@staticmethod
def validate(platform, conn, hosts):
_allowed_platforms = ('kinesis', 'kafka')
if platform not in _allowed_platforms:
raise ValueError("unknown platform '{}'! supported: {}".format(
platform,
_allowed_platforms))
if platform == 'kinesis' and not conn:
raise ValueError('conn argument must be specified for a {} app'.format(
platform
))
elif platform == 'kafka' and not hosts:
raise ValueError('conn argument must be specified for a {} app'.format(
platform
))
else:
return