diff --git a/nixos/tests/all-tests.nix b/nixos/tests/all-tests.nix index 1c354f312617..b3d2fbaee8ed 100644 --- a/nixos/tests/all-tests.nix +++ b/nixos/tests/all-tests.nix @@ -1488,6 +1488,7 @@ in teleport = handleTest ./teleport.nix { }; teleports = runTest ./teleports.nix; thelounge = handleTest ./thelounge.nix { }; + temporal = runTest ./temporal.nix; terminal-emulators = handleTest ./terminal-emulators.nix { }; thanos = runTest ./thanos.nix; tiddlywiki = runTest ./tiddlywiki.nix; diff --git a/nixos/tests/temporal.nix b/nixos/tests/temporal.nix new file mode 100644 index 000000000000..80ad0540a673 --- /dev/null +++ b/nixos/tests/temporal.nix @@ -0,0 +1,311 @@ +( + { lib, pkgs, ... }: + + { + name = "temporal"; + meta.maintainers = [ pkgs.lib.maintainers.jpds ]; + + nodes = { + temporal = + { config, pkgs, ... }: + { + networking.firewall.allowedTCPPorts = [ 7233 ]; + + environment.systemPackages = [ + (pkgs.writers.writePython3Bin "temporal-hello-workflow.py" + { + libraries = [ pkgs.python3Packages.temporalio ]; + } + # Graciously taken from https://github.com/temporalio/samples-python/blob/main/hello/hello_activity.py + '' + import asyncio + from concurrent.futures import ThreadPoolExecutor + from dataclasses import dataclass + from datetime import timedelta + + from temporalio import activity, workflow + from temporalio.client import Client + from temporalio.worker import Worker + + + # While we could use multiple parameters in the activity, Temporal strongly + # encourages using a single dataclass instead which can have fields added to it + # in a backwards-compatible way. + @dataclass + class ComposeGreetingInput: + greeting: str + name: str + + + # Basic activity that logs and does string concatenation + @activity.defn + def compose_greeting(input: ComposeGreetingInput) -> str: + activity.logger.info("Running activity with parameter %s" % input) + return f"{input.greeting}, {input.name}!" + + + # Basic workflow that logs and invokes an activity + @workflow.defn + class GreetingWorkflow: + @workflow.run + async def run(self, name: str) -> str: + workflow.logger.info("Running workflow with parameter %s" % name) + return await workflow.execute_activity( + compose_greeting, + ComposeGreetingInput("Hello", name), + start_to_close_timeout=timedelta(seconds=10), + ) + + + async def main(): + # Uncomment the lines below to see logging output + # import logging + # logging.basicConfig(level=logging.INFO) + + # Start client + client = await Client.connect("localhost:7233") + + # Run a worker for the workflow + async with Worker( + client, + task_queue="hello-activity-task-queue", + workflows=[GreetingWorkflow], + activities=[compose_greeting], + # Non-async activities require an executor; + # a thread pool executor is recommended. + # This same thread pool could be passed to multiple workers if desired. + activity_executor=ThreadPoolExecutor(5), + ): + + # While the worker is running, use the client to run the workflow and + # print out its result. Note, in many production setups, the client + # would be in a completely separate process from the worker. + result = await client.execute_workflow( + GreetingWorkflow.run, + "World", + id="hello-activity-workflow-id", + task_queue="hello-activity-task-queue", + ) + print(f"Result: {result}") + + + if __name__ == "__main__": + asyncio.run(main()) + '' + ) + pkgs.temporal-cli + ]; + + services.temporal = { + enable = true; + settings = { + # Based on https://github.com/temporalio/temporal/blob/main/config/development-sqlite.yaml + log = { + stdout = true; + level = "info"; + }; + services = { + frontend = { + rpc = { + grpcPort = 7233; + membershipPort = 6933; + bindOnLocalHost = true; + httpPort = 7243; + }; + }; + matching = { + rpc = { + grpcPort = 7235; + membershipPort = 6935; + bindOnLocalHost = true; + }; + }; + history = { + rpc = { + grpcPort = 7234; + membershipPort = 6934; + bindOnLocalHost = true; + }; + }; + worker = { + rpc = { + grpcPort = 7239; + membershipPort = 6939; + bindOnLocalHost = true; + }; + }; + }; + + persistence = { + defaultStore = "sqlite-default"; + visibilityStore = "sqlite-visibility"; + numHistoryShards = 1; + datastores = { + sqlite-default = { + sql = { + user = ""; + password = ""; + pluginName = "sqlite"; + databaseName = "default"; + connectAddr = "localhost"; + connectProtocol = "tcp"; + connectAttributes = { + mode = "memory"; + cache = "private"; + }; + maxConns = 1; + maxIdleConns = 1; + maxConnLifetime = "1h"; + tls = { + enabled = false; + caFile = ""; + certFile = ""; + keyFile = ""; + enableHostVerification = false; + serverName = ""; + }; + }; + }; + sqlite-visibility = { + sql = { + user = ""; + password = ""; + pluginName = "sqlite"; + databaseName = "default"; + connectAddr = "localhost"; + connectProtocol = "tcp"; + connectAttributes = { + mode = "memory"; + cache = "private"; + }; + maxConns = 1; + maxIdleConns = 1; + maxConnLifetime = "1h"; + tls = { + enabled = false; + caFile = ""; + certFile = ""; + keyFile = ""; + enableHostVerification = false; + serverName = ""; + }; + }; + }; + }; + }; + clusterMetadata = { + enableGlobalNamespace = false; + failoverVersionIncrement = 10; + masterClusterName = "active"; + currentClusterName = "active"; + clusterInformation = { + active = { + enabled = true; + initialFailoverVersion = 1; + rpcName = "frontend"; + rpcAddress = "localhost:7233"; + httpAddress = "localhost:7243"; + }; + }; + }; + + dcRedirectionPolicy = { + policy = "noop"; + }; + + archival = { + history = { + state = "enabled"; + enableRead = true; + provider = { + filestore = { + fileMode = "0666"; + dirMode = "0766"; + }; + gstorage = { + credentialsPath = "/tmp/gcloud/keyfile.json"; + }; + }; + }; + visibility = { + state = "enabled"; + enableRead = true; + provider = { + filestore = { + fileMode = "0666"; + dirMode = "0766"; + }; + }; + }; + }; + + namespaceDefaults = { + archival = { + history = { + state = "disabled"; + URI = "file:///tmp/temporal_archival/development"; + }; + visibility = { + state = "disabled"; + URI = "file:///tmp/temporal_vis_archival/development"; + }; + }; + }; + }; + }; + }; + }; + + testScript = '' + temporal.wait_for_unit("temporal") + temporal.wait_for_open_port(6933) + temporal.wait_for_open_port(6934) + temporal.wait_for_open_port(6935) + temporal.wait_for_open_port(7233) + temporal.wait_for_open_port(7234) + temporal.wait_for_open_port(7235) + + temporal.wait_until_succeeds( + "journalctl -o cat -u temporal.service | grep 'server-version' | grep '${pkgs.temporal.version}'" + ) + + temporal.wait_until_succeeds( + "journalctl -o cat -u temporal.service | grep 'Frontend is now healthy'" + ) + + import json + cluster_list_json = json.loads(temporal.wait_until_succeeds("temporal operator cluster list --output json")) + assert cluster_list_json[0]['clusterName'] == "active" + + cluster_describe_json = json.loads(temporal.wait_until_succeeds("temporal operator cluster describe --output json")) + assert cluster_describe_json['serverVersion'] in "${pkgs.temporal.version}" + + temporal.log(temporal.wait_until_succeeds("temporal operator namespace create --namespace default")) + + temporal.wait_until_succeeds( + "journalctl -o cat -u temporal.service | grep 'Register namespace succeeded'" + ) + + namespace_list_json = json.loads(temporal.wait_until_succeeds("temporal operator namespace list --output json")) + assert len(namespace_list_json) == 2 + + namespace_describe_json = json.loads(temporal.wait_until_succeeds("temporal operator namespace describe --output json --namespace default")) + assert namespace_describe_json['namespaceInfo']['name'] == "default" + assert namespace_describe_json['namespaceInfo']['state'] == "NAMESPACE_STATE_REGISTERED" + + workflow_json = json.loads(temporal.wait_until_succeeds("temporal workflow list --output json")) + assert len(workflow_json) == 0 + + out = temporal.wait_until_succeeds("temporal-hello-workflow.py") + assert "Result: Hello, World!" in out + + workflow_json = json.loads(temporal.wait_until_succeeds("temporal workflow list --output json")) + assert workflow_json[0]['execution']['workflowId'] == "hello-activity-workflow-id" + assert workflow_json[0]['status'] == "WORKFLOW_EXECUTION_STATUS_COMPLETED" + + temporal.log(temporal.succeed( + "systemd-analyze security temporal.service | grep -v '✓'" + )) + ''; + } +)