Accessing kafka container in Buildkite pipeline

32 views Asked by At

I have the following setup (simplified version) using Test Containers for my integration tests:

public class IntegrationTestFactory : WebApplicationFactory<Program>, IAsyncLifetime
{
    private static readonly string NetworkName = Guid.NewGuid().ToString();

    private readonly INetwork _network = new NetworkBuilder()
        .WithName(NetworkName)
        .Build();

    private readonly PostgreSqlContainer _postgreSqlContainer = new PostgreSqlBuilder()
        .WithImage("postgres:14.7")
        .WithName("psql")
        .WithHostname("psql")
        .WithCleanUp(true)
        .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(5432))
        .WithPortBinding(5432, 5432)
        .WithEnvironment(
            new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
            {
                { "POSTGRES_DB", “test-db” },
                { "POSTGRES_USER", "test-db-user“ },
                { "POSTGRES_PASSWORD", "test-db-password“ }
            }))
        .WithNetwork(NetworkName)
        .Build();

    private readonly IContainer _zookeeperContainer = new ContainerBuilder()
        .WithImage("confluentinc/cp-zookeeper:5.5.0")
        .WithHostname("zookeeper")
        .WithName("zookeeper")
        .WithCleanUp(true)
        .WithEnvironment(
            new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
            {
                { "ZOOKEEPER_CLIENT_PORT", "2181" },
                { "ZOOKEEPER_TICK_TIME", "2000" }
            }))
        .WithPortBinding(2181, 2181)
        .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(2181))
        .WithNetwork(NetworkName)
        .Build();

    private readonly IContainer _kafkaContainer = new ContainerBuilder()
        .WithImage("confluentinc/cp-kafka:5.5.0")
        .WithHostname("broker")
        .WithName("broker")
        .WithNetwork(NetworkName)
        .WithEnvironment(
            new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
            {
                { "KAFKA_BROKER_ID", "1" },
                { "KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181" },
                { "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" },
                { "KAFKA_LISTENERS", "PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092" },
                { "KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092" },
                { "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1" },
                { "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0" },
                { "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1" },
                { "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1" },
                { "KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true" }
           }))
        .WithPortBinding(9092, 9092)
        .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(9092))
        .WithCleanUp(true)
        .Build();

    private readonly IContainer _schemaRegistryContainer = new ContainerBuilder()
        .WithImage("confluentinc/cp-schema-registry:5.5.0")
        .WithHostname("schema-registry")
        .WithName("schema-registry")
        .WithCleanUp(true)
        .WithEnvironment(
            new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
            {
                { "SCHEMA_REGISTRY_HOST_NAME", "schema-registry" },
                { "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:29092" },
                { "SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081" }
            }))
        .WithPortBinding(8081, 8081)
        .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(8081))
        .WithNetwork(NetworkName)
        .Build();

    private readonly IContainer _restProxyContainer = new ContainerBuilder()
        .WithImage("confluentinc/cp-kafka-rest:6.2.0")
        .WithHostname("rest-proxy")
        .WithName("rest-proxy")
        .WithCleanUp(true)
        .WithEnvironment(
            new ReadOnlyDictionary<string, string>(new Dictionary<string, string>
            {
                { "KAFKA_REST_HOST_NAME", "rest-proxy" },
                { "KAFKA_REST_BOOTSTRAP_SERVERS", "broker:29092" },
                { "KAFKA_REST_LISTENERS", "http://0.0.0.0:8082" },
                { "KAFKA_REST_SCHEMA_REGISTRY_URL", "http://schema-registry:8081" }
            }))
        .WithPortBinding(8082, 8082)
        .WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(8082))
        .WithNetwork(NetworkName)
        .Build();

    protected override void ConfigureWebHost(IWebHostBuilder builder)
    {
        builder.ConfigureTestServices(services =>
        {
            var isBuildkite = Environment.GetEnvironmentVariable("BUILDKITE") == "true";
            var host = isBuildkite ? "172.17.0.1" : "localhost";
        
            // Replace DbContext
            AppContext.SetSwitch("Npgsql.EnableLegacyTimestampBehavior", true);
            services.RemoveDbContext<ApplicationDbContext>();
            var appConnectionString =
             $"Host={host};Port=5432;Database=test-db;Username=test-db-user;Password=test-db-password;”;
        
            services.AddDbContext<ApplicationDbContext>(options => options.UseNpgsql(appConnectionString, sqlOptions =>
            {
                sqlOptions.EnableRetryOnFailure();
                sqlOptions.UseNodaTime();
            }));
        
            // Override Kafka config
            services.AddKafkaProducer(new ConfigurationBuilder().Build(), options =>
            {
                options.TopicName = "MyTopicName";
                options.RetryCount = 1;
                options.RetryTimeoutInSeconds = 1;
                options.BootstrapServers = $"{host}:9092";
                options.SchemaRegistry = $"http://{host}:8081";
            });
        });
    }
}

I can run my integration tests on my local machine with the host value localhost. When I try to run these tests on Buildkite pipeline, I confirm that I can connect to PostgreSql instance by using the host 172.17.0.1 but can not access to Kafka broker. I tried multiple host values as the BootstrapServers like 172.17.0.1:9092, 172.17.0.1:29092, broker:9092, broker: 29092, localhost:9092, localhost:29092 but no luck. When I try to run it with 172.17.0.1:9092, I get the following error:

localhost:9092: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)

What should be my BootstrapServers value to access to my Kafka container in Buildkite pipeline?

0

There are 0 answers