Kafka Listeners

En esta entrada describimos los listeners de kafka, un software que ha adquirido importancia dentro del mundo del software médico y sanitario.

Necesitamos en KAFKA establecer advertised.listeners (o KAFKA_ADVERTISED_LISTENERS  utilizando imágenes Docker) en la dirección externa (host / IP) para que los clientes puedan conectarse correctamente. De lo contrario, intentarán conectarse a la dirección del host interno, y si no se puede acceder, entonces surgen problemas. Hay que diferenciar primero entre LISTENERS y ADVERTISED_LISTENERS: 

  • LISTENERS son a qué interfaces se une Kafka
  • ADVERTISED_LISTENERS es cómo los clientes pueden conectarse.

images / docker01.png

En esta publicación, hablaré sobre por qué esto es necesario y luego mostraré cómo hacerlo, en base a un par de escenarios: Docker y AWS.

¿Alguien está escuchando?

Kafka es un sistema distribuido. Los datos se leen y se escriben en el líder para una partición determinada, que podría estar en cualquiera de los brokers de un cluster. Cuando se inicia un cliente (productor / consumidor), solicitará metadatos sobre qué broker es el líder de una partición, y puede hacerlo desde cualquier broker. Los metadatos devueltos incluirán los endpoints disponibles para el broker líder de esa partición, y el cliente luego usará esos endpoints para conectarse al broker para leer / escribir datos según sea necesario.

Son estos endpoints los que causan problemas. En una sola máquina, ejecutando ‘bare metal’ (sin VM, sin Docker), todo podría ser el nombre de host (o solo localhost ) y es fácil. Pero una vez que pasa a configuraciones de red más complejas y múltiples nodos, es algo más complejo.

Supongamos que tienes más de una red. Esto podría ser cosas como:

  • Redes internas de Docker más máquina host
  • Brokers en la nube (p. Ej., AWS EC2) y máquinas “on premise” (o incluso en otra nube)

Debes decirle a Kafka cómo los brokers pueden comunicarse entre sí, pero también asegurarse de que los clientes externos (productores / consumidores) puedan comunicarse con el broker que necesitan.

La clave es que cuando ejecutas un cliente, el broker que le pasas como parámetro es justo a dónde irá y obtendrá los metadatos sobre los brokers en el clúster . El host y la IP reales a los que se conectará para leer / escribir datos se basan en los datos que el broker devuelve en esa conexión inicial, incluso si es solo un nodo y el broker devuelto es el mismo al que está conectado.

Para configurar esto correctamente, debe comprender que los brokes de Kafka pueden tener múltiples listners . Un listener es una combinación de

  1. Host / IP
  2. Puerto
  3. Protocolo

Veamos algunas configuraciones. A menudo, el protocolo también se usa para el nombre del listener, pero aquí hagámoslo bien y claro usando nombres abstractos para los oyentes:

KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092 
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092 
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT 
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB

Estoy usando los nombres de configuración de Docker; sería una configuración análoga usando server.properties directamente. Estos elementos son:

  • KAFKA_LISTENERS (listeners) es una lista de listeners separados por comas, y el host / ip y el puerto al que se une Kafka para escuchar. Para redes más complejas, esta podría ser una dirección IP asociada con una interfaz de red dada en una máquina. El valor predeterminado es 0.0.0.0, lo que significa escuchar en todas las interfaces.
  • KAFKA_ADVERTISED_LISTENERS (advertised.listeners) es una lista de listeners separados por comas con su host / ip y puerto. Estos son los metadatos que se devuelven a los clientes.
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP (listener.security.protocol.map) define pares clave / valor para el protocolo de seguridad a utilizar, por nombre de listener.
  • KAFKA_INTER_BROKER_LISTENER_NAME ( inter.broker.listener.name ) Los brokers de Kafka se comunican entre sí , generalmente en la red interna (por ejemplo, la red Docker, AWS VPC, etc.). Para definir qué listener utilizar para esta comunicación, especifiamos esta variable. El host / IP utilizado debe ser accesible desde la máquina del broker a otros.

Es posible que los clientes de Kafka no sean locales en la red del broker, y aquí es donde entran los listeners adicionales.

Cada listener, cuando esté conectado, informará la dirección a la que se puede acceder. La dirección en la que llega a un broker depende de la red utilizada . Si se está conectando al broker desde una red interna, será un host / IP diferente al que se conecta externamente.

Al conectarse a un broker, el listener que se devolverá al cliente será el listener al que se conectó (según el puerto).

kafkacat es una herramienta útil para explorar esto. Usando -L puede ver los metadatos para el oyente al que se conectó. Basado en la misma configuración de escucha que la anterior ( LISTENER_BOB / LISTENER_FRED ), consulte las entradas respectivas para el broker 0 at : –

Al conectarse en el puerto 9092 (que asignamos como LISTENER_FRED ), la dirección del broker se devuelve como localhost

$ kafkacat -b kafka0:9092 \ -L 

Metadata for all topics (from broker -1: kafka0:9092/bootstrap): 1 brokers: broker 0 at localhost:9092

Conectándose en el puerto 29092 (que asignamos como LISTENER_BOB ), la dirección del broker se devuelve como kafka0 :

$ kafkacat -b kafka0:29092 \ -L 

Metadata for all topics (from broker 0: kafka0:29092/0): 1 brokers: broker 0 at kafka0:29092

También puede usar tcpdump para examinar el tráfico de un cliente que se conecta al broker y detectar el nombre de host que devuelve el broker.

¿Por qué me puedo conectar al broker, pero el cliente aún falla?

Incluso si puede realizar la conexión inicial con el broker, la dirección que se devuelve en los metadatos puede seguir siendo un hostname al que no puede acceder nuestro cliente.

Pensemos esto paso a paso.

  1. Tenemos un broker en AWS. Queremos enviarle un mensaje desde nuestra computadora portátil. Conocemos el nombre de host externo para la instancia EC2 ( ec2-54-191-84-122.us-west-2.compute.amazonaws.com ). Hemos creado la entrada necesaria en el grupo de seguridad para abrir el puerto del agente a nuestro tráfico entrante. Hacemos cosas inteligentes como verificar que nuestra máquina local pueda conectarse al puerto en la máquina:
    $ nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092 
    found 0 associations found 1 connections: 1: flags=82<CONNECTED,PREFERRED> outif utun5 src 172.27.230.23 port 53352 dst 54.191.84.122 port 9092 rank info not available TCP aux info available Connection to ec2-54-191-84-122.us-west-2.compute.amazonaws.com port 9092 [tcp/XmlIpcRegSvc] succeeded!

    ¡Las cosas se ven bien! Ejecutamos:

    echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test

    Ahora … ¿qué pasa después?

  2. Nuestro PC resuelve ec2-54-191-84-122.us-west-2.compute.amazonaws.com éxito (a la dirección IP 54.191.84.122) y se conecta a la máquin en el puerto 9092
  3. El broker recibe la conexión entrante en el puerto 9092. Devuelve los metadatos al cliente, con el nombre de host ip-172-31-18-160.us-west-2.compute.internal porque este es el nombre de host del broker y el valor predeterminado para los listeners .
  4. El cliente intenta enviar datos al broker utilizando los metadatos que se le dieron. Dado que ip-172-31-18-160.us-west-2.compute.internal no se puede resolver desde Internet, falla.
    $ echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test >>[2018-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
  5. Desconcertado, intentamos lo mismo desde la propia máquina de brokers:
    $ echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test >> $ kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning foo

    ¡Funciona bien! Esto se debe a que nos estamos conectando al puerto 9092, que está configurado como escucha interna y, por lo tanto, informa su nombre de host como ip-172-31-18-160.us-west-2.compute.internal que se puede resolver desde la máquina del agente (¡ya que es su propio nombre de host!)

  6. Podemos hacer la vida aún más fácil usando kafkacat . Usando el indicador -L podemos ver los metadatos devueltos por el broker:
    $ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L 
    
    Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap): 1 brokers: broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092

    Claro como día, se devuelve el nombre de host interno . Esto también hace que este error aparentemente confuso tenga mucho más sentido: conectarse a un nombre de host y obtener un error de búsqueda en otro:

    $ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test % 
    
    ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known

    Aquí estamos usando kafkacat en modo consumidor ( -C ) de nuestra máquina local para intentar leer el tema. Como antes, debido a que estamos recuperando el nombre de host del listener interno del intermediario en los metadatos, el cliente no puede resolver ese nombre de host para leer / escribir.

Cómo: Conectarse a Kafka en Docker

images / docker01.png

Ejecute dentro de Docker, deberá configurar dos listeners para Kafka:

  1. Comunicación dentro de la red Docker . Esto podría ser la comunicación entre intermediarios (es decir, entre intermediarios) y entre otros componentes que se ejecutan en Docker, como Kafka Connect, o clientes o productores de terceros.Para estas comunicaciones, necesitamos usar el nombre de host de los contenedores Docker . Cada contenedor de Docker en la misma red de Docker utilizará el nombre de host del contenedor del agente Kafka para alcanzarlo.
  2. Tráfico de red no Docker. Esto podría ser clientes que se ejecutan localmente en la máquina host Docker, por ejemplo. Se supone que se conectarán en localhost , a un puerto expuesto desde el contenedor Docker.Aquí está el fragmento de docker-compose:
kafka0:
image: "confluentinc/cp-enterprise-kafka:5.2.1"
ports:
- '9092:9092'
- '29094:29094'
depends_on:
- zookeeper
environment:
- KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://kafka0:9092,LISTENER_ALICE://kafka0:29094
- KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092,LISTENER_ALICE://never-gonna-give-you-up:29094
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT,LISTENER_ALICE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
  • Los clientes dentro de la red Docker se conectan utilizando el oyente “BOB”, con el puerto 29092 y el nombre de host kafka0 . Al hacerlo, recuperan el nombre de host kafka0 al cual conectarse. Cada contenedor de docker resolverá kafka0 utilizando la red interna de Docker y podrá comunicarse con el broker.
  • Los clientes externos a la red Docker se conectan utilizando el oyente “FRED”, con el puerto 9092 y el nombre de host localhost . El puerto 9092 está expuesto por el contenedor Docker y está disponible para conectarse. Cuando los clientes se conectan, se les da el nombre de host localhost para los metadatos del intermediario, y se conectan a esto cuando leen / escriben datos.
  • La configuración anterior no manejaría el escenario en el que un cliente externo a Docker y externo a la máquina host desea conectarse. Esto se debe a que ni kafka0 (el nombre de host interno de Docker) ni localhost (la dirección de bucle de retorno para la máquina host de Docker) serían resolubles.

Cómo: Conexión a Kafka en Cloud

Aquí se aplican exactamente los mismos conceptos que con Docker. La principal diferencia es que, mientras que con Docker las conexiones externas pueden estar solo en el host local (como arriba), con Kafka alojado en la nube (como en AWS) la conexión externa será de una máquina no local al broker y que necesita poder conectarse al broker.

Otra complicación es que, si bien las redes de Docker están muy segregadas de las del host, en IaaS a menudo el nombre de host externo se puede resolver internamente , lo que lo hace impredecible cuando realmente puede encontrar estos problemas.

Existen dos enfoques, dependiendo de si la dirección externa a través de la cual se va a conectar al intermediario también se puede resolver localmente para todos los intermediarios en la red (por ejemplo, VPC).

Opción 1: la dirección externa se puede resolver localmente

images / aws01.png

El listner existente, llamado PLAINTEXT , solo necesita sobrescribir para establecer el nombre de host anunciado (es decir, el que se pasa a los clientes entrantes)

advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092

Ahora las conexiones tanto internas como externas utilizarán ec2-54-191-84-122.us-west-2.compute.amazonaws.com para conectarse. Debido a que ec2-54-191-84-122.us-west-2.compute.amazonaws.com puede resolverse tanto local como externamente, las cosas funcionan bien.

Opción 2: la dirección externa NO se puede resolver localmente

Deberá configurar dos oyentes para Kafka:

  1. Comunicación dentro de la red AWS (VPC) . Esto podría ser la comunicación entre intermediarios (es decir, entre intermediarios) y entre otros componentes que se ejecutan en la VPC, como Kafka Connect o clientes o productores de terceros. Para estas comunicaciones, necesitamos usar la IP interna de la máquina EC2 (o el nombre de host, si DNS está configurado).
  2. Tráfico externo de AWS. Esto podría estar probando la conectividad desde una computadora portátil, o simplemente desde máquinas no alojadas en Amazon. En ambos casos, se debe usar la IP externa de la instancia (o el nombre de host, si DNS está configurado).

images / aws02.png

Aquí hay un ejemplo de configuración:

listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092 

listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT 

advertised.listeners=INTERNAL://ip-172-31-18-160.us-west-2.compute.internal:19092,EXTERNAL://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 

inter.broker.listener.name=INTERNAL