kschltz
Posted on September 1, 2021
So this is probably some niche stuff, but from time to time I have to configure kafka clients (consumer, producer, admin) and more often then not I completely forgot what the configuration entries are, or what value should they map to, so I thought to myself, what if I translate kafka configuration classes to malli schemas?
Then I could validate the whole thing and have it tell exactly how stupid I am and what's wrong with the configuration I provided, so this is a first draft
(ns foo
(:require [malli.core :as malli]
[malli.error :as malli.error]
[malli.generator :as mg])
(:import (org.apache.kafka.clients.producer ProducerConfig)
(org.apache.kafka.common.config ConfigDef ConfigDef$ConfigKey)
(org.apache.kafka.common.config.types Password)))
(defn assert-model [model x]
(when-some [errors (malli.error/humanize (malli/explain model x))]
(throw (ex-info "Invalid data" {:errors errors}))))
;; For some of those more odd cases we'd want custom generators
(def types->custom-generators
{"CLASS" {:gen/elements [Object]}
"PASSWORD" {:gen/elements [(Password. "supersecret")]}})
;; We can have predicates for each type defined in config classes
(def types->pred
{"LIST" [:sequential some?]
"STRING" string?
"LONG" integer?
"INT" int?
"CLASS" [:and some? [:fn (fn [x] (class? x))]]
"PASSWORD" [:and some? [:fn (fn [x] (instance? Password x))]]
"DOUBLE" double?
"SHORT" int?
"BOOLEAN" boolean?})
(defn config-def->opts-model [^ConfigDef config-def]
(->> (.configKeys config-def)
(map (fn [[configuration-name ^ConfigDef$ConfigKey cfgk]]
(let [type-name (str (.type cfgk))]
[configuration-name
(merge {:optional true} (get types->custom-generators type-name))
(get types->pred type-name)])))
(into [:map {:closed true} ])))
(def producer-opts-model
(config-def->opts-model (ProducerConfig/configDef)))
We can now validate entries
;; throws clojure.lang.ExceptionInfo: Invalid data {:errors {:name ["disallowed key"]}}
(assert-model producer-opts-model {:name "Kaue"})
Or even generate examples of whats accepted, with the caveat that the values generated will be only as good as the generators you provided
;; Give me an example of whats a valid producer config map
(mg/generate producer-opts-model {:size 1})
=>
{"send.buffer.bytes" 0,
"metrics.sample.window.ms" 0,
"sasl.kerberos.ticket.renew.window.factor" 3.0,
"client.dns.lookup" "5",
"ssl.endpoint.identification.algorithm" "o",
"transactional.id" "",
"ssl.provider" "",
"bootstrap.servers" [],
"security.providers" "R",
"ssl.protocol" "N",
"ssl.keystore.password" #object[org.apache.kafka.common.config.types.Password 0xc9dcd6 "[hidden]"],
"sasl.login.class" java.lang.Object,
"sasl.login.refresh.window.jitter" -0.5,
"connections.max.idle.ms" 0,
"metrics.num.samples" -1,
"ssl.truststore.certificates" #object[org.apache.kafka.common.config.types.Password 0xc9dcd6 "[hidden]"],
"ssl.cipher.suites" [],
"enable.idempotence" true,
"metadata.max.age.ms" 0,
"max.block.ms" -1,
"ssl.keystore.type" "o",
"retries" 0,
"socket.connection.setup.timeout.ms" -1,
"delivery.timeout.ms" -1,
"buffer.memory" 0,
"max.in.flight.requests.per.connection" 0,
"ssl.secure.random.implementation" "j",
"ssl.truststore.type" "",
"transaction.timeout.ms" 0,
"sasl.kerberos.min.time.before.relogin" 0,
"sasl.kerberos.ticket.renew.jitter" 2.0,
"compression.type" "F"}
Posted on September 1, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.